This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b359f76e8c5 [HUDI-9061] JSON to AVRO schema converter (#12864)
b359f76e8c5 is described below
commit b359f76e8c5f67bc64a5eb3fb7561aaf87aeba47
Author: Davis-Zhang-Onehouse
<[email protected]>
AuthorDate: Mon Feb 24 19:35:54 2025 -0800
[HUDI-9061] JSON to AVRO schema converter (#12864)
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../common/testutils/HoodieTestDataGenerator.java | 7 +-
hudi-kafka-connect/pom.xml | 5 +-
hudi-utilities/pom.xml | 17 +-
.../converter/JsonToAvroSchemaConverter.java | 464 ++++++++++++++++-----
.../converter/JsonToAvroSchemaConverterConfig.java | 51 +++
.../hudi/utilities/sources/JsonKafkaSource.java | 73 +++-
.../converter/TestJsonToAvroSchemaConverter.java | 58 ++-
.../utilities/sources/TestJsonKafkaSource.java | 124 +++++-
.../json/array-with-item-type-union/expected.json | 78 ++++
.../json/array-with-item-type-union/input.json | 93 +++++
.../json/complex-json-union-types/expected.json | 66 +++
.../json/complex-json-union-types/input.json | 42 ++
.../expected_no_stripping_quotes.json | 138 ++++++
.../expected_stripping_quotes.json | 138 ++++++
.../json/not-null-default-value-schema/input.json | 67 +++
.../json/schema-repeating-names/expected.json | 153 +++++++
.../json/schema-repeating-names/input.json | 124 ++++++
packaging/hudi-utilities-bundle/pom.xml | 1 +
pom.xml | 12 +
19 files changed, 1557 insertions(+), 154 deletions(-)
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index 6c1d526bd8e..4122ab452ca 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -147,7 +147,12 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
public static final String TRIP_NESTED_EXAMPLE_SCHEMA =
TRIP_SCHEMA_PREFIX + FARE_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX;
-
+ public static final String TRIP_ENCODED_DECIMAL_SCHEMA =
"{\"type\":\"record\",\"name\":\"tripUberRec\",\"fields\":["
+ +
"{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},"
+ +
"{\"name\":\"decfield\",\"type\":{\"type\":\"bytes\",\"name\":\"abc\",\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},"
+ +
"{\"name\":\"lowprecision\",\"type\":{\"type\":\"bytes\",\"name\":\"def\",\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}},"
+ +
"{\"name\":\"highprecision\",\"type\":{\"type\":\"bytes\",\"name\":\"ghi\",\"logicalType\":\"decimal\",\"precision\":32,\"scale\":12}},"
+ +
"{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\":
\"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}";
public static final String TRIP_SCHEMA =
"{\"type\":\"record\",\"name\":\"tripUberRec\",\"fields\":["
+
"{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},"
+
"{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\":
\"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}";
diff --git a/hudi-kafka-connect/pom.xml b/hudi-kafka-connect/pom.xml
index 68844d6a3e4..5ba03e20267 100644
--- a/hudi-kafka-connect/pom.xml
+++ b/hudi-kafka-connect/pom.xml
@@ -30,7 +30,6 @@
<properties>
<main.basedir>${project.parent.basedir}</main.basedir>
- <connect.api.version>2.5.0</connect.api.version>
</properties>
<build>
@@ -81,13 +80,13 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
- <version>${connect.api.version}</version>
+ <version>${kafka.connect.api.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-json</artifactId>
- <version>${connect.api.version}</version>
+ <version>${kafka.connect.api.version}</version>
<scope>provided</scope>
</dependency>
diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml
index 3494605092c..669385e3901 100644
--- a/hudi-utilities/pom.xml
+++ b/hudi-utilities/pom.xml
@@ -197,6 +197,10 @@
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>connect-api</artifactId>
+ </dependency>
<!-- Pulsar -->
<dependency>
@@ -369,7 +373,6 @@
<dependency>
<groupId>io.confluent</groupId>
<artifactId>common-utils</artifactId>
- <version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
@@ -379,13 +382,19 @@
<groupId>io.confluent</groupId>
<artifactId>kafka-protobuf-serializer</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.kjetland</groupId>
+
<artifactId>mbknor-jackson-jsonschema_${scala.binary.version}</artifactId>
+ </dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-json-schema-serializer</artifactId>
</dependency>
+
<dependency>
- <groupId>com.kjetland</groupId>
-
<artifactId>mbknor-jackson-jsonschema_${scala.binary.version}</artifactId>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-streams-test-utils</artifactId>
+ <scope>test</scope>
</dependency>
<!-- Httpcomponents -->
@@ -507,7 +516,7 @@
<artifactId>sqs</artifactId>
<version>${aws.sdk.version}</version>
</dependency>
-
+
<!-- Hive - Test -->
<dependency>
<groupId>${hive.groupid}</groupId>
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/JsonToAvroSchemaConverter.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/JsonToAvroSchemaConverter.java
index cb9a4eb63d3..46d6a2567b5 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/JsonToAvroSchemaConverter.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/JsonToAvroSchemaConverter.java
@@ -22,16 +22,22 @@ package org.apache.hudi.utilities.schema.converter;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.JsonUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.BooleanNode;
+import com.fasterxml.jackson.databind.node.DoubleNode;
+import com.fasterxml.jackson.databind.node.LongNode;
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.json.JsonSchema;
+import org.apache.kafka.connect.data.Decimal;
import java.io.IOException;
import java.net.URI;
@@ -42,171 +48,400 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static org.apache.hudi.common.util.StringUtils.getSuffixBy;
-import static org.apache.hudi.common.util.StringUtils.removeSuffixBy;
+import static
org.apache.hudi.utilities.schema.converter.JsonToAvroSchemaConverterConfig.CONVERT_DEFAULT_VALUE_TYPE;
+import static
org.apache.hudi.utilities.schema.converter.JsonToAvroSchemaConverterConfig.STRIP_DEFAULT_VALUE_QUOTES;
+/**
+ * Converts a JSON schema to an Avro schema.
+ */
public class JsonToAvroSchemaConverter implements
SchemaRegistryProvider.SchemaConverter {
+ private static final String CONNECT_PARAMETERS = "connect.parameters";
+ private static final String CONNECT_DECIMAL_PRECISION =
"connect.decimal.precision";
+ private static final String CONNECT_TYPE = "connect.type";
private static final ObjectMapper MAPPER = JsonUtils.getObjectMapper();
- private static final Map<String, String> JSON_TO_AVRO_TYPE = Stream.of(new
String[][] {
- {"string", "string"},
- {"null", "null"},
- {"boolean", "boolean"},
- {"integer", "long"},
- {"number", "double"}
- }).collect(Collectors.collectingAndThen(Collectors.toMap(p -> p[0], p ->
p[1]), Collections::<String, String>unmodifiableMap));
+ private static final Map<String, String> JSON_TO_AVRO_TYPE =
+ Stream.of(
+ new String[][] {
+ {"string", "string"},
+ {"null", "null"},
+ {"boolean", "boolean"},
+ {"integer", "long"},
+ {"number", "double"}
+ })
+ .collect(
+ Collectors.collectingAndThen(
+ Collectors.toMap(p -> p[0], p -> p[1]),
+ Collections::<String, String>unmodifiableMap));
private static final Pattern SYMBOL_REGEX =
Pattern.compile("^[A-Za-z_][A-Za-z0-9_]*$");
+ private static final String LONG_TYPE = "long";
+ private static final String DOUBLE_TYPE = "double";
+ private static final String BOOLEAN_TYPE = "boolean";
+ private static final String STRING_TYPE = "string";
+ private static final String DEFAULT_FIELD = "default";
+ private static final String TYPE = "type";
+ private static final String TITLE = "title";
+ private static final String RECORD = "record";
+ private static final String ARRAY = "array";
+
+ private final boolean convertDefaultValueType;
+ private final boolean stripDefaultValueQuotes;
- public JsonToAvroSchemaConverter(TypedProperties properties) {
- // properties unused in this converter
+ public JsonToAvroSchemaConverter(TypedProperties config) {
+ this.convertDefaultValueType = ConfigUtils.getBooleanWithAltKeys(config,
CONVERT_DEFAULT_VALUE_TYPE);
+ this.stripDefaultValueQuotes = ConfigUtils.getBooleanWithAltKeys(config,
STRIP_DEFAULT_VALUE_QUOTES);
}
- @Override
public String convert(ParsedSchema parsedSchema) throws IOException {
JsonSchema jsonSchema = (JsonSchema) parsedSchema;
JsonNode jsonNode = MAPPER.readTree(jsonSchema.canonicalString());
- ObjectNode avroRecord = MAPPER.createObjectNode()
- .put("type", "record")
- .put("name", getAvroSchemaRecordName(jsonNode))
- .put("doc", getAvroDoc(jsonNode));
+ ObjectNode avroRecord = convertJsonNodeToAvroNode(jsonNode, new
AtomicInteger(1), new HashSet<>());
+ return avroRecord.toString();
+ }
+
+ private ObjectNode convertJsonNodeToAvroNode(JsonNode jsonNode,
AtomicInteger schemaCounter, Set<String> seenNames) {
+ ObjectNode avroRecord =
+ MAPPER
+ .createObjectNode()
+ .put(TYPE, RECORD)
+ .put("name", getAvroSchemaRecordName(jsonNode))
+ .put("doc", getAvroDoc(jsonNode));
Option<String> namespace = getAvroSchemaRecordNamespace(jsonNode);
if (namespace.isPresent()) {
avroRecord.put("namespace", namespace.get());
}
if (jsonNode.hasNonNull("properties")) {
- avroRecord.set("fields", convertProperties(jsonNode.get("properties"),
getRequired(jsonNode)));
+ avroRecord.set(
+ "fields", convertProperties(jsonNode.get("properties"),
getRequired(jsonNode), schemaCounter, seenNames));
} else {
avroRecord.set("fields", MAPPER.createArrayNode());
}
- return avroRecord.toString();
+ return avroRecord;
}
- private static ArrayNode convertProperties(JsonNode jsonProperties,
Set<String> required) {
- List<JsonNode> avroFields = new ArrayList<>(jsonProperties.size());
- jsonProperties.fieldNames().forEachRemaining(name ->
- avroFields.add(tryConvertNestedProperty(name, jsonProperties.get(name))
- .or(() -> tryConvertArrayProperty(name, jsonProperties.get(name)))
- .or(() -> tryConvertEnumProperty(name, jsonProperties.get(name)))
- .orElseGet(() -> convertProperty(name, jsonProperties.get(name),
required.contains(name)))));
+ private ArrayNode convertProperties(JsonNode jsonProperties, Set<String>
required, AtomicInteger schemaCounter,
+ Set<String> seenNames) {
+ List<JsonNode> avroFields = new ArrayList<>();
+ jsonProperties
+ .fieldNames()
+ .forEachRemaining(
+ name -> {
+ avroFields.add(
+ tryConvertNestedProperty(name, jsonProperties.get(name),
schemaCounter, seenNames)
+ .or(() -> tryConvertArrayProperty(name,
jsonProperties.get(name), schemaCounter, seenNames))
+ .or(() -> tryConvertEnumProperty(name,
jsonProperties.get(name), schemaCounter, seenNames))
+ .orElseGet(() ->
+ convertProperty(
+ name, jsonProperties.get(name),
required.contains(name), schemaCounter, seenNames, false)));
+ });
+
return MAPPER.createArrayNode().addAll(avroFields);
}
- private static Option<JsonNode> tryConvertNestedProperty(String name,
JsonNode jsonProperty) {
+ private Option<JsonNode> tryConvertNestedProperty(String name, JsonNode
jsonProperty,
+ AtomicInteger
schemaCounter, Set<String> seenNames) {
if (!isJsonNestedType(jsonProperty)) {
return Option.empty();
}
- JsonNode avroNode = MAPPER.createObjectNode()
- .put("name", sanitizeAsAvroName(name))
- .put("doc", getAvroDoc(jsonProperty))
- .set("type", MAPPER.createObjectNode()
- .put("type", "record")
- .put("name", getAvroTypeName(jsonProperty, name))
- .set("fields", convertProperties(jsonProperty.get("properties"),
getRequired(jsonProperty))));
+ JsonNode avroNode =
+ MAPPER
+ .createObjectNode()
+ .put("name", sanitizeAsAvroName(name))
+ .put("doc", getAvroDoc(jsonProperty))
+ .set(
+ TYPE,
+ MAPPER
+ .createObjectNode()
+ .put(TYPE, RECORD)
+ .put("name", getAvroTypeName(jsonProperty, name,
schemaCounter, seenNames))
+ .set(
+ "fields",
+ convertProperties(
+ jsonProperty.get("properties"),
getRequired(jsonProperty), schemaCounter, seenNames)));
return Option.of(avroNode);
}
- private static Option<JsonNode> tryConvertArrayProperty(String name,
JsonNode jsonProperty) {
+ private Option<JsonNode> tryConvertArrayProperty(String name, JsonNode
jsonProperty,
+ AtomicInteger
schemaCounter, Set<String> seenNames) {
if (!isJsonArrayType(jsonProperty)) {
return Option.empty();
}
JsonNode avroItems;
JsonNode jsonItems = jsonProperty.get("items");
- String itemName = getAvroTypeName(jsonItems, name) + "_child";
+ String itemName = getAvroTypeName(jsonItems, name, schemaCounter,
seenNames) + "_child";
if (isJsonNestedType(jsonItems)) {
- avroItems = MAPPER.createObjectNode()
- .put("type", "record")
- .put("name", itemName)
- .set("fields", convertProperties(jsonItems.get("properties"),
getRequired(jsonItems)));
+ avroItems =
+ MAPPER
+ .createObjectNode()
+ .put(TYPE, RECORD)
+ .put("name", itemName)
+ .set(
+ "fields", convertProperties(jsonItems.get("properties"),
getRequired(jsonItems),
+ schemaCounter, seenNames));
} else {
- avroItems = convertProperty(itemName, jsonItems, true);
+ avroItems = convertProperty(itemName, jsonItems, true, schemaCounter,
seenNames, true);
}
- JsonNode avroNode = MAPPER.createObjectNode()
- .put("name", sanitizeAsAvroName(name))
- .put("doc", getAvroDoc(jsonProperty))
- .set("type", MAPPER.createObjectNode()
- .put("type", "array")
- .set("items", avroItems));
+ JsonNode avroNode =
+ MAPPER
+ .createObjectNode()
+ .put("name", sanitizeAsAvroName(name))
+ .put("doc", getAvroDoc(jsonProperty))
+ .set(TYPE, MAPPER.createObjectNode().put(TYPE, ARRAY).set("items",
avroItems));
return Option.of(avroNode);
}
- private static Option<JsonNode> tryConvertEnumProperty(String name, JsonNode
jsonProperty) {
+ private static Option<JsonNode> tryConvertEnumProperty(String name, JsonNode
jsonProperty,
+ AtomicInteger
schemaCounter, Set<String> seenNames) {
if (!isJsonEnumType(jsonProperty)) {
return Option.empty();
}
List<String> enums = new ArrayList<>();
jsonProperty.get("enum").iterator().forEachRemaining(e ->
enums.add(e.asText()));
- JsonNode avroType = enums.stream().allMatch(e ->
SYMBOL_REGEX.matcher(e).matches())
- ? MAPPER.createObjectNode()
- .put("type", "enum")
- .put("name", getAvroTypeName(jsonProperty, name))
- .set("symbols", jsonProperty.get("enum"))
- : TextNode.valueOf("string");
- JsonNode avroNode = MAPPER.createObjectNode()
- .put("name", sanitizeAsAvroName(name))
- .put("doc", getAvroDoc(jsonProperty))
- .set("type", avroType);
+ JsonNode avroType =
+ enums.stream().allMatch(e -> SYMBOL_REGEX.matcher(e).matches())
+ ? MAPPER
+ .createObjectNode()
+ .put(TYPE, "enum")
+ .put("name", getAvroTypeName(jsonProperty, name, schemaCounter,
seenNames))
+ .set("symbols", jsonProperty.get("enum"))
+ : TextNode.valueOf("string");
+ JsonNode avroNode =
+ MAPPER
+ .createObjectNode()
+ .put("name", sanitizeAsAvroName(name))
+ .put("doc", getAvroDoc(jsonProperty))
+ .set(TYPE, avroType);
return Option.of(avroNode);
}
- private static JsonNode convertProperty(String name, JsonNode jsonProperty,
boolean isRequired) {
- ObjectNode avroNode = MAPPER.createObjectNode()
- .put("name", sanitizeAsAvroName(name))
- .put("doc", getAvroDoc(jsonProperty));
+ private JsonNode convertProperty(String name, JsonNode jsonProperty, boolean
isRequired,
+ AtomicInteger schemaCounter, Set<String>
seenNames,
+ boolean isArrayType) {
+ ObjectNode avroNode =
+ MAPPER
+ .createObjectNode()
+ .put("name", sanitizeAsAvroName(name))
+ .put("doc", getAvroDoc(jsonProperty));
- // infer `default`
boolean nullable = !isRequired;
- if (jsonProperty.has("default")) {
- avroNode.set("default", jsonProperty.get("default"));
- } else if (nullable) {
- avroNode.set("default", NullNode.getInstance());
- }
// infer `types`
- Set<String> avroTypeSet = new HashSet<>();
+ Set<String> avroSimpleTypeSet = new HashSet<>();
+ List<JsonNode> defaultValueList = new ArrayList<>();
+ List<JsonNode> avroComplexTypeSet = new ArrayList<>();
+ boolean unionType = false;
if (jsonProperty.hasNonNull("oneOf") || jsonProperty.hasNonNull("allOf")) {
+ unionType = true;
// prefer to look for `oneOf` and `allOf` for types
Option<JsonNode> oneOfTypes =
Option.ofNullable(jsonProperty.get("oneOf"));
- if (oneOfTypes.isPresent()) {
- oneOfTypes.get().elements().forEachRemaining(e ->
avroTypeSet.add(JSON_TO_AVRO_TYPE.get(e.get("type").asText())));
- }
+ Pair<Pair<Set<String>, List<JsonNode>>, List<JsonNode>> allOneOfTypes =
+ getAllTypesFromOneOfAllOfTypes(oneOfTypes, name, schemaCounter,
seenNames);
+ avroSimpleTypeSet.addAll(allOneOfTypes.getLeft().getLeft());
+ defaultValueList.addAll(allOneOfTypes.getLeft().getRight());
+ avroComplexTypeSet.addAll(allOneOfTypes.getRight());
+
Option<JsonNode> allOfTypes =
Option.ofNullable(jsonProperty.get("allOf"));
- if (allOfTypes.isPresent()) {
- allOfTypes.get().elements().forEachRemaining(e ->
avroTypeSet.add(JSON_TO_AVRO_TYPE.get(e.get("type").asText())));
- }
- } else if (jsonProperty.has("type")) {
- // fall back to `type` parameter
- JsonNode jsonType = jsonProperty.get("type");
- if (jsonType.isArray()) {
- jsonType.elements().forEachRemaining(e ->
avroTypeSet.add(JSON_TO_AVRO_TYPE.get(e.asText())));
- } else {
- avroTypeSet.add(JSON_TO_AVRO_TYPE.get(jsonType.asText()));
+ Pair<Pair<Set<String>, List<JsonNode>>, List<JsonNode>> allAllOfTypes =
+ getAllTypesFromOneOfAllOfTypes(allOfTypes, name, schemaCounter,
seenNames);
+ avroSimpleTypeSet.addAll(allAllOfTypes.getLeft().getLeft());
+ avroComplexTypeSet.addAll(allAllOfTypes.getRight());
+ } else if (jsonProperty.has(TYPE)) {
+ Pair<Set<String>, Option<JsonNode>> types =
getPrimitiveTypeFromType(jsonProperty);
+ avroSimpleTypeSet.addAll(types.getLeft());
+ if (types.getRight().isPresent()) {
+ avroComplexTypeSet.add(types.getRight().get());
}
}
List<String> avroTypes = new ArrayList<>();
- if (nullable || avroTypeSet.contains("null")) {
+ if (nullable || avroSimpleTypeSet.contains("null")) {
avroTypes.add("null");
}
- avroTypeSet.remove("null");
- avroTypes.addAll(avroTypeSet);
- avroNode.set("type", avroTypes.size() > 1
- ?
MAPPER.createArrayNode().addAll(avroTypes.stream().map(TextNode::valueOf).collect(Collectors.toList()))
- : TextNode.valueOf(avroTypes.get(0)));
- return avroNode;
+ avroSimpleTypeSet.remove("null");
+ avroTypes.addAll(avroSimpleTypeSet);
+
+ // infer `default`
+ // Either there is explicit default value defined for a single simple type,
+ // e.g., "field":{"type":"integer","default":50},
+ // or we take the default value from the single simple type in "oneOf"
type,
+ // e.g., "oneOf":[{"type":"null"},{"type":"integer","default":"60"}],
+ // in which the default value is 60.
+ JsonNode defaultNode = jsonProperty.has(DEFAULT_FIELD) ?
jsonProperty.get(DEFAULT_FIELD)
+ : (avroComplexTypeSet.isEmpty() && avroSimpleTypeSet.size() == 1 &&
!defaultValueList.isEmpty())
+ ? defaultValueList.get(0) : NullNode.getInstance();
+ if (jsonProperty.has(DEFAULT_FIELD)
+ || (avroComplexTypeSet.isEmpty() && avroSimpleTypeSet.size() == 1 &&
!defaultValueList.isEmpty())) {
+ if (this.convertDefaultValueType && avroComplexTypeSet.isEmpty() &&
avroSimpleTypeSet.size() == 1) {
+ String defaultType = avroSimpleTypeSet.stream().findFirst().get();
+ switch (defaultType) {
+ case LONG_TYPE:
+ defaultNode =
LongNode.valueOf(Long.parseLong(defaultNode.asText()));
+ break;
+ case DOUBLE_TYPE:
+ defaultNode =
DoubleNode.valueOf(Double.parseDouble(defaultNode.asText()));
+ break;
+ case BOOLEAN_TYPE:
+ defaultNode =
BooleanNode.valueOf(Boolean.parseBoolean(defaultNode.asText()));
+ break;
+ case STRING_TYPE:
+ if (this.stripDefaultValueQuotes) {
+ defaultNode =
TextNode.valueOf(stripQuotesFromStringValue(defaultNode.asText()));
+ }
+ break;
+ default:
+ }
+ }
+ avroNode.set(DEFAULT_FIELD, defaultNode);
+ } else if (nullable) {
+ avroNode.set(DEFAULT_FIELD, defaultNode);
+ }
+ avroTypes = arrangeTypeOrderOnDefault(avroTypes, defaultNode);
+ List<JsonNode> allTypes =
+ avroTypes.stream().map(TextNode::valueOf).collect(Collectors.toList());
+ allTypes.addAll(avroComplexTypeSet);
+ ArrayNode typeValue = MAPPER.createArrayNode().addAll(allTypes);
+ avroNode.set(
+ TYPE, allTypes.size() > 1 ? typeValue : allTypes.get(0));
+ return unionType && isArrayType ? typeValue : avroNode;
+ }
+
+ protected Pair<Set<String>, Option<JsonNode>>
getPrimitiveTypeFromType(JsonNode jsonNode) {
+ if (!jsonNode.has(TYPE)) {
+ return Pair.of(Collections.emptySet(), Option.empty());
+ }
+
+ JsonNode typeNode = jsonNode.get(TYPE);
+ if (!typeNode.isArray() && typeNode.asText().equals("number")
+ && jsonNode.has(TITLE) &&
Decimal.LOGICAL_NAME.equals(jsonNode.get(TITLE).asText())) {
+ return convertKafkaConnectDecimal(jsonNode);
+ }
+
+ Set<String> avroSimpleTypeSet = new HashSet<>();
+ // fall back to `type` parameter
+ String jsonType = jsonNode.get(TYPE).asText();
+ if (!jsonType.equals("object") && !jsonType.equals(ARRAY)) {
+ avroSimpleTypeSet.add(JSON_TO_AVRO_TYPE.get(jsonType));
+ }
+ return Pair.of(avroSimpleTypeSet, Option.empty());
+ }
+
+ private static Pair<Set<String>, Option<JsonNode>>
convertKafkaConnectDecimal(JsonNode jsonNode) {
+ String precision;
+ String scale = "0";
+ if (jsonNode.has(CONNECT_PARAMETERS)) {
+ JsonNode parameters = jsonNode.get(CONNECT_PARAMETERS);
+ if (parameters.has(CONNECT_DECIMAL_PRECISION)) {
+ precision = parameters.get(CONNECT_DECIMAL_PRECISION).asText();
+ } else {
+ throw new IllegalArgumentException("Missing " +
CONNECT_DECIMAL_PRECISION + " from properties in decimal type");
+ }
+ if (parameters.has(Decimal.SCALE_FIELD)) {
+ scale = parameters.get(Decimal.SCALE_FIELD).asText();
+ }
+ } else {
+ throw new IllegalArgumentException("Missing " + CONNECT_PARAMETERS + "
from decimal type in json schema");
+ }
+
+ if (jsonNode.has(CONNECT_TYPE)) {
+ String connectType = jsonNode.get(CONNECT_TYPE).asText();
+ if (!connectType.equals("bytes")) {
+ throw new IllegalArgumentException(connectType + " is not a supported
type for decimal");
+ }
+ }
+
+ JsonNode avroNode =
+ MAPPER
+ .createObjectNode()
+ .put(TYPE, "bytes")
+ .put("logicalType", "decimal")
+ .put("precision", Integer.valueOf(precision))
+ .put("scale", Integer.valueOf(scale));
+ return Pair.of(Collections.emptySet(), Option.of(avroNode));
+ }
+
+ private Pair<Pair<Set<String>, List<JsonNode>>, List<JsonNode>>
getAllTypesFromOneOfAllOfTypes(Option<JsonNode> jsonUnionType, String name,
AtomicInteger schemaCounter, Set<String> seenNames) {
+ Set<String> avroSimpleTypeSet = new HashSet<>();
+ List<JsonNode> defaultValueList = new ArrayList<>();
+ List<JsonNode> avroComplexTypeSet = new ArrayList<>();
+ if (jsonUnionType.isPresent()) {
+ jsonUnionType
+ .get()
+ .elements()
+ .forEachRemaining(
+ e -> {
+ Pair<Set<String>, Option<JsonNode>> types =
getPrimitiveTypeFromType(e);
+ if (types.getLeft().isEmpty() &&
!types.getRight().isPresent()) {
+ if (isJsonNestedType(e)) {
+ avroComplexTypeSet.add(tryConvertNestedProperty(name, e,
schemaCounter, seenNames).get().get(TYPE));
+ } else if (isJsonArrayType(e)) {
+ avroComplexTypeSet.add(tryConvertArrayProperty(name, e,
schemaCounter, seenNames).get().get(TYPE));
+ } else if (isJsonEnumType(e)) {
+ avroComplexTypeSet.add(tryConvertEnumProperty(name, e,
schemaCounter, seenNames).get().get(TYPE));
+ } else {
+ throw new RuntimeException("unknown complex type
encountered");
+ }
+ } else {
+ avroSimpleTypeSet.addAll(types.getLeft());
+ if (types.getRight().isPresent()) {
+ avroComplexTypeSet.add(types.getRight().get());
+ }
+ if (e.has(DEFAULT_FIELD)) {
+ // This is only valid if one simple-type exists in the
oneOf type
+ defaultValueList.add(e.get(DEFAULT_FIELD));
+ }
+ }
+ });
+ }
+ return Pair.of(Pair.of(avroSimpleTypeSet, defaultValueList),
avroComplexTypeSet);
+ }
+
+ private static List<String> arrangeTypeOrderOnDefault(List<String>
avroTypes, JsonNode defaultNode) {
+ // Nothing to be done as null is already the first one.
+ if (defaultNode == null || defaultNode.isNull()) {
+ return avroTypes;
+ }
+ Set<String> avroTypesSet = new HashSet<>(avroTypes);
+ if (defaultNode.isInt() || defaultNode.isBigInteger() ||
defaultNode.isIntegralNumber()) {
+ // Maps to Long Type in Avro. Place it first, defaulting to null if not
found.
+ return modifyListOrderingBasedOnDefaultValue(avroTypes, avroTypesSet,
LONG_TYPE);
+ } else if (defaultNode.isNumber() || defaultNode.isBigDecimal() ||
defaultNode.isDouble()
+ || defaultNode.isFloatingPointNumber()) {
+ return modifyListOrderingBasedOnDefaultValue(avroTypes, avroTypesSet,
DOUBLE_TYPE);
+ } else if (defaultNode.isTextual()) {
+ return modifyListOrderingBasedOnDefaultValue(avroTypes, avroTypesSet,
STRING_TYPE);
+ } else if (defaultNode.isBoolean()) {
+ return modifyListOrderingBasedOnDefaultValue(avroTypes, avroTypesSet,
BOOLEAN_TYPE);
+ }
+ return avroTypes;
+ }
+
+ private static List<String>
modifyListOrderingBasedOnDefaultValue(List<String> typeList,
+
Set<String> avroTypesSet,
+ String
type) {
+ List<String> modifiedAvroTypeList = new ArrayList<>();
+ if (avroTypesSet.contains(type)) {
+ modifiedAvroTypeList.add(type);
+ avroTypesSet.remove(type);
+ modifiedAvroTypeList.addAll(avroTypesSet);
+ return modifiedAvroTypeList;
+ }
+ // Return original list.
+ return typeList;
}
private static boolean isJsonNestedType(JsonNode jsonNode) {
- return jsonNode.has("type") &&
Objects.equals(jsonNode.get("type").asText(), "object");
+ return jsonNode.has(TYPE) && Objects.equals(jsonNode.get(TYPE).asText(),
"object");
}
private static boolean isJsonArrayType(JsonNode jsonNode) {
- return jsonNode.has("type") &&
Objects.equals(jsonNode.get("type").asText(), "array");
+ return jsonNode.has(TYPE) && Objects.equals(jsonNode.get(TYPE).asText(),
ARRAY);
}
private static boolean isJsonEnumType(JsonNode jsonNode) {
@@ -216,17 +451,18 @@ public class JsonToAvroSchemaConverter implements
SchemaRegistryProvider.SchemaC
private static Option<String> getAvroSchemaRecordNamespace(JsonNode
jsonNode) {
if (jsonNode.hasNonNull("$id")) {
String host = URI.create(jsonNode.get("$id").asText()).getHost();
- String avroNamespace = Stream.of(host.split("\\."))
- .map(JsonToAvroSchemaConverter::sanitizeAsAvroName)
- .collect(Collectors.joining("."));
+ String avroNamespace =
+ Stream.of(host.split("\\."))
+ .map(JsonToAvroSchemaConverter::sanitizeAsAvroName)
+ .collect(Collectors.joining("."));
return Option.of(avroNamespace);
}
return Option.empty();
}
private static String getAvroSchemaRecordName(JsonNode jsonNode) {
- if (jsonNode.hasNonNull("title")) {
- return sanitizeAsAvroName(jsonNode.get("title").asText());
+ if (jsonNode.hasNonNull(TITLE)) {
+ return sanitizeAsAvroName(jsonNode.get(TITLE).asText());
}
if (jsonNode.hasNonNull("$id")) {
// infer name from host: http://www.my-example.com => "my_example"
@@ -252,11 +488,45 @@ public class JsonToAvroSchemaConverter implements
SchemaRegistryProvider.SchemaC
return required;
}
- private static String getAvroTypeName(JsonNode jsonNode, String defaultName)
{
- return jsonNode.hasNonNull("title") ? jsonNode.get("title").asText() :
defaultName;
+ private static String getAvroTypeName(JsonNode jsonNode, String defaultName,
+ AtomicInteger schemaCounter,
+ Set<String> seenNames) {
+ String typeName = jsonNode.hasNonNull(TITLE) ? jsonNode.get(TITLE).asText()
+ : defaultName;
+ if (!seenNames.contains(typeName)) {
+ seenNames.add(typeName);
+ return typeName;
+ }
+ String modifiedTypeName = typeName + schemaCounter.getAndIncrement();
+ seenNames.add(modifiedTypeName);
+ return modifiedTypeName;
}
private static String getAvroDoc(JsonNode jsonNode) {
return jsonNode.hasNonNull("description") ?
jsonNode.get("description").asText() : "";
}
+
+ public static String getSuffixBy(String input, int ch) {
+ int i = input.lastIndexOf(ch);
+ if (i == -1) {
+ return input;
+ }
+ return input.substring(i);
+ }
+
+ public static String removeSuffixBy(String input, int ch) {
+ int i = input.lastIndexOf(ch);
+ if (i == -1) {
+ return input;
+ }
+ return input.substring(0, i);
+ }
+
+ public static String stripQuotesFromStringValue(String input) {
+ if (input != null && input.length() >= 2
+ && input.charAt(0) == '\"' && input.charAt(input.length() - 1) ==
'\"') {
+ return input.substring(1, input.length() - 1);
+ }
+ return input;
+ }
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/JsonToAvroSchemaConverterConfig.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/JsonToAvroSchemaConverterConfig.java
new file mode 100644
index 00000000000..dc6a98f8306
--- /dev/null
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/JsonToAvroSchemaConverterConfig.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.schema.converter;
+
+import org.apache.hudi.common.config.ConfigProperty;
+
+/**
+ * Configs for `JsonToAvroSchemaConverter`
+ */
+public class JsonToAvroSchemaConverterConfig {
+ public static final ConfigProperty<Boolean> CONVERT_DEFAULT_VALUE_TYPE =
ConfigProperty
+
.key("hoodie.streamer.schemaconverter.json_to_avro.convert_default_value_type")
+ .defaultValue(true)
+ .sinceVersion("1.1.0")
+ .withDocumentation("In
`org.apache.hudi.utilities.schema.converter.JsonToAvroSchemaConverter`, "
+ + "whether to automatically convert the default value based on the
field type in the "
+ + "schema. For example, the field schema is "
+ + "`\"col1\":{\"type\":\"integer\",\"default\":\"0\"`, and in this
case, the default "
+ + "value is a String in JSON. After converting the type of the
default value from JSON, "
+ + "the converter can properly insert the default value to the Avro
schema without "
+ + "hitting Avro schema validation error. This should be turned on by
default.");
+
+ public static final ConfigProperty<Boolean> STRIP_DEFAULT_VALUE_QUOTES =
ConfigProperty
+
.key("hoodie.streamer.schemaconverter.json_to_avro.strip_default_value_quotes")
+ .defaultValue(true)
+ .sinceVersion("1.1.0")
+ .withDocumentation("In
`org.apache.hudi.utilities.schema.converter.JsonToAvroSchemaConverter`, "
+ + "whether to automatically strip the surrounding quotes on the
default value of the "
+ + "String-typed field in the schema. For example, the field schema
is "
+ + "`\"col1\":{\"type\":\"string\",\"default\":\"\\\"abc\\\"\"`, and
in this case, the "
+ + "default string value is surrounded by additional quotes (`\"`) in
JSON. The converter "
+ + "can strip the quotes, if exist, as they are redundant before
adding the default value "
+ + "to the Avro schema. This should be turned on by default.");
+}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
index 6ce0a548ec3..7c47691ac11 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
@@ -18,11 +18,14 @@
package org.apache.hudi.utilities.sources;
+import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.config.JsonKafkaPostProcessorConfig;
import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException;
@@ -33,6 +36,7 @@ import
org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor;
import org.apache.hudi.utilities.streamer.DefaultStreamContext;
import org.apache.hudi.utilities.streamer.StreamContext;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -61,6 +65,19 @@ import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SO
public class JsonKafkaSource extends KafkaSource<JavaRDD<String>> {
private static final Logger LOG =
LoggerFactory.getLogger(JsonKafkaSource.class);
+ /**
+ * Configs specific to {@link JsonKafkaSource}.
+ */
+ public static class Config {
+ public static final ConfigProperty<String>
KAFKA_JSON_VALUE_DESERIALIZER_CLASS = ConfigProperty
+ .key("hoodie.deltastreamer.source.kafka.json.value.deserializer.class")
+ .defaultValue(StringDeserializer.class.getName())
+ .sinceVersion("1.1.0")
+ .withDocumentation("Kafka Json Payload Deserializer Class");
+ }
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
public JsonKafkaSource(TypedProperties properties, JavaSparkContext
sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider, HoodieIngestionMetrics
metrics) {
this(properties, sparkContext, sparkSession, metrics, new
DefaultStreamContext(schemaProvider, Option.empty()));
@@ -69,48 +86,61 @@ public class JsonKafkaSource extends
KafkaSource<JavaRDD<String>> {
public JsonKafkaSource(TypedProperties properties, JavaSparkContext
sparkContext, SparkSession sparkSession, HoodieIngestionMetrics metrics,
StreamContext streamContext) {
super(properties, sparkContext, sparkSession, SourceType.JSON, metrics,
new
DefaultStreamContext(UtilHelpers.getSchemaProviderForKafkaSource(streamContext.getSchemaProvider(),
properties, sparkContext), streamContext.getSourceProfileSupplier()));
- properties.put("key.deserializer", StringDeserializer.class.getName());
- properties.put("value.deserializer", StringDeserializer.class.getName());
+ props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP,
StringDeserializer.class.getName());
+ props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP,
ConfigUtils.getStringWithAltKeys(props,
Config.KAFKA_JSON_VALUE_DESERIALIZER_CLASS, true));
this.offsetGen = new KafkaOffsetGen(props);
}
@Override
protected JavaRDD<String> toBatch(OffsetRange[] offsetRanges) {
+ String deserializerClass =
props.getString(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP);
JavaRDD<ConsumerRecord<Object, Object>> kafkaRDD =
KafkaUtils.createRDD(sparkContext,
offsetGen.getKafkaParams(),
offsetRanges,
LocationStrategies.PreferConsistent())
- .filter(x -> !StringUtils.isNullOrEmpty((String) x.value()));
- return postProcess(maybeAppendKafkaOffsets(kafkaRDD));
+ .filter(x -> filterForNullValues(x.value(), deserializerClass));
+ return postProcess(maybeAppendKafkaOffsets(kafkaRDD, deserializerClass));
}
- protected JavaRDD<String>
maybeAppendKafkaOffsets(JavaRDD<ConsumerRecord<Object, Object>> kafkaRDD) {
- if (this.shouldAddOffsets) {
+ protected JavaRDD<String>
maybeAppendKafkaOffsets(JavaRDD<ConsumerRecord<Object, Object>> kafkaRDD,
String deserializerClass) {
+ if (shouldAddOffsets) {
return kafkaRDD.mapPartitions(partitionIterator -> {
TaskContext taskContext = TaskContext.get();
LOG.info("Converting Kafka source objects to strings with stageId :
{}, stage attempt no: {}, taskId : {}, task attempt no : {}, task attempt id :
{} ",
taskContext.stageId(), taskContext.stageAttemptNumber(),
taskContext.partitionId(), taskContext.attemptNumber(),
taskContext.taskAttemptId());
- ObjectMapper objectMapper = new ObjectMapper();
return new
CloseableMappingIterator<>(ClosableIterator.wrap(partitionIterator),
consumerRecord -> {
- String recordValue = consumerRecord.value().toString();
- String recordKey = StringUtils.objToString(consumerRecord.key());
+ String recordKey;
+ String record;
try {
- ObjectNode jsonNode = (ObjectNode)
objectMapper.readTree(recordValue);
+ record = getValueAsString(consumerRecord.value(),
deserializerClass);
+ recordKey = StringUtils.objToString(consumerRecord.key());
+ } catch (JsonProcessingException e) {
+ throw new HoodieException(e);
+ }
+ try {
+ ObjectNode jsonNode = (ObjectNode) OBJECT_MAPPER.readTree(record);
jsonNode.put(KAFKA_SOURCE_OFFSET_COLUMN, consumerRecord.offset());
jsonNode.put(KAFKA_SOURCE_PARTITION_COLUMN,
consumerRecord.partition());
jsonNode.put(KAFKA_SOURCE_TIMESTAMP_COLUMN,
consumerRecord.timestamp());
if (recordKey != null) {
jsonNode.put(KAFKA_SOURCE_KEY_COLUMN, recordKey);
}
- return objectMapper.writeValueAsString(jsonNode);
+ return OBJECT_MAPPER.writeValueAsString(jsonNode);
} catch (Throwable e) {
- return recordValue;
+ return record;
}
});
});
+ } else {
+ return kafkaRDD.map(consumerRecord -> {
+ try {
+ return getValueAsString(consumerRecord.value(), deserializerClass);
+ } catch (JsonProcessingException e) {
+ throw new HoodieException(e);
+ }
+ });
}
- return kafkaRDD.map(consumerRecord -> (String) consumerRecord.value());
}
private JavaRDD<String> postProcess(JavaRDD<String> jsonStringRDD) {
@@ -130,4 +160,21 @@ public class JsonKafkaSource extends
KafkaSource<JavaRDD<String>> {
return processor.process(jsonStringRDD);
}
+
+ private static Boolean filterForNullValues(Object value, String
valueDeserializerClass) {
+ if (value == null) {
+ return false;
+ }
+ if (valueDeserializerClass.equals(StringDeserializer.class.getName())) {
+ return StringUtils.nonEmpty((String) value);
+ }
+ return true;
+ }
+
+ private static String getValueAsString(Object value, String
valueDeserializerClass) throws JsonProcessingException {
+ if (StringDeserializer.class.getName().equals(valueDeserializerClass)) {
+ return (String) value;
+ }
+ return OBJECT_MAPPER.writeValueAsString(value);
+ }
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestJsonToAvroSchemaConverter.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestJsonToAvroSchemaConverter.java
index dd44704fb5d..6f5766e81dc 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestJsonToAvroSchemaConverter.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestJsonToAvroSchemaConverter.java
@@ -19,46 +19,74 @@
package org.apache.hudi.utilities.schema.converter;
+import org.apache.hudi.common.config.TypedProperties;
+
import io.confluent.kafka.schemaregistry.json.JsonSchema;
import org.apache.avro.Schema;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.CsvSource;
import java.io.IOException;
import static org.apache.hudi.common.util.FileIOUtils.readAsUTFString;
+import static
org.apache.hudi.utilities.schema.converter.JsonToAvroSchemaConverter.stripQuotesFromStringValue;
+import static
org.apache.hudi.utilities.schema.converter.JsonToAvroSchemaConverterConfig.STRIP_DEFAULT_VALUE_QUOTES;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
class TestJsonToAvroSchemaConverter {
@ParameterizedTest
- @ValueSource(strings = {
- "enum-properties",
- "example-address",
- "example-calendar",
- "example-card",
- "example-geographical-location",
- "multiple-properties",
- "nested-properties",
- "single-properties"
+ @CsvSource({
+ "enum-properties,",
+ "example-address,",
+ "example-calendar,",
+ "example-card,",
+ "example-geographical-location,",
+ "multiple-properties,",
+ "nested-properties,",
+ "single-properties,",
+ "schema-repeating-names,",
+ "complex-json-union-types,",
+ "not-null-default-value-schema,_no_stripping_quotes",
+ "not-null-default-value-schema,_stripping_quotes",
+ "array-with-item-type-union,",
})
- void testConvertJsonSchemaToAvroSchema(String inputCase) throws IOException {
+ void testConvertJsonSchemaToAvroSchema(String inputCase, String
avroSchemaFileSuffix) throws IOException {
String jsonSchema = loadJsonSchema(inputCase);
- String avroSchema = new JsonToAvroSchemaConverter(null).convert(new
JsonSchema(jsonSchema));
+ TypedProperties config = new TypedProperties();
+ if ("_no_stripping_quotes".equals(avroSchemaFileSuffix)) {
+ config.put(STRIP_DEFAULT_VALUE_QUOTES.key(), "false");
+ }
+ String avroSchema = new JsonToAvroSchemaConverter(config).convert(new
JsonSchema(jsonSchema));
Schema schema = new Schema.Parser().parse(avroSchema);
- Schema expected = new Schema.Parser().parse(loadAvroSchema(inputCase));
+ Schema expected = new Schema.Parser().parse(loadAvroSchema(inputCase,
avroSchemaFileSuffix));
assertEquals(expected, schema);
}
+ @Test
+ void testStripQuotesFromStringValue() {
+ assertNull(stripQuotesFromStringValue(null));
+ assertEquals("", stripQuotesFromStringValue(""));
+ assertEquals("\"", stripQuotesFromStringValue("\""));
+ assertEquals("", stripQuotesFromStringValue("\"\""));
+ assertEquals("\"", stripQuotesFromStringValue("\"\"\""));
+ assertEquals("123", stripQuotesFromStringValue("123"));
+ assertEquals("123", stripQuotesFromStringValue("\"123\""));
+ assertEquals("x", stripQuotesFromStringValue("x"));
+ }
+
private String loadJsonSchema(String inputCase) throws IOException {
return readAsUTFString(getClass()
.getClassLoader()
.getResourceAsStream(String.format("schema-provider/json/%s/input.json",
inputCase)));
}
- private String loadAvroSchema(String inputCase) throws IOException {
+ private String loadAvroSchema(String inputCase, String avroSchemaFileSuffix)
throws IOException {
return readAsUTFString(getClass()
.getClassLoader()
-
.getResourceAsStream(String.format("schema-provider/json/%s/expected.json",
inputCase)));
+
.getResourceAsStream(String.format("schema-provider/json/%s/expected%s.json",
inputCase,
+ avroSchemaFileSuffix == null ? "" : avroSchemaFileSuffix)));
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
index dcfc97e4eae..82bbd3a01c5 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
@@ -18,6 +18,7 @@
package org.apache.hudi.utilities.sources;
+import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
@@ -26,6 +27,7 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.InProcessTimeGenerator;
+import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.config.HoodieStreamerConfig;
@@ -38,9 +40,15 @@ import org.apache.hudi.utilities.streamer.ErrorEvent;
import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -50,13 +58,16 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
+import java.math.BigDecimal;
import java.net.URL;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Properties;
import java.util.UUID;
import java.util.stream.Collectors;
@@ -70,16 +81,19 @@ import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SO
import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN;
import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN;
import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN;
+import static
org.apache.hudi.utilities.sources.JsonKafkaSource.Config.KAFKA_JSON_VALUE_DESERIALIZER_CLASS;
import static
org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecords;
import static
org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecordsByPartitions;
import static
org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecordsByPartitionsWithNullKafkaKey;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Tests against {@link JsonKafkaSource}.
*/
public class TestJsonKafkaSource extends BaseTestKafkaSource {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final HoodieTestDataGenerator DATA_GENERATOR = new
HoodieTestDataGenerator(1L);
static final URL SCHEMA_FILE_URL =
TestJsonKafkaSource.class.getClassLoader().getResource("streamer-config/source_short_trip_uber.avsc");
@BeforeEach
@@ -136,6 +150,36 @@ public class TestJsonKafkaSource extends
BaseTestKafkaSource {
assertEquals(1000, fetch1.getBatch().get().count());
}
+ @Test
+ public void testJsonKafkaSourceWithJsonSchemaDeserializer() {
+ // topic setup.
+ final String topic = TEST_TOPIC_PREFIX +
"testJsonKafkaSourceWithJsonSchemaDeserializer";
+ testUtils.createTopic(topic, 2);
+ TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
+ props.put(KAFKA_JSON_VALUE_DESERIALIZER_CLASS.key(),
+ "io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer");
+ props.put("schema.registry.url", "mock://127.0.0.1:8081");
+
+ Source jsonSource = new JsonKafkaSource(props, jsc(), spark(),
schemaProvider, metrics);
+ SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
+
+ // 1. Extract without any checkpoint => get all the data, respecting
sourceLimit
+ assertEquals(Option.empty(),
+ kafkaSource.fetchNewDataInAvroFormat(Option.empty(),
Long.MAX_VALUE).getBatch());
+ // Send 1000 non-null messages to Kafka
+ List<RawTripTestPayload> insertRecords =
DATA_GENERATOR.generateInserts("000", 1000)
+ .stream()
+ .map(hr -> (RawTripTestPayload)
hr.getData()).collect(Collectors.toList());
+ sendMessagesToKafkaWithJsonSchemaSerializer(topic, 2, insertRecords);
+ // send 200 null messages to Kafka
+ List<RawTripTestPayload> nullInsertedRecords = Arrays.asList(new
RawTripTestPayload[200]);
+ sendMessagesToKafkaWithJsonSchemaSerializer(topic, 2, nullInsertedRecords);
+ InputBatch<JavaRDD<GenericRecord>> fetch1 =
+ kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
+ // Verify that messages with null values are filtered
+ assertEquals(1000, fetch1.getBatch().get().count());
+ }
+
@Test
public void testJsonKafkaSourceWithDefaultUpperCap() {
// topic setup.
@@ -207,6 +251,18 @@ public class TestJsonKafkaSource extends
BaseTestKafkaSource {
assertEquals(Option.empty(), fetch6.getBatch());
}
+ private static void verifyDecimalValue(List<GenericRecord> records, Schema
schema, String fieldname) {
+ Schema fieldSchema = schema.getField(fieldname).schema();
+ LogicalTypes.Decimal decField = (LogicalTypes.Decimal)
fieldSchema.getLogicalType();
+ double maxVal = Math.pow(10, decField.getPrecision() -
decField.getScale());
+ double minVal = maxVal * 0.1;
+ for (GenericRecord record : records) {
+ BigDecimal dec = HoodieAvroUtils.convertBytesToBigDecimal(((ByteBuffer)
record.get(fieldname)).array(), decField);
+ double doubleValue = dec.doubleValue();
+ assertTrue(doubleValue <= maxVal && doubleValue >= minVal);
+ }
+ }
+
@Override
protected void sendMessagesToKafka(String topic, int count, int
numPartitions) {
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
@@ -239,7 +295,7 @@ public class TestJsonKafkaSource extends
BaseTestKafkaSource {
@ParameterizedTest
@ValueSource(booleans = {true, false})
- public void testErrorEventsForDataInRowFormat(boolean persistSourceRdd)
throws IOException {
+ public void testErrorEventsForDataInRowFormat(boolean persistSourceRdd) {
// topic setup.
final String topic = TEST_TOPIC_PREFIX +
"testErrorEventsForDataInRowFormat_" + persistSourceRdd;
@@ -251,14 +307,14 @@ public class TestJsonKafkaSource extends
BaseTestKafkaSource {
topicPartitions.add(topicPartition1);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
sendJsonSafeMessagesToKafka(topic, 1000, 2);
- testUtils.sendMessages(topic, new String[]{"error_event1",
"error_event2"});
+ testUtils.sendMessages(topic, new String[] {"error_event1",
"error_event2"});
TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
props.put(ENABLE_KAFKA_COMMIT_OFFSET.key(), "true");
-
props.put(ERROR_TABLE_BASE_PATH.key(),"/tmp/qurantine_table_test/json_kafka_row_events");
- props.put(ERROR_TARGET_TABLE.key(),"json_kafka_row_events");
+ props.put(ERROR_TABLE_BASE_PATH.key(),
"/tmp/qurantine_table_test/json_kafka_row_events");
+ props.put(ERROR_TARGET_TABLE.key(), "json_kafka_row_events");
props.put("hoodie.errortable.validate.targetschema.enable", "true");
- props.put("hoodie.base.path","/tmp/json_kafka_row_events");
+ props.put("hoodie.base.path", "/tmp/json_kafka_row_events");
props.setProperty(ERROR_TABLE_PERSIST_SOURCE_RDD.key(),
String.valueOf(persistSourceRdd));
Source jsonSource = new JsonKafkaSource(props, jsc(), spark(),
schemaProvider, metrics);
@@ -266,7 +322,7 @@ public class TestJsonKafkaSource extends
BaseTestKafkaSource {
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource,
errorTableWriter, Option.of(props));
InputBatch<Dataset<Row>> fetch1 =
kafkaSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE);
assertEquals(1000, fetch1.getBatch().get().count());
- assertEquals(2,((JavaRDD)errorTableWriter.get().getErrorEvents(
+ assertEquals(2, ((JavaRDD) errorTableWriter.get().getErrorEvents(
InProcessTimeGenerator.createNewInstantTime(),
Option.empty()).get()).count());
verifyRddsArePersisted(kafkaSource.getSource(),
fetch1.getBatch().get().rdd().toDebugString(), persistSourceRdd);
}
@@ -283,28 +339,27 @@ public class TestJsonKafkaSource extends
BaseTestKafkaSource {
TopicPartition topicPartition1 = new TopicPartition(topic, 1);
topicPartitions.add(topicPartition1);
sendJsonSafeMessagesToKafka(topic, 1000, 2);
- testUtils.sendMessages(topic, new String[]{"error_event1",
"error_event2"});
+ testUtils.sendMessages(topic, new String[] {"error_event1",
"error_event2"});
TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
props.put(ENABLE_KAFKA_COMMIT_OFFSET.key(), "true");
-
props.put(ERROR_TABLE_BASE_PATH.key(),"/tmp/qurantine_table_test/json_kafka_row_events");
- props.put(ERROR_TARGET_TABLE.key(),"json_kafka_row_events");
+ props.put(ERROR_TABLE_BASE_PATH.key(),
"/tmp/qurantine_table_test/json_kafka_row_events");
+ props.put(ERROR_TARGET_TABLE.key(), "json_kafka_row_events");
props.put(HoodieStreamerConfig.SANITIZE_SCHEMA_FIELD_NAMES.key(), true);
props.put(HoodieStreamerConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.key(),
"__");
props.put("hoodie.errortable.validate.targetschema.enable", "true");
- props.put("hoodie.base.path","/tmp/json_kafka_row_events");
+ props.put("hoodie.base.path", "/tmp/json_kafka_row_events");
Source jsonSource = new JsonKafkaSource(props, jsc(), spark(),
schemaProvider, metrics);
Option<BaseErrorTableWriter> errorTableWriter =
Option.of(getAnonymousErrorTableWriter(props));
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource,
errorTableWriter, Option.of(props));
- assertEquals(1000,
kafkaSource.fetchNewDataInRowFormat(Option.empty(),Long.MAX_VALUE).getBatch().get().count());
- assertEquals(2,((JavaRDD)errorTableWriter.get().getErrorEvents(
+ assertEquals(1000, kafkaSource.fetchNewDataInRowFormat(Option.empty(),
Long.MAX_VALUE).getBatch().get().count());
+ assertEquals(2, ((JavaRDD) errorTableWriter.get().getErrorEvents(
InProcessTimeGenerator.createNewInstantTime(),
Option.empty()).get()).count());
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testErrorEventsForDataInAvroFormat(boolean persistSourceRdd)
throws IOException {
-
// topic setup.
final String topic = TEST_TOPIC_PREFIX +
"testErrorEventsForDataInAvroFormat_" + persistSourceRdd;
@@ -317,20 +372,20 @@ public class TestJsonKafkaSource extends
BaseTestKafkaSource {
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
testUtils.sendMessages(topic,
jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 1000,
HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
- testUtils.sendMessages(topic, new String[]{"error_event1",
"error_event2"});
+ testUtils.sendMessages(topic, new String[] {"error_event1",
"error_event2"});
TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
props.put(ENABLE_KAFKA_COMMIT_OFFSET.key(), "true");
-
props.put(ERROR_TABLE_BASE_PATH.key(),"/tmp/qurantine_table_test/json_kafka_events");
- props.put(ERROR_TARGET_TABLE.key(),"json_kafka_events");
- props.put("hoodie.base.path","/tmp/json_kafka_events");
+ props.put(ERROR_TABLE_BASE_PATH.key(),
"/tmp/qurantine_table_test/json_kafka_events");
+ props.put(ERROR_TARGET_TABLE.key(), "json_kafka_events");
+ props.put("hoodie.base.path", "/tmp/json_kafka_events");
props.setProperty(ERROR_TABLE_PERSIST_SOURCE_RDD.key(),
String.valueOf(persistSourceRdd));
Source jsonSource = new JsonKafkaSource(props, jsc(), spark(),
schemaProvider, metrics);
Option<BaseErrorTableWriter> errorTableWriter =
Option.of(getAnonymousErrorTableWriter(props));
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource,
errorTableWriter, Option.of(props));
- InputBatch<JavaRDD<GenericRecord>> fetch1 =
kafkaSource.fetchNewDataInAvroFormat(Option.empty(),Long.MAX_VALUE);
- assertEquals(1000,fetch1.getBatch().get().count());
+ InputBatch<JavaRDD<GenericRecord>> fetch1 =
kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
+ assertEquals(1000, fetch1.getBatch().get().count());
assertEquals(2, ((JavaRDD) errorTableWriter.get().getErrorEvents(
InProcessTimeGenerator.createNewInstantTime(),
Option.empty()).get()).count());
verifyRddsArePersisted(kafkaSource.getSource(),
fetch1.getBatch().get().rdd().toDebugString(), persistSourceRdd);
@@ -357,8 +412,7 @@ public class TestJsonKafkaSource extends
BaseTestKafkaSource {
}
@Override
- public Option<JavaRDD<HoodieAvroRecord>> getErrorEvents(String
baseTableInstantTime,
- Option
commitedInstantTime) {
+ public Option<JavaRDD<HoodieAvroRecord>> getErrorEvents(String
baseTableInstantTime, Option commitedInstantTime) {
return Option.of(errorEvents.stream().reduce((rdd1, rdd2) ->
rdd1.union(rdd2)).get());
}
@@ -415,6 +469,34 @@ public class TestJsonKafkaSource extends
BaseTestKafkaSource {
dfWithOffsetInfoAndNullKafkaKey.unpersist();
}
+ private void sendMessagesToKafkaWithJsonSchemaSerializer(String topic, int
numPartitions,
+
List<RawTripTestPayload> insertRecords) {
+ Properties config = getProducerPropertiesForJsonKafkaSchemaSerializer();
+ try (Producer<String, RawTripTestPayload> producer = new
KafkaProducer<>(config)) {
+ for (int i = 0; i < insertRecords.size(); i++) {
+ // use consistent keys to get even spread over partitions for test
expectations
+ producer.send(new ProducerRecord<>(topic, Integer.toString(i %
numPartitions),
+ insertRecords.get(i)));
+ }
+ }
+ }
+
+ private Properties getProducerPropertiesForJsonKafkaSchemaSerializer() {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", testUtils.brokerAddress());
+ props.put("value.serializer",
+ "io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer");
+ props.put("value.deserializer",
+ "io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer");
+ // Key serializer is required.
+ props.put("key.serializer", StringSerializer.class.getName());
+ props.put("schema.registry.url", "mock://127.0.0.1:8081");
+ props.put("auto.register.schemas", "true");
+ // wait for all in-sync replicas to ack sends
+ props.put("acks", "all");
+ return props;
+ }
+
@Test
public void testCreateSource() throws IOException {
final String topic = TEST_TOPIC_PREFIX + "testJsonKafkaSourceCreation";
diff --git
a/hudi-utilities/src/test/resources/schema-provider/json/array-with-item-type-union/expected.json
b/hudi-utilities/src/test/resources/schema-provider/json/array-with-item-type-union/expected.json
new file mode 100644
index 00000000000..86f31868d71
--- /dev/null
+++
b/hudi-utilities/src/test/resources/schema-provider/json/array-with-item-type-union/expected.json
@@ -0,0 +1,78 @@
+{
+ "type" : "record",
+ "name" : "namespace1",
+ "doc" : "",
+ "fields" : [ {
+ "name" : "field1",
+ "type" : [ "null", "long" ],
+ "doc" : "",
+ "default" : null
+ }, {
+ "name" : "field2",
+ "type" : [ "null", "string" ],
+ "doc" : "",
+ "default" : null
+ }, {
+ "name" : "field3",
+ "type" : [ "null", "long" ],
+ "doc" : "",
+ "default" : null
+ }, {
+ "name" : "field4",
+ "type" : [ "null", "long" ],
+ "doc" : "",
+ "default" : null
+ }, {
+ "name" : "field5",
+ "type" : [ "null", "long" ],
+ "doc" : "",
+ "default" : null
+ }, {
+ "name" : "field6",
+ "type" : [ "null", "string" ],
+ "doc" : "",
+ "default" : null
+ }, {
+ "name" : "field7",
+ "type" : [ "null", "string" ],
+ "doc" : "",
+ "default" : null
+ }, {
+ "name" : "field8",
+ "type" : [ "null", "long" ],
+ "doc" : "",
+ "default" : null
+ }, {
+ "name" : "field9",
+ "type" : [ "null", "string" ],
+ "doc" : "",
+ "default" : null
+ }, {
+ "name" : "field10",
+ "type" : [ "null", "string" ],
+ "doc" : "",
+ "default" : null
+ }, {
+ "name" : "field11",
+ "type" : [ "null", {
+ "type" : "array",
+ "items" : [ "null", {
+ "type" : "record",
+ "name" : "namespace2",
+ "fields" : [ {
+ "name" : "field12",
+ "type" : [ "null", "string" ],
+ "doc" : "",
+ "default" : null
+ }, {
+ "name" : "field13",
+ "type" : [ "null", "long" ],
+ "doc" : "",
+ "default" : null
+ } ]
+ } ]
+ } ],
+ "doc" : "",
+ "default" : null
+ } ]
+}
\ No newline at end of file
diff --git
a/hudi-utilities/src/test/resources/schema-provider/json/array-with-item-type-union/input.json
b/hudi-utilities/src/test/resources/schema-provider/json/array-with-item-type-union/input.json
new file mode 100644
index 00000000000..0b1bad988ee
--- /dev/null
+++
b/hudi-utilities/src/test/resources/schema-provider/json/array-with-item-type-union/input.json
@@ -0,0 +1,93 @@
+{
+ "type": "object",
+ "title": "namespace1",
+ "connect.version": 1,
+ "properties": {
+ "field1": {
+ "type": "integer",
+ "connect.index": 5,
+ "connect.type": "int32"
+ },
+ "field2": {
+ "type": "string",
+ "connect.index": 8
+ },
+ "field3": {
+ "connect.index": 2,
+ "oneOf": [
+ {
+ "type": "null"
+ },
+ {
+ "type": "integer",
+ "connect.type": "int64"
+ }
+ ]
+ },
+ "field4": {
+ "type": "integer",
+ "connect.index": 7,
+ "connect.type": "int32"
+ },
+ "field5": {
+ "type": "integer",
+ "connect.index": 10,
+ "connect.type": "int64"
+ },
+ "field6": {
+ "type": "string",
+ "connect.index": 1
+ },
+ "field7": {
+ "type": "string",
+ "connect.index": 6
+ },
+ "field8": {
+ "type": "integer",
+ "connect.index": 4,
+ "connect.type": "int64"
+ },
+ "field9": {
+ "type": "string",
+ "connect.index": 9
+ },
+ "field10": {
+ "type": "string",
+ "connect.index": 0
+ },
+ "field11": {
+ "connect.index": 3,
+ "oneOf": [
+ {
+ "type": "null"
+ },
+ {
+ "type": "array",
+ "items": {
+ "oneOf": [
+ {
+ "type": "null"
+ },
+ {
+ "type": "object",
+ "title": "namespace2",
+ "connect.version": 1,
+ "properties": {
+ "field12": {
+ "type": "string",
+ "connect.index": 0
+ },
+ "field13": {
+ "type": "integer",
+ "connect.index": 1,
+ "connect.type": "int64"
+ }
+ }
+ }
+ ]
+ }
+ }
+ ]
+ }
+ }
+}
diff --git
a/hudi-utilities/src/test/resources/schema-provider/json/complex-json-union-types/expected.json
b/hudi-utilities/src/test/resources/schema-provider/json/complex-json-union-types/expected.json
new file mode 100644
index 00000000000..12d2f8f1a8f
--- /dev/null
+++
b/hudi-utilities/src/test/resources/schema-provider/json/complex-json-union-types/expected.json
@@ -0,0 +1,66 @@
+{
+ "type": "record",
+ "name": "json_schema_with_complex_union_types",
+ "doc": "",
+ "fields": [
+ {
+ "name": "field1",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "field1",
+ "fields": [
+ {
+ "name": "property1",
+ "type": [
+ "null",
+ "string"
+ ],
+ "doc": "",
+ "default": null
+ },
+ {
+ "name": "property2",
+ "type": [
+ "null",
+ "double"
+ ],
+ "doc": "",
+ "default": null
+ }
+ ]
+ }
+ ],
+ "doc": "",
+ "default": null
+ },
+ {
+ "name": "field2",
+ "type": [
+ "null",
+ {
+ "type": "array",
+ "items": "string"
+ },
+ {
+ "type": "record",
+ "name": "field21",
+ "fields": [
+ {
+ "name": "property3",
+ "type": [
+ "null",
+ "long"
+ ],
+ "doc": "",
+ "default": null
+ }
+ ]
+ }
+ ],
+ "doc": "",
+ "default": null
+ }
+ ]
+}
diff --git
a/hudi-utilities/src/test/resources/schema-provider/json/complex-json-union-types/input.json
b/hudi-utilities/src/test/resources/schema-provider/json/complex-json-union-types/input.json
new file mode 100644
index 00000000000..1ee57ff9d71
--- /dev/null
+++
b/hudi-utilities/src/test/resources/schema-provider/json/complex-json-union-types/input.json
@@ -0,0 +1,42 @@
+{
+ "title": "json schema with complex union types",
+ "type": "object",
+ "properties": {
+ "field1": {
+ "oneOf": [
+ {
+ "type": "object",
+ "properties": {
+ "property1": {
+ "type": "string"
+ },
+ "property2": {
+ "type": "number"
+ }
+ }
+ },
+ {
+ "type": "null"
+ }
+ ]
+ },
+ "field2": {
+ "allOf": [
+ {
+ "type": "array",
+ "items": {
+ "type": "string"
+ }
+ },
+ {
+ "type": "object",
+ "properties": {
+ "property3": {
+ "type": "integer"
+ }
+ }
+ }
+ ]
+ }
+ }
+}
diff --git
a/hudi-utilities/src/test/resources/schema-provider/json/not-null-default-value-schema/expected_no_stripping_quotes.json
b/hudi-utilities/src/test/resources/schema-provider/json/not-null-default-value-schema/expected_no_stripping_quotes.json
new file mode 100644
index 00000000000..c1a6619816e
--- /dev/null
+++
b/hudi-utilities/src/test/resources/schema-provider/json/not-null-default-value-schema/expected_no_stripping_quotes.json
@@ -0,0 +1,138 @@
+{
+ "type": "record",
+ "name": "json_schema_with_non_null_default_values",
+ "doc": "",
+ "fields": [
+ {
+ "name": "field1",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "field1",
+ "fields": [
+ {
+ "name": "property1",
+ "type": [
+ "string",
+ "null"
+ ],
+ "doc": "",
+ "default": "\"property1_defaultvalue\""
+ },
+ {
+ "name": "property2",
+ "type": [
+ "null",
+ "double"
+ ],
+ "doc": "",
+ "default": null
+ },
+ {
+ "name": "property3",
+ "type": [
+ "long",
+ "null"
+ ],
+ "doc": "",
+ "default": 10
+ },
+ {
+ "name": "property4",
+ "type": [
+ "double",
+ "null"
+ ],
+ "doc": "",
+ "default": 8.25
+ }
+ ]
+ }
+ ],
+ "doc": "",
+ "default": null
+ },
+ {
+ "name": "field2",
+ "type": [
+ "string",
+ "null"
+ ],
+ "doc": "",
+ "default": "\"property2_defaultvalue\""
+ },
+ {
+ "name": "field3",
+ "type": [
+ "string",
+ "null"
+ ],
+ "doc": "",
+ "default": "\"N\""
+ },
+ {
+ "name": "field4",
+ "type": [
+ "long",
+ "null"
+ ],
+ "doc": "",
+ "default": 40
+ },
+ {
+ "name": "field5",
+ "type": [
+ "long",
+ "null"
+ ],
+ "doc": "",
+ "default": 50
+ },
+ {
+ "name": "field6",
+ "type": [
+ "long",
+ "null"
+ ],
+ "doc": "",
+ "default": 60
+ },
+ {
+ "name": "field7",
+ "type": [
+ "double",
+ "null"
+ ],
+ "doc": "",
+ "default": 7.75
+ },
+ {
+ "name": "field8",
+ "type": [
+ "double",
+ "null"
+ ],
+ "doc": "",
+ "default": 8.375
+ },
+ {
+ "name": "field9",
+ "type": [
+ "boolean",
+ "null"
+ ],
+ "doc": "",
+ "default": true
+ },
+ {
+ "name": "field10",
+ "type": [
+ "boolean",
+ "null"
+ ],
+ "doc": "",
+ "default": false
+ }
+ ]
+}
diff --git
a/hudi-utilities/src/test/resources/schema-provider/json/not-null-default-value-schema/expected_stripping_quotes.json
b/hudi-utilities/src/test/resources/schema-provider/json/not-null-default-value-schema/expected_stripping_quotes.json
new file mode 100644
index 00000000000..68df4c24b78
--- /dev/null
+++
b/hudi-utilities/src/test/resources/schema-provider/json/not-null-default-value-schema/expected_stripping_quotes.json
@@ -0,0 +1,138 @@
+{
+ "type": "record",
+ "name": "json_schema_with_non_null_default_values",
+ "doc": "",
+ "fields": [
+ {
+ "name": "field1",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "field1",
+ "fields": [
+ {
+ "name": "property1",
+ "type": [
+ "string",
+ "null"
+ ],
+ "doc": "",
+ "default": "property1_defaultvalue"
+ },
+ {
+ "name": "property2",
+ "type": [
+ "null",
+ "double"
+ ],
+ "doc": "",
+ "default": null
+ },
+ {
+ "name": "property3",
+ "type": [
+ "long",
+ "null"
+ ],
+ "doc": "",
+ "default": 10
+ },
+ {
+ "name": "property4",
+ "type": [
+ "double",
+ "null"
+ ],
+ "doc": "",
+ "default": 8.25
+ }
+ ]
+ }
+ ],
+ "doc": "",
+ "default": null
+ },
+ {
+ "name": "field2",
+ "type": [
+ "string",
+ "null"
+ ],
+ "doc": "",
+ "default": "property2_defaultvalue"
+ },
+ {
+ "name": "field3",
+ "type": [
+ "string",
+ "null"
+ ],
+ "doc": "",
+ "default": "N"
+ },
+ {
+ "name": "field4",
+ "type": [
+ "long",
+ "null"
+ ],
+ "doc": "",
+ "default": 40
+ },
+ {
+ "name": "field5",
+ "type": [
+ "long",
+ "null"
+ ],
+ "doc": "",
+ "default": 50
+ },
+ {
+ "name": "field6",
+ "type": [
+ "long",
+ "null"
+ ],
+ "doc": "",
+ "default": 60
+ },
+ {
+ "name": "field7",
+ "type": [
+ "double",
+ "null"
+ ],
+ "doc": "",
+ "default": 7.75
+ },
+ {
+ "name": "field8",
+ "type": [
+ "double",
+ "null"
+ ],
+ "doc": "",
+ "default": 8.375
+ },
+ {
+ "name": "field9",
+ "type": [
+ "boolean",
+ "null"
+ ],
+ "doc": "",
+ "default": true
+ },
+ {
+ "name": "field10",
+ "type": [
+ "boolean",
+ "null"
+ ],
+ "doc": "",
+ "default": false
+ }
+ ]
+}
diff --git
a/hudi-utilities/src/test/resources/schema-provider/json/not-null-default-value-schema/input.json
b/hudi-utilities/src/test/resources/schema-provider/json/not-null-default-value-schema/input.json
new file mode 100644
index 00000000000..74466123e6a
--- /dev/null
+++
b/hudi-utilities/src/test/resources/schema-provider/json/not-null-default-value-schema/input.json
@@ -0,0 +1,67 @@
+{
+ "title": "json schema with non null default values",
+ "type": "object",
+ "properties": {
+ "field1": {
+ "oneOf": [
+ {
+ "type": "object",
+ "properties": {
+ "property1": {
+ "type": "string",
+ "default": "\"property1_defaultvalue\""
+ },
+ "property2": {
+ "type": "number"
+ },
+ "property3": {
+ "type": "integer",
+ "default": "10"
+ },
+ "property4": {
+ "type": "number",
+ "default": "8.25"
+ }
+ }
+ },
+ {
+ "type": "null"
+ }
+ ]
+ },
+ "field2": {
+ "type": "string",
+ "default": "\"property2_defaultvalue\""
+ },
+ "field3": {
+ "oneOf":[{"type":"null"},{"type":"string","default":"\"N\""}]
+ },
+ "field4": {
+ "type": "integer",
+ "default": "40"
+ },
+ "field5": {
+ "type": "integer",
+ "default": 50
+ },
+ "field6": {
+ "oneOf":[{"type":"null"},{"type":"integer","default":"60"}]
+ },
+ "field7": {
+ "type": "number",
+ "default": "7.75"
+ },
+ "field8": {
+ "type": "number",
+ "default": 8.375
+ },
+ "field9": {
+ "type": "boolean",
+ "default": "true"
+ },
+ "field10": {
+ "type": "boolean",
+ "default": false
+ }
+ }
+}
diff --git
a/hudi-utilities/src/test/resources/schema-provider/json/schema-repeating-names/expected.json
b/hudi-utilities/src/test/resources/schema-provider/json/schema-repeating-names/expected.json
new file mode 100644
index 00000000000..29b76565746
--- /dev/null
+++
b/hudi-utilities/src/test/resources/schema-provider/json/schema-repeating-names/expected.json
@@ -0,0 +1,153 @@
+{
+ "type": "record",
+ "name": "schema_repeating_names",
+ "doc": "",
+ "fields": [
+ {
+ "name": "op",
+ "type": [
+ "null",
+ "string"
+ ],
+ "doc": "",
+ "default": null
+ },
+ {
+ "name": "before",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "ValueSchema",
+ "fields": [
+ {
+ "name": "fileName",
+ "type": [
+ "null",
+ "string"
+ ],
+ "doc": "",
+ "default": null
+ },
+ {
+ "name": "functionCode",
+ "type": [
+ "null",
+ "string"
+ ],
+ "doc": "",
+ "default": null
+ }
+ ]
+ }
+ ],
+ "doc": "",
+ "default": null
+ },
+ {
+ "name": "after",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "ValueSchema1",
+ "fields": [
+ {
+ "name": "fileName",
+ "type": [
+ "null",
+ "string"
+ ],
+ "doc": "",
+ "default": null
+ },
+ {
+ "name": "functionCode",
+ "type": [
+ "null",
+ "string"
+ ],
+ "doc": "",
+ "default": null
+ }
+ ]
+ }
+ ],
+ "doc": "",
+ "default": null
+ },
+ {
+ "name": "source",
+ "type": {
+ "type": "record",
+ "name": "io.debezium.connector.mysql.Source",
+ "fields": [
+ {
+ "name": "query",
+ "type": [
+ "null",
+ "string"
+ ],
+ "doc": "",
+ "default": null
+ },
+ {
+ "name": "thread",
+ "type": [
+ "null",
+ "long"
+ ],
+ "doc": "",
+ "default": null
+ },
+ {
+ "name": "version",
+ "type": [
+ "null",
+ "string"
+ ],
+ "doc": "",
+ "default": null
+ },
+ {
+ "name": "pos",
+ "type": [
+ "null",
+ "long"
+ ],
+ "doc": "",
+ "default": null
+ },
+ {
+ "name": "ts_ms",
+ "type": [
+ "null",
+ "long"
+ ],
+ "doc": "",
+ "default": null
+ }
+ ]
+ },
+ "doc": ""
+ },
+ {
+ "name": "program",
+ "type": [
+ "null",
+ "string"
+ ],
+ "doc": "",
+ "default": null
+ },
+ {
+ "name": "ts_ms",
+ "type": [
+ "null",
+ "long"
+ ],
+ "doc": "",
+ "default": null
+ }
+ ]
+}
diff --git
a/hudi-utilities/src/test/resources/schema-provider/json/schema-repeating-names/input.json
b/hudi-utilities/src/test/resources/schema-provider/json/schema-repeating-names/input.json
new file mode 100644
index 00000000000..a4165dc4ffa
--- /dev/null
+++
b/hudi-utilities/src/test/resources/schema-provider/json/schema-repeating-names/input.json
@@ -0,0 +1,124 @@
+{
+ "type": "object",
+ "title": "schema_repeating_names",
+ "properties": {
+ "op": {
+ "type": "string"
+ },
+ "before": {
+ "oneOf": [
+ {
+ "type": "null"
+ },
+ {
+ "type": "object",
+ "title": "ValueSchema",
+ "properties": {
+ "fileName": {
+ "oneOf": [
+ {
+ "type": "null"
+ },
+ {
+ "type": "string"
+ }
+ ]
+ },
+ "functionCode": {
+ "oneOf": [
+ {
+ "type": "null"
+ },
+ {
+ "type": "string"
+ }
+ ]
+ }
+ }
+ }
+ ]
+ },
+ "after": {
+ "oneOf": [
+ {
+ "type": "null"
+ },
+ {
+ "type": "object",
+ "title": "ValueSchema",
+ "properties": {
+ "fileName": {
+ "oneOf": [
+ {
+ "type": "null"
+ },
+ {
+ "type": "string"
+ }
+ ]
+ },
+ "functionCode": {
+ "oneOf": [
+ {
+ "type": "null"
+ },
+ {
+ "type": "string"
+ }
+ ]
+ }
+ }
+ }
+ ]
+ },
+ "source": {
+ "type": "object",
+ "title": "io.debezium.connector.mysql.Source",
+ "properties": {
+ "query": {
+ "oneOf": [
+ {
+ "type": "null"
+ },
+ {
+ "type": "string"
+ }
+ ]
+ },
+ "thread": {
+ "oneOf": [
+ {
+ "type": "null"
+ },
+ {
+ "type": "integer"
+ }
+ ]
+ },
+ "version": {
+ "type": "string"
+ },
+ "pos": {
+ "type": "integer"
+ },
+ "ts_ms": {
+ "type": "integer",
+ "connect.type": "int64"
+ }
+ }
+ },
+ "program": {
+ "type": "string"
+ },
+ "ts_ms": {
+ "oneOf": [
+ {
+ "type": "null"
+ },
+ {
+ "type": "integer"
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git a/packaging/hudi-utilities-bundle/pom.xml
b/packaging/hudi-utilities-bundle/pom.xml
index 9d56415f376..97c82850155 100644
--- a/packaging/hudi-utilities-bundle/pom.xml
+++ b/packaging/hudi-utilities-bundle/pom.xml
@@ -129,6 +129,7 @@
<include>com.twitter:chill-protobuf</include>
<include>io.confluent:kafka-avro-serializer</include>
<include>io.confluent:kafka-schema-serializer</include>
+ <include>io.confluent:kafka-json-schema-serializer</include>
<include>io.confluent:common-config</include>
<include>io.confluent:common-utils</include>
<include>io.confluent:kafka-schema-registry-client</include>
diff --git a/pom.xml b/pom.xml
index f1e87dfa888..794de9ea118 100644
--- a/pom.xml
+++ b/pom.xml
@@ -102,6 +102,8 @@
<fasterxml.jackson.dataformat.yaml.version>${fasterxml.spark3.version}</fasterxml.jackson.dataformat.yaml.version>
<kafka.version>2.0.0</kafka.version>
<pulsar.version>3.0.2</pulsar.version>
+ <kafka.connect.api.version>2.5.0</kafka.connect.api.version>
+ <kafka.spark3.version>2.8.2</kafka.spark3.version>
<pulsar.spark.version>${pulsar.spark.scala12.version}</pulsar.spark.version>
<pulsar.spark.scala12.version>3.1.1.4</pulsar.spark.scala12.version>
<pulsar.spark.scala13.version>3.4.1.1</pulsar.spark.scala13.version>
@@ -1305,6 +1307,16 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>connect-api</artifactId>
+ <version>${kafka.connect.api.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-streams-test-utils</artifactId>
+ <version>3.4.0</version>
+ </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>