This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 721f7fc9d30 [HUDI-9072] Support decimal in JsonKafkaSource (#12879)
721f7fc9d30 is described below

commit 721f7fc9d30c907645d24ce43df4cbe0baba4d19
Author: Y Ethan Guo <[email protected]>
AuthorDate: Tue Feb 25 19:34:19 2025 -0800

    [HUDI-9072] Support decimal in JsonKafkaSource (#12879)
---
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java | 16 +++++
 .../hudi/avro/MercifulJsonConverterTestBase.java   | 16 +++++
 .../org/apache/hudi/avro/TestHoodieAvroUtils.java  | 71 +++++++++++++++++---
 .../hudi/avro/TestMercifulJsonConverter.java       | 64 ++++++++++++++++++
 .../common/testutils/HoodieTestDataGenerator.java  | 49 ++++++++++++++
 .../utilities/schema/FilebasedSchemaProvider.java  | 60 +++++++++++++----
 .../utilities/streamer/SourceFormatAdapter.java    | 16 +++--
 .../deltastreamer/HoodieDeltaStreamerTestBase.java |  7 ++
 .../schema/TestFilebasedSchemaProvider.java        | 35 +++++-----
 .../converter/TestJsonToAvroSchemaConverter.java   |  1 +
 .../utilities/sources/TestJsonKafkaSource.java     | 41 ++++++++++++
 .../json/kafka-decimal-simple/expected.json        | 24 +++++++
 .../json/kafka-decimal-simple/input.json           | 28 ++++++++
 .../source_uber_encoded_decimal.avsc               | 77 ++++++++++++++++++++++
 .../source_uber_encoded_decimal.json               | 54 +++++++++++++++
 15 files changed, 519 insertions(+), 40 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index b232ee304b5..188311a5666 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -1149,6 +1149,8 @@ public class HoodieAvroUtils {
             // due to Java, there will be precision problems in direct 
conversion, we should use string instead of use double
             BigDecimal bigDecimal = new 
java.math.BigDecimal(oldValue.toString()).setScale(decimal.getScale(), 
RoundingMode.HALF_UP);
             return DECIMAL_CONVERSION.toFixed(bigDecimal, newSchema, 
newSchema.getLogicalType());
+          } else if (oldSchema.getType() == Schema.Type.BYTES) {
+            return convertBytesToFixed(((ByteBuffer) oldValue).array(), 
newSchema);
           }
         }
         break;
@@ -1157,6 +1159,16 @@ public class HoodieAvroUtils {
     throw new HoodieAvroSchemaException(String.format("cannot support rewrite 
value for schema type: %s since the old schema type is: %s", newSchema, 
oldSchema));
   }
 
+  /**
+   * bytes is the result of BigDecimal.unscaledValue().toByteArray();
+   * This is also what Conversions.DecimalConversion.toBytes() outputs inside 
a byte buffer
+   */
+  public static Object convertBytesToFixed(byte[] bytes, Schema schema) {
+    LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) 
schema.getLogicalType();
+    BigDecimal bigDecimal = convertBytesToBigDecimal(bytes, decimal);
+    return DECIMAL_CONVERSION.toFixed(bigDecimal, schema, decimal);
+  }
+
   /**
    * Use this instead of DECIMAL_CONVERSION.fromBytes() because that method 
does not add in precision
    *
@@ -1168,6 +1180,10 @@ public class HoodieAvroUtils {
         decimal.getScale(), new MathContext(decimal.getPrecision(), 
RoundingMode.HALF_UP));
   }
 
+  public static boolean hasDecimalField(Schema schema) {
+    return hasDecimalWithCondition(schema, unused -> true);
+  }
+
   /**
    * Checks whether the provided schema contains a decimal with a precision 
less than or equal to 18,
    * which allows the decimal to be stored as int/long instead of a fixed size 
byte array in
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/avro/MercifulJsonConverterTestBase.java
 
b/hudi-common/src/test/java/org/apache/hudi/avro/MercifulJsonConverterTestBase.java
index c9e643042d6..3cf60e49e89 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/avro/MercifulJsonConverterTestBase.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/avro/MercifulJsonConverterTestBase.java
@@ -353,4 +353,20 @@ public class MercifulJsonConverterTestBase {
         "{\"first\":\"John\",\"last\":\"Smith\",\"suffix\":3}"
     );
   }
+
+  static Stream<Object> encodedDecimalScalePrecisionProvider() {
+    return Stream.of(
+        Arguments.of(6, 10),
+        Arguments.of(30, 32),
+        Arguments.of(1, 3)
+    );
+  }
+
+  static Stream<Object> encodedDecimalFixedScalePrecisionProvider() {
+    return Stream.of(
+        Arguments.of(5, 6, 10),
+        Arguments.of(14, 30, 32),
+        Arguments.of(2, 1, 3)
+    );
+  }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
index 625453ef2e5..b1c8c453f82 100644
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
@@ -34,13 +34,16 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.RewriteAvroPayload;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.SchemaTestUtil;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.SchemaCompatibilityException;
 
 import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Conversions;
 import org.apache.avro.JsonProperties;
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumReader;
@@ -63,6 +66,8 @@ import org.slf4j.LoggerFactory;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.math.MathContext;
+import java.math.RoundingMode;
 import java.nio.ByteBuffer;
 import java.sql.Date;
 import java.sql.Timestamp;
@@ -75,6 +80,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -99,7 +105,7 @@ public class TestHoodieAvroUtils {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(TestHoodieAvroUtils.class);
 
-  private static String EVOLVED_SCHEMA = "{\"type\": \"record\",\"name\": 
\"testrec1\",\"fields\": [ "
+  private static final String EVOLVED_SCHEMA = "{\"type\": 
\"record\",\"name\": \"testrec1\",\"fields\": [ "
       + "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": 
\"_row_key\", \"type\": \"string\"},"
       + "{\"name\": \"non_pii_col\", \"type\": \"string\"},"
       + "{\"name\": \"pii_col\", \"type\": \"string\", \"column_category\": 
\"user_profile\"},"
@@ -108,7 +114,7 @@ public class TestHoodieAvroUtils {
       + "{\"name\": \"new_col_nullable_default_null\", \"type\": [\"null\" 
,\"string\"],\"default\": null},"
       + "{\"name\": \"new_col_nullable_default_dummy_val\", \"type\": 
[\"string\" ,\"null\"],\"default\": \"dummy_val\"}]}";
 
-  private static String EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": 
\"testrec\",\"fields\": [ "
+  private static final String EXAMPLE_SCHEMA = "{\"type\": 
\"record\",\"name\": \"testrec\",\"fields\": [ "
       + "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": 
\"_row_key\", \"type\": \"string\"},"
       + "{\"name\": \"non_pii_col\", \"type\": \"string\"},"
       + "{\"name\": \"pii_col\", \"type\": \"string\", \"column_category\": 
\"user_profile\"}]}";
@@ -119,9 +125,9 @@ public class TestHoodieAvroUtils {
       + "{\"name\": \"pii_col\", \"type\": \"string\", \"column_category\": 
\"user_profile\"}], "
       + "\"custom_schema_property\": \"custom_schema_property_value\"}";
 
-  private static int NUM_FIELDS_IN_EXAMPLE_SCHEMA = 4;
+  private static final int NUM_FIELDS_IN_EXAMPLE_SCHEMA = 4;
 
-  private static String SCHEMA_WITH_METADATA_FIELD = "{\"type\": 
\"record\",\"name\": \"testrec2\",\"fields\": [ "
+  private static final String SCHEMA_WITH_METADATA_FIELD = "{\"type\": 
\"record\",\"name\": \"testrec2\",\"fields\": [ "
       + "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": 
\"_row_key\", \"type\": \"string\"},"
       + "{\"name\": \"non_pii_col\", \"type\": \"string\"},"
       + "{\"name\": \"pii_col\", \"type\": \"string\", \"column_category\": 
\"user_profile\"},"
@@ -129,7 +135,8 @@ public class TestHoodieAvroUtils {
       + "{\"name\": \"nullable_field\",\"type\": [\"null\" 
,\"string\"],\"default\": null},"
       + "{\"name\": \"nullable_field_wo_default\",\"type\": [\"null\" 
,\"string\"]}]}";
 
-  private static String SCHEMA_WITH_NON_NULLABLE_FIELD = "{\"type\": 
\"record\",\"name\": \"testrec3\",\"fields\": [ "
+  private static final String SCHEMA_WITH_NON_NULLABLE_FIELD =
+      "{\"type\": \"record\",\"name\": \"testrec3\",\"fields\": [ "
       + "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": 
\"_row_key\", \"type\": \"string\"},"
       + "{\"name\": \"non_pii_col\", \"type\": \"string\"},"
       + "{\"name\": \"pii_col\", \"type\": \"string\", \"column_category\": 
\"user_profile\"},"
@@ -137,14 +144,15 @@ public class TestHoodieAvroUtils {
       + "{\"name\": \"non_nullable_field_wo_default\",\"type\": \"string\"},"
       + "{\"name\": \"non_nullable_field_with_default\",\"type\": \"string\", 
\"default\": \"dummy\"}]}";
 
-  private static String SCHEMA_WITH_NON_NULLABLE_FIELD_WITH_DEFAULT = 
"{\"type\": \"record\",\"name\": \"testrec4\",\"fields\": [ "
+  private static final String SCHEMA_WITH_NON_NULLABLE_FIELD_WITH_DEFAULT =
+      "{\"type\": \"record\",\"name\": \"testrec4\",\"fields\": [ "
       + "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": 
\"_row_key\", \"type\": \"string\"},"
       + "{\"name\": \"non_pii_col\", \"type\": \"string\"},"
       + "{\"name\": \"pii_col\", \"type\": \"string\", \"column_category\": 
\"user_profile\"},"
       + "{\"name\": \"nullable_field\",\"type\": [\"null\" 
,\"string\"],\"default\": null},"
       + "{\"name\": \"non_nullable_field_with_default\",\"type\": \"string\", 
\"default\": \"dummy\"}]}";
 
-  private static String SCHEMA_WITH_DECIMAL_FIELD = 
"{\"type\":\"record\",\"name\":\"record\",\"fields\":["
+  private static final String SCHEMA_WITH_DECIMAL_FIELD = 
"{\"type\":\"record\",\"name\":\"record\",\"fields\":["
       + "{\"name\":\"key_col\",\"type\":[\"null\",\"int\"],\"default\":null},"
       + "{\"name\":\"decimal_col\",\"type\":[\"null\","
       + 
"{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":8,\"scale\":4}],\"default\":null}]}";
@@ -155,7 +163,8 @@ public class TestHoodieAvroUtils {
       + 
"{\"name\":\"student\",\"type\":{\"name\":\"student\",\"type\":\"record\",\"fields\":["
       + "{\"name\":\"firstname\",\"type\":[\"null\" ,\"string\"],\"default\": 
null},{\"name\":\"lastname\",\"type\":[\"null\" ,\"string\"],\"default\": 
null}]}}]}";
 
-  private static String SCHEMA_WITH_NESTED_FIELD_RENAMED = 
"{\"name\":\"MyClass\",\"type\":\"record\",\"namespace\":\"com.acme.avro\",\"fields\":["
+  private static final String SCHEMA_WITH_NESTED_FIELD_RENAMED =
+      
"{\"name\":\"MyClass\",\"type\":\"record\",\"namespace\":\"com.acme.avro\",\"fields\":["
       + "{\"name\":\"fn\",\"type\":\"string\"},"
       + "{\"name\":\"ln\",\"type\":\"string\"},"
       + 
"{\"name\":\"ss\",\"type\":{\"name\":\"ss\",\"type\":\"record\",\"fields\":["
@@ -773,6 +782,52 @@ public class TestHoodieAvroUtils {
     assertEquals("{\"timestamp\": \"foo\", \"_row_key\": \"key\", 
\"non_pii_col\": \"val1\", \"pii_col\": \"val2\"}", jsonString);
   }
 
+  @Test
+  void testConvertBytesToFixed() {
+    Random rand = new Random();
+    //size calculated using 
AvroInternalSchemaConverter.computeMinBytesForPrecision
+    testConverBytesToFixedHelper(rand.nextDouble(), 13, 7, 6);
+    testConverBytesToFixedHelper(rand.nextDouble(), 4, 2, 2);
+    testConverBytesToFixedHelper(rand.nextDouble(), 32, 12, 14);
+  }
+
+  private static void testConverBytesToFixedHelper(double value, int 
precision, int scale, int size) {
+    BigDecimal decfield = BigDecimal.valueOf(value * Math.pow(10, precision - 
scale))
+        .setScale(scale, RoundingMode.HALF_UP).round(new 
MathContext(precision, RoundingMode.HALF_UP));
+    byte[] encodedDecimal = decfield.unscaledValue().toByteArray();
+    Schema fixedSchema = new Schema.Parser().parse("{\"type\": 
\"record\",\"name\": \"testrec\",\"fields\": [{\"name\": \"decfield\", 
\"type\": {\"type\": \"fixed\", \"name\": \"idk\","
+        + " \"logicalType\": \"decimal\", \"precision\": " + precision + ", 
\"scale\": " + scale + ", \"size\": " + size + 
"}}]}").getFields().get(0).schema();
+    GenericData.Fixed fixed = (GenericData.Fixed) 
HoodieAvroUtils.convertBytesToFixed(encodedDecimal, fixedSchema);
+    BigDecimal after = new Conversions.DecimalConversion().fromFixed(fixed, 
fixedSchema, fixedSchema.getLogicalType());
+    assertEquals(decfield, after);
+  }
+
+  @Test
+  void testHasDecimalField() {
+    assertTrue(HoodieAvroUtils.hasDecimalField(new 
Schema.Parser().parse(SCHEMA_WITH_DECIMAL_FIELD)));
+    assertFalse(HoodieAvroUtils.hasDecimalField(new 
Schema.Parser().parse(EVOLVED_SCHEMA)));
+    assertFalse(HoodieAvroUtils.hasDecimalField(new 
Schema.Parser().parse(SCHEMA_WITH_NON_NULLABLE_FIELD)));
+    
assertTrue(HoodieAvroUtils.hasDecimalField(HoodieTestDataGenerator.AVRO_SCHEMA));
+    
assertTrue(HoodieAvroUtils.hasDecimalField(HoodieTestDataGenerator.AVRO_TRIP_ENCODED_DECIMAL_SCHEMA));
+    Schema recordWithMapAndArray = 
Schema.createRecord("recordWithMapAndArray", null, null, false,
+        Arrays.asList(new Schema.Field("mapfield", 
Schema.createMap(Schema.create(Schema.Type.INT)), null, null),
+            new Schema.Field("arrayfield", 
Schema.createArray(Schema.create(Schema.Type.INT)), null, null)
+        ));
+    assertFalse(HoodieAvroUtils.hasDecimalField(recordWithMapAndArray));
+    Schema recordWithDecMapAndArray = 
Schema.createRecord("recordWithDecMapAndArray", null, null, false,
+        Arrays.asList(new Schema.Field("mapfield",
+                
Schema.createMap(LogicalTypes.decimal(10,6).addToSchema(Schema.create(Schema.Type.BYTES))),
 null, null),
+            new Schema.Field("arrayfield", 
Schema.createArray(Schema.create(Schema.Type.INT)), null, null)
+        ));
+    assertTrue(HoodieAvroUtils.hasDecimalField(recordWithDecMapAndArray));
+    Schema recordWithMapAndDecArray = 
Schema.createRecord("recordWithMapAndDecArray", null, null, false,
+        Arrays.asList(new Schema.Field("mapfield",
+            Schema.createMap(Schema.create(Schema.Type.INT)), null, null), new 
Schema.Field("arrayfield",
+            
Schema.createArray(LogicalTypes.decimal(10,6).addToSchema(Schema.create(Schema.Type.BYTES))),
 null, null)
+        ));
+    assertTrue(HoodieAvroUtils.hasDecimalField(recordWithMapAndDecArray));
+  }
+
   @Test
   void testCreateFullName() {
     String result = HoodieAvroUtils.createFullName(new 
ArrayDeque<>(Arrays.asList("a", "b", "c")));
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/avro/TestMercifulJsonConverter.java 
b/hudi-common/src/test/java/org/apache/hudi/avro/TestMercifulJsonConverter.java
index 0844ac98fdb..6a451989bbe 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/avro/TestMercifulJsonConverter.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/avro/TestMercifulJsonConverter.java
@@ -21,8 +21,10 @@ package org.apache.hudi.avro;
 import org.apache.hudi.common.testutils.SchemaTestUtil;
 import org.apache.hudi.exception.HoodieJsonToAvroConversionException;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.avro.Conversions;
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericFixed;
@@ -34,14 +36,19 @@ import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.math.MathContext;
+import java.math.RoundingMode;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.util.ArrayList;
+import java.util.Base64;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.stream.Stream;
 
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_ENCODED_DECIMAL_SCHEMA;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
@@ -534,4 +541,61 @@ public class TestMercifulJsonConverter extends 
MercifulJsonConverterTestBase {
 
     assertEquals(rec, CONVERTER.convert(json, sanitizedSchema));
   }
+
+  @ParameterizedTest
+  @MethodSource("encodedDecimalScalePrecisionProvider")
+  void testEncodedDecimal(int scale, int precision) throws 
JsonProcessingException {
+    Random rand = new Random();
+    BigDecimal decfield = BigDecimal.valueOf(rand.nextDouble())
+        .setScale(scale, RoundingMode.HALF_UP).round(new 
MathContext(precision, RoundingMode.HALF_UP));
+    Map<String, Object> data = new HashMap<>();
+    data.put("_row_key", "mykey");
+    long timestamp = 214523432;
+    data.put("timestamp", timestamp);
+    data.put("rider", "myrider");
+    data.put("decfield", 
Base64.getEncoder().encodeToString(decfield.unscaledValue().toByteArray()));
+    data.put("driver", "mydriver");
+    data.put("fare", rand.nextDouble() * 100);
+    data.put("_hoodie_is_deleted", false);
+    String json = MAPPER.writeValueAsString(data);
+    Schema tripSchema = new Schema.Parser().parse(
+        TRIP_ENCODED_DECIMAL_SCHEMA.replace("6", 
Integer.toString(scale)).replace("10", Integer.toString(precision)));
+    GenericRecord genrec = CONVERTER.convert(json, tripSchema);
+    Schema decimalFieldSchema = tripSchema.getField("decfield").schema();
+    BigDecimal decoded = 
HoodieAvroUtils.convertBytesToBigDecimal(((ByteBuffer) 
genrec.get("decfield")).array(),
+        (LogicalTypes.Decimal) decimalFieldSchema.getLogicalType());
+    assertEquals(decfield, decoded);
+  }
+
+  @ParameterizedTest
+  @MethodSource("encodedDecimalFixedScalePrecisionProvider")
+  void testEncodedDecimalAvroSparkPostProcessorCase(int size, int scale, int 
precision) throws JsonProcessingException {
+    Random rand = new Random();
+    String postProcessSchemaString = 
String.format("{\"type\":\"record\",\"name\":\"tripUberRec\","
+        + 
"\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\",\"doc\":\"\"},{\"name\":\"_row_key\","
+        + 
"\"type\":\"string\",\"doc\":\"\"},{\"name\":\"rider\",\"type\":\"string\",\"doc\":\"\"},"
+        + 
"{\"name\":\"decfield\",\"type\":{\"type\":\"fixed\",\"name\":\"fixed\","
+        + 
"\"namespace\":\"tripUberRec.decfield\",\"size\":%d,\"logicalType\":\"decimal\","
+        + 
"\"precision\":%d,\"scale\":%d},\"doc\":\"\"},{\"name\":\"driver\",\"type\":\"string\","
+        + 
"\"doc\":\"\"},{\"name\":\"fare\",\"type\":\"double\",\"doc\":\"\"},{\"name\":\"_hoodie_is_deleted\","
+        + "\"type\":\"boolean\",\"doc\":\"\"}]}", size, precision, scale);
+    Schema postProcessSchema = new 
Schema.Parser().parse(postProcessSchemaString);
+    BigDecimal decfield = BigDecimal.valueOf(rand.nextDouble())
+        .setScale(scale, RoundingMode.HALF_UP).round(new 
MathContext(precision, RoundingMode.HALF_UP));
+    Map<String, Object> data = new HashMap<>();
+    data.put("_row_key", "mykey");
+    long timestamp = 214523432;
+    data.put("timestamp", timestamp);
+    data.put("rider", "myrider");
+    data.put("decfield", 
Base64.getEncoder().encodeToString(decfield.unscaledValue().toByteArray()));
+    data.put("driver", "mydriver");
+    data.put("fare", rand.nextDouble() * 100);
+    data.put("_hoodie_is_deleted", false);
+    String json = MAPPER.writeValueAsString(data);
+    GenericRecord genrec = CONVERTER.convert(json, postProcessSchema);
+    GenericData.Fixed fixed = (GenericData.Fixed) genrec.get("decfield");
+    Conversions.DecimalConversion decimalConverter = new 
Conversions.DecimalConversion();
+    Schema decimalFieldSchema = 
postProcessSchema.getField("decfield").schema();
+    assertEquals(decfield, decimalConverter.fromFixed(fixed, 
decimalFieldSchema, decimalFieldSchema.getLogicalType()));
+  }
 }
\ No newline at end of file
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index 4122ab452ca..e4d37e67e5c 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -56,12 +56,15 @@ import java.io.Serializable;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.math.BigDecimal;
+import java.math.MathContext;
+import java.math.RoundingMode;
 import java.nio.ByteBuffer;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneOffset;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Base64;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -170,6 +173,7 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
   public static final Schema AVRO_SCHEMA_WITH_METADATA_FIELDS =
       HoodieAvroUtils.addMetadataFields(AVRO_SCHEMA);
   public static final Schema AVRO_SHORT_TRIP_SCHEMA = new 
Schema.Parser().parse(SHORT_TRIP_SCHEMA);
+  public static final Schema AVRO_TRIP_ENCODED_DECIMAL_SCHEMA = new 
Schema.Parser().parse(TRIP_ENCODED_DECIMAL_SCHEMA);
   public static final Schema AVRO_TRIP_SCHEMA = new 
Schema.Parser().parse(TRIP_SCHEMA);
   public static final Schema FLATTENED_AVRO_SCHEMA = new 
Schema.Parser().parse(TRIP_FLATTENED_SCHEMA);
   private final Random rand;
@@ -294,6 +298,8 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
       return generateRandomValue(key, commitTime, true);
     } else if (TRIP_EXAMPLE_SCHEMA.equals(schemaStr)) {
       return generateRandomValue(key, commitTime, isFlattened);
+    } else if (TRIP_ENCODED_DECIMAL_SCHEMA.equals(schemaStr)) {
+      return generatePayloadForTripEncodedDecimalSchema(key, commitTime);
     } else if (TRIP_SCHEMA.equals(schemaStr)) {
       return generatePayloadForTripSchema(key, commitTime);
     } else if (SHORT_TRIP_SCHEMA.equals(schemaStr)) {
@@ -353,6 +359,17 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
     return new RawTripTestPayload(rec.toString(), key.getRecordKey(), 
key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA);
   }
 
+  /**
+   * Generates a new avro record with TRIP_ENCODED_DECIMAL_SCHEMA, retaining 
the key if optionally provided.
+   */
+  public RawTripTestPayload 
generatePayloadForTripEncodedDecimalSchema(HoodieKey key, String commitTime)
+      throws IOException {
+    GenericRecord rec =
+        generateRecordForTripEncodedDecimalSchema(key.getRecordKey(), "rider-" 
+ commitTime, "driver-" + commitTime, 0);
+    return new RawTripTestPayload(rec.toString(), key.getRecordKey(), 
key.getPartitionPath(),
+        TRIP_ENCODED_DECIMAL_SCHEMA);
+  }
+
   /**
    * Generates a new avro record with TRIP_SCHEMA, retaining the key if 
optionally provided.
    */
