This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ccb8f1d6945 [HUDI-8008] Handle proto schema references in returned
schema (#11660)
ccb8f1d6945 is described below
commit ccb8f1d69455e69568166c545ec61925cfcd53ce
Author: Tim Brown <[email protected]>
AuthorDate: Mon Jul 22 17:53:47 2024 -0700
[HUDI-8008] Handle proto schema references in returned schema (#11660)
---
hudi-utilities/pom.xml | 8 +
.../utilities/schema/SchemaRegistryProvider.java | 186 ++++++++++++---------
.../converter/JsonToAvroSchemaConverter.java | 12 +-
.../ProtoSchemaToAvroSchemaConverter.java | 6 +-
.../schema/TestSchemaRegistryProvider.java | 148 ++++++++--------
.../converter/TestJsonToAvroSchemaConverter.java | 3 +-
.../TestProtoSchemaToAvroSchemaConverter.java | 4 +-
pom.xml | 16 ++
8 files changed, 229 insertions(+), 154 deletions(-)
diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml
index 47c172b7791..8e2d8387332 100644
--- a/hudi-utilities/pom.xml
+++ b/hudi-utilities/pom.xml
@@ -379,6 +379,14 @@
<groupId>io.confluent</groupId>
<artifactId>kafka-protobuf-serializer</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.confluent</groupId>
+ <artifactId>kafka-json-schema-serializer</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.kjetland</groupId>
+
<artifactId>mbknor-jackson-jsonschema_${scala.binary.version}</artifactId>
+ </dependency>
<!-- Httpcomponents -->
<dependency>
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
index 1c2e9181fd7..bdc47225a6e 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
@@ -19,41 +19,50 @@
package org.apache.hudi.utilities.schema;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.function.SerializableFunctionUnchecked;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.Triple;
import org.apache.hudi.internal.schema.HoodieSchemaException;
import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig;
import org.apache.hudi.utilities.exception.HoodieSchemaFetchException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import io.confluent.kafka.schemaregistry.ParsedSchema;
+import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.RestService;
+import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
import org.apache.avro.Schema;
import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.apache.spark.api.java.JavaSparkContext;
-import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLSocketFactory;
import java.io.File;
import java.io.IOException;
-import java.io.InputStream;
-import java.net.HttpURLConnection;
-import java.net.URL;
+import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
+import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static
org.apache.hudi.common.util.ConfigUtils.checkRequiredConfigProperties;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
-import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
/**
* Obtains latest schema from the Confluent/Kafka schema-registry.
@@ -61,6 +70,8 @@ import static
org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
* https://github.com/confluentinc/schema-registry
*/
public class SchemaRegistryProvider extends SchemaProvider {
+ private static final Pattern URL_PATTERN =
Pattern.compile("(.*/)subjects/(.*)/versions/(.*)");
+ private static final String LATEST = "latest";
/**
* Configs supported.
@@ -82,34 +93,53 @@ public class SchemaRegistryProvider extends SchemaProvider {
public static final String SSL_KEY_PASSWORD_PROP =
"schema.registry.ssl.key.password";
}
- protected Schema cachedSourceSchema;
- protected Schema cachedTargetSchema;
+ private final Option<SchemaConverter> schemaConverter;
+ private final SerializableFunctionUnchecked<String, RestService>
restServiceProvider;
+ private final SerializableFunctionUnchecked<RestService,
SchemaRegistryClient> registryClientProvider;
- private final String srcSchemaRegistryUrl;
- private final String targetSchemaRegistryUrl;
+ public SchemaRegistryProvider(TypedProperties props, JavaSparkContext jssc) {
+ super(props, jssc);
+ checkRequiredConfigProperties(props,
Collections.singletonList(HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL));
+ if (config.containsKey(Config.SSL_KEYSTORE_LOCATION_PROP)
+ || config.containsKey(Config.SSL_TRUSTSTORE_LOCATION_PROP)) {
+ setUpSSLStores();
+ }
+ String schemaConverter = getStringWithAltKeys(config,
HoodieSchemaProviderConfig.SCHEMA_CONVERTER, true);
+ this.schemaConverter = !StringUtils.isNullOrEmpty(schemaConverter)
+ ? Option.of((SchemaConverter) ReflectionUtils.loadClass(
+ schemaConverter, new Class<?>[] {TypedProperties.class}, config))
+ : Option.empty();
+ this.restServiceProvider = RestService::new;
+ this.registryClientProvider = restService -> new
CachedSchemaRegistryClient(restService, 100,
+ Arrays.asList(new ProtobufSchemaProvider(), new JsonSchemaProvider(),
new AvroSchemaProvider()), null, null);
+ }
+
+ @VisibleForTesting
+ SchemaRegistryProvider(TypedProperties props, JavaSparkContext jssc,
+ Option<SchemaConverter> schemaConverter,
+ SerializableFunctionUnchecked<String, RestService>
restServiceProvider,
+ SerializableFunctionUnchecked<RestService,
SchemaRegistryClient> registryClientProvider) {
+ super(props, jssc);
+ checkRequiredConfigProperties(props,
Collections.singletonList(HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL));
+ this.schemaConverter = schemaConverter;
+ this.restServiceProvider = restServiceProvider;
+ this.registryClientProvider = registryClientProvider;
+ }
@FunctionalInterface
public interface SchemaConverter {
/**
* Convert original schema string to avro schema string.
*
- * @param schema original schema string (e.g., JSON)
+ * @param schema original schema returned from the registry
* @return avro schema string
*/
- String convert(String schema) throws IOException;
+ String convert(ParsedSchema schema) throws IOException;
}
public Schema parseSchemaFromRegistry(String registryUrl) {
String schema = fetchSchemaFromRegistry(registryUrl);
- try {
- String schemaConverter = getStringWithAltKeys(config,
HoodieSchemaProviderConfig.SCHEMA_CONVERTER, true);
- SchemaConverter converter = !StringUtils.isNullOrEmpty(schemaConverter)
- ? ReflectionUtils.loadClass(schemaConverter)
- : s -> s;
- return new Schema.Parser().parse(converter.convert(schema));
- } catch (Exception e) {
- throw new HoodieSchemaException("Failed to parse schema from registry: "
+ schema, e);
- }
+ return new Schema.Parser().parse(schema);
}
/**
@@ -123,55 +153,73 @@ public class SchemaRegistryProvider extends
SchemaProvider {
*/
public String fetchSchemaFromRegistry(String registryUrl) {
try {
- HttpURLConnection connection;
Matcher matcher = Pattern.compile("://(.*?)@").matcher(registryUrl);
+ Triple<String, String, String> registryInfo;
+ String creds = null;
if (matcher.find()) {
- String creds = matcher.group(1);
+ creds = matcher.group(1);
String urlWithoutCreds = registryUrl.replace(creds + "@", "");
- connection = getConnection(urlWithoutCreds);
- setAuthorizationHeader(matcher.group(1), connection);
+ registryInfo = getUrlSubjectAndVersion(urlWithoutCreds);
} else {
- connection = getConnection(registryUrl);
+ registryInfo = getUrlSubjectAndVersion(registryUrl);
+ }
+ String url = registryInfo.getLeft();
+ RestService restService = getRestService(url);
+ if (creds != null) {
+ setAuthorizationHeader(creds, restService);
+ }
+ String subject = registryInfo.getMiddle();
+ String version = registryInfo.getRight();
+ SchemaRegistryClient registryClient =
registryClientProvider.apply(restService);
+ SchemaMetadata schemaMetadata = version.equals(LATEST) ?
registryClient.getLatestSchemaMetadata(subject) :
registryClient.getSchemaMetadata(subject, Integer.parseInt(version));
+ ParsedSchema parsedSchema =
registryClient.parseSchema(schemaMetadata.getSchemaType(),
schemaMetadata.getSchema(), schemaMetadata.getReferences())
+ .orElseThrow(() -> new HoodieSchemaException("Failed to parse schema
from registry"));
+ if (schemaConverter.isPresent()) {
+ return schemaConverter.get().convert(parsedSchema);
+ } else {
+ return parsedSchema.canonicalString();
}
- ObjectMapper mapper = new ObjectMapper();
- JsonNode node = mapper.readTree(getStream(connection));
- return node.get("schema").asText();
} catch (Exception e) {
throw new HoodieSchemaFetchException("Failed to fetch schema from
registry", e);
}
}
- private SSLSocketFactory sslSocketFactory;
-
- protected HttpURLConnection getConnection(String url) throws IOException {
- URL registry = new URL(url);
- if (sslSocketFactory != null) {
- // we cannot cast to HttpsURLConnection if url is http so only cast when
sslSocketFactory is set
- HttpsURLConnection connection = (HttpsURLConnection)
registry.openConnection();
- connection.setSSLSocketFactory(sslSocketFactory);
- return connection;
+ private Triple<String, String, String> getUrlSubjectAndVersion(String
registryUrl) {
+ // url may be list of urls
+ String[] splitRegistryUrls = registryUrl.split(",");
+ String subjectName = null;
+ String version = null;
+ List<String> urls = new ArrayList<>(splitRegistryUrls.length);
+ // url will end with /subjects/{subject}/versions/{version}
+ for (String url : splitRegistryUrls) {
+ Matcher matcher = URL_PATTERN.matcher(url);
+ if (!matcher.matches()) {
+ throw new HoodieSchemaFetchException("Failed to extract subject name
and version from registry url");
+ }
+ urls.add(matcher.group(1));
+ subjectName = matcher.group(2);
+ version = matcher.group(3);
+ }
+ if (subjectName == null) {
+ throw new HoodieSchemaFetchException("Failed to extract subject name
from registry url");
}
- return (HttpURLConnection) registry.openConnection();
+ return Triple.of(String.join(",", urls), subjectName, version);
}
- protected void setAuthorizationHeader(String creds, HttpURLConnection
connection) {
- String encodedAuth =
Base64.getEncoder().encodeToString(getUTF8Bytes(creds));
- connection.setRequestProperty("Authorization", "Basic " + encodedAuth);
- }
+ private SSLSocketFactory sslSocketFactory;
- protected InputStream getStream(HttpURLConnection connection) throws
IOException {
- return connection.getInputStream();
+ protected RestService getRestService(String url) {
+ RestService restService = restServiceProvider.apply(url);
+ if (sslSocketFactory != null) {
+ restService.setSslSocketFactory(sslSocketFactory);
+ return restService;
+ }
+ return restService;
}
- public SchemaRegistryProvider(TypedProperties props, JavaSparkContext jssc) {
- super(props, jssc);
- checkRequiredConfigProperties(props,
Collections.singletonList(HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL));
- this.srcSchemaRegistryUrl = getStringWithAltKeys(config,
HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL);
- this.targetSchemaRegistryUrl = getStringWithAltKeys(config,
HoodieSchemaProviderConfig.TARGET_SCHEMA_REGISTRY_URL, srcSchemaRegistryUrl);
- if (config.containsKey(Config.SSL_KEYSTORE_LOCATION_PROP)
- || config.containsKey(Config.SSL_TRUSTSTORE_LOCATION_PROP)) {
- setUpSSLStores();
- }
+ protected void setAuthorizationHeader(String creds, RestService restService)
{
+ String encodedAuth =
Base64.getEncoder().encodeToString(creds.getBytes(StandardCharsets.UTF_8));
+ restService.setHttpHeaders(Collections.singletonMap("Authorization",
"Basic " + encodedAuth));
}
private void setUpSSLStores() {
@@ -199,42 +247,30 @@ public class SchemaRegistryProvider extends
SchemaProvider {
@Override
public Schema getSourceSchema() {
+ String registryUrl = getStringWithAltKeys(config,
HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL);
try {
- if (cachedSourceSchema == null) {
- cachedSourceSchema =
parseSchemaFromRegistry(this.srcSchemaRegistryUrl);
- }
- return cachedSourceSchema;
+ return parseSchemaFromRegistry(registryUrl);
} catch (Exception e) {
throw new HoodieSchemaFetchException(String.format(
"Error reading source schema from registry. Please check %s is
configured correctly. Truncated URL: %s",
Config.SRC_SCHEMA_REGISTRY_URL_PROP,
- StringUtils.truncate(srcSchemaRegistryUrl, 10, 10)), e);
+ StringUtils.truncate(registryUrl, 10, 10)), e);
}
}
@Override
public Schema getTargetSchema() {
+ String registryUrl = getStringWithAltKeys(config,
HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL);
+ String targetRegistryUrl =
+ getStringWithAltKeys(config,
HoodieSchemaProviderConfig.TARGET_SCHEMA_REGISTRY_URL, registryUrl);
try {
- if (cachedTargetSchema == null) {
- cachedTargetSchema =
parseSchemaFromRegistry(this.targetSchemaRegistryUrl);
- }
- return cachedTargetSchema;
+ return parseSchemaFromRegistry(targetRegistryUrl);
} catch (Exception e) {
throw new HoodieSchemaFetchException(String.format(
"Error reading target schema from registry. Please check %s is
configured correctly. If that is not configured then check %s. Truncated URL:
%s",
Config.SRC_SCHEMA_REGISTRY_URL_PROP,
Config.TARGET_SCHEMA_REGISTRY_URL_PROP,
- StringUtils.truncate(targetSchemaRegistryUrl, 10, 10)), e);
+ StringUtils.truncate(targetRegistryUrl, 10, 10)), e);
}
}
-
- // Per SyncOnce call, the cachedschema for the provider is dropped and
SourceSchema re-attained
- // Subsequent calls to getSourceSchema within the write batch should be
cached.
- @Override
- public void refresh() {
- cachedSourceSchema = null;
- cachedTargetSchema = null;
- getSourceSchema();
- getTargetSchema();
- }
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/JsonToAvroSchemaConverter.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/JsonToAvroSchemaConverter.java
index 9f892ab8f0e..cb9a4eb63d3 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/JsonToAvroSchemaConverter.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/JsonToAvroSchemaConverter.java
@@ -19,6 +19,7 @@
package org.apache.hudi.utilities.schema.converter;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.JsonUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
@@ -29,6 +30,8 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
+import io.confluent.kafka.schemaregistry.ParsedSchema;
+import io.confluent.kafka.schemaregistry.json.JsonSchema;
import java.io.IOException;
import java.net.URI;
@@ -58,9 +61,14 @@ public class JsonToAvroSchemaConverter implements
SchemaRegistryProvider.SchemaC
}).collect(Collectors.collectingAndThen(Collectors.toMap(p -> p[0], p ->
p[1]), Collections::<String, String>unmodifiableMap));
private static final Pattern SYMBOL_REGEX =
Pattern.compile("^[A-Za-z_][A-Za-z0-9_]*$");
+ public JsonToAvroSchemaConverter(TypedProperties properties) {
+ // properties unused in this converter
+ }
+
@Override
- public String convert(String jsonSchema) throws IOException {
- JsonNode jsonNode = MAPPER.readTree(jsonSchema);
+ public String convert(ParsedSchema parsedSchema) throws IOException {
+ JsonSchema jsonSchema = (JsonSchema) parsedSchema;
+ JsonNode jsonNode = MAPPER.readTree(jsonSchema.canonicalString());
ObjectNode avroRecord = MAPPER.createObjectNode()
.put("type", "record")
.put("name", getAvroSchemaRecordName(jsonNode))
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/ProtoSchemaToAvroSchemaConverter.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/ProtoSchemaToAvroSchemaConverter.java
index 78ef25e9a04..2845c3fc1f9 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/ProtoSchemaToAvroSchemaConverter.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/ProtoSchemaToAvroSchemaConverter.java
@@ -21,6 +21,7 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
import org.apache.hudi.utilities.sources.helpers.ProtoConversionUtil;
+import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import java.io.IOException;
@@ -35,9 +36,8 @@ public class ProtoSchemaToAvroSchemaConverter implements
SchemaRegistryProvider.
this.schemaConfig =
ProtoConversionUtil.SchemaConfig.fromProperties(config);
}
- @Override
- public String convert(String schema) throws IOException {
- ProtobufSchema protobufSchema = new ProtobufSchema(schema);
+ public String convert(ParsedSchema schema) throws IOException {
+ ProtobufSchema protobufSchema = (ProtobufSchema) schema;
return
ProtoConversionUtil.getAvroSchemaForMessageDescriptor(protobufSchema.toDescriptor(),
schemaConfig).toString();
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java
index 88f67723c85..35272a33906 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java
@@ -19,33 +19,30 @@
package org.apache.hudi.utilities.schema;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.function.SerializableFunctionUnchecked;
+import org.apache.hudi.common.util.Option;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.fasterxml.jackson.databind.node.TextNode;
+import io.confluent.kafka.schemaregistry.ParsedSchema;
+import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.RestService;
import org.apache.avro.Schema;
import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.HttpURLConnection;
+import java.util.Base64;
+import java.util.Collections;
-import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.times;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
class TestSchemaRegistryProvider {
private static final String BASIC_AUTH = "foo:bar";
-
- private static final String REGISTRY_RESPONSE =
"{\"schema\":\"{\\\"type\\\": \\\"record\\\", \\\"namespace\\\":
\\\"example\\\", "
- + "\\\"name\\\": \\\"FullName\\\",\\\"fields\\\": [{ \\\"name\\\":
\\\"first\\\", \\\"type\\\": "
- + "\\\"string\\\" }]}\"}";
private static final String RAW_SCHEMA = "{\"type\": \"record\",
\"namespace\": \"example\", "
+ "\"name\": \"FullName\",\"fields\": [{ \"name\": \"first\", \"type\": "
+ "\"string\" }]}";
@@ -64,93 +61,100 @@ class TestSchemaRegistryProvider {
private static TypedProperties getProps() {
return new TypedProperties() {
{
- put("hoodie.streamer.schemaprovider.registry.baseUrl", "http://" +
BASIC_AUTH + "@localhost");
- put("hoodie.streamer.schemaprovider.registry.urlSuffix", "-value");
- put("hoodie.streamer.schemaprovider.registry.url",
"http://foo:bar@localhost");
- put("hoodie.streamer.source.kafka.topic", "foo");
+ put("hoodie.deltastreamer.schemaprovider.registry.baseUrl", "http://"
+ BASIC_AUTH + "@localhost");
+ put("hoodie.deltastreamer.schemaprovider.registry.urlSuffix",
"-value");
+ put("hoodie.deltastreamer.schemaprovider.registry.url",
"http://foo:bar@localhost/subjects/test/versions/latest");
+ put("hoodie.deltastreamer.source.kafka.topic", "foo");
}
};
}
- private static SchemaRegistryProvider getUnderTest(TypedProperties props)
throws IOException {
- InputStream is = new ByteArrayInputStream(getUTF8Bytes(REGISTRY_RESPONSE));
- SchemaRegistryProvider spyUnderTest = Mockito.spy(new
SchemaRegistryProvider(props, null));
- Mockito.doReturn(is).when(spyUnderTest).getStream(Mockito.any());
- return spyUnderTest;
+ private final SchemaRegistryProvider.SchemaConverter mockSchemaConverter =
mock(SchemaRegistryProvider.SchemaConverter.class);
+ private final RestService mockRestService = mock(RestService.class);
+ private final SchemaRegistryClient mockRegistryClient =
mock(SchemaRegistryClient.class);
+
+ private SchemaRegistryProvider getUnderTest(TypedProperties props, int
version, boolean useConverter) throws Exception {
+ SerializableFunctionUnchecked<String, RestService> mockRestServiceFactory
= mock(SerializableFunctionUnchecked.class);
+
when(mockRestServiceFactory.apply("http://localhost/")).thenReturn(mockRestService);
+ SerializableFunctionUnchecked<RestService, SchemaRegistryClient>
mockRegistryClientFactory = mock(SerializableFunctionUnchecked.class);
+
when(mockRegistryClientFactory.apply(mockRestService)).thenReturn(mockRegistryClient);
+ SchemaRegistryProvider underTest = new SchemaRegistryProvider(props, null,
useConverter ? Option.of(mockSchemaConverter) : Option.empty(),
+ mockRestServiceFactory, mockRegistryClientFactory);
+ SchemaMetadata metadata = new SchemaMetadata(1, 1, RAW_SCHEMA);
+ if (version == -1) {
+
when(mockRegistryClient.getLatestSchemaMetadata("test")).thenReturn(metadata);
+ } else {
+ when(mockRegistryClient.getSchemaMetadata("test",
version)).thenReturn(metadata);
+ }
+ ParsedSchema mockParsedSchema = mock(ParsedSchema.class);
+ when(mockRegistryClient.parseSchema("AVRO", RAW_SCHEMA,
Collections.emptyList())).thenReturn(java.util.Optional.of(mockParsedSchema));
+ if (useConverter) {
+
when(mockSchemaConverter.convert(mockParsedSchema)).thenReturn(CONVERTED_SCHEMA);
+ } else {
+ when(mockParsedSchema.canonicalString()).thenReturn(RAW_SCHEMA);
+ }
+ return underTest;
}
@Test
- public void testGetSourceSchemaShouldRequestSchemaWithCreds() throws
IOException {
- SchemaRegistryProvider spyUnderTest = getUnderTest(getProps());
- Schema actual = spyUnderTest.getSourceSchema();
+ public void testGetSourceSchemaShouldRequestSchemaWithCreds() throws
Exception {
+ SchemaRegistryProvider underTest = getUnderTest(getProps(), -1, true);
+ Schema actual = underTest.getSourceSchema();
assertNotNull(actual);
- assertEquals(getExpectedSchema(), actual);
- verify(spyUnderTest, times(1)).setAuthorizationHeader(eq(BASIC_AUTH),
- Mockito.any(HttpURLConnection.class));
+ assertEquals(getExpectedConvertedSchema(), actual);
+
verify(mockRestService).setHttpHeaders(Collections.singletonMap("Authorization",
"Basic " + Base64.getEncoder().encodeToString(BASIC_AUTH.getBytes())));
}
@Test
- public void testGetTargetSchemaShouldRequestSchemaWithCreds() throws
IOException {
- SchemaRegistryProvider spyUnderTest = getUnderTest(getProps());
- Schema actual = spyUnderTest.getTargetSchema();
+ public void testGetTargetSchemaShouldRequestSchemaWithCreds() throws
Exception {
+ SchemaRegistryProvider underTest = getUnderTest(getProps(), -1, true);
+ Schema actual = underTest.getTargetSchema();
assertNotNull(actual);
- assertEquals(getExpectedSchema(), actual);
- verify(spyUnderTest, times(1)).setAuthorizationHeader(eq(BASIC_AUTH),
- Mockito.any(HttpURLConnection.class));
+ assertEquals(getExpectedConvertedSchema(), actual);
+
verify(mockRestService).setHttpHeaders(Collections.singletonMap("Authorization",
"Basic " + Base64.getEncoder().encodeToString(BASIC_AUTH.getBytes())));
}
@Test
- public void testGetSourceSchemaShouldRequestSchemaWithoutCreds() throws
IOException {
+ public void testGetSourceSchemaShouldRequestSchemaWithoutCreds() throws
Exception {
TypedProperties props = getProps();
- props.put("hoodie.streamer.schemaprovider.registry.url",
"http://localhost");
- props.put("hoodie.streamer.schemaprovider.registry.schemaconverter",
DummySchemaConverter.class.getName());
- SchemaRegistryProvider spyUnderTest = getUnderTest(props);
- Schema actual = spyUnderTest.getSourceSchema();
+ props.put("hoodie.deltastreamer.schemaprovider.registry.url",
"http://localhost/subjects/test/versions/latest");
+ SchemaRegistryProvider underTest = getUnderTest(props, -1, true);
+ Schema actual = underTest.getSourceSchema();
assertNotNull(actual);
assertEquals(getExpectedConvertedSchema(), actual);
- verify(spyUnderTest, times(0)).setAuthorizationHeader(Mockito.any(),
Mockito.any());
+ verify(mockRestService, never()).setHttpHeaders(any());
}
@Test
- public void testGetTargetSchemaShouldRequestSchemaWithoutCreds() throws
IOException {
+ public void testGetTargetSchemaShouldRequestSchemaWithoutCreds() throws
Exception {
TypedProperties props = getProps();
- props.put("hoodie.streamer.schemaprovider.registry.url",
"http://localhost");
- props.put("hoodie.streamer.schemaprovider.registry.schemaconverter",
DummySchemaConverter.class.getName());
- SchemaRegistryProvider spyUnderTest = getUnderTest(props);
- Schema actual = spyUnderTest.getTargetSchema();
+ props.put("hoodie.deltastreamer.schemaprovider.registry.url",
"http://localhost/subjects/test/versions/latest");
+ SchemaRegistryProvider underTest = getUnderTest(props, -1, true);
+ Schema actual = underTest.getTargetSchema();
assertNotNull(actual);
assertEquals(getExpectedConvertedSchema(), actual);
- verify(spyUnderTest, times(0)).setAuthorizationHeader(Mockito.any(),
Mockito.any());
+ verify(mockRestService, never()).setHttpHeaders(any());
}
- public static class DummySchemaConverter implements
SchemaRegistryProvider.SchemaConverter {
-
- @Override
- public String convert(String schema) throws IOException {
- return ((ObjectNode) new ObjectMapper()
- .readTree(schema))
- .set("namespace", TextNode.valueOf("com.example.hoodie"))
- .toString();
- }
+ @Test
+ public void testGetTargetSchemaWithoutConverter() throws Exception {
+ TypedProperties props = getProps();
+ props.put("hoodie.deltastreamer.schemaprovider.registry.url",
"http://localhost/subjects/test/versions/latest");
+ SchemaRegistryProvider underTest = getUnderTest(props, -1, false);
+ Schema actual = underTest.getTargetSchema();
+ assertNotNull(actual);
+ assertEquals(getExpectedSchema(), actual);
+ verify(mockRestService, never()).setHttpHeaders(any());
}
- // The SR is checked when cachedSchema is empty, when not empty, the
cachedSchema is used.
@Test
- public void testGetSourceSchemaUsesCachedSchema() throws IOException {
+ public void testUrlWithSpecificSchemaVerson() throws Exception {
TypedProperties props = getProps();
- SchemaRegistryProvider spyUnderTest = getUnderTest(props);
-
- // Call when cachedSchema is empty
- Schema actual = spyUnderTest.getSourceSchema();
+ props.put("hoodie.deltastreamer.schemaprovider.registry.url",
"http://localhost/subjects/test/versions/3");
+ SchemaRegistryProvider underTest = getUnderTest(props, 3, false);
+ Schema actual = underTest.getTargetSchema();
assertNotNull(actual);
- verify(spyUnderTest, times(1)).parseSchemaFromRegistry(Mockito.any());
-
- assert spyUnderTest.cachedSourceSchema != null;
-
- Schema actualTwo = spyUnderTest.getSourceSchema();
-
- // cachedSchema should now be set, a subsequent call should not call
parseSchemaFromRegistry
- // Assuming this verify() has the scope of the whole test? so it should
still be 1 from previous call?
- verify(spyUnderTest, times(1)).parseSchemaFromRegistry(Mockito.any());
+ assertEquals(getExpectedSchema(), actual);
+ verify(mockRestService, never()).setHttpHeaders(any());
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestJsonToAvroSchemaConverter.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestJsonToAvroSchemaConverter.java
index 335ddeee616..dd44704fb5d 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestJsonToAvroSchemaConverter.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestJsonToAvroSchemaConverter.java
@@ -19,6 +19,7 @@
package org.apache.hudi.utilities.schema.converter;
+import io.confluent.kafka.schemaregistry.json.JsonSchema;
import org.apache.avro.Schema;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -43,7 +44,7 @@ class TestJsonToAvroSchemaConverter {
})
void testConvertJsonSchemaToAvroSchema(String inputCase) throws IOException {
String jsonSchema = loadJsonSchema(inputCase);
- String avroSchema = new JsonToAvroSchemaConverter().convert(jsonSchema);
+ String avroSchema = new JsonToAvroSchemaConverter(null).convert(new
JsonSchema(jsonSchema));
Schema schema = new Schema.Parser().parse(avroSchema);
Schema expected = new Schema.Parser().parse(loadAvroSchema(inputCase));
assertEquals(expected, schema);
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestProtoSchemaToAvroSchemaConverter.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestProtoSchemaToAvroSchemaConverter.java
index fed4bc5e0ed..39de96cbdad 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestProtoSchemaToAvroSchemaConverter.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestProtoSchemaToAvroSchemaConverter.java
@@ -21,6 +21,7 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig;
import org.apache.hudi.utilities.test.proto.Parent;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import org.apache.avro.Schema;
import org.junit.jupiter.api.Test;
@@ -37,7 +38,8 @@ class TestProtoSchemaToAvroSchemaConverter {
TypedProperties properties = new TypedProperties();
properties.setProperty(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key(),
Parent.class.getName());
Schema.Parser parser = new Schema.Parser();
- String actual = new
ProtoSchemaToAvroSchemaConverter(properties).convert(getProtoSchemaString());
+ ProtobufSchema protobufSchema = new ProtobufSchema(getProtoSchemaString());
+ String actual = new
ProtoSchemaToAvroSchemaConverter(properties).convert(protobufSchema);
Schema actualSchema = new Schema.Parser().parse(actual);
Schema expectedSchema =
parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/parent_schema_recursive_default_limit.avsc"));
diff --git a/pom.xml b/pom.xml
index 635c746c2f2..9f7349556a5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1810,6 +1810,22 @@
<artifactId>kafka-protobuf-serializer</artifactId>
<version>${confluent.version}</version>
</dependency>
+ <dependency>
+ <groupId>io.confluent</groupId>
+ <artifactId>kafka-json-schema-serializer</artifactId>
+ <version>${confluent.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.kjetland</groupId>
+ <artifactId>mbknor-jackson-jsonschema_2.12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.kjetland</groupId>
+
<artifactId>mbknor-jackson-jsonschema_${scala.binary.version}</artifactId>
+ <version>1.0.39</version>
+ </dependency>
</dependencies>
</dependencyManagement>
<repositories>