This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ccb8f1d6945 [HUDI-8008] Handle proto schema references in returned 
schema (#11660)
ccb8f1d6945 is described below

commit ccb8f1d69455e69568166c545ec61925cfcd53ce
Author: Tim Brown <[email protected]>
AuthorDate: Mon Jul 22 17:53:47 2024 -0700

    [HUDI-8008] Handle proto schema references in returned schema (#11660)
---
 hudi-utilities/pom.xml                             |   8 +
 .../utilities/schema/SchemaRegistryProvider.java   | 186 ++++++++++++---------
 .../converter/JsonToAvroSchemaConverter.java       |  12 +-
 .../ProtoSchemaToAvroSchemaConverter.java          |   6 +-
 .../schema/TestSchemaRegistryProvider.java         | 148 ++++++++--------
 .../converter/TestJsonToAvroSchemaConverter.java   |   3 +-
 .../TestProtoSchemaToAvroSchemaConverter.java      |   4 +-
 pom.xml                                            |  16 ++
 8 files changed, 229 insertions(+), 154 deletions(-)

diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml
index 47c172b7791..8e2d8387332 100644
--- a/hudi-utilities/pom.xml
+++ b/hudi-utilities/pom.xml
@@ -379,6 +379,14 @@
       <groupId>io.confluent</groupId>
       <artifactId>kafka-protobuf-serializer</artifactId>
     </dependency>
+    <dependency>
+      <groupId>io.confluent</groupId>
+      <artifactId>kafka-json-schema-serializer</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.kjetland</groupId>
+      
<artifactId>mbknor-jackson-jsonschema_${scala.binary.version}</artifactId>
+    </dependency>
 
     <!-- Httpcomponents -->
     <dependency>
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
index 1c2e9181fd7..bdc47225a6e 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
@@ -19,41 +19,50 @@
 package org.apache.hudi.utilities.schema;
 
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.function.SerializableFunctionUnchecked;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.Triple;
 import org.apache.hudi.internal.schema.HoodieSchemaException;
 import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig;
 import org.apache.hudi.utilities.exception.HoodieSchemaFetchException;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import io.confluent.kafka.schemaregistry.ParsedSchema;
+import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.RestService;
+import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
 import org.apache.avro.Schema;
 import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
 import org.apache.http.ssl.SSLContextBuilder;
 import org.apache.http.ssl.SSLContexts;
 import org.apache.spark.api.java.JavaSparkContext;
 
-import javax.net.ssl.HttpsURLConnection;
 import javax.net.ssl.SSLSocketFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.InputStream;
-import java.net.HttpURLConnection;
-import java.net.URL;
+import java.nio.charset.StandardCharsets;
 import java.security.KeyManagementException;
 import java.security.KeyStoreException;
 import java.security.NoSuchAlgorithmException;
 import java.security.UnrecoverableKeyException;
 import java.security.cert.CertificateException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Base64;
 import java.util.Collections;
+import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import static 
org.apache.hudi.common.util.ConfigUtils.checkRequiredConfigProperties;
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
-import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 
 /**
  * Obtains latest schema from the Confluent/Kafka schema-registry.
@@ -61,6 +70,8 @@ import static 
org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
  * https://github.com/confluentinc/schema-registry
  */
 public class SchemaRegistryProvider extends SchemaProvider {
+  private static final Pattern URL_PATTERN = 
Pattern.compile("(.*/)subjects/(.*)/versions/(.*)");
+  private static final String LATEST = "latest";
 
   /**
    * Configs supported.
@@ -82,34 +93,53 @@ public class SchemaRegistryProvider extends SchemaProvider {
     public static final String SSL_KEY_PASSWORD_PROP = 
"schema.registry.ssl.key.password";
   }
 
-  protected Schema cachedSourceSchema;
-  protected Schema cachedTargetSchema;
+  private final Option<SchemaConverter> schemaConverter;
+  private final SerializableFunctionUnchecked<String, RestService> 
restServiceProvider;
+  private final SerializableFunctionUnchecked<RestService, 
SchemaRegistryClient> registryClientProvider;
 
-  private final String srcSchemaRegistryUrl;
-  private final String targetSchemaRegistryUrl;
+  public SchemaRegistryProvider(TypedProperties props, JavaSparkContext jssc) {
+    super(props, jssc);
+    checkRequiredConfigProperties(props, 
Collections.singletonList(HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL));
+    if (config.containsKey(Config.SSL_KEYSTORE_LOCATION_PROP)
+        || config.containsKey(Config.SSL_TRUSTSTORE_LOCATION_PROP)) {
+      setUpSSLStores();
+    }
+    String schemaConverter = getStringWithAltKeys(config, 
HoodieSchemaProviderConfig.SCHEMA_CONVERTER, true);
+    this.schemaConverter = !StringUtils.isNullOrEmpty(schemaConverter)
+        ? Option.of((SchemaConverter) ReflectionUtils.loadClass(
+        schemaConverter, new Class<?>[] {TypedProperties.class}, config))
+        : Option.empty();
+    this.restServiceProvider = RestService::new;
+    this.registryClientProvider = restService -> new 
CachedSchemaRegistryClient(restService, 100,
+        Arrays.asList(new ProtobufSchemaProvider(), new JsonSchemaProvider(), 
new AvroSchemaProvider()), null, null);
+  }
+
+  @VisibleForTesting
+  SchemaRegistryProvider(TypedProperties props, JavaSparkContext jssc,
+                         Option<SchemaConverter> schemaConverter,
+                         SerializableFunctionUnchecked<String, RestService> 
restServiceProvider,
+                         SerializableFunctionUnchecked<RestService, 
SchemaRegistryClient> registryClientProvider) {
+    super(props, jssc);
+    checkRequiredConfigProperties(props, 
Collections.singletonList(HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL));
+    this.schemaConverter = schemaConverter;
+    this.restServiceProvider = restServiceProvider;
+    this.registryClientProvider = registryClientProvider;
+  }
 
   @FunctionalInterface
   public interface SchemaConverter {
     /**
      * Convert original schema string to avro schema string.
      *
-     * @param schema original schema string (e.g., JSON)
+     * @param schema original schema returned from the registry
      * @return avro schema string
      */
-    String convert(String schema) throws IOException;
+    String convert(ParsedSchema schema) throws IOException;
   }
 
   public Schema parseSchemaFromRegistry(String registryUrl) {
     String schema = fetchSchemaFromRegistry(registryUrl);
-    try {
-      String schemaConverter = getStringWithAltKeys(config, 
HoodieSchemaProviderConfig.SCHEMA_CONVERTER, true);
-      SchemaConverter converter = !StringUtils.isNullOrEmpty(schemaConverter)
-          ? ReflectionUtils.loadClass(schemaConverter)
-          : s -> s;
-      return new Schema.Parser().parse(converter.convert(schema));
-    } catch (Exception e) {
-      throw new HoodieSchemaException("Failed to parse schema from registry: " 
+ schema, e);
-    }
+    return new Schema.Parser().parse(schema);
   }
 
   /**
@@ -123,55 +153,73 @@ public class SchemaRegistryProvider extends 
SchemaProvider {
    */
   public String fetchSchemaFromRegistry(String registryUrl) {
     try {
-      HttpURLConnection connection;
       Matcher matcher = Pattern.compile("://(.*?)@").matcher(registryUrl);
+      Triple<String, String, String> registryInfo;
+      String creds = null;
       if (matcher.find()) {
-        String creds = matcher.group(1);
+        creds = matcher.group(1);
         String urlWithoutCreds = registryUrl.replace(creds + "@", "");
-        connection = getConnection(urlWithoutCreds);
-        setAuthorizationHeader(matcher.group(1), connection);
+        registryInfo = getUrlSubjectAndVersion(urlWithoutCreds);
       } else {
-        connection = getConnection(registryUrl);
+        registryInfo = getUrlSubjectAndVersion(registryUrl);
+      }
+      String url = registryInfo.getLeft();
+      RestService restService = getRestService(url);
+      if (creds != null) {
+        setAuthorizationHeader(creds, restService);
+      }
+      String subject = registryInfo.getMiddle();
+      String version = registryInfo.getRight();
+      SchemaRegistryClient registryClient = 
registryClientProvider.apply(restService);
+      SchemaMetadata schemaMetadata = version.equals(LATEST) ? 
registryClient.getLatestSchemaMetadata(subject) : 
registryClient.getSchemaMetadata(subject, Integer.parseInt(version));
+      ParsedSchema parsedSchema = 
registryClient.parseSchema(schemaMetadata.getSchemaType(), 
schemaMetadata.getSchema(), schemaMetadata.getReferences())
+          .orElseThrow(() -> new HoodieSchemaException("Failed to parse schema 
from registry"));
+      if (schemaConverter.isPresent()) {
+        return schemaConverter.get().convert(parsedSchema);
+      } else {
+        return parsedSchema.canonicalString();
       }
-      ObjectMapper mapper = new ObjectMapper();
-      JsonNode node = mapper.readTree(getStream(connection));
-      return node.get("schema").asText();
     } catch (Exception e) {
       throw new HoodieSchemaFetchException("Failed to fetch schema from 
registry", e);
     }
   }
 
-  private SSLSocketFactory sslSocketFactory;
-
-  protected HttpURLConnection getConnection(String url) throws IOException {
-    URL registry = new URL(url);
-    if (sslSocketFactory != null) {
-      // we cannot cast to HttpsURLConnection if url is http so only cast when 
sslSocketFactory is set
-      HttpsURLConnection connection = (HttpsURLConnection) 
registry.openConnection();
-      connection.setSSLSocketFactory(sslSocketFactory);
-      return connection;
+  private Triple<String, String, String> getUrlSubjectAndVersion(String 
registryUrl) {
+    // url may be list of urls
+    String[] splitRegistryUrls = registryUrl.split(",");
+    String subjectName = null;
+    String version = null;
+    List<String> urls = new ArrayList<>(splitRegistryUrls.length);
+    // url will end with /subjects/{subject}/versions/{version}
+    for (String url : splitRegistryUrls) {
+      Matcher matcher = URL_PATTERN.matcher(url);
+      if (!matcher.matches()) {
+        throw new HoodieSchemaFetchException("Failed to extract subject name 
and version from registry url");
+      }
+      urls.add(matcher.group(1));
+      subjectName = matcher.group(2);
+      version = matcher.group(3);
+    }
+    if (subjectName == null) {
+      throw new HoodieSchemaFetchException("Failed to extract subject name 
from registry url");
     }
-    return (HttpURLConnection) registry.openConnection();
+    return Triple.of(String.join(",", urls), subjectName, version);
   }
 
-  protected void setAuthorizationHeader(String creds, HttpURLConnection 
connection) {
-    String encodedAuth = 
Base64.getEncoder().encodeToString(getUTF8Bytes(creds));
-    connection.setRequestProperty("Authorization", "Basic " + encodedAuth);
-  }
+  private SSLSocketFactory sslSocketFactory;
 
-  protected InputStream getStream(HttpURLConnection connection) throws 
IOException {
-    return connection.getInputStream();
+  protected RestService getRestService(String url) {
+    RestService restService = restServiceProvider.apply(url);
+    if (sslSocketFactory != null) {
+      restService.setSslSocketFactory(sslSocketFactory);
+      return restService;
+    }
+    return restService;
   }
 
-  public SchemaRegistryProvider(TypedProperties props, JavaSparkContext jssc) {
-    super(props, jssc);
-    checkRequiredConfigProperties(props, 
Collections.singletonList(HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL));
-    this.srcSchemaRegistryUrl = getStringWithAltKeys(config, 
HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL);
-    this.targetSchemaRegistryUrl = getStringWithAltKeys(config, 
HoodieSchemaProviderConfig.TARGET_SCHEMA_REGISTRY_URL, srcSchemaRegistryUrl);
-    if (config.containsKey(Config.SSL_KEYSTORE_LOCATION_PROP)
-        || config.containsKey(Config.SSL_TRUSTSTORE_LOCATION_PROP)) {
-      setUpSSLStores();
-    }
+  protected void setAuthorizationHeader(String creds, RestService restService) 
{
+    String encodedAuth = 
Base64.getEncoder().encodeToString(creds.getBytes(StandardCharsets.UTF_8));
+    restService.setHttpHeaders(Collections.singletonMap("Authorization", 
"Basic " + encodedAuth));
   }
 
   private void setUpSSLStores() {
@@ -199,42 +247,30 @@ public class SchemaRegistryProvider extends 
SchemaProvider {
 
   @Override
   public Schema getSourceSchema() {
+    String registryUrl = getStringWithAltKeys(config, 
HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL);
     try {
-      if (cachedSourceSchema == null) {
-        cachedSourceSchema = 
parseSchemaFromRegistry(this.srcSchemaRegistryUrl);
-      }
-      return cachedSourceSchema;
+      return parseSchemaFromRegistry(registryUrl);
     } catch (Exception e) {
       throw new HoodieSchemaFetchException(String.format(
           "Error reading source schema from registry. Please check %s is 
configured correctly. Truncated URL: %s",
           Config.SRC_SCHEMA_REGISTRY_URL_PROP,
-          StringUtils.truncate(srcSchemaRegistryUrl, 10, 10)), e);
+          StringUtils.truncate(registryUrl, 10, 10)), e);
     }
   }
 
   @Override
   public Schema getTargetSchema() {
+    String registryUrl = getStringWithAltKeys(config, 
HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL);
+    String targetRegistryUrl =
+        getStringWithAltKeys(config, 
HoodieSchemaProviderConfig.TARGET_SCHEMA_REGISTRY_URL, registryUrl);
     try {
-      if (cachedTargetSchema == null) {
-        cachedTargetSchema = 
parseSchemaFromRegistry(this.targetSchemaRegistryUrl);
-      }
-      return cachedTargetSchema;
+      return parseSchemaFromRegistry(targetRegistryUrl);
     } catch (Exception e) {
       throw new HoodieSchemaFetchException(String.format(
           "Error reading target schema from registry. Please check %s is 
configured correctly. If that is not configured then check %s. Truncated URL: 
%s",
           Config.SRC_SCHEMA_REGISTRY_URL_PROP,
           Config.TARGET_SCHEMA_REGISTRY_URL_PROP,
-          StringUtils.truncate(targetSchemaRegistryUrl, 10, 10)), e);
+          StringUtils.truncate(targetRegistryUrl, 10, 10)), e);
     }
   }
-
-  // Per SyncOnce call, the cachedschema for the provider is dropped and 
SourceSchema re-attained
-  // Subsequent calls to getSourceSchema within the write batch should be 
cached.
-  @Override
-  public void refresh() {
-    cachedSourceSchema = null;
-    cachedTargetSchema = null;
-    getSourceSchema();
-    getTargetSchema();
-  }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/JsonToAvroSchemaConverter.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/JsonToAvroSchemaConverter.java
index 9f892ab8f0e..cb9a4eb63d3 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/JsonToAvroSchemaConverter.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/JsonToAvroSchemaConverter.java
@@ -19,6 +19,7 @@
 
 package org.apache.hudi.utilities.schema.converter;
 
+import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.JsonUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
@@ -29,6 +30,8 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.NullNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.fasterxml.jackson.databind.node.TextNode;
+import io.confluent.kafka.schemaregistry.ParsedSchema;
+import io.confluent.kafka.schemaregistry.json.JsonSchema;
 
 import java.io.IOException;
 import java.net.URI;
@@ -58,9 +61,14 @@ public class JsonToAvroSchemaConverter implements 
SchemaRegistryProvider.SchemaC
   }).collect(Collectors.collectingAndThen(Collectors.toMap(p -> p[0], p -> 
p[1]), Collections::<String, String>unmodifiableMap));
   private static final Pattern SYMBOL_REGEX = 
Pattern.compile("^[A-Za-z_][A-Za-z0-9_]*$");
 
+  public JsonToAvroSchemaConverter(TypedProperties properties) {
+    // properties unused in this converter
+  }
+
   @Override
-  public String convert(String jsonSchema) throws IOException {
-    JsonNode jsonNode = MAPPER.readTree(jsonSchema);
+  public String convert(ParsedSchema parsedSchema) throws IOException {
+    JsonSchema jsonSchema = (JsonSchema) parsedSchema;
+    JsonNode jsonNode = MAPPER.readTree(jsonSchema.canonicalString());
     ObjectNode avroRecord = MAPPER.createObjectNode()
         .put("type", "record")
         .put("name", getAvroSchemaRecordName(jsonNode))
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/ProtoSchemaToAvroSchemaConverter.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/ProtoSchemaToAvroSchemaConverter.java
index 78ef25e9a04..2845c3fc1f9 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/ProtoSchemaToAvroSchemaConverter.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/ProtoSchemaToAvroSchemaConverter.java
@@ -21,6 +21,7 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
 import org.apache.hudi.utilities.sources.helpers.ProtoConversionUtil;
 
+import io.confluent.kafka.schemaregistry.ParsedSchema;
 import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
 
 import java.io.IOException;
@@ -35,9 +36,8 @@ public class ProtoSchemaToAvroSchemaConverter implements 
SchemaRegistryProvider.
     this.schemaConfig = 
ProtoConversionUtil.SchemaConfig.fromProperties(config);
   }
 
-  @Override
-  public String convert(String schema) throws IOException {
-    ProtobufSchema protobufSchema = new ProtobufSchema(schema);
+  public String convert(ParsedSchema schema) throws IOException {
+    ProtobufSchema protobufSchema = (ProtobufSchema) schema;
     return 
ProtoConversionUtil.getAvroSchemaForMessageDescriptor(protobufSchema.toDescriptor(),
 schemaConfig).toString();
   }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java
index 88f67723c85..35272a33906 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java
@@ -19,33 +19,30 @@
 package org.apache.hudi.utilities.schema;
 
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.function.SerializableFunctionUnchecked;
+import org.apache.hudi.common.util.Option;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.fasterxml.jackson.databind.node.TextNode;
+import io.confluent.kafka.schemaregistry.ParsedSchema;
+import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.RestService;
 import org.apache.avro.Schema;
 import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
 
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.HttpURLConnection;
+import java.util.Base64;
+import java.util.Collections;
 
-import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.times;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 class TestSchemaRegistryProvider {
 
   private static final String BASIC_AUTH = "foo:bar";
-
-  private static final String REGISTRY_RESPONSE = 
"{\"schema\":\"{\\\"type\\\": \\\"record\\\", \\\"namespace\\\": 
\\\"example\\\", "
-      + "\\\"name\\\": \\\"FullName\\\",\\\"fields\\\": [{ \\\"name\\\": 
\\\"first\\\", \\\"type\\\": "
-      + "\\\"string\\\" }]}\"}";
   private static final String RAW_SCHEMA = "{\"type\": \"record\", 
\"namespace\": \"example\", "
       + "\"name\": \"FullName\",\"fields\": [{ \"name\": \"first\", \"type\": "
       + "\"string\" }]}";
@@ -64,93 +61,100 @@ class TestSchemaRegistryProvider {
   private static TypedProperties getProps() {
     return new TypedProperties() {
       {
-        put("hoodie.streamer.schemaprovider.registry.baseUrl", "http://"; + 
BASIC_AUTH + "@localhost");
-        put("hoodie.streamer.schemaprovider.registry.urlSuffix", "-value");
-        put("hoodie.streamer.schemaprovider.registry.url", 
"http://foo:bar@localhost";);
-        put("hoodie.streamer.source.kafka.topic", "foo");
+        put("hoodie.deltastreamer.schemaprovider.registry.baseUrl", "http://"; 
+ BASIC_AUTH + "@localhost");
+        put("hoodie.deltastreamer.schemaprovider.registry.urlSuffix", 
"-value");
+        put("hoodie.deltastreamer.schemaprovider.registry.url", 
"http://foo:bar@localhost/subjects/test/versions/latest";);
+        put("hoodie.deltastreamer.source.kafka.topic", "foo");
       }
     };
   }
 
-  private static SchemaRegistryProvider getUnderTest(TypedProperties props) 
throws IOException {
-    InputStream is = new ByteArrayInputStream(getUTF8Bytes(REGISTRY_RESPONSE));
-    SchemaRegistryProvider spyUnderTest = Mockito.spy(new 
SchemaRegistryProvider(props, null));
-    Mockito.doReturn(is).when(spyUnderTest).getStream(Mockito.any());
-    return spyUnderTest;
+  private final SchemaRegistryProvider.SchemaConverter mockSchemaConverter = 
mock(SchemaRegistryProvider.SchemaConverter.class);
+  private final RestService mockRestService = mock(RestService.class);
+  private final SchemaRegistryClient mockRegistryClient = 
mock(SchemaRegistryClient.class);
+
+  private SchemaRegistryProvider getUnderTest(TypedProperties props, int 
version, boolean useConverter) throws Exception {
+    SerializableFunctionUnchecked<String, RestService> mockRestServiceFactory 
= mock(SerializableFunctionUnchecked.class);
+    
when(mockRestServiceFactory.apply("http://localhost/";)).thenReturn(mockRestService);
+    SerializableFunctionUnchecked<RestService, SchemaRegistryClient> 
mockRegistryClientFactory = mock(SerializableFunctionUnchecked.class);
+    
when(mockRegistryClientFactory.apply(mockRestService)).thenReturn(mockRegistryClient);
+    SchemaRegistryProvider underTest = new SchemaRegistryProvider(props, null, 
useConverter ? Option.of(mockSchemaConverter) : Option.empty(),
+        mockRestServiceFactory, mockRegistryClientFactory);
+    SchemaMetadata metadata = new SchemaMetadata(1, 1, RAW_SCHEMA);
+    if (version == -1) {
+      
when(mockRegistryClient.getLatestSchemaMetadata("test")).thenReturn(metadata);
+    } else {
+      when(mockRegistryClient.getSchemaMetadata("test", 
version)).thenReturn(metadata);
+    }
+    ParsedSchema mockParsedSchema = mock(ParsedSchema.class);
+    when(mockRegistryClient.parseSchema("AVRO", RAW_SCHEMA, 
Collections.emptyList())).thenReturn(java.util.Optional.of(mockParsedSchema));
+    if (useConverter) {
+      
when(mockSchemaConverter.convert(mockParsedSchema)).thenReturn(CONVERTED_SCHEMA);
+    } else {
+      when(mockParsedSchema.canonicalString()).thenReturn(RAW_SCHEMA);
+    }
+    return underTest;
   }
 
   @Test
-  public void testGetSourceSchemaShouldRequestSchemaWithCreds() throws 
IOException {
-    SchemaRegistryProvider spyUnderTest = getUnderTest(getProps());
-    Schema actual = spyUnderTest.getSourceSchema();
+  public void testGetSourceSchemaShouldRequestSchemaWithCreds() throws 
Exception {
+    SchemaRegistryProvider underTest = getUnderTest(getProps(), -1, true);
+    Schema actual = underTest.getSourceSchema();
     assertNotNull(actual);
-    assertEquals(getExpectedSchema(), actual);
-    verify(spyUnderTest, times(1)).setAuthorizationHeader(eq(BASIC_AUTH),
-        Mockito.any(HttpURLConnection.class));
+    assertEquals(getExpectedConvertedSchema(), actual);
+    
verify(mockRestService).setHttpHeaders(Collections.singletonMap("Authorization",
 "Basic " + Base64.getEncoder().encodeToString(BASIC_AUTH.getBytes())));
   }
 
   @Test
-  public void testGetTargetSchemaShouldRequestSchemaWithCreds() throws 
IOException {
-    SchemaRegistryProvider spyUnderTest = getUnderTest(getProps());
-    Schema actual = spyUnderTest.getTargetSchema();
+  public void testGetTargetSchemaShouldRequestSchemaWithCreds() throws 
Exception {
+    SchemaRegistryProvider underTest = getUnderTest(getProps(), -1, true);
+    Schema actual = underTest.getTargetSchema();
     assertNotNull(actual);
-    assertEquals(getExpectedSchema(), actual);
-    verify(spyUnderTest, times(1)).setAuthorizationHeader(eq(BASIC_AUTH),
-        Mockito.any(HttpURLConnection.class));
+    assertEquals(getExpectedConvertedSchema(), actual);
+    
verify(mockRestService).setHttpHeaders(Collections.singletonMap("Authorization",
 "Basic " + Base64.getEncoder().encodeToString(BASIC_AUTH.getBytes())));
   }
 
   @Test
-  public void testGetSourceSchemaShouldRequestSchemaWithoutCreds() throws 
IOException {
+  public void testGetSourceSchemaShouldRequestSchemaWithoutCreds() throws 
Exception {
     TypedProperties props = getProps();
-    props.put("hoodie.streamer.schemaprovider.registry.url", 
"http://localhost";);
-    props.put("hoodie.streamer.schemaprovider.registry.schemaconverter", 
DummySchemaConverter.class.getName());
-    SchemaRegistryProvider spyUnderTest = getUnderTest(props);
-    Schema actual = spyUnderTest.getSourceSchema();
+    props.put("hoodie.deltastreamer.schemaprovider.registry.url", 
"http://localhost/subjects/test/versions/latest";);
+    SchemaRegistryProvider underTest = getUnderTest(props, -1, true);
+    Schema actual = underTest.getSourceSchema();
     assertNotNull(actual);
     assertEquals(getExpectedConvertedSchema(), actual);
-    verify(spyUnderTest, times(0)).setAuthorizationHeader(Mockito.any(), 
Mockito.any());
+    verify(mockRestService, never()).setHttpHeaders(any());
   }
 
   @Test
-  public void testGetTargetSchemaShouldRequestSchemaWithoutCreds() throws 
IOException {
+  public void testGetTargetSchemaShouldRequestSchemaWithoutCreds() throws 
Exception {
     TypedProperties props = getProps();
-    props.put("hoodie.streamer.schemaprovider.registry.url", 
"http://localhost";);
-    props.put("hoodie.streamer.schemaprovider.registry.schemaconverter", 
DummySchemaConverter.class.getName());
-    SchemaRegistryProvider spyUnderTest = getUnderTest(props);
-    Schema actual = spyUnderTest.getTargetSchema();
+    props.put("hoodie.deltastreamer.schemaprovider.registry.url", 
"http://localhost/subjects/test/versions/latest";);
+    SchemaRegistryProvider underTest = getUnderTest(props, -1, true);
+    Schema actual = underTest.getTargetSchema();
     assertNotNull(actual);
     assertEquals(getExpectedConvertedSchema(), actual);
-    verify(spyUnderTest, times(0)).setAuthorizationHeader(Mockito.any(), 
Mockito.any());
+    verify(mockRestService, never()).setHttpHeaders(any());
   }
 
-  public static class DummySchemaConverter implements 
SchemaRegistryProvider.SchemaConverter {
-
-    @Override
-    public String convert(String schema) throws IOException {
-      return ((ObjectNode) new ObjectMapper()
-          .readTree(schema))
-          .set("namespace", TextNode.valueOf("com.example.hoodie"))
-          .toString();
-    }
+  @Test
+  public void testGetTargetSchemaWithoutConverter() throws Exception {
+    TypedProperties props = getProps();
+    props.put("hoodie.deltastreamer.schemaprovider.registry.url", 
"http://localhost/subjects/test/versions/latest";);
+    SchemaRegistryProvider underTest = getUnderTest(props, -1, false);
+    Schema actual = underTest.getTargetSchema();
+    assertNotNull(actual);
+    assertEquals(getExpectedSchema(), actual);
+    verify(mockRestService, never()).setHttpHeaders(any());
   }
 
-  // The SR is checked when cachedSchema is empty, when not empty, the 
cachedSchema is used.
   @Test
-  public void testGetSourceSchemaUsesCachedSchema() throws IOException {
+  public void testUrlWithSpecificSchemaVerson() throws Exception {
     TypedProperties props = getProps();
-    SchemaRegistryProvider spyUnderTest = getUnderTest(props);
-
-    // Call when cachedSchema is empty
-    Schema actual = spyUnderTest.getSourceSchema();
+    props.put("hoodie.deltastreamer.schemaprovider.registry.url", 
"http://localhost/subjects/test/versions/3";);
+    SchemaRegistryProvider underTest = getUnderTest(props, 3, false);
+    Schema actual = underTest.getTargetSchema();
     assertNotNull(actual);
-    verify(spyUnderTest, times(1)).parseSchemaFromRegistry(Mockito.any());
-
-    assert spyUnderTest.cachedSourceSchema != null;
-
-    Schema actualTwo = spyUnderTest.getSourceSchema();
-    
-    // cachedSchema should now be set, a subsequent call should not call 
parseSchemaFromRegistry
-    // Assuming this verify() has the scope of the whole test? so it should 
still be 1 from previous call?
-    verify(spyUnderTest, times(1)).parseSchemaFromRegistry(Mockito.any());
+    assertEquals(getExpectedSchema(), actual);
+    verify(mockRestService, never()).setHttpHeaders(any());
   }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestJsonToAvroSchemaConverter.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestJsonToAvroSchemaConverter.java
index 335ddeee616..dd44704fb5d 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestJsonToAvroSchemaConverter.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestJsonToAvroSchemaConverter.java
@@ -19,6 +19,7 @@
 
 package org.apache.hudi.utilities.schema.converter;
 
+import io.confluent.kafka.schemaregistry.json.JsonSchema;
 import org.apache.avro.Schema;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
@@ -43,7 +44,7 @@ class TestJsonToAvroSchemaConverter {
   })
   void testConvertJsonSchemaToAvroSchema(String inputCase) throws IOException {
     String jsonSchema = loadJsonSchema(inputCase);
-    String avroSchema = new JsonToAvroSchemaConverter().convert(jsonSchema);
+    String avroSchema = new JsonToAvroSchemaConverter(null).convert(new 
JsonSchema(jsonSchema));
     Schema schema = new Schema.Parser().parse(avroSchema);
     Schema expected = new Schema.Parser().parse(loadAvroSchema(inputCase));
     assertEquals(expected, schema);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestProtoSchemaToAvroSchemaConverter.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestProtoSchemaToAvroSchemaConverter.java
index fed4bc5e0ed..39de96cbdad 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestProtoSchemaToAvroSchemaConverter.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestProtoSchemaToAvroSchemaConverter.java
@@ -21,6 +21,7 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig;
 import org.apache.hudi.utilities.test.proto.Parent;
 
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
 import org.apache.avro.Schema;
 import org.junit.jupiter.api.Test;
 
@@ -37,7 +38,8 @@ class TestProtoSchemaToAvroSchemaConverter {
     TypedProperties properties = new TypedProperties();
     
properties.setProperty(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key(),
 Parent.class.getName());
     Schema.Parser parser = new Schema.Parser();
-    String actual = new 
ProtoSchemaToAvroSchemaConverter(properties).convert(getProtoSchemaString());
+    ProtobufSchema protobufSchema = new ProtobufSchema(getProtoSchemaString());
+    String actual = new 
ProtoSchemaToAvroSchemaConverter(properties).convert(protobufSchema);
     Schema actualSchema = new Schema.Parser().parse(actual);
 
     Schema expectedSchema = 
parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/parent_schema_recursive_default_limit.avsc"));
diff --git a/pom.xml b/pom.xml
index 635c746c2f2..9f7349556a5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1810,6 +1810,22 @@
         <artifactId>kafka-protobuf-serializer</artifactId>
         <version>${confluent.version}</version>
       </dependency>
+      <dependency>
+        <groupId>io.confluent</groupId>
+        <artifactId>kafka-json-schema-serializer</artifactId>
+        <version>${confluent.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>com.kjetland</groupId>
+            <artifactId>mbknor-jackson-jsonschema_2.12</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      <dependency>
+        <groupId>com.kjetland</groupId>
+        
<artifactId>mbknor-jackson-jsonschema_${scala.binary.version}</artifactId>
+        <version>1.0.39</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>
   <repositories>

Reply via email to