@@ -508,6 +525,38 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
     return rec;
   }
 
+  /*
+Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
+ */
+  public GenericRecord generateRecordForTripEncodedDecimalSchema(String 
rowKey, String riderName, String driverName,
+                                                                 long 
timestamp) {
+    GenericRecord rec = new 
GenericData.Record(AVRO_TRIP_ENCODED_DECIMAL_SCHEMA);
+    rec.put("_row_key", rowKey);
+    rec.put("timestamp", timestamp);
+    rec.put("rider", riderName);
+    rec.put("decfield", getNonzeroEncodedBigDecimal(rand, 6, 10));
+    rec.put("lowprecision", getNonzeroEncodedBigDecimal(rand, 2, 4));
+    rec.put("highprecision", getNonzeroEncodedBigDecimal(rand, 12, 32));
+    rec.put("driver", driverName);
+    rec.put("fare", rand.nextDouble() * 100);
+    rec.put("_hoodie_is_deleted", false);
+    return rec;
+  }
+
+  private static String getNonzeroEncodedBigDecimal(Random rand, int scale, 
int precision) {
+    //scale the value because rand.nextDouble() only returns a val that is 
between 0 and 1
+
+    //make it between 0.1 and 1 so that we keep all values in the same order 
of magnitude
+    double nextDouble = rand.nextDouble();
+    while (nextDouble <= 0.1) {
+      nextDouble = rand.nextDouble();
+    }
+    double valuescale = Math.pow(10, precision - scale);
+    BigDecimal dec = BigDecimal.valueOf(nextDouble * valuescale)
+        .setScale(scale, RoundingMode.HALF_UP).round(new 
MathContext(precision, RoundingMode.HALF_UP));
+    return 
Base64.getEncoder().encodeToString(dec.unscaledValue().toByteArray());
+  }
+
   /*
   Generate random record using TRIP_SCHEMA
    */
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
index e4d2bf58e43..dd3849fcc71 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
@@ -20,18 +20,24 @@ package org.apache.hudi.utilities.schema;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.utilities.config.FilebasedSchemaProviderConfig;
+import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig;
 import org.apache.hudi.utilities.exception.HoodieSchemaProviderException;
 import org.apache.hudi.utilities.sources.helpers.SanitizationUtils;
 
