This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b359f76e8c5 [HUDI-9061] JSON to AVRO schema converter (#12864)
b359f76e8c5 is described below

commit b359f76e8c5f67bc64a5eb3fb7561aaf87aeba47
Author: Davis-Zhang-Onehouse 
<[email protected]>
AuthorDate: Mon Feb 24 19:35:54 2025 -0800

    [HUDI-9061] JSON to AVRO schema converter (#12864)
    
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../common/testutils/HoodieTestDataGenerator.java  |   7 +-
 hudi-kafka-connect/pom.xml                         |   5 +-
 hudi-utilities/pom.xml                             |  17 +-
 .../converter/JsonToAvroSchemaConverter.java       | 464 ++++++++++++++++-----
 .../converter/JsonToAvroSchemaConverterConfig.java |  51 +++
 .../hudi/utilities/sources/JsonKafkaSource.java    |  73 +++-
 .../converter/TestJsonToAvroSchemaConverter.java   |  58 ++-
 .../utilities/sources/TestJsonKafkaSource.java     | 124 +++++-
 .../json/array-with-item-type-union/expected.json  |  78 ++++
 .../json/array-with-item-type-union/input.json     |  93 +++++
 .../json/complex-json-union-types/expected.json    |  66 +++
 .../json/complex-json-union-types/input.json       |  42 ++
 .../expected_no_stripping_quotes.json              | 138 ++++++
 .../expected_stripping_quotes.json                 | 138 ++++++
 .../json/not-null-default-value-schema/input.json  |  67 +++
 .../json/schema-repeating-names/expected.json      | 153 +++++++
 .../json/schema-repeating-names/input.json         | 124 ++++++
 packaging/hudi-utilities-bundle/pom.xml            |   1 +
 pom.xml                                            |  12 +
 19 files changed, 1557 insertions(+), 154 deletions(-)

diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index 6c1d526bd8e..4122ab452ca 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -147,7 +147,12 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
 
   public static final String TRIP_NESTED_EXAMPLE_SCHEMA =
       TRIP_SCHEMA_PREFIX + FARE_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX;
-
+  public static final String TRIP_ENCODED_DECIMAL_SCHEMA = 
"{\"type\":\"record\",\"name\":\"tripUberRec\",\"fields\":["
+      + 
"{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},"
+      + 
"{\"name\":\"decfield\",\"type\":{\"type\":\"bytes\",\"name\":\"abc\",\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},"
+      + 
"{\"name\":\"lowprecision\",\"type\":{\"type\":\"bytes\",\"name\":\"def\",\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}},"
+      + 
"{\"name\":\"highprecision\",\"type\":{\"type\":\"bytes\",\"name\":\"ghi\",\"logicalType\":\"decimal\",\"precision\":32,\"scale\":12}},"
+      + 
"{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\":
 \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}";
   public static final String TRIP_SCHEMA = 
"{\"type\":\"record\",\"name\":\"tripUberRec\",\"fields\":["
       + 
"{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},"
       + 
"{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\":
 \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}";
diff --git a/hudi-kafka-connect/pom.xml b/hudi-kafka-connect/pom.xml
index 68844d6a3e4..5ba03e20267 100644
--- a/hudi-kafka-connect/pom.xml
+++ b/hudi-kafka-connect/pom.xml
@@ -30,7 +30,6 @@
 
     <properties>
         <main.basedir>${project.parent.basedir}</main.basedir>
-        <connect.api.version>2.5.0</connect.api.version>
     </properties>
 
     <build>
@@ -81,13 +80,13 @@
         <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>connect-api</artifactId>
-            <version>${connect.api.version}</version>
+            <version>${kafka.connect.api.version}</version>
             <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>connect-json</artifactId>
-            <version>${connect.api.version}</version>
+            <version>${kafka.connect.api.version}</version>
             <scope>provided</scope>
         </dependency>
 
diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml
index 3494605092c..669385e3901 100644
--- a/hudi-utilities/pom.xml
+++ b/hudi-utilities/pom.xml
@@ -197,6 +197,10 @@
       <artifactId>kafka-clients</artifactId>
       <version>${kafka.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>connect-api</artifactId>
+    </dependency>
 
     <!-- Pulsar -->
     <dependency>
@@ -369,7 +373,6 @@
     <dependency>
       <groupId>io.confluent</groupId>
       <artifactId>common-utils</artifactId>
-      <version>${confluent.version}</version>
     </dependency>
     <dependency>
       <groupId>io.confluent</groupId>
@@ -379,13 +382,19 @@
       <groupId>io.confluent</groupId>
       <artifactId>kafka-protobuf-serializer</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.kjetland</groupId>
+      
<artifactId>mbknor-jackson-jsonschema_${scala.binary.version}</artifactId>
+    </dependency>
     <dependency>
       <groupId>io.confluent</groupId>
       <artifactId>kafka-json-schema-serializer</artifactId>
     </dependency>
+
     <dependency>
-      <groupId>com.kjetland</groupId>
-      
<artifactId>mbknor-jackson-jsonschema_${scala.binary.version}</artifactId>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-streams-test-utils</artifactId>
+      <scope>test</scope>
     </dependency>
 
     <!-- Httpcomponents -->
@@ -507,7 +516,7 @@
       <artifactId>sqs</artifactId>
       <version>${aws.sdk.version}</version>
     </dependency>
-    
+
     <!-- Hive - Test -->
     <dependency>
       <groupId>${hive.groupid}</groupId>
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/JsonToAvroSchemaConverter.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/JsonToAvroSchemaConverter.java
index cb9a4eb63d3..46d6a2567b5 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/JsonToAvroSchemaConverter.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/JsonToAvroSchemaConverter.java
@@ -22,16 +22,22 @@ package org.apache.hudi.utilities.schema.converter;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.JsonUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.BooleanNode;
+import com.fasterxml.jackson.databind.node.DoubleNode;
+import com.fasterxml.jackson.databind.node.LongNode;
 import com.fasterxml.jackson.databind.node.NullNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.fasterxml.jackson.databind.node.TextNode;
 import io.confluent.kafka.schemaregistry.ParsedSchema;
 import io.confluent.kafka.schemaregistry.json.JsonSchema;
+import org.apache.kafka.connect.data.Decimal;
 
 import java.io.IOException;
 import java.net.URI;
@@ -42,171 +48,400 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.apache.hudi.common.util.StringUtils.getSuffixBy;
-import static org.apache.hudi.common.util.StringUtils.removeSuffixBy;
+import static 
org.apache.hudi.utilities.schema.converter.JsonToAvroSchemaConverterConfig.CONVERT_DEFAULT_VALUE_TYPE;
+import static 
org.apache.hudi.utilities.schema.converter.JsonToAvroSchemaConverterConfig.STRIP_DEFAULT_VALUE_QUOTES;
 
+/**
+ * Converts a JSON schema to an Avro schema.
+ */
 public class JsonToAvroSchemaConverter implements 
SchemaRegistryProvider.SchemaConverter {
+  private static final String CONNECT_PARAMETERS = "connect.parameters";
+  private static final String CONNECT_DECIMAL_PRECISION = 
"connect.decimal.precision";
+  private static final String CONNECT_TYPE = "connect.type";
 
   private static final ObjectMapper MAPPER = JsonUtils.getObjectMapper();
-  private static final Map<String, String> JSON_TO_AVRO_TYPE = Stream.of(new 
String[][] {
-      {"string", "string"},
-      {"null", "null"},
-      {"boolean", "boolean"},
-      {"integer", "long"},
-      {"number", "double"}
-  }).collect(Collectors.collectingAndThen(Collectors.toMap(p -> p[0], p -> 
p[1]), Collections::<String, String>unmodifiableMap));
+  private static final Map<String, String> JSON_TO_AVRO_TYPE =
+      Stream.of(
+              new String[][] {
+                  {"string", "string"},
+                  {"null", "null"},
+                  {"boolean", "boolean"},
+                  {"integer", "long"},
+                  {"number", "double"}
+              })
+          .collect(
+              Collectors.collectingAndThen(
+                  Collectors.toMap(p -> p[0], p -> p[1]),
+                  Collections::<String, String>unmodifiableMap));
   private static final Pattern SYMBOL_REGEX = 
Pattern.compile("^[A-Za-z_][A-Za-z0-9_]*$");
+  private static final String LONG_TYPE = "long";
+  private static final String DOUBLE_TYPE = "double";
+  private static final String BOOLEAN_TYPE = "boolean";
+  private static final String STRING_TYPE = "string";
+  private static final String DEFAULT_FIELD = "default";
+  private static final String TYPE = "type";
+  private static final String TITLE = "title";
+  private static final String RECORD = "record";
+  private static final String ARRAY = "array";
+
+  private final boolean convertDefaultValueType;
+  private final boolean stripDefaultValueQuotes;
 
-  public JsonToAvroSchemaConverter(TypedProperties properties) {
-    // properties unused in this converter
+  public JsonToAvroSchemaConverter(TypedProperties config) {
+    this.convertDefaultValueType = ConfigUtils.getBooleanWithAltKeys(config, 
CONVERT_DEFAULT_VALUE_TYPE);
+    this.stripDefaultValueQuotes = ConfigUtils.getBooleanWithAltKeys(config, 
STRIP_DEFAULT_VALUE_QUOTES);
   }
 
-  @Override
   public String convert(ParsedSchema parsedSchema) throws IOException {
     JsonSchema jsonSchema = (JsonSchema) parsedSchema;
     JsonNode jsonNode = MAPPER.readTree(jsonSchema.canonicalString());
-    ObjectNode avroRecord = MAPPER.createObjectNode()
-        .put("type", "record")
-        .put("name", getAvroSchemaRecordName(jsonNode))
-        .put("doc", getAvroDoc(jsonNode));
+    ObjectNode avroRecord = convertJsonNodeToAvroNode(jsonNode, new 
AtomicInteger(1), new HashSet<>());
+    return avroRecord.toString();
+  }
+
+  private ObjectNode convertJsonNodeToAvroNode(JsonNode jsonNode, 
AtomicInteger schemaCounter, Set<String> seenNames) {
+    ObjectNode avroRecord =
+        MAPPER
+            .createObjectNode()
+            .put(TYPE, RECORD)
+            .put("name", getAvroSchemaRecordName(jsonNode))
+            .put("doc", getAvroDoc(jsonNode));
     Option<String> namespace = getAvroSchemaRecordNamespace(jsonNode);
     if (namespace.isPresent()) {
       avroRecord.put("namespace", namespace.get());
     }
     if (jsonNode.hasNonNull("properties")) {
-      avroRecord.set("fields", convertProperties(jsonNode.get("properties"), 
getRequired(jsonNode)));
+      avroRecord.set(
+          "fields", convertProperties(jsonNode.get("properties"), 
getRequired(jsonNode), schemaCounter, seenNames));
     } else {
       avroRecord.set("fields", MAPPER.createArrayNode());
     }
-    return avroRecord.toString();
+    return avroRecord;
   }
 
-  private static ArrayNode convertProperties(JsonNode jsonProperties, 
Set<String> required) {
-    List<JsonNode> avroFields = new ArrayList<>(jsonProperties.size());
-    jsonProperties.fieldNames().forEachRemaining(name ->
-        avroFields.add(tryConvertNestedProperty(name, jsonProperties.get(name))
-            .or(() -> tryConvertArrayProperty(name, jsonProperties.get(name)))
-            .or(() -> tryConvertEnumProperty(name, jsonProperties.get(name)))
-            .orElseGet(() -> convertProperty(name, jsonProperties.get(name), 
required.contains(name)))));
+  private ArrayNode convertProperties(JsonNode jsonProperties, Set<String> 
required, AtomicInteger schemaCounter,
+                                      Set<String> seenNames) {
+    List<JsonNode> avroFields = new ArrayList<>();
+    jsonProperties
+        .fieldNames()
+        .forEachRemaining(
+            name -> {
+              avroFields.add(
+                  tryConvertNestedProperty(name, jsonProperties.get(name), 
schemaCounter, seenNames)
+                      .or(() -> tryConvertArrayProperty(name, 
jsonProperties.get(name), schemaCounter, seenNames))
+                      .or(() -> tryConvertEnumProperty(name, 
jsonProperties.get(name), schemaCounter, seenNames))
+                      .orElseGet(() ->
+                          convertProperty(
+                              name, jsonProperties.get(name), 
required.contains(name), schemaCounter, seenNames, false)));
+            });
+
     return MAPPER.createArrayNode().addAll(avroFields);
   }
 
-  private static Option<JsonNode> tryConvertNestedProperty(String name, 
JsonNode jsonProperty) {
+  private Option<JsonNode> tryConvertNestedProperty(String name, JsonNode 
jsonProperty,
+                                                    AtomicInteger 
schemaCounter, Set<String> seenNames) {
     if (!isJsonNestedType(jsonProperty)) {
       return Option.empty();
     }
-    JsonNode avroNode = MAPPER.createObjectNode()
-        .put("name", sanitizeAsAvroName(name))
-        .put("doc", getAvroDoc(jsonProperty))
-        .set("type", MAPPER.createObjectNode()
-            .put("type", "record")
-            .put("name", getAvroTypeName(jsonProperty, name))
-            .set("fields", convertProperties(jsonProperty.get("properties"), 
getRequired(jsonProperty))));
+    JsonNode avroNode =
+        MAPPER
+            .createObjectNode()
+            .put("name", sanitizeAsAvroName(name))
+            .put("doc", getAvroDoc(jsonProperty))
+            .set(
+                TYPE,
+                MAPPER
+                    .createObjectNode()
+                    .put(TYPE, RECORD)
+                    .put("name", getAvroTypeName(jsonProperty, name, 
schemaCounter, seenNames))
+                    .set(
+                        "fields",
+                        convertProperties(
+                            jsonProperty.get("properties"), 
getRequired(jsonProperty), schemaCounter, seenNames)));
 
     return Option.of(avroNode);
   }
 
-  private static Option<JsonNode> tryConvertArrayProperty(String name, 
JsonNode jsonProperty) {
+  private Option<JsonNode> tryConvertArrayProperty(String name, JsonNode 
jsonProperty,
+                                                   AtomicInteger 
schemaCounter, Set<String> seenNames) {
     if (!isJsonArrayType(jsonProperty)) {
       return Option.empty();
     }
     JsonNode avroItems;
     JsonNode jsonItems = jsonProperty.get("items");
-    String itemName = getAvroTypeName(jsonItems, name) + "_child";
+    String itemName = getAvroTypeName(jsonItems, name, schemaCounter, 
seenNames) + "_child";
     if (isJsonNestedType(jsonItems)) {
-      avroItems = MAPPER.createObjectNode()
-          .put("type", "record")
-          .put("name", itemName)
-          .set("fields", convertProperties(jsonItems.get("properties"), 
getRequired(jsonItems)));
+      avroItems =
+          MAPPER
+              .createObjectNode()
+              .put(TYPE, RECORD)
+              .put("name", itemName)
+              .set(
+                  "fields", convertProperties(jsonItems.get("properties"), 
getRequired(jsonItems),
+                      schemaCounter, seenNames));
     } else {
-      avroItems = convertProperty(itemName, jsonItems, true);
+      avroItems = convertProperty(itemName, jsonItems, true, schemaCounter, 
seenNames, true);
     }
-    JsonNode avroNode = MAPPER.createObjectNode()
-        .put("name", sanitizeAsAvroName(name))
-        .put("doc", getAvroDoc(jsonProperty))
-        .set("type", MAPPER.createObjectNode()
-            .put("type", "array")
-            .set("items", avroItems));
+    JsonNode avroNode =
+        MAPPER
+            .createObjectNode()
+            .put("name", sanitizeAsAvroName(name))
+            .put("doc", getAvroDoc(jsonProperty))
+            .set(TYPE, MAPPER.createObjectNode().put(TYPE, ARRAY).set("items", 
avroItems));
 
     return Option.of(avroNode);
   }
 
-  private static Option<JsonNode> tryConvertEnumProperty(String name, JsonNode 
jsonProperty) {
+  private static Option<JsonNode> tryConvertEnumProperty(String name, JsonNode 
jsonProperty,
+                                                         AtomicInteger 
schemaCounter, Set<String> seenNames) {
     if (!isJsonEnumType(jsonProperty)) {
       return Option.empty();
     }
     List<String> enums = new ArrayList<>();
     jsonProperty.get("enum").iterator().forEachRemaining(e -> 
enums.add(e.asText()));
-    JsonNode avroType = enums.stream().allMatch(e -> 
SYMBOL_REGEX.matcher(e).matches())
-        ? MAPPER.createObjectNode()
-        .put("type", "enum")
-        .put("name", getAvroTypeName(jsonProperty, name))
-        .set("symbols", jsonProperty.get("enum"))
-        : TextNode.valueOf("string");
-    JsonNode avroNode = MAPPER.createObjectNode()
-        .put("name", sanitizeAsAvroName(name))
-        .put("doc", getAvroDoc(jsonProperty))
-        .set("type", avroType);
+    JsonNode avroType =
+        enums.stream().allMatch(e -> SYMBOL_REGEX.matcher(e).matches())
+            ? MAPPER
+            .createObjectNode()
+            .put(TYPE, "enum")
+            .put("name", getAvroTypeName(jsonProperty, name, schemaCounter, 
seenNames))
+            .set("symbols", jsonProperty.get("enum"))
+            : TextNode.valueOf("string");
+    JsonNode avroNode =
+        MAPPER
+            .createObjectNode()
+            .put("name", sanitizeAsAvroName(name))
+            .put("doc", getAvroDoc(jsonProperty))
+            .set(TYPE, avroType);
 
     return Option.of(avroNode);
   }
 
-  private static JsonNode convertProperty(String name, JsonNode jsonProperty, 
boolean isRequired) {
-    ObjectNode avroNode = MAPPER.createObjectNode()
-        .put("name", sanitizeAsAvroName(name))
-        .put("doc", getAvroDoc(jsonProperty));
+  private JsonNode convertProperty(String name, JsonNode jsonProperty, boolean 
isRequired,
+                                   AtomicInteger schemaCounter, Set<String> 
seenNames,
+                                   boolean isArrayType) {
+    ObjectNode avroNode =
+        MAPPER
+            .createObjectNode()
+            .put("name", sanitizeAsAvroName(name))
+            .put("doc", getAvroDoc(jsonProperty));
 
-    // infer `default`
     boolean nullable = !isRequired;
-    if (jsonProperty.has("default")) {
-      avroNode.set("default", jsonProperty.get("default"));
-    } else if (nullable) {
-      avroNode.set("default", NullNode.getInstance());
-    }
 
     // infer `types`
-    Set<String> avroTypeSet = new HashSet<>();
+    Set<String> avroSimpleTypeSet = new HashSet<>();
+    List<JsonNode> defaultValueList = new ArrayList<>();
+    List<JsonNode> avroComplexTypeSet = new ArrayList<>();
+    boolean unionType = false;
     if (jsonProperty.hasNonNull("oneOf") || jsonProperty.hasNonNull("allOf")) {
+      unionType = true;
       // prefer to look for `oneOf` and `allOf` for types
       Option<JsonNode> oneOfTypes = 
Option.ofNullable(jsonProperty.get("oneOf"));
-      if (oneOfTypes.isPresent()) {
-        oneOfTypes.get().elements().forEachRemaining(e -> 
avroTypeSet.add(JSON_TO_AVRO_TYPE.get(e.get("type").asText())));
-      }
+      Pair<Pair<Set<String>, List<JsonNode>>, List<JsonNode>> allOneOfTypes =
+          getAllTypesFromOneOfAllOfTypes(oneOfTypes, name, schemaCounter, 
seenNames);
+      avroSimpleTypeSet.addAll(allOneOfTypes.getLeft().getLeft());
+      defaultValueList.addAll(allOneOfTypes.getLeft().getRight());
+      avroComplexTypeSet.addAll(allOneOfTypes.getRight());
+
       Option<JsonNode> allOfTypes = 
Option.ofNullable(jsonProperty.get("allOf"));
-      if (allOfTypes.isPresent()) {
-        allOfTypes.get().elements().forEachRemaining(e -> 
avroTypeSet.add(JSON_TO_AVRO_TYPE.get(e.get("type").asText())));
-      }
-    } else if (jsonProperty.has("type")) {
-      // fall back to `type` parameter
-      JsonNode jsonType = jsonProperty.get("type");
-      if (jsonType.isArray()) {
-        jsonType.elements().forEachRemaining(e -> 
avroTypeSet.add(JSON_TO_AVRO_TYPE.get(e.asText())));
-      } else {
-        avroTypeSet.add(JSON_TO_AVRO_TYPE.get(jsonType.asText()));
+      Pair<Pair<Set<String>, List<JsonNode>>, List<JsonNode>> allAllOfTypes =
+          getAllTypesFromOneOfAllOfTypes(allOfTypes, name, schemaCounter, 
seenNames);
+      avroSimpleTypeSet.addAll(allAllOfTypes.getLeft().getLeft());
+      avroComplexTypeSet.addAll(allAllOfTypes.getRight());
+    } else if (jsonProperty.has(TYPE)) {
+      Pair<Set<String>, Option<JsonNode>> types = 
getPrimitiveTypeFromType(jsonProperty);
+      avroSimpleTypeSet.addAll(types.getLeft());
+      if (types.getRight().isPresent()) {
+        avroComplexTypeSet.add(types.getRight().get());
       }
     }
     List<String> avroTypes = new ArrayList<>();
-    if (nullable || avroTypeSet.contains("null")) {
+    if (nullable || avroSimpleTypeSet.contains("null")) {
       avroTypes.add("null");
     }
-    avroTypeSet.remove("null");
-    avroTypes.addAll(avroTypeSet);
-    avroNode.set("type", avroTypes.size() > 1
-        ? 
MAPPER.createArrayNode().addAll(avroTypes.stream().map(TextNode::valueOf).collect(Collectors.toList()))
-        : TextNode.valueOf(avroTypes.get(0)));
-    return avroNode;
+    avroSimpleTypeSet.remove("null");
+    avroTypes.addAll(avroSimpleTypeSet);
+
+    // infer `default`
+    // Either there is explicit default value defined for a single simple type,
+    // e.g., "field":{"type":"integer","default":50},
+    // or we take the default value from the single simple type in "oneOf" 
type,
+    // e.g., "oneOf":[{"type":"null"},{"type":"integer","default":"60"}],
+    // in which the default value is 60.
+    JsonNode defaultNode = jsonProperty.has(DEFAULT_FIELD) ? 
jsonProperty.get(DEFAULT_FIELD)
+        : (avroComplexTypeSet.isEmpty() && avroSimpleTypeSet.size() == 1 && 
!defaultValueList.isEmpty())
+        ? defaultValueList.get(0) : NullNode.getInstance();
+    if (jsonProperty.has(DEFAULT_FIELD)
+        || (avroComplexTypeSet.isEmpty() && avroSimpleTypeSet.size() == 1 && 
!defaultValueList.isEmpty())) {
+      if (this.convertDefaultValueType && avroComplexTypeSet.isEmpty() && 
avroSimpleTypeSet.size() == 1) {
+        String defaultType = avroSimpleTypeSet.stream().findFirst().get();
+        switch (defaultType) {
+          case LONG_TYPE:
+            defaultNode = 
LongNode.valueOf(Long.parseLong(defaultNode.asText()));
+            break;
+          case DOUBLE_TYPE:
+            defaultNode = 
DoubleNode.valueOf(Double.parseDouble(defaultNode.asText()));
+            break;
+          case BOOLEAN_TYPE:
+            defaultNode = 
BooleanNode.valueOf(Boolean.parseBoolean(defaultNode.asText()));
+            break;
+          case STRING_TYPE:
+            if (this.stripDefaultValueQuotes) {
+              defaultNode = 
TextNode.valueOf(stripQuotesFromStringValue(defaultNode.asText()));
+            }
+            break;
+          default:
+        }
+      }
+      avroNode.set(DEFAULT_FIELD, defaultNode);
+    } else if (nullable) {
+      avroNode.set(DEFAULT_FIELD, defaultNode);
+    }
+    avroTypes = arrangeTypeOrderOnDefault(avroTypes, defaultNode);
+    List<JsonNode> allTypes =
+        avroTypes.stream().map(TextNode::valueOf).collect(Collectors.toList());
+    allTypes.addAll(avroComplexTypeSet);
+    ArrayNode typeValue = MAPPER.createArrayNode().addAll(allTypes);
+    avroNode.set(
+        TYPE, allTypes.size() > 1 ? typeValue : allTypes.get(0));
+    return unionType && isArrayType ? typeValue : avroNode;
+  }
+
+  protected Pair<Set<String>, Option<JsonNode>> 
getPrimitiveTypeFromType(JsonNode jsonNode) {
+    if (!jsonNode.has(TYPE)) {
+      return Pair.of(Collections.emptySet(), Option.empty());
+    }
+
+    JsonNode typeNode = jsonNode.get(TYPE);
+    if (!typeNode.isArray() && typeNode.asText().equals("number")
+        && jsonNode.has(TITLE) && 
Decimal.LOGICAL_NAME.equals(jsonNode.get(TITLE).asText())) {
+      return convertKafkaConnectDecimal(jsonNode);
+    }
+
+    Set<String> avroSimpleTypeSet = new HashSet<>();
+    // fall back to `type` parameter
+    String jsonType = jsonNode.get(TYPE).asText();
+    if (!jsonType.equals("object") && !jsonType.equals(ARRAY)) {
+      avroSimpleTypeSet.add(JSON_TO_AVRO_TYPE.get(jsonType));
+    }
+    return Pair.of(avroSimpleTypeSet, Option.empty());
+  }
+
+  private static Pair<Set<String>, Option<JsonNode>> 
convertKafkaConnectDecimal(JsonNode jsonNode) {
+    String precision;
+    String scale = "0";
+    if (jsonNode.has(CONNECT_PARAMETERS)) {
+      JsonNode parameters = jsonNode.get(CONNECT_PARAMETERS);
+      if (parameters.has(CONNECT_DECIMAL_PRECISION)) {
+        precision = parameters.get(CONNECT_DECIMAL_PRECISION).asText();
+      } else {
+        throw new IllegalArgumentException("Missing " + 
CONNECT_DECIMAL_PRECISION + " from properties in decimal type");
+      }
+      if (parameters.has(Decimal.SCALE_FIELD)) {
+        scale = parameters.get(Decimal.SCALE_FIELD).asText();
+      }
+    } else {
+      throw new IllegalArgumentException("Missing " + CONNECT_PARAMETERS + " 
from decimal type in json schema");
+    }
+
+    if (jsonNode.has(CONNECT_TYPE)) {
+      String connectType = jsonNode.get(CONNECT_TYPE).asText();
+      if (!connectType.equals("bytes")) {
+        throw new IllegalArgumentException(connectType + " is not a supported 
type for decimal");
+      }
+    }
+
+    JsonNode avroNode =
+        MAPPER
+            .createObjectNode()
+            .put(TYPE, "bytes")
+            .put("logicalType", "decimal")
+            .put("precision", Integer.valueOf(precision))
+            .put("scale", Integer.valueOf(scale));
+    return Pair.of(Collections.emptySet(), Option.of(avroNode));
+  }
+
+  private Pair<Pair<Set<String>, List<JsonNode>>, List<JsonNode>> 
getAllTypesFromOneOfAllOfTypes(Option<JsonNode> jsonUnionType, String name, 
AtomicInteger schemaCounter, Set<String> seenNames) {
+    Set<String> avroSimpleTypeSet = new HashSet<>();
+    List<JsonNode> defaultValueList = new ArrayList<>();
+    List<JsonNode> avroComplexTypeSet = new ArrayList<>();
+    if (jsonUnionType.isPresent()) {
+      jsonUnionType
+          .get()
+          .elements()
+          .forEachRemaining(
+              e -> {
+                Pair<Set<String>, Option<JsonNode>> types = 
getPrimitiveTypeFromType(e);
+                if (types.getLeft().isEmpty() && 
!types.getRight().isPresent()) {
+                  if (isJsonNestedType(e)) {
+                    avroComplexTypeSet.add(tryConvertNestedProperty(name, e, 
schemaCounter, seenNames).get().get(TYPE));
+                  } else if (isJsonArrayType(e)) {
+                    avroComplexTypeSet.add(tryConvertArrayProperty(name, e, 
schemaCounter, seenNames).get().get(TYPE));
+                  } else if (isJsonEnumType(e)) {
+                    avroComplexTypeSet.add(tryConvertEnumProperty(name, e, 
schemaCounter, seenNames).get().get(TYPE));
+                  } else {
+                    throw new RuntimeException("unknown complex type 
encountered");
+                  }
+                } else {
+                  avroSimpleTypeSet.addAll(types.getLeft());
+                  if (types.getRight().isPresent()) {
+                    avroComplexTypeSet.add(types.getRight().get());
+                  }
+                  if (e.has(DEFAULT_FIELD)) {
+                    // This is only valid if one simple-type exists in the 
oneOf type
+                    defaultValueList.add(e.get(DEFAULT_FIELD));
+                  }
+                }
+              });
+    }
+    return Pair.of(Pair.of(avroSimpleTypeSet, defaultValueList), 
avroComplexTypeSet);
+  }
+
+  private static List<String> arrangeTypeOrderOnDefault(List<String> 
avroTypes, JsonNode defaultNode) {
+    // Nothing to be done as null is already the first one.
+    if (defaultNode == null || defaultNode.isNull()) {
+      return avroTypes;
+    }
+    Set<String> avroTypesSet = new HashSet<>(avroTypes);
+    if (defaultNode.isInt() || defaultNode.isBigInteger() || 
defaultNode.isIntegralNumber()) {
+      // Maps to Long Type in Avro. Place it first, defaulting to null if not 
found.
+      return modifyListOrderingBasedOnDefaultValue(avroTypes, avroTypesSet, 
LONG_TYPE);
+    } else if (defaultNode.isNumber() || defaultNode.isBigDecimal() || 
defaultNode.isDouble()
+        || defaultNode.isFloatingPointNumber()) {
+      return modifyListOrderingBasedOnDefaultValue(avroTypes, avroTypesSet, 
DOUBLE_TYPE);
+    } else if (defaultNode.isTextual()) {
+      return modifyListOrderingBasedOnDefaultValue(avroTypes, avroTypesSet, 
STRING_TYPE);
+    } else if (defaultNode.isBoolean()) {
+      return modifyListOrderingBasedOnDefaultValue(avroTypes, avroTypesSet, 
BOOLEAN_TYPE);
+    }
+    return avroTypes;
+  }
+
+  private static List<String> 
modifyListOrderingBasedOnDefaultValue(List<String> typeList,
+                                                                    
Set<String> avroTypesSet,
+                                                                    String 
type) {
+    List<String> modifiedAvroTypeList = new ArrayList<>();
+    if (avroTypesSet.contains(type)) {
+      modifiedAvroTypeList.add(type);
+      avroTypesSet.remove(type);
+      modifiedAvroTypeList.addAll(avroTypesSet);
+      return modifiedAvroTypeList;
+    }
+    // Return original list.
+    return typeList;
   }
 
   private static boolean isJsonNestedType(JsonNode jsonNode) {
-    return jsonNode.has("type") && 
Objects.equals(jsonNode.get("type").asText(), "object");
+    return jsonNode.has(TYPE) && Objects.equals(jsonNode.get(TYPE).asText(), 
"object");
   }
 
   private static boolean isJsonArrayType(JsonNode jsonNode) {
-    return jsonNode.has("type") && 
Objects.equals(jsonNode.get("type").asText(), "array");
+    return jsonNode.has(TYPE) && Objects.equals(jsonNode.get(TYPE).asText(), 
ARRAY);
   }
 
   private static boolean isJsonEnumType(JsonNode jsonNode) {
@@ -216,17 +451,18 @@ public class JsonToAvroSchemaConverter implements 
SchemaRegistryProvider.SchemaC
   private static Option<String> getAvroSchemaRecordNamespace(JsonNode 
jsonNode) {
     if (jsonNode.hasNonNull("$id")) {
       String host = URI.create(jsonNode.get("$id").asText()).getHost();
-      String avroNamespace = Stream.of(host.split("\\."))
-          .map(JsonToAvroSchemaConverter::sanitizeAsAvroName)
-          .collect(Collectors.joining("."));
+      String avroNamespace =
+          Stream.of(host.split("\\."))
+              .map(JsonToAvroSchemaConverter::sanitizeAsAvroName)
+              .collect(Collectors.joining("."));
       return Option.of(avroNamespace);
     }
     return Option.empty();
   }
 
   private static String getAvroSchemaRecordName(JsonNode jsonNode) {
-    if (jsonNode.hasNonNull("title")) {
-      return sanitizeAsAvroName(jsonNode.get("title").asText());
+    if (jsonNode.hasNonNull(TITLE)) {
+      return sanitizeAsAvroName(jsonNode.get(TITLE).asText());
     }
     if (jsonNode.hasNonNull("$id")) {
       // infer name from host: http://www.my-example.com => "my_example"
@@ -252,11 +488,45 @@ public class JsonToAvroSchemaConverter implements 
SchemaRegistryProvider.SchemaC
     return required;
   }
 
-  private static String getAvroTypeName(JsonNode jsonNode, String defaultName) 
{
-    return jsonNode.hasNonNull("title") ? jsonNode.get("title").asText() : 
defaultName;
+  private static String getAvroTypeName(JsonNode jsonNode, String defaultName,
+                                        AtomicInteger schemaCounter,
+                                        Set<String> seenNames) {
+    String typeName = jsonNode.hasNonNull(TITLE) ? jsonNode.get(TITLE).asText()
+        : defaultName;
+    if (!seenNames.contains(typeName)) {
+      seenNames.add(typeName);
+      return typeName;
+    }
+    String modifiedTypeName = typeName + schemaCounter.getAndIncrement();
+    seenNames.add(modifiedTypeName);
+    return modifiedTypeName;
   }
 
   private static String getAvroDoc(JsonNode jsonNode) {
     return jsonNode.hasNonNull("description") ? 
jsonNode.get("description").asText() : "";
   }
+
+  public static String getSuffixBy(String input, int ch) {
+    int i = input.lastIndexOf(ch);
+    if (i == -1) {
+      return input;
+    }
+    return input.substring(i);
+  }
+
+  public static String removeSuffixBy(String input, int ch) {
+    int i = input.lastIndexOf(ch);
+    if (i == -1) {
+      return input;
+    }
+    return input.substring(0, i);
+  }
+
+  public static String stripQuotesFromStringValue(String input) {
+    if (input != null && input.length() >= 2
+        && input.charAt(0) == '\"' && input.charAt(input.length() - 1) == 
'\"') {
+      return input.substring(1, input.length() - 1);
+    }
+    return input;
+  }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/JsonToAvroSchemaConverterConfig.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/JsonToAvroSchemaConverterConfig.java
new file mode 100644
index 00000000000..dc6a98f8306
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/JsonToAvroSchemaConverterConfig.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.schema.converter;
+
+import org.apache.hudi.common.config.ConfigProperty;
+
+/**
+ * Configs for `JsonToAvroSchemaConverter`
+ */
+public class JsonToAvroSchemaConverterConfig {
+  public static final ConfigProperty<Boolean> CONVERT_DEFAULT_VALUE_TYPE = 
ConfigProperty
+      
.key("hoodie.streamer.schemaconverter.json_to_avro.convert_default_value_type")
+      .defaultValue(true)
+      .sinceVersion("1.1.0")
+      .withDocumentation("In 
`org.apache.hudi.utilities.schema.converter.JsonToAvroSchemaConverter`, "
+          + "whether to automatically convert the default value based on the 
field type in the "
+          + "schema. For example, the field schema is "
+          + "`\"col1\":{\"type\":\"integer\",\"default\":\"0\"`, and in this 
case, the default "
+          + "value is a String in JSON. After converting the type of the 
default value from JSON, "
+          + "the converter can properly insert the default value to the Avro 
schema without "
+          + "hitting Avro schema validation error. This should be turned on by 
default.");
+
+  public static final ConfigProperty<Boolean> STRIP_DEFAULT_VALUE_QUOTES = 
ConfigProperty
+      
.key("hoodie.streamer.schemaconverter.json_to_avro.strip_default_value_quotes")
+      .defaultValue(true)
+      .sinceVersion("1.1.0")
+      .withDocumentation("In 
`org.apache.hudi.utilities.schema.converter.JsonToAvroSchemaConverter`, "
+          + "whether to automatically strip the surrounding quotes on the 
default value of the "
+          + "String-typed field in the schema. For example, the field schema 
is "
+          + "`\"col1\":{\"type\":\"string\",\"default\":\"\\\"abc\\\"\"`, and 
in this case, the "
+          + "default string value is surrounded by additional quotes (`\"`) in 
JSON. The converter "
+          + "can strip the quotes, if exist, as they are redundant before 
adding the default value "
+          + "to the Avro schema. This should be turned on by default.");
+}
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
index 6ce0a548ec3..7c47691ac11 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
@@ -18,11 +18,14 @@
 
 package org.apache.hudi.utilities.sources;
 
+import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.utilities.UtilHelpers;
 import org.apache.hudi.utilities.config.JsonKafkaPostProcessorConfig;
 import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException;
@@ -33,6 +36,7 @@ import 
org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor;
 import org.apache.hudi.utilities.streamer.DefaultStreamContext;
 import org.apache.hudi.utilities.streamer.StreamContext;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -61,6 +65,19 @@ import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SO
 public class JsonKafkaSource extends KafkaSource<JavaRDD<String>> {
   private static final Logger LOG = 
LoggerFactory.getLogger(JsonKafkaSource.class);
 
+  /**
+   * Configs specific to {@link JsonKafkaSource}.
+   */
+  public static class Config  {
+    public static final ConfigProperty<String> 
KAFKA_JSON_VALUE_DESERIALIZER_CLASS = ConfigProperty
+        .key("hoodie.deltastreamer.source.kafka.json.value.deserializer.class")
+        .defaultValue(StringDeserializer.class.getName())
+        .sinceVersion("1.1.0")
+        .withDocumentation("Kafka Json Payload Deserializer Class");
+  }
+
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
   public JsonKafkaSource(TypedProperties properties, JavaSparkContext 
sparkContext, SparkSession sparkSession,
                          SchemaProvider schemaProvider, HoodieIngestionMetrics 
metrics) {
     this(properties, sparkContext, sparkSession, metrics, new 
DefaultStreamContext(schemaProvider, Option.empty()));
@@ -69,48 +86,61 @@ public class JsonKafkaSource extends 
KafkaSource<JavaRDD<String>> {
   public JsonKafkaSource(TypedProperties properties, JavaSparkContext 
sparkContext, SparkSession sparkSession, HoodieIngestionMetrics metrics, 
StreamContext streamContext) {
     super(properties, sparkContext, sparkSession, SourceType.JSON, metrics,
         new 
DefaultStreamContext(UtilHelpers.getSchemaProviderForKafkaSource(streamContext.getSchemaProvider(),
 properties, sparkContext), streamContext.getSourceProfileSupplier()));
-    properties.put("key.deserializer", StringDeserializer.class.getName());
-    properties.put("value.deserializer", StringDeserializer.class.getName());
+    props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, 
StringDeserializer.class.getName());
+    props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, 
ConfigUtils.getStringWithAltKeys(props, 
Config.KAFKA_JSON_VALUE_DESERIALIZER_CLASS, true));
     this.offsetGen = new KafkaOffsetGen(props);
   }
 
   @Override
   protected JavaRDD<String> toBatch(OffsetRange[] offsetRanges) {
+    String deserializerClass = 
props.getString(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP);
     JavaRDD<ConsumerRecord<Object, Object>> kafkaRDD = 
KafkaUtils.createRDD(sparkContext,
             offsetGen.getKafkaParams(),
             offsetRanges,
             LocationStrategies.PreferConsistent())
-        .filter(x -> !StringUtils.isNullOrEmpty((String) x.value()));
-    return postProcess(maybeAppendKafkaOffsets(kafkaRDD));
+        .filter(x -> filterForNullValues(x.value(), deserializerClass));
+    return postProcess(maybeAppendKafkaOffsets(kafkaRDD, deserializerClass));
   }
 
-  protected JavaRDD<String> 
maybeAppendKafkaOffsets(JavaRDD<ConsumerRecord<Object, Object>> kafkaRDD) {
-    if (this.shouldAddOffsets) {
+  protected JavaRDD<String> 
maybeAppendKafkaOffsets(JavaRDD<ConsumerRecord<Object, Object>> kafkaRDD, 
String deserializerClass) {
+    if (shouldAddOffsets) {
       return kafkaRDD.mapPartitions(partitionIterator -> {
         TaskContext taskContext = TaskContext.get();
         LOG.info("Converting Kafka source objects to strings with stageId : 
{}, stage attempt no: {}, taskId : {}, task attempt no : {}, task attempt id : 
{} ",
             taskContext.stageId(), taskContext.stageAttemptNumber(), 
taskContext.partitionId(), taskContext.attemptNumber(),
             taskContext.taskAttemptId());
-        ObjectMapper objectMapper = new ObjectMapper();
         return new 
CloseableMappingIterator<>(ClosableIterator.wrap(partitionIterator), 
consumerRecord -> {
-          String recordValue = consumerRecord.value().toString();
-          String recordKey = StringUtils.objToString(consumerRecord.key());
+          String recordKey;
+          String record;
           try {
-            ObjectNode jsonNode = (ObjectNode) 
objectMapper.readTree(recordValue);
+            record = getValueAsString(consumerRecord.value(), 
deserializerClass);
+            recordKey = StringUtils.objToString(consumerRecord.key());
+          } catch (JsonProcessingException e) {
+            throw new HoodieException(e);
+          }
+          try {
+            ObjectNode jsonNode = (ObjectNode) OBJECT_MAPPER.readTree(record);
             jsonNode.put(KAFKA_SOURCE_OFFSET_COLUMN, consumerRecord.offset());
             jsonNode.put(KAFKA_SOURCE_PARTITION_COLUMN, 
consumerRecord.partition());
             jsonNode.put(KAFKA_SOURCE_TIMESTAMP_COLUMN, 
consumerRecord.timestamp());
             if (recordKey != null) {
               jsonNode.put(KAFKA_SOURCE_KEY_COLUMN, recordKey);
             }
-            return objectMapper.writeValueAsString(jsonNode);
+            return OBJECT_MAPPER.writeValueAsString(jsonNode);
           } catch (Throwable e) {
-            return recordValue;
+            return record;
           }
         });
       });
+    } else {
+      return kafkaRDD.map(consumerRecord -> {
+        try {
+          return getValueAsString(consumerRecord.value(), deserializerClass);
+        } catch (JsonProcessingException e) {
+          throw new HoodieException(e);
+        }
+      });
     }
-    return kafkaRDD.map(consumerRecord -> (String) consumerRecord.value());
   }
 
   private JavaRDD<String> postProcess(JavaRDD<String> jsonStringRDD) {
@@ -130,4 +160,21 @@ public class JsonKafkaSource extends 
KafkaSource<JavaRDD<String>> {
 
     return processor.process(jsonStringRDD);
   }
+
+  private static Boolean filterForNullValues(Object value, String 
valueDeserializerClass) {
+    if (value == null) {
+      return false;
+    }
+    if (valueDeserializerClass.equals(StringDeserializer.class.getName())) {
+      return StringUtils.nonEmpty((String) value);
+    }
+    return true;
+  }
+
+  private static String getValueAsString(Object value, String 
valueDeserializerClass) throws JsonProcessingException {
+    if (StringDeserializer.class.getName().equals(valueDeserializerClass)) {
+      return (String) value;
+    }
+    return OBJECT_MAPPER.writeValueAsString(value);
+  }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestJsonToAvroSchemaConverter.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestJsonToAvroSchemaConverter.java
index dd44704fb5d..6f5766e81dc 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestJsonToAvroSchemaConverter.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestJsonToAvroSchemaConverter.java
@@ -19,46 +19,74 @@
 
 package org.apache.hudi.utilities.schema.converter;
 
+import org.apache.hudi.common.config.TypedProperties;
+
 import io.confluent.kafka.schemaregistry.json.JsonSchema;
 import org.apache.avro.Schema;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.CsvSource;
 
 import java.io.IOException;
 
 import static org.apache.hudi.common.util.FileIOUtils.readAsUTFString;
+import static 
org.apache.hudi.utilities.schema.converter.JsonToAvroSchemaConverter.stripQuotesFromStringValue;
+import static 
org.apache.hudi.utilities.schema.converter.JsonToAvroSchemaConverterConfig.STRIP_DEFAULT_VALUE_QUOTES;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 
 class TestJsonToAvroSchemaConverter {
 
   @ParameterizedTest
-  @ValueSource(strings = {
-      "enum-properties",
-      "example-address",
-      "example-calendar",
-      "example-card",
-      "example-geographical-location",
-      "multiple-properties",
-      "nested-properties",
-      "single-properties"
+  @CsvSource({
+      "enum-properties,",
+      "example-address,",
+      "example-calendar,",
+      "example-card,",
+      "example-geographical-location,",
+      "multiple-properties,",
+      "nested-properties,",
+      "single-properties,",
+      "schema-repeating-names,",
+      "complex-json-union-types,",
+      "not-null-default-value-schema,_no_stripping_quotes",
+      "not-null-default-value-schema,_stripping_quotes",
+      "array-with-item-type-union,",
   })
-  void testConvertJsonSchemaToAvroSchema(String inputCase) throws IOException {
+  void testConvertJsonSchemaToAvroSchema(String inputCase, String 
avroSchemaFileSuffix) throws IOException {
     String jsonSchema = loadJsonSchema(inputCase);
-    String avroSchema = new JsonToAvroSchemaConverter(null).convert(new 
JsonSchema(jsonSchema));
+    TypedProperties config = new TypedProperties();
+    if ("_no_stripping_quotes".equals(avroSchemaFileSuffix)) {
+      config.put(STRIP_DEFAULT_VALUE_QUOTES.key(), "false");
+    }
+    String avroSchema = new JsonToAvroSchemaConverter(config).convert(new 
JsonSchema(jsonSchema));
     Schema schema = new Schema.Parser().parse(avroSchema);
-    Schema expected = new Schema.Parser().parse(loadAvroSchema(inputCase));
+    Schema expected = new Schema.Parser().parse(loadAvroSchema(inputCase, 
avroSchemaFileSuffix));
     assertEquals(expected, schema);
   }
 
+  @Test
+  void testStripQuotesFromStringValue() {
+    assertNull(stripQuotesFromStringValue(null));
+    assertEquals("", stripQuotesFromStringValue(""));
+    assertEquals("\"", stripQuotesFromStringValue("\""));
+    assertEquals("", stripQuotesFromStringValue("\"\""));
+    assertEquals("\"", stripQuotesFromStringValue("\"\"\""));
+    assertEquals("123", stripQuotesFromStringValue("123"));
+    assertEquals("123", stripQuotesFromStringValue("\"123\""));
+    assertEquals("x", stripQuotesFromStringValue("x"));
+  }
+
   private String loadJsonSchema(String inputCase) throws IOException {
     return readAsUTFString(getClass()
         .getClassLoader()
         
.getResourceAsStream(String.format("schema-provider/json/%s/input.json", 
inputCase)));
   }
 
-  private String loadAvroSchema(String inputCase) throws IOException {
+  private String loadAvroSchema(String inputCase, String avroSchemaFileSuffix) 
throws IOException {
     return readAsUTFString(getClass()
         .getClassLoader()
-        
.getResourceAsStream(String.format("schema-provider/json/%s/expected.json", 
inputCase)));
+        
.getResourceAsStream(String.format("schema-provider/json/%s/expected%s.json", 
inputCase,
+            avroSchemaFileSuffix == null ? "" : avroSchemaFileSuffix)));
   }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
index dcfc97e4eae..82bbd3a01c5 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.utilities.sources;
 
+import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.config.TypedProperties;
@@ -26,6 +27,7 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.InProcessTimeGenerator;
+import org.apache.hudi.common.testutils.RawTripTestPayload;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.utilities.UtilHelpers;
 import org.apache.hudi.utilities.config.HoodieStreamerConfig;
@@ -38,9 +40,15 @@ import org.apache.hudi.utilities.streamer.ErrorEvent;
 import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
@@ -50,13 +58,16 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.net.URL;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Properties;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
@@ -70,16 +81,19 @@ import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SO
 import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN;
 import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN;
 import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN;
+import static 
org.apache.hudi.utilities.sources.JsonKafkaSource.Config.KAFKA_JSON_VALUE_DESERIALIZER_CLASS;
 import static 
org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecords;
 import static 
org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecordsByPartitions;
 import static 
org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecordsByPartitionsWithNullKafkaKey;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * Tests against {@link JsonKafkaSource}.
  */
 public class TestJsonKafkaSource extends BaseTestKafkaSource {
   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final HoodieTestDataGenerator DATA_GENERATOR = new 
HoodieTestDataGenerator(1L);
   static final URL SCHEMA_FILE_URL = 
TestJsonKafkaSource.class.getClassLoader().getResource("streamer-config/source_short_trip_uber.avsc");
 
   @BeforeEach
@@ -136,6 +150,36 @@ public class TestJsonKafkaSource extends 
BaseTestKafkaSource {
     assertEquals(1000, fetch1.getBatch().get().count());
   }
 
+  @Test
+  public void testJsonKafkaSourceWithJsonSchemaDeserializer() {
+    // topic setup.
+    final String topic = TEST_TOPIC_PREFIX + 
"testJsonKafkaSourceWithJsonSchemaDeserializer";
+    testUtils.createTopic(topic, 2);
+    TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
+    props.put(KAFKA_JSON_VALUE_DESERIALIZER_CLASS.key(),
+        "io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer");
+    props.put("schema.registry.url", "mock://127.0.0.1:8081");
+
+    Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), 
schemaProvider, metrics);
+    SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
+
+    // 1. Extract without any checkpoint => get all the data, respecting 
sourceLimit
+    assertEquals(Option.empty(),
+        kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 
Long.MAX_VALUE).getBatch());
+    // Send  1000 non-null messages to Kafka
+    List<RawTripTestPayload> insertRecords = 
DATA_GENERATOR.generateInserts("000", 1000)
+        .stream()
+        .map(hr -> (RawTripTestPayload) 
hr.getData()).collect(Collectors.toList());
+    sendMessagesToKafkaWithJsonSchemaSerializer(topic, 2, insertRecords);
+    // send 200 null messages to Kafka
+    List<RawTripTestPayload> nullInsertedRecords = Arrays.asList(new 
RawTripTestPayload[200]);
+    sendMessagesToKafkaWithJsonSchemaSerializer(topic, 2, nullInsertedRecords);
+    InputBatch<JavaRDD<GenericRecord>> fetch1 =
+        kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
+    // Verify that messages with null values are filtered
+    assertEquals(1000, fetch1.getBatch().get().count());
+  }
+
   @Test
   public void testJsonKafkaSourceWithDefaultUpperCap() {
     // topic setup.
@@ -207,6 +251,18 @@ public class TestJsonKafkaSource extends 
BaseTestKafkaSource {
     assertEquals(Option.empty(), fetch6.getBatch());
   }
 
+  private static void verifyDecimalValue(List<GenericRecord> records, Schema 
schema, String fieldname) {
+    Schema fieldSchema = schema.getField(fieldname).schema();
+    LogicalTypes.Decimal decField = (LogicalTypes.Decimal) 
fieldSchema.getLogicalType();
+    double maxVal = Math.pow(10, decField.getPrecision() - 
decField.getScale());
+    double minVal = maxVal * 0.1;
+    for (GenericRecord record : records) {
+      BigDecimal dec = HoodieAvroUtils.convertBytesToBigDecimal(((ByteBuffer) 
record.get(fieldname)).array(), decField);
+      double doubleValue = dec.doubleValue();
+      assertTrue(doubleValue <= maxVal && doubleValue >= minVal);
+    }
+  }
+
   @Override
   protected void sendMessagesToKafka(String topic, int count, int 
numPartitions) {
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
@@ -239,7 +295,7 @@ public class TestJsonKafkaSource extends 
BaseTestKafkaSource {
 
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
-  public void testErrorEventsForDataInRowFormat(boolean persistSourceRdd) 
throws IOException {
+  public void testErrorEventsForDataInRowFormat(boolean persistSourceRdd) {
     // topic setup.
     final String topic = TEST_TOPIC_PREFIX + 
"testErrorEventsForDataInRowFormat_" + persistSourceRdd;
 
@@ -251,14 +307,14 @@ public class TestJsonKafkaSource extends 
BaseTestKafkaSource {
     topicPartitions.add(topicPartition1);
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
     sendJsonSafeMessagesToKafka(topic, 1000, 2);
-    testUtils.sendMessages(topic, new String[]{"error_event1", 
"error_event2"});
+    testUtils.sendMessages(topic, new String[] {"error_event1", 
"error_event2"});
 
     TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
     props.put(ENABLE_KAFKA_COMMIT_OFFSET.key(), "true");
-    
props.put(ERROR_TABLE_BASE_PATH.key(),"/tmp/qurantine_table_test/json_kafka_row_events");
-    props.put(ERROR_TARGET_TABLE.key(),"json_kafka_row_events");
+    props.put(ERROR_TABLE_BASE_PATH.key(), 
"/tmp/qurantine_table_test/json_kafka_row_events");
+    props.put(ERROR_TARGET_TABLE.key(), "json_kafka_row_events");
     props.put("hoodie.errortable.validate.targetschema.enable", "true");
-    props.put("hoodie.base.path","/tmp/json_kafka_row_events");
+    props.put("hoodie.base.path", "/tmp/json_kafka_row_events");
     props.setProperty(ERROR_TABLE_PERSIST_SOURCE_RDD.key(), 
String.valueOf(persistSourceRdd));
 
     Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), 
schemaProvider, metrics);
@@ -266,7 +322,7 @@ public class TestJsonKafkaSource extends 
BaseTestKafkaSource {
     SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource, 
errorTableWriter, Option.of(props));
     InputBatch<Dataset<Row>> fetch1 = 
kafkaSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE);
     assertEquals(1000, fetch1.getBatch().get().count());
-    assertEquals(2,((JavaRDD)errorTableWriter.get().getErrorEvents(
+    assertEquals(2, ((JavaRDD) errorTableWriter.get().getErrorEvents(
         InProcessTimeGenerator.createNewInstantTime(), 
Option.empty()).get()).count());
     verifyRddsArePersisted(kafkaSource.getSource(), 
fetch1.getBatch().get().rdd().toDebugString(), persistSourceRdd);
   }
@@ -283,28 +339,27 @@ public class TestJsonKafkaSource extends 
BaseTestKafkaSource {
     TopicPartition topicPartition1 = new TopicPartition(topic, 1);
     topicPartitions.add(topicPartition1);
     sendJsonSafeMessagesToKafka(topic, 1000, 2);
-    testUtils.sendMessages(topic, new String[]{"error_event1", 
"error_event2"});
+    testUtils.sendMessages(topic, new String[] {"error_event1", 
"error_event2"});
 
     TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
     props.put(ENABLE_KAFKA_COMMIT_OFFSET.key(), "true");
-    
props.put(ERROR_TABLE_BASE_PATH.key(),"/tmp/qurantine_table_test/json_kafka_row_events");
-    props.put(ERROR_TARGET_TABLE.key(),"json_kafka_row_events");
+    props.put(ERROR_TABLE_BASE_PATH.key(), 
"/tmp/qurantine_table_test/json_kafka_row_events");
+    props.put(ERROR_TARGET_TABLE.key(), "json_kafka_row_events");
     props.put(HoodieStreamerConfig.SANITIZE_SCHEMA_FIELD_NAMES.key(), true);
     props.put(HoodieStreamerConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.key(), 
"__");
     props.put("hoodie.errortable.validate.targetschema.enable", "true");
-    props.put("hoodie.base.path","/tmp/json_kafka_row_events");
+    props.put("hoodie.base.path", "/tmp/json_kafka_row_events");
     Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), 
schemaProvider, metrics);
     Option<BaseErrorTableWriter> errorTableWriter = 
Option.of(getAnonymousErrorTableWriter(props));
     SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource, 
errorTableWriter, Option.of(props));
-    assertEquals(1000, 
kafkaSource.fetchNewDataInRowFormat(Option.empty(),Long.MAX_VALUE).getBatch().get().count());
-    assertEquals(2,((JavaRDD)errorTableWriter.get().getErrorEvents(
+    assertEquals(1000, kafkaSource.fetchNewDataInRowFormat(Option.empty(), 
Long.MAX_VALUE).getBatch().get().count());
+    assertEquals(2, ((JavaRDD) errorTableWriter.get().getErrorEvents(
         InProcessTimeGenerator.createNewInstantTime(), 
Option.empty()).get()).count());
   }
 
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
   public void testErrorEventsForDataInAvroFormat(boolean persistSourceRdd) 
throws IOException {
-
     // topic setup.
     final String topic = TEST_TOPIC_PREFIX + 
"testErrorEventsForDataInAvroFormat_" + persistSourceRdd;
 
@@ -317,20 +372,20 @@ public class TestJsonKafkaSource extends 
BaseTestKafkaSource {
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
     testUtils.sendMessages(topic, 
jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 1000,
         HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
-    testUtils.sendMessages(topic, new String[]{"error_event1", 
"error_event2"});
+    testUtils.sendMessages(topic, new String[] {"error_event1", 
"error_event2"});
 
     TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
     props.put(ENABLE_KAFKA_COMMIT_OFFSET.key(), "true");
-    
props.put(ERROR_TABLE_BASE_PATH.key(),"/tmp/qurantine_table_test/json_kafka_events");
-    props.put(ERROR_TARGET_TABLE.key(),"json_kafka_events");
-    props.put("hoodie.base.path","/tmp/json_kafka_events");
+    props.put(ERROR_TABLE_BASE_PATH.key(), 
"/tmp/qurantine_table_test/json_kafka_events");
+    props.put(ERROR_TARGET_TABLE.key(), "json_kafka_events");
+    props.put("hoodie.base.path", "/tmp/json_kafka_events");
     props.setProperty(ERROR_TABLE_PERSIST_SOURCE_RDD.key(), 
String.valueOf(persistSourceRdd));
 
     Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), 
schemaProvider, metrics);
     Option<BaseErrorTableWriter> errorTableWriter = 
Option.of(getAnonymousErrorTableWriter(props));
     SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource, 
errorTableWriter, Option.of(props));
-    InputBatch<JavaRDD<GenericRecord>> fetch1 = 
kafkaSource.fetchNewDataInAvroFormat(Option.empty(),Long.MAX_VALUE);
-    assertEquals(1000,fetch1.getBatch().get().count());
+    InputBatch<JavaRDD<GenericRecord>> fetch1 = 
kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
+    assertEquals(1000, fetch1.getBatch().get().count());
     assertEquals(2, ((JavaRDD) errorTableWriter.get().getErrorEvents(
         InProcessTimeGenerator.createNewInstantTime(), 
Option.empty()).get()).count());
     verifyRddsArePersisted(kafkaSource.getSource(), 
fetch1.getBatch().get().rdd().toDebugString(), persistSourceRdd);
@@ -357,8 +412,7 @@ public class TestJsonKafkaSource extends 
BaseTestKafkaSource {
       }
 
       @Override
-      public Option<JavaRDD<HoodieAvroRecord>> getErrorEvents(String 
baseTableInstantTime,
-                                                              Option 
commitedInstantTime) {
+      public Option<JavaRDD<HoodieAvroRecord>> getErrorEvents(String 
baseTableInstantTime, Option commitedInstantTime) {
         return Option.of(errorEvents.stream().reduce((rdd1, rdd2) -> 
rdd1.union(rdd2)).get());
       }
 
@@ -415,6 +469,34 @@ public class TestJsonKafkaSource extends 
BaseTestKafkaSource {
     dfWithOffsetInfoAndNullKafkaKey.unpersist();
   }
 
+  private void sendMessagesToKafkaWithJsonSchemaSerializer(String topic, int 
numPartitions,
+                                                           
List<RawTripTestPayload> insertRecords) {
+    Properties config = getProducerPropertiesForJsonKafkaSchemaSerializer();
+    try (Producer<String, RawTripTestPayload> producer = new 
KafkaProducer<>(config)) {
+      for (int i = 0; i < insertRecords.size(); i++) {
+        // use consistent keys to get even spread over partitions for test 
expectations
+        producer.send(new ProducerRecord<>(topic, Integer.toString(i % 
numPartitions),
+            insertRecords.get(i)));
+      }
+    }
+  }
+
+  private Properties getProducerPropertiesForJsonKafkaSchemaSerializer() {
+    Properties props = new Properties();
+    props.put("bootstrap.servers", testUtils.brokerAddress());
+    props.put("value.serializer",
+        "io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer");
+    props.put("value.deserializer",
+        "io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer");
+    // Key serializer is required.
+    props.put("key.serializer", StringSerializer.class.getName());
+    props.put("schema.registry.url", "mock://127.0.0.1:8081");
+    props.put("auto.register.schemas", "true");
+    // wait for all in-sync replicas to ack sends
+    props.put("acks", "all");
+    return props;
+  }
+
   @Test
   public void testCreateSource() throws IOException {
     final String topic = TEST_TOPIC_PREFIX + "testJsonKafkaSourceCreation";
diff --git 
a/hudi-utilities/src/test/resources/schema-provider/json/array-with-item-type-union/expected.json
 
b/hudi-utilities/src/test/resources/schema-provider/json/array-with-item-type-union/expected.json
new file mode 100644
index 00000000000..86f31868d71
--- /dev/null
+++ 
b/hudi-utilities/src/test/resources/schema-provider/json/array-with-item-type-union/expected.json
@@ -0,0 +1,78 @@
+{
+  "type" : "record",
+  "name" : "namespace1",
+  "doc" : "",
+  "fields" : [ {
+    "name" : "field1",
+    "type" : [ "null", "long" ],
+    "doc" : "",
+    "default" : null
+  }, {
+    "name" : "field2",
+    "type" : [ "null", "string" ],
+    "doc" : "",
+    "default" : null
+  }, {
+    "name" : "field3",
+    "type" : [ "null", "long" ],
+    "doc" : "",
+    "default" : null
+  }, {
+    "name" : "field4",
+    "type" : [ "null", "long" ],
+    "doc" : "",
+    "default" : null
+  }, {
+    "name" : "field5",
+    "type" : [ "null", "long" ],
+    "doc" : "",
+    "default" : null
+  }, {
+    "name" : "field6",
+    "type" : [ "null", "string" ],
+    "doc" : "",
+    "default" : null
+  }, {
+    "name" : "field7",
+    "type" : [ "null", "string" ],
+    "doc" : "",
+    "default" : null
+  }, {
+    "name" : "field8",
+    "type" : [ "null", "long" ],
+    "doc" : "",
+    "default" : null
+  }, {
+    "name" : "field9",
+    "type" : [ "null", "string" ],
+    "doc" : "",
+    "default" : null
+  }, {
+    "name" : "field10",
+    "type" : [ "null", "string" ],
+    "doc" : "",
+    "default" : null
+  }, {
+    "name" : "field11",
+    "type" : [ "null", {
+      "type" : "array",
+      "items" : [ "null", {
+        "type" : "record",
+        "name" : "namespace2",
+        "fields" : [ {
+          "name" : "field12",
+          "type" : [ "null", "string" ],
+          "doc" : "",
+          "default" : null
+        }, {
+          "name" : "field13",
+          "type" : [ "null", "long" ],
+          "doc" : "",
+          "default" : null
+        } ]
+      } ]
+    } ],
+    "doc" : "",
+    "default" : null
+  } ]
+}
\ No newline at end of file
diff --git 
a/hudi-utilities/src/test/resources/schema-provider/json/array-with-item-type-union/input.json
 
b/hudi-utilities/src/test/resources/schema-provider/json/array-with-item-type-union/input.json
new file mode 100644
index 00000000000..0b1bad988ee
--- /dev/null
+++ 
b/hudi-utilities/src/test/resources/schema-provider/json/array-with-item-type-union/input.json
@@ -0,0 +1,93 @@
+{
+  "type": "object",
+  "title": "namespace1",
+  "connect.version": 1,
+  "properties": {
+    "field1": {
+      "type": "integer",
+      "connect.index": 5,
+      "connect.type": "int32"
+    },
+    "field2": {
+      "type": "string",
+      "connect.index": 8
+    },
+    "field3": {
+      "connect.index": 2,
+      "oneOf": [
+        {
+          "type": "null"
+        },
+        {
+          "type": "integer",
+          "connect.type": "int64"
+        }
+      ]
+    },
+    "field4": {
+      "type": "integer",
+      "connect.index": 7,
+      "connect.type": "int32"
+    },
+    "field5": {
+      "type": "integer",
+      "connect.index": 10,
+      "connect.type": "int64"
+    },
+    "field6": {
+      "type": "string",
+      "connect.index": 1
+    },
+    "field7": {
+      "type": "string",
+      "connect.index": 6
+    },
+    "field8": {
+      "type": "integer",
+      "connect.index": 4,
+      "connect.type": "int64"
+    },
+    "field9": {
+      "type": "string",
+      "connect.index": 9
+    },
+    "field10": {
+      "type": "string",
+      "connect.index": 0
+    },
+    "field11": {
+      "connect.index": 3,
+      "oneOf": [
+        {
+          "type": "null"
+        },
+        {
+          "type": "array",
+          "items": {
+            "oneOf": [
+              {
+                "type": "null"
+              },
+              {
+                "type": "object",
+                "title": "namespace2",
+                "connect.version": 1,
+                "properties": {
+                  "field12": {
+                    "type": "string",
+                    "connect.index": 0
+                  },
+                  "field13": {
+                    "type": "integer",
+                    "connect.index": 1,
+                    "connect.type": "int64"
+                  }
+                }
+              }
+            ]
+          }
+        }
+      ]
+    }
+  }
+}
diff --git 
a/hudi-utilities/src/test/resources/schema-provider/json/complex-json-union-types/expected.json
 
b/hudi-utilities/src/test/resources/schema-provider/json/complex-json-union-types/expected.json
new file mode 100644
index 00000000000..12d2f8f1a8f
--- /dev/null
+++ 
b/hudi-utilities/src/test/resources/schema-provider/json/complex-json-union-types/expected.json
@@ -0,0 +1,66 @@
+{
+  "type": "record",
+  "name": "json_schema_with_complex_union_types",
+  "doc": "",
+  "fields": [
+    {
+      "name": "field1",
+      "type": [
+        "null",
+        {
+          "type": "record",
+          "name": "field1",
+          "fields": [
+            {
+              "name": "property1",
+              "type": [
+                "null",
+                "string"
+              ],
+              "doc": "",
+              "default": null
+            },
+            {
+              "name": "property2",
+              "type": [
+                "null",
+                "double"
+              ],
+              "doc": "",
+              "default": null
+            }
+          ]
+        }
+      ],
+      "doc": "",
+      "default": null
+    },
+    {
+      "name": "field2",
+      "type": [
+        "null",
+        {
+          "type": "array",
+          "items": "string"
+        },
+        {
+          "type": "record",
+          "name": "field21",
+          "fields": [
+            {
+              "name": "property3",
+              "type": [
+                "null",
+                "long"
+              ],
+              "doc": "",
+              "default": null
+            }
+          ]
+        }
+      ],
+      "doc": "",
+      "default": null
+    }
+  ]
+}
diff --git 
a/hudi-utilities/src/test/resources/schema-provider/json/complex-json-union-types/input.json
 
b/hudi-utilities/src/test/resources/schema-provider/json/complex-json-union-types/input.json
new file mode 100644
index 00000000000..1ee57ff9d71
--- /dev/null
+++ 
b/hudi-utilities/src/test/resources/schema-provider/json/complex-json-union-types/input.json
@@ -0,0 +1,42 @@
+{
+  "title": "json schema with complex union types",
+  "type": "object",
+  "properties": {
+    "field1": {
+      "oneOf": [
+        {
+          "type": "object",
+          "properties": {
+            "property1": {
+              "type": "string"
+            },
+            "property2": {
+              "type": "number"
+            }
+          }
+        },
+        {
+          "type": "null"
+        }
+      ]
+    },
+    "field2": {
+      "allOf": [
+        {
+          "type": "array",
+          "items": {
+            "type": "string"
+          }
+        },
+        {
+          "type": "object",
+          "properties": {
+            "property3": {
+              "type": "integer"
+            }
+          }
+        }
+      ]
+    }
+  }
+}
diff --git 
a/hudi-utilities/src/test/resources/schema-provider/json/not-null-default-value-schema/expected_no_stripping_quotes.json
 
b/hudi-utilities/src/test/resources/schema-provider/json/not-null-default-value-schema/expected_no_stripping_quotes.json
new file mode 100644
index 00000000000..c1a6619816e
--- /dev/null
+++ 
b/hudi-utilities/src/test/resources/schema-provider/json/not-null-default-value-schema/expected_no_stripping_quotes.json
@@ -0,0 +1,138 @@
+{
+  "type": "record",
+  "name": "json_schema_with_non_null_default_values",
+  "doc": "",
+  "fields": [
+    {
+      "name": "field1",
+      "type": [
+        "null",
+        {
+          "type": "record",
+          "name": "field1",
+          "fields": [
+            {
+              "name": "property1",
+              "type": [
+                "string",
+                "null"
+              ],
+              "doc": "",
+              "default": "\"property1_defaultvalue\""
+            },
+            {
+              "name": "property2",
+              "type": [
+                "null",
+                "double"
+              ],
+              "doc": "",
+              "default": null
+            },
+            {
+              "name": "property3",
+              "type": [
+                "long",
+                "null"
+              ],
+              "doc": "",
+              "default": 10
+            },
+            {
+              "name": "property4",
+              "type": [
+                "double",
+                "null"
+              ],
+              "doc": "",
+              "default": 8.25
+            }
+          ]
+        }
+      ],
+      "doc": "",
+      "default": null
+    },
+    {
+      "name": "field2",
+      "type": [
+        "string",
+        "null"
+      ],
+      "doc": "",
+      "default": "\"property2_defaultvalue\""
+    },
+    {
+      "name": "field3",
+      "type": [
+        "string",
+        "null"
+      ],
+      "doc": "",
+      "default": "\"N\""
+    },
+    {
+      "name": "field4",
+      "type": [
+        "long",
+        "null"
+      ],
+      "doc": "",
+      "default": 40
+    },
+    {
+      "name": "field5",
+      "type": [
+        "long",
+        "null"
+      ],
+      "doc": "",
+      "default": 50
+    },
+    {
+      "name": "field6",
+      "type": [
+        "long",
+        "null"
+      ],
+      "doc": "",
+      "default": 60
+    },
+    {
+      "name": "field7",
+      "type": [
+        "double",
+        "null"
+      ],
+      "doc": "",
+      "default": 7.75
+    },
+    {
+      "name": "field8",
+      "type": [
+        "double",
+        "null"
+      ],
+      "doc": "",
+      "default": 8.375
+    },
+    {
+      "name": "field9",
+      "type": [
+        "boolean",
+        "null"
+      ],
+      "doc": "",
+      "default": true
+    },
+    {
+      "name": "field10",
+      "type": [
+        "boolean",
+        "null"
+      ],
+      "doc": "",
+      "default": false
+    }
+  ]
+}
diff --git 
a/hudi-utilities/src/test/resources/schema-provider/json/not-null-default-value-schema/expected_stripping_quotes.json
 
b/hudi-utilities/src/test/resources/schema-provider/json/not-null-default-value-schema/expected_stripping_quotes.json
new file mode 100644
index 00000000000..68df4c24b78
--- /dev/null
+++ 
b/hudi-utilities/src/test/resources/schema-provider/json/not-null-default-value-schema/expected_stripping_quotes.json
@@ -0,0 +1,138 @@
+{
+  "type": "record",
+  "name": "json_schema_with_non_null_default_values",
+  "doc": "",
+  "fields": [
+    {
+      "name": "field1",
+      "type": [
+        "null",
+        {
+          "type": "record",
+          "name": "field1",
+          "fields": [
+            {
+              "name": "property1",
+              "type": [
+                "string",
+                "null"
+              ],
+              "doc": "",
+              "default": "property1_defaultvalue"
+            },
+            {
+              "name": "property2",
+              "type": [
+                "null",
+                "double"
+              ],
+              "doc": "",
+              "default": null
+            },
+            {
+              "name": "property3",
+              "type": [
+                "long",
+                "null"
+              ],
+              "doc": "",
+              "default": 10
+            },
+            {
+              "name": "property4",
+              "type": [
+                "double",
+                "null"
+              ],
+              "doc": "",
+              "default": 8.25
+            }
+          ]
+        }
+      ],
+      "doc": "",
+      "default": null
+    },
+    {
+      "name": "field2",
+      "type": [
+        "string",
+        "null"
+      ],
+      "doc": "",
+      "default": "property2_defaultvalue"
+    },
+    {
+      "name": "field3",
+      "type": [
+        "string",
+        "null"
+      ],
+      "doc": "",
+      "default": "N"
+    },
+    {
+      "name": "field4",
+      "type": [
+        "long",
+        "null"
+      ],
+      "doc": "",
+      "default": 40
+    },
+    {
+      "name": "field5",
+      "type": [
+        "long",
+        "null"
+      ],
+      "doc": "",
+      "default": 50
+    },
+    {
+      "name": "field6",
+      "type": [
+        "long",
+        "null"
+      ],
+      "doc": "",
+      "default": 60
+    },
+    {
+      "name": "field7",
+      "type": [
+        "double",
+        "null"
+      ],
+      "doc": "",
+      "default": 7.75
+    },
+    {
+      "name": "field8",
+      "type": [
+        "double",
+        "null"
+      ],
+      "doc": "",
+      "default": 8.375
+    },
+    {
+      "name": "field9",
+      "type": [
+        "boolean",
+        "null"
+      ],
+      "doc": "",
+      "default": true
+    },
+    {
+      "name": "field10",
+      "type": [
+        "boolean",
+        "null"
+      ],
+      "doc": "",
+      "default": false
+    }
+  ]
+}
diff --git 
a/hudi-utilities/src/test/resources/schema-provider/json/not-null-default-value-schema/input.json
 
b/hudi-utilities/src/test/resources/schema-provider/json/not-null-default-value-schema/input.json
new file mode 100644
index 00000000000..74466123e6a
--- /dev/null
+++ 
b/hudi-utilities/src/test/resources/schema-provider/json/not-null-default-value-schema/input.json
@@ -0,0 +1,67 @@
+{
+  "title": "json schema with non null default values",
+  "type": "object",
+  "properties": {
+    "field1": {
+      "oneOf": [
+        {
+          "type": "object",
+          "properties": {
+            "property1": {
+              "type": "string",
+              "default": "\"property1_defaultvalue\""
+            },
+            "property2": {
+              "type": "number"
+            },
+            "property3": {
+              "type": "integer",
+              "default": "10"
+            },
+            "property4": {
+              "type": "number",
+              "default": "8.25"
+            }
+          }
+        },
+        {
+          "type": "null"
+        }
+      ]
+    },
+    "field2": {
+      "type": "string",
+      "default": "\"property2_defaultvalue\""
+    },
+    "field3": {
+      "oneOf":[{"type":"null"},{"type":"string","default":"\"N\""}]
+    },
+    "field4": {
+      "type": "integer",
+      "default": "40"
+    },
+    "field5": {
+      "type": "integer",
+      "default": 50
+    },
+    "field6": {
+      "oneOf":[{"type":"null"},{"type":"integer","default":"60"}]
+    },
+    "field7": {
+      "type": "number",
+      "default": "7.75"
+    },
+    "field8": {
+      "type": "number",
+      "default": 8.375
+    },
+    "field9": {
+      "type": "boolean",
+      "default": "true"
+    },
+    "field10": {
+      "type": "boolean",
+      "default": false
+    }
+  }
+}
diff --git 
a/hudi-utilities/src/test/resources/schema-provider/json/schema-repeating-names/expected.json
 
b/hudi-utilities/src/test/resources/schema-provider/json/schema-repeating-names/expected.json
new file mode 100644
index 00000000000..29b76565746
--- /dev/null
+++ 
b/hudi-utilities/src/test/resources/schema-provider/json/schema-repeating-names/expected.json
@@ -0,0 +1,153 @@
+{
+  "type": "record",
+  "name": "schema_repeating_names",
+  "doc": "",
+  "fields": [
+    {
+      "name": "op",
+      "type": [
+        "null",
+        "string"
+      ],
+      "doc": "",
+      "default": null
+    },
+    {
+      "name": "before",
+      "type": [
+        "null",
+        {
+          "type": "record",
+          "name": "ValueSchema",
+          "fields": [
+            {
+              "name": "fileName",
+              "type": [
+                "null",
+                "string"
+              ],
+              "doc": "",
+              "default": null
+            },
+            {
+              "name": "functionCode",
+              "type": [
+                "null",
+                "string"
+              ],
+              "doc": "",
+              "default": null
+            }
+          ]
+        }
+      ],
+      "doc": "",
+      "default": null
+    },
+    {
+      "name": "after",
+      "type": [
+        "null",
+        {
+          "type": "record",
+          "name": "ValueSchema1",
+          "fields": [
+            {
+              "name": "fileName",
+              "type": [
+                "null",
+                "string"
+              ],
+              "doc": "",
+              "default": null
+            },
+            {
+              "name": "functionCode",
+              "type": [
+                "null",
+                "string"
+              ],
+              "doc": "",
+              "default": null
+            }
+          ]
+        }
+      ],
+      "doc": "",
+      "default": null
+    },
+    {
+      "name": "source",
+      "type": {
+        "type": "record",
+        "name": "io.debezium.connector.mysql.Source",
+        "fields": [
+          {
+            "name": "query",
+            "type": [
+              "null",
+              "string"
+            ],
+            "doc": "",
+            "default": null
+          },
+          {
+            "name": "thread",
+            "type": [
+              "null",
+              "long"
+            ],
+            "doc": "",
+            "default": null
+          },
+          {
+            "name": "version",
+            "type": [
+              "null",
+              "string"
+            ],
+            "doc": "",
+            "default": null
+          },
+          {
+            "name": "pos",
+            "type": [
+              "null",
+              "long"
+            ],
+            "doc": "",
+            "default": null
+          },
+          {
+            "name": "ts_ms",
+            "type": [
+              "null",
+              "long"
+            ],
+            "doc": "",
+            "default": null
+          }
+        ]
+      },
+      "doc": ""
+    },
+    {
+      "name": "program",
+      "type": [
+        "null",
+        "string"
+      ],
+      "doc": "",
+      "default": null
+    },
+    {
+      "name": "ts_ms",
+      "type": [
+        "null",
+        "long"
+      ],
+      "doc": "",
+      "default": null
+    }
+  ]
+}
diff --git 
a/hudi-utilities/src/test/resources/schema-provider/json/schema-repeating-names/input.json
 
b/hudi-utilities/src/test/resources/schema-provider/json/schema-repeating-names/input.json
new file mode 100644
index 00000000000..a4165dc4ffa
--- /dev/null
+++ 
b/hudi-utilities/src/test/resources/schema-provider/json/schema-repeating-names/input.json
@@ -0,0 +1,124 @@
+{
+  "type": "object",
+  "title": "schema_repeating_names",
+  "properties": {
+    "op": {
+      "type": "string"
+    },
+    "before": {
+      "oneOf": [
+        {
+          "type": "null"
+        },
+        {
+          "type": "object",
+          "title": "ValueSchema",
+          "properties": {
+            "fileName": {
+              "oneOf": [
+                {
+                  "type": "null"
+                },
+                {
+                  "type": "string"
+                }
+              ]
+            },
+            "functionCode": {
+              "oneOf": [
+                {
+                  "type": "null"
+                },
+                {
+                  "type": "string"
+                }
+              ]
+            }
+          }
+        }
+      ]
+    },
+    "after": {
+      "oneOf": [
+        {
+          "type": "null"
+        },
+        {
+          "type": "object",
+          "title": "ValueSchema",
+          "properties": {
+            "fileName": {
+              "oneOf": [
+                {
+                  "type": "null"
+                },
+                {
+                  "type": "string"
+                }
+              ]
+            },
+            "functionCode": {
+              "oneOf": [
+                {
+                  "type": "null"
+                },
+                {
+                  "type": "string"
+                }
+              ]
+            }
+          }
+        }
+      ]
+    },
+    "source": {
+      "type": "object",
+      "title": "io.debezium.connector.mysql.Source",
+      "properties": {
+        "query": {
+          "oneOf": [
+            {
+              "type": "null"
+            },
+            {
+              "type": "string"
+            }
+          ]
+        },
+        "thread": {
+          "oneOf": [
+            {
+              "type": "null"
+            },
+            {
+              "type": "integer"
+            }
+          ]
+        },
+        "version": {
+          "type": "string"
+        },
+        "pos": {
+          "type": "integer"
+        },
+        "ts_ms": {
+          "type": "integer",
+          "connect.type": "int64"
+        }
+      }
+    },
+    "program": {
+      "type": "string"
+    },
+    "ts_ms": {
+      "oneOf": [
+        {
+          "type": "null"
+        },
+        {
+          "type": "integer"
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file
diff --git a/packaging/hudi-utilities-bundle/pom.xml 
b/packaging/hudi-utilities-bundle/pom.xml
index 9d56415f376..97c82850155 100644
--- a/packaging/hudi-utilities-bundle/pom.xml
+++ b/packaging/hudi-utilities-bundle/pom.xml
@@ -129,6 +129,7 @@
                   <include>com.twitter:chill-protobuf</include>
                   <include>io.confluent:kafka-avro-serializer</include>
                   <include>io.confluent:kafka-schema-serializer</include>
+                  <include>io.confluent:kafka-json-schema-serializer</include>
                   <include>io.confluent:common-config</include>
                   <include>io.confluent:common-utils</include>
                   <include>io.confluent:kafka-schema-registry-client</include>
diff --git a/pom.xml b/pom.xml
index f1e87dfa888..794de9ea118 100644
--- a/pom.xml
+++ b/pom.xml
@@ -102,6 +102,8 @@
     
<fasterxml.jackson.dataformat.yaml.version>${fasterxml.spark3.version}</fasterxml.jackson.dataformat.yaml.version>
     <kafka.version>2.0.0</kafka.version>
     <pulsar.version>3.0.2</pulsar.version>
+    <kafka.connect.api.version>2.5.0</kafka.connect.api.version>
+    <kafka.spark3.version>2.8.2</kafka.spark3.version>
     
<pulsar.spark.version>${pulsar.spark.scala12.version}</pulsar.spark.version>
     <pulsar.spark.scala12.version>3.1.1.4</pulsar.spark.scala12.version>
     <pulsar.spark.scala13.version>3.4.1.1</pulsar.spark.scala13.version>
@@ -1305,6 +1307,16 @@
           </exclusion>
         </exclusions>
       </dependency>
+      <dependency>
+        <groupId>org.apache.kafka</groupId>
+        <artifactId>connect-api</artifactId>
+        <version>${kafka.connect.api.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.kafka</groupId>
+        <artifactId>kafka-streams-test-utils</artifactId>
+        <version>3.4.0</version>
+      </dependency>
       <dependency>
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-mapreduce-client-common</artifactId>


Reply via email to