+import io.confluent.kafka.schemaregistry.ParsedSchema;
+import io.confluent.kafka.schemaregistry.json.JsonSchema;
 import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.spark.api.java.JavaSparkContext;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.Collections;
 
 import static 
org.apache.hudi.common.util.ConfigUtils.checkRequiredConfigProperties;
@@ -47,8 +53,6 @@ public class FilebasedSchemaProvider extends SchemaProvider {
 
   private final String sourceFile;
   private final String targetFile;
-  private final boolean shouldSanitize;
-  private final String invalidCharMask;
 
   protected Schema sourceSchema;
 
@@ -59,8 +63,6 @@ public class FilebasedSchemaProvider extends SchemaProvider {
     checkRequiredConfigProperties(props, 
Collections.singletonList(FilebasedSchemaProviderConfig.SOURCE_SCHEMA_FILE));
     this.sourceFile = getStringWithAltKeys(props, 
FilebasedSchemaProviderConfig.SOURCE_SCHEMA_FILE);
     this.targetFile = getStringWithAltKeys(props, 
FilebasedSchemaProviderConfig.TARGET_SCHEMA_FILE, sourceFile);
-    this.shouldSanitize = SanitizationUtils.shouldSanitize(props);
-    this.invalidCharMask = SanitizationUtils.getInvalidCharMask(props);
     this.fs = HadoopFSUtils.getFs(sourceFile, jssc.hadoopConfiguration(), 
true);
     this.sourceSchema = parseSchema(this.sourceFile);
     if (containsConfigProperty(props, 
FilebasedSchemaProviderConfig.TARGET_SCHEMA_FILE)) {
@@ -69,7 +71,7 @@ public class FilebasedSchemaProvider extends SchemaProvider {
   }
 
   private Schema parseSchema(String schemaFile) {
-    return readAvroSchemaFromFile(schemaFile, this.fs, shouldSanitize, 
invalidCharMask);
+    return readSchemaFromFile(schemaFile, this.fs, config);
   }
 
   @Override
@@ -86,14 +88,50 @@ public class FilebasedSchemaProvider extends SchemaProvider 
{
     }
   }
 
-  private static Schema readAvroSchemaFromFile(String schemaPath, FileSystem 
fs, boolean sanitizeSchema, String invalidCharMask) {
-    String schemaStr;
-    try (InputStream in = fs.open(new Path(schemaPath))) {
-      schemaStr = FileIOUtils.readAsUTFString(in);
+  private static Schema readSchemaFromFile(String schemaPath, FileSystem fs, 
TypedProperties props) {
+    return schemaPath.endsWith(".json")
+        ? readJsonSchemaFromFile(schemaPath, fs, props)
+        : readAvroSchemaFromFile(schemaPath, fs, props);
+  }
+
+  private static Schema readJsonSchemaFromFile(String schemaPath, FileSystem 
fs, TypedProperties props) {
+    String schemaConverterClass = getStringWithAltKeys(props, 
HoodieSchemaProviderConfig.SCHEMA_CONVERTER, true);
+    SchemaRegistryProvider.SchemaConverter schemaConverter;
+    try {
+      
ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(schemaConverterClass),
+          "Schema converter class must be set for the json file based schema 
provider");
+      schemaConverter = (SchemaRegistryProvider.SchemaConverter) 
ReflectionUtils.loadClass(
+          schemaConverterClass, new Class<?>[] {TypedProperties.class}, props);
+    } catch (Exception e) {
+      throw new HoodieSchemaProviderException("Error loading json schema 
converter", e);
+    }
+    String schemaStr = readSchemaString(schemaPath, fs);
+    ParsedSchema parsedSchema = new JsonSchema(schemaStr);
+    String convertedSchema;
+    try {
+      convertedSchema = schemaConverter.convert(parsedSchema);
+    } catch (IOException e) {
+      throw new HoodieSchemaProviderException(String.format("Error converting 
json schema from file %s", schemaPath),
+          e);
+    }
+    return new Schema.Parser().parse(convertedSchema);
+  }
+
+  private static Schema readAvroSchemaFromFile(String schemaPath,
+                                               FileSystem fs,
+                                               TypedProperties props) {
+    boolean shouldSanitize = SanitizationUtils.shouldSanitize(props);
+    String invalidCharMask = SanitizationUtils.getInvalidCharMask(props);
+    String schemaStr = readSchemaString(schemaPath, fs);
+    return SanitizationUtils.parseAvroSchema(schemaStr, shouldSanitize, 
invalidCharMask);
+  }
+
+  private static String readSchemaString(String schemaPath, FileSystem fs) {
+    try (FSDataInputStream in = fs.open(new Path(schemaPath))) {
+      return FileIOUtils.readAsUTFString(in);
     } catch (IOException ioe) {
       throw new HoodieSchemaProviderException(String.format("Error reading 
schema from file %s", schemaPath), ioe);
     }
-    return SanitizationUtils.parseAvroSchema(schemaStr, sanitizeSchema, 
invalidCharMask);
   }
 
   // Per write batch, refresh the schemas from the file
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
index fa9e0dc5add..f326c98826e 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
@@ -21,6 +21,7 @@ package org.apache.hudi.utilities.streamer;
 
 import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.MercifulJsonConverter;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.table.checkpoint.Checkpoint;
@@ -33,6 +34,7 @@ import 
org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
 import org.apache.hudi.utilities.sources.InputBatch;
+import org.apache.hudi.utilities.sources.KafkaSource;
 import org.apache.hudi.utilities.sources.Source;
 import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
 import org.apache.hudi.utilities.sources.helpers.RowConverter;
@@ -165,6 +167,11 @@ public class SourceFormatAdapter implements Closeable {
     );
   }
 
+  private InputBatch<JavaRDD<GenericRecord>> 
convertJsonStringToAvroFormat(InputBatch<JavaRDD<String>> r) {
+    JavaRDD<GenericRecord> eventsRdd = transformJsonToGenericRdd(r);
+    return new InputBatch<>(Option.ofNullable(eventsRdd), 
r.getCheckpointForNextBatch(), r.getSchemaProvider());
+  }
+
   /**
    * Fetch new data in avro format. If the source provides data in different 
format, they are translated to Avro format
    */
@@ -176,8 +183,7 @@ public class SourceFormatAdapter implements Closeable {
       case JSON: {
         //sanitizing is done inside the convertor in transformJsonToGenericRdd 
if enabled
         InputBatch<JavaRDD<String>> r = ((Source<JavaRDD<String>>) 
source).fetchNext(lastCheckpoint, sourceLimit);
-        JavaRDD<GenericRecord> eventsRdd = transformJsonToGenericRdd(r);
-        return new 
InputBatch<>(Option.ofNullable(eventsRdd),r.getCheckpointForNextBatch(), 
r.getSchemaProvider());
+        return convertJsonStringToAvroFormat(r);
       }
       case ROW: {
         //we do the sanitizing here if enabled
@@ -240,8 +246,10 @@ public class SourceFormatAdapter implements Closeable {
       case JSON: {
         InputBatch<JavaRDD<String>> r = ((Source<JavaRDD<String>>) 
source).fetchNext(lastCheckpoint, sourceLimit);
         Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
-
-        if (isFieldNameSanitizingEnabled()) {
+        // Decimal fields need additional decoding from json generated by 
kafka-connect. JSON -> ROW conversion is done through
+        // a spark library that has not implemented this decoding
+        if (isFieldNameSanitizingEnabled()
+            || (HoodieAvroUtils.hasDecimalField(sourceSchema) && source 
instanceof KafkaSource)) {
           StructType dataType = 
AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema);
           JavaRDD<Row> rowRDD = transformJsonToRowRdd(r);
           if (rowRDD != null) {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
index 278bafac001..09e3113407d 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
@@ -195,6 +195,13 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
 
     
UtilitiesTestBase.Helpers.copyToDFS("streamer-config/source_short_trip_uber.avsc",
 storage, dfsBasePath + "/source_short_trip_uber.avsc");
     UtilitiesTestBase.Helpers.copyToDFS("streamer-config/source_uber.avsc", 
storage, dfsBasePath + "/source_uber.avsc");
+    UtilitiesTestBase.Helpers.copyToDFS(
+        "streamer-config/source_uber_encoded_decimal.json", storage,
+        dfsBasePath + "/source_uber_encoded_decimal.json");
+    UtilitiesTestBase.Helpers.copyToDFS(
+        "streamer-config/source_uber_encoded_decimal.avsc", storage,
+        dfsBasePath + "/source_uber_encoded_decimal.avsc");
+
     
UtilitiesTestBase.Helpers.copyToDFS("streamer-config/target_short_trip_uber.avsc",
 storage, dfsBasePath + "/target_short_trip_uber.avsc");
     UtilitiesTestBase.Helpers.copyToDFS("streamer-config/target_uber.avsc", 
storage, dfsBasePath + "/target_uber.avsc");
     
UtilitiesTestBase.Helpers.copyToDFS("streamer-config/invalid_hive_sync_uber_config.properties",
 storage, dfsBasePath + "/config/invalid_hive_sync_uber_config.properties");
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestFilebasedSchemaProvider.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestFilebasedSchemaProvider.java
index 389282ddcdb..6d310ff3db2 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestFilebasedSchemaProvider.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestFilebasedSchemaProvider.java
@@ -19,13 +19,13 @@
 package org.apache.hudi.utilities.schema;
 
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig;
+import org.apache.hudi.utilities.schema.converter.JsonToAvroSchemaConverter;
 import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
 
 import org.apache.avro.SchemaParseException;
 import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
@@ -41,7 +41,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 /**
  * Unit tests for {@link FilebasedSchemaProvider}.
  */
-public class TestFilebasedSchemaProvider extends UtilitiesTestBase {
+class TestFilebasedSchemaProvider extends UtilitiesTestBase {
 
   private FilebasedSchemaProvider schemaProvider;
 
@@ -55,25 +55,15 @@ public class TestFilebasedSchemaProvider extends 
UtilitiesTestBase {
     UtilitiesTestBase.cleanUpUtilitiesTestServices();
   }
 
-  @BeforeEach
-  public void setup() throws Exception {
-    super.setup();
-  }
-
-  @AfterEach
-  public void teardown() throws Exception {
-    super.teardown();
-  }
-
   @Test
-  public void properlyFormattedNestedSchemaTest() throws IOException {
+  void properlyFormattedNestedSchemaTest() throws IOException {
     this.schemaProvider = new FilebasedSchemaProvider(
         Helpers.setupSchemaOnDFS("streamer-config", 
"file_schema_provider_valid.avsc"), jsc);
     assertEquals(this.schemaProvider.getSourceSchema(), 
generateProperFormattedSchema());
   }
 
   @Test
-  public void renameBadlyFormattedSchemaTest() throws IOException {
+  void renameBadlyFormattedSchemaTest() throws IOException {
     TypedProperties props = Helpers.setupSchemaOnDFS("streamer-config", 
"file_schema_provider_invalid.avsc");
     props.put(SANITIZE_SCHEMA_FIELD_NAMES.key(), "true");
     this.schemaProvider = new FilebasedSchemaProvider(props, jsc);
@@ -81,7 +71,7 @@ public class TestFilebasedSchemaProvider extends 
UtilitiesTestBase {
   }
 
   @Test
-  public void renameBadlyFormattedSchemaWithPropertyDisabledTest() {
+  void renameBadlyFormattedSchemaWithPropertyDisabledTest() {
     assertThrows(SchemaParseException.class, () -> {
       new FilebasedSchemaProvider(
           Helpers.setupSchemaOnDFS("streamer-config", 
"file_schema_provider_invalid.avsc"), jsc);
@@ -89,11 +79,22 @@ public class TestFilebasedSchemaProvider extends 
UtilitiesTestBase {
   }
 
   @Test
-  public void renameBadlyFormattedSchemaWithAltCharMaskConfiguredTest() throws 
IOException {
+  void renameBadlyFormattedSchemaWithAltCharMaskConfiguredTest() throws 
IOException {
     TypedProperties props = Helpers.setupSchemaOnDFS("streamer-config", 
"file_schema_provider_invalid.avsc");
     props.put(SANITIZE_SCHEMA_FIELD_NAMES.key(), "true");
     props.put(SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.key(), "_");
     this.schemaProvider = new FilebasedSchemaProvider(props, jsc);
     assertEquals(this.schemaProvider.getSourceSchema(), 
generateRenamedSchemaWithConfiguredReplacement());
   }
+
+  @Test
+  void testJsonSchema() throws IOException {
+    TypedProperties jsonProps = Helpers.setupSchemaOnDFS("streamer-config", 
"source_uber_encoded_decimal.json");
+    jsonProps.setProperty(HoodieSchemaProviderConfig.SCHEMA_CONVERTER.key(), 
JsonToAvroSchemaConverter.class.getName());
+    FilebasedSchemaProvider jsonFilebasedSchemaProvider = new 
FilebasedSchemaProvider(jsonProps, jsc);
+    FilebasedSchemaProvider filebasedSchemaProvider = new 
FilebasedSchemaProvider(
+        Helpers.setupSchemaOnDFS("streamer-config", 
"source_uber_encoded_decimal.avsc"), jsc);
+
+    assertEquals(filebasedSchemaProvider.getSourceSchema(), 
jsonFilebasedSchemaProvider.getSourceSchema());
+  }
 }
\ No newline at end of file
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestJsonToAvroSchemaConverter.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestJsonToAvroSchemaConverter.java
index 6f5766e81dc..e0684fecf8e 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestJsonToAvroSchemaConverter.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestJsonToAvroSchemaConverter.java
@@ -52,6 +52,7 @@ class TestJsonToAvroSchemaConverter {
       "not-null-default-value-schema,_no_stripping_quotes",
       "not-null-default-value-schema,_stripping_quotes",
       "array-with-item-type-union,",
+      "kafka-decimal-simple,"
   })
   void testConvertJsonSchemaToAvroSchema(String inputCase, String 
avroSchemaFileSuffix) throws IOException {
     String jsonSchema = loadJsonSchema(inputCase);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
index 82bbd3a01c5..fb9c05d0fd1 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.utilities.sources;
 
+import org.apache.hudi.HoodieSchemaUtils;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
@@ -30,10 +31,12 @@ import 
org.apache.hudi.common.testutils.InProcessTimeGenerator;
 import org.apache.hudi.common.testutils.RawTripTestPayload;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig;
 import org.apache.hudi.utilities.config.HoodieStreamerConfig;
 import org.apache.hudi.utilities.config.KafkaSourceConfig;
 import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
 import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.hudi.utilities.schema.converter.JsonToAvroSchemaConverter;
 import org.apache.hudi.utilities.streamer.BaseErrorTableWriter;
 import org.apache.hudi.utilities.streamer.DefaultStreamContext;
 import org.apache.hudi.utilities.streamer.ErrorEvent;
@@ -59,6 +62,7 @@ import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.net.URISyntaxException;
 import java.net.URL;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -251,6 +255,43 @@ public class TestJsonKafkaSource extends 
BaseTestKafkaSource {
     assertEquals(Option.empty(), fetch6.getBatch());
   }
 
+  @Test
+  void testJsonKafkaSourceWithEncodedDecimals() throws URISyntaxException {
+    String schemaFilePath = 
Objects.requireNonNull(TestJsonKafkaSource.class.getClassLoader()
+        
.getResource("streamer-config/source_uber_encoded_decimal.json")).toURI().getPath();
+    final String topic = TEST_TOPIC_PREFIX + 
"testJsonKafkaSourceWithEncodedDecimals";
+    testUtils.createTopic(topic, 2);
+    TypedProperties props = createPropsForKafkaSource(topic, Long.MAX_VALUE, 
"earliest");
+    props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", 
schemaFilePath);
+    props.put(HoodieSchemaProviderConfig.SCHEMA_CONVERTER.key(), 
JsonToAvroSchemaConverter.class.getName());
+    schemaProvider = new FilebasedSchemaProvider(props, jsc());
+
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+    Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), 
schemaProvider, metrics);
+    SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
+    List<HoodieRecord> send1 =
+        dataGenerator.generateInsertsAsPerSchema("000", 10, 
HoodieTestDataGenerator.TRIP_ENCODED_DECIMAL_SCHEMA);
+    testUtils.sendMessages(topic, jsonifyRecords(send1));
+    InputBatch<JavaRDD<GenericRecord>> fetch1 = 
kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
+    List<GenericRecord> recs = fetch1.getBatch().get().collect();
+    assertEquals(10, recs.size());
+
+    Schema deducedSchema =
+        HoodieSchemaUtils.deduceWriterSchema(schemaProvider.getSourceSchema(), 
Option.empty(), Option.empty(), props);
+    verifyDecimalValue(recs, deducedSchema, "decfield");
+    verifyDecimalValue(recs, deducedSchema, "lowprecision");
+    verifyDecimalValue(recs, deducedSchema, "highprecision");
+
+    testUtils.sendMessages(topic, jsonifyRecords(
+        dataGenerator.generateInsertsAsPerSchema("001", 20, 
HoodieTestDataGenerator.TRIP_ENCODED_DECIMAL_SCHEMA)));
+    InputBatch<Dataset<Row>> fetch2 =
+        
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()),
 30);
+    assertEquals(20, fetch2.getBatch().get().count());
+    assertEquals(20, fetch2.getBatch().get().filter("decfield < 
10000.0").filter("decfield > 1000.0")
+        .filter("lowprecision < 100.0").filter("lowprecision > 10.0")
+        .filter("highprecision < 
100000000000000000000.0").filter("highprecision > 
10000000000000000000.0").count());
+  }
+
   private static void verifyDecimalValue(List<GenericRecord> records, Schema 
schema, String fieldname) {
     Schema fieldSchema = schema.getField(fieldname).schema();
     LogicalTypes.Decimal decField = (LogicalTypes.Decimal) 
fieldSchema.getLogicalType();
diff --git 
a/hudi-utilities/src/test/resources/schema-provider/json/kafka-decimal-simple/expected.json
 
b/hudi-utilities/src/test/resources/schema-provider/json/kafka-decimal-simple/expected.json
new file mode 100644
index 00000000000..8dac6d45da0
--- /dev/null
+++ 
b/hudi-utilities/src/test/resources/schema-provider/json/kafka-decimal-simple/expected.json
@@ -0,0 +1,24 @@
+{
+  "type" : "record",
+  "name" : "Product",
+  "doc" : "A product from Acme's catalog",
+  "namespace" : "example.com",
+  "fields" : [ {
+    "name" : "product",
+    "doc" : "The unique identifier for a product",
+    "type" : "long"
+  }, {
+    "name" : "productId",
+    "doc" : "The unique identifier for a product",
+    "type" : {
+      "type": "bytes",
+      "logicalType": "decimal",
+      "precision": 14,
+      "scale": 2
+    }
+  }, {
+    "name" : "productName",
+    "doc" : "Name of the product",
+    "type" : "string"
+  }]
+}
diff --git 
a/hudi-utilities/src/test/resources/schema-provider/json/kafka-decimal-simple/input.json
 
b/hudi-utilities/src/test/resources/schema-provider/json/kafka-decimal-simple/input.json
new file mode 100644
index 00000000000..1d13d79a6bb
--- /dev/null
+++ 
b/hudi-utilities/src/test/resources/schema-provider/json/kafka-decimal-simple/input.json
@@ -0,0 +1,28 @@
+{
+  "$schema": "https://json-schema.org/draft/2020-12/schema";,
+  "$id": "https://example.com/product.schema.json";,
+  "title": "Product",
+  "description": "A product from Acme's catalog",
+  "type": "object",
+  "properties": {
+    "product": {
+      "description": "The unique identifier for a product",
+      "type": "integer"
+    },
+    "productId": {
+      "description": "The unique identifier for a product",
+      "title": "org.apache.kafka.connect.data.Decimal",
+      "connect.parameters": {
+        "connect.decimal.precision": "14",
+        "scale": "2"
+      },
+      "connect.type": "bytes",
+      "type": "number"
+    },
+    "productName": {
+      "description": "Name of the product",
+      "type": "string"
+    }
+  },
+  "required": [ "product", "productId", "productName" ]
+}
diff --git 
a/hudi-utilities/src/test/resources/streamer-config/source_uber_encoded_decimal.avsc
 
b/hudi-utilities/src/test/resources/streamer-config/source_uber_encoded_decimal.avsc
new file mode 100644
index 00000000000..a4666511836
--- /dev/null
+++ 
b/hudi-utilities/src/test/resources/streamer-config/source_uber_encoded_decimal.avsc
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+  "type" : "record",
+  "name" : "tripUberRec",
+  "doc" : "",
+  "fields" : [
+  {
+    "name" : "timestamp",
+    "doc" : "",
+    "type" : "long"
+  }, {
+    "name" : "_row_key",
+    "doc" : "",
+    "type" : "string"
+  }, {
+    "name" : "rider",
+    "doc" : "",
+    "type" : "string"
+  }, {
+    "name" : "decfield",
+    "doc" : "",
+    "type": {
+      "type": "bytes",
+      "logicalType": "decimal",
+      "precision": 10,
+      "scale": 6
+    }
+  }, {
+      "name" : "lowprecision",
+      "doc" : "",
+      "type": {
+        "type": "bytes",
+        "logicalType": "decimal",
+        "precision": 4,
+        "scale": 2
+      }
+  }, {
+    "name" : "highprecision",
+    "doc" : "",
+    "type": {
+      "type": "bytes",
+      "logicalType": "decimal",
+      "precision": 32,
+      "scale": 12
+    }
+  }, {
+    "name" : "driver",
+    "doc" : "",
+    "type" : "string"
+  }, {
+    "name" : "fare",
+    "doc" : "",
+    "type" : "double"
+  },
+  {
+    "name" : "_hoodie_is_deleted",
+    "doc" : "",
+    "type" : "boolean",
+    "default" : false
+  } ]
+}
diff --git 
a/hudi-utilities/src/test/resources/streamer-config/source_uber_encoded_decimal.json
 
b/hudi-utilities/src/test/resources/streamer-config/source_uber_encoded_decimal.json
new file mode 100644
index 00000000000..a36492cba5b
--- /dev/null
+++ 
b/hudi-utilities/src/test/resources/streamer-config/source_uber_encoded_decimal.json
@@ -0,0 +1,54 @@
+{
+  "$schema": "https://json-schema.org/draft/2020-12/schema";,
+  "title": "tripUberRec",
+  "type": "object",
+  "properties": {
+    "timestamp": {
+      "type": "integer"
+    },
+    "_row_key": {
+      "type": "string"
+    },
+    "rider": {
+      "type": "string"
+    },
+    "decfield": {
+      "title": "org.apache.kafka.connect.data.Decimal",
+      "connect.parameters": {
+        "connect.decimal.precision": "10",
+        "scale": "6"
+      },
+      "connect.type": "bytes",
+      "type": "number"
+    },
+    "lowprecision": {
+      "title": "org.apache.kafka.connect.data.Decimal",
+      "connect.parameters": {
+        "connect.decimal.precision": "4",
+        "scale": "2"
+      },
+      "connect.type": "bytes",
+      "type": "number"
+    },
+    "highprecision": {
+      "title": "org.apache.kafka.connect.data.Decimal",
+      "connect.parameters": {
+        "connect.decimal.precision": "32",
+        "scale": "12"
+      },
+      "connect.type": "bytes",
+      "type": "number"
+    },
+    "driver": {
+      "type":"string"
+    },
+    "fare": {
+      "type": "number"
+    },
+    "_hoodie_is_deleted": {
+      "type": "boolean",
+      "default": "false"
+    }
+  },
+  "required": [ "timestamp", "_row_key", "rider", "decfield", "lowprecision", 
"highprecision", "driver", "fare", "_hoodie_is_deleted" ]
+}

Reply via email to