This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 721f7fc9d30 [HUDI-9072] Support decimal in JsonKafkaSource (#12879)
721f7fc9d30 is described below
commit 721f7fc9d30c907645d24ce43df4cbe0baba4d19
Author: Y Ethan Guo <[email protected]>
AuthorDate: Tue Feb 25 19:34:19 2025 -0800
[HUDI-9072] Support decimal in JsonKafkaSource (#12879)
---
.../java/org/apache/hudi/avro/HoodieAvroUtils.java | 16 +++++
.../hudi/avro/MercifulJsonConverterTestBase.java | 16 +++++
.../org/apache/hudi/avro/TestHoodieAvroUtils.java | 71 +++++++++++++++++---
.../hudi/avro/TestMercifulJsonConverter.java | 64 ++++++++++++++++++
.../common/testutils/HoodieTestDataGenerator.java | 49 ++++++++++++++
.../utilities/schema/FilebasedSchemaProvider.java | 60 +++++++++++++----
.../utilities/streamer/SourceFormatAdapter.java | 16 +++--
.../deltastreamer/HoodieDeltaStreamerTestBase.java | 7 ++
.../schema/TestFilebasedSchemaProvider.java | 35 +++++-----
.../converter/TestJsonToAvroSchemaConverter.java | 1 +
.../utilities/sources/TestJsonKafkaSource.java | 41 ++++++++++++
.../json/kafka-decimal-simple/expected.json | 24 +++++++
.../json/kafka-decimal-simple/input.json | 28 ++++++++
.../source_uber_encoded_decimal.avsc | 77 ++++++++++++++++++++++
.../source_uber_encoded_decimal.json | 54 +++++++++++++++
15 files changed, 519 insertions(+), 40 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index b232ee304b5..188311a5666 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -1149,6 +1149,8 @@ public class HoodieAvroUtils {
// due to Java, there will be precision problems in direct
conversion, we should use string instead of use double
BigDecimal bigDecimal = new
java.math.BigDecimal(oldValue.toString()).setScale(decimal.getScale(),
RoundingMode.HALF_UP);
return DECIMAL_CONVERSION.toFixed(bigDecimal, newSchema,
newSchema.getLogicalType());
+ } else if (oldSchema.getType() == Schema.Type.BYTES) {
+ return convertBytesToFixed(((ByteBuffer) oldValue).array(),
newSchema);
}
}
break;
@@ -1157,6 +1159,16 @@ public class HoodieAvroUtils {
throw new HoodieAvroSchemaException(String.format("cannot support rewrite
value for schema type: %s since the old schema type is: %s", newSchema,
oldSchema));
}
+ /**
+ * bytes is the result of BigDecimal.unscaledValue().toByteArray();
+ * This is also what Conversions.DecimalConversion.toBytes() outputs inside
a byte buffer
+ */
+ public static Object convertBytesToFixed(byte[] bytes, Schema schema) {
+ LogicalTypes.Decimal decimal = (LogicalTypes.Decimal)
schema.getLogicalType();
+ BigDecimal bigDecimal = convertBytesToBigDecimal(bytes, decimal);
+ return DECIMAL_CONVERSION.toFixed(bigDecimal, schema, decimal);
+ }
+
/**
* Use this instead of DECIMAL_CONVERSION.fromBytes() because that method
does not add in precision
*
@@ -1168,6 +1180,10 @@ public class HoodieAvroUtils {
decimal.getScale(), new MathContext(decimal.getPrecision(),
RoundingMode.HALF_UP));
}
+ public static boolean hasDecimalField(Schema schema) {
+ return hasDecimalWithCondition(schema, unused -> true);
+ }
+
/**
* Checks whether the provided schema contains a decimal with a precision
less than or equal to 18,
* which allows the decimal to be stored as int/long instead of a fixed size
byte array in
diff --git
a/hudi-common/src/test/java/org/apache/hudi/avro/MercifulJsonConverterTestBase.java
b/hudi-common/src/test/java/org/apache/hudi/avro/MercifulJsonConverterTestBase.java
index c9e643042d6..3cf60e49e89 100644
---
a/hudi-common/src/test/java/org/apache/hudi/avro/MercifulJsonConverterTestBase.java
+++
b/hudi-common/src/test/java/org/apache/hudi/avro/MercifulJsonConverterTestBase.java
@@ -353,4 +353,20 @@ public class MercifulJsonConverterTestBase {
"{\"first\":\"John\",\"last\":\"Smith\",\"suffix\":3}"
);
}
+
+ static Stream<Object> encodedDecimalScalePrecisionProvider() {
+ return Stream.of(
+ Arguments.of(6, 10),
+ Arguments.of(30, 32),
+ Arguments.of(1, 3)
+ );
+ }
+
+ static Stream<Object> encodedDecimalFixedScalePrecisionProvider() {
+ return Stream.of(
+ Arguments.of(5, 6, 10),
+ Arguments.of(14, 30, 32),
+ Arguments.of(2, 1, 3)
+ );
+ }
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
index 625453ef2e5..b1c8c453f82 100644
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
@@ -34,13 +34,16 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.RewriteAvroPayload;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.SchemaCompatibilityException;
import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Conversions;
import org.apache.avro.JsonProperties;
+import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
@@ -63,6 +66,8 @@ import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.math.BigDecimal;
+import java.math.MathContext;
+import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.sql.Date;
import java.sql.Timestamp;
@@ -75,6 +80,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -99,7 +105,7 @@ public class TestHoodieAvroUtils {
private static final Logger LOG =
LoggerFactory.getLogger(TestHoodieAvroUtils.class);
- private static String EVOLVED_SCHEMA = "{\"type\": \"record\",\"name\":
\"testrec1\",\"fields\": [ "
+ private static final String EVOLVED_SCHEMA = "{\"type\":
\"record\",\"name\": \"testrec1\",\"fields\": [ "
+ "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\":
\"_row_key\", \"type\": \"string\"},"
+ "{\"name\": \"non_pii_col\", \"type\": \"string\"},"
+ "{\"name\": \"pii_col\", \"type\": \"string\", \"column_category\":
\"user_profile\"},"
@@ -108,7 +114,7 @@ public class TestHoodieAvroUtils {
+ "{\"name\": \"new_col_nullable_default_null\", \"type\": [\"null\"
,\"string\"],\"default\": null},"
+ "{\"name\": \"new_col_nullable_default_dummy_val\", \"type\":
[\"string\" ,\"null\"],\"default\": \"dummy_val\"}]}";
- private static String EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\":
\"testrec\",\"fields\": [ "
+ private static final String EXAMPLE_SCHEMA = "{\"type\":
\"record\",\"name\": \"testrec\",\"fields\": [ "
+ "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\":
\"_row_key\", \"type\": \"string\"},"
+ "{\"name\": \"non_pii_col\", \"type\": \"string\"},"
+ "{\"name\": \"pii_col\", \"type\": \"string\", \"column_category\":
\"user_profile\"}]}";
@@ -119,9 +125,9 @@ public class TestHoodieAvroUtils {
+ "{\"name\": \"pii_col\", \"type\": \"string\", \"column_category\":
\"user_profile\"}], "
+ "\"custom_schema_property\": \"custom_schema_property_value\"}";
- private static int NUM_FIELDS_IN_EXAMPLE_SCHEMA = 4;
+ private static final int NUM_FIELDS_IN_EXAMPLE_SCHEMA = 4;
- private static String SCHEMA_WITH_METADATA_FIELD = "{\"type\":
\"record\",\"name\": \"testrec2\",\"fields\": [ "
+ private static final String SCHEMA_WITH_METADATA_FIELD = "{\"type\":
\"record\",\"name\": \"testrec2\",\"fields\": [ "
+ "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\":
\"_row_key\", \"type\": \"string\"},"
+ "{\"name\": \"non_pii_col\", \"type\": \"string\"},"
+ "{\"name\": \"pii_col\", \"type\": \"string\", \"column_category\":
\"user_profile\"},"
@@ -129,7 +135,8 @@ public class TestHoodieAvroUtils {
+ "{\"name\": \"nullable_field\",\"type\": [\"null\"
,\"string\"],\"default\": null},"
+ "{\"name\": \"nullable_field_wo_default\",\"type\": [\"null\"
,\"string\"]}]}";
- private static String SCHEMA_WITH_NON_NULLABLE_FIELD = "{\"type\":
\"record\",\"name\": \"testrec3\",\"fields\": [ "
+ private static final String SCHEMA_WITH_NON_NULLABLE_FIELD =
+ "{\"type\": \"record\",\"name\": \"testrec3\",\"fields\": [ "
+ "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\":
\"_row_key\", \"type\": \"string\"},"
+ "{\"name\": \"non_pii_col\", \"type\": \"string\"},"
+ "{\"name\": \"pii_col\", \"type\": \"string\", \"column_category\":
\"user_profile\"},"
@@ -137,14 +144,15 @@ public class TestHoodieAvroUtils {
+ "{\"name\": \"non_nullable_field_wo_default\",\"type\": \"string\"},"
+ "{\"name\": \"non_nullable_field_with_default\",\"type\": \"string\",
\"default\": \"dummy\"}]}";
- private static String SCHEMA_WITH_NON_NULLABLE_FIELD_WITH_DEFAULT =
"{\"type\": \"record\",\"name\": \"testrec4\",\"fields\": [ "
+ private static final String SCHEMA_WITH_NON_NULLABLE_FIELD_WITH_DEFAULT =
+ "{\"type\": \"record\",\"name\": \"testrec4\",\"fields\": [ "
+ "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\":
\"_row_key\", \"type\": \"string\"},"
+ "{\"name\": \"non_pii_col\", \"type\": \"string\"},"
+ "{\"name\": \"pii_col\", \"type\": \"string\", \"column_category\":
\"user_profile\"},"
+ "{\"name\": \"nullable_field\",\"type\": [\"null\"
,\"string\"],\"default\": null},"
+ "{\"name\": \"non_nullable_field_with_default\",\"type\": \"string\",
\"default\": \"dummy\"}]}";
- private static String SCHEMA_WITH_DECIMAL_FIELD =
"{\"type\":\"record\",\"name\":\"record\",\"fields\":["
+ private static final String SCHEMA_WITH_DECIMAL_FIELD =
"{\"type\":\"record\",\"name\":\"record\",\"fields\":["
+ "{\"name\":\"key_col\",\"type\":[\"null\",\"int\"],\"default\":null},"
+ "{\"name\":\"decimal_col\",\"type\":[\"null\","
+
"{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":8,\"scale\":4}],\"default\":null}]}";
@@ -155,7 +163,8 @@ public class TestHoodieAvroUtils {
+
"{\"name\":\"student\",\"type\":{\"name\":\"student\",\"type\":\"record\",\"fields\":["
+ "{\"name\":\"firstname\",\"type\":[\"null\" ,\"string\"],\"default\":
null},{\"name\":\"lastname\",\"type\":[\"null\" ,\"string\"],\"default\":
null}]}}]}";
- private static String SCHEMA_WITH_NESTED_FIELD_RENAMED =
"{\"name\":\"MyClass\",\"type\":\"record\",\"namespace\":\"com.acme.avro\",\"fields\":["
+ private static final String SCHEMA_WITH_NESTED_FIELD_RENAMED =
+
"{\"name\":\"MyClass\",\"type\":\"record\",\"namespace\":\"com.acme.avro\",\"fields\":["
+ "{\"name\":\"fn\",\"type\":\"string\"},"
+ "{\"name\":\"ln\",\"type\":\"string\"},"
+
"{\"name\":\"ss\",\"type\":{\"name\":\"ss\",\"type\":\"record\",\"fields\":["
@@ -773,6 +782,52 @@ public class TestHoodieAvroUtils {
assertEquals("{\"timestamp\": \"foo\", \"_row_key\": \"key\",
\"non_pii_col\": \"val1\", \"pii_col\": \"val2\"}", jsonString);
}
+ @Test
+ void testConvertBytesToFixed() {
+ Random rand = new Random();
+ //size calculated using
AvroInternalSchemaConverter.computeMinBytesForPrecision
+ testConverBytesToFixedHelper(rand.nextDouble(), 13, 7, 6);
+ testConverBytesToFixedHelper(rand.nextDouble(), 4, 2, 2);
+ testConverBytesToFixedHelper(rand.nextDouble(), 32, 12, 14);
+ }
+
+ private static void testConverBytesToFixedHelper(double value, int
precision, int scale, int size) {
+ BigDecimal decfield = BigDecimal.valueOf(value * Math.pow(10, precision -
scale))
+ .setScale(scale, RoundingMode.HALF_UP).round(new
MathContext(precision, RoundingMode.HALF_UP));
+ byte[] encodedDecimal = decfield.unscaledValue().toByteArray();
+ Schema fixedSchema = new Schema.Parser().parse("{\"type\":
\"record\",\"name\": \"testrec\",\"fields\": [{\"name\": \"decfield\",
\"type\": {\"type\": \"fixed\", \"name\": \"idk\","
+ + " \"logicalType\": \"decimal\", \"precision\": " + precision + ",
\"scale\": " + scale + ", \"size\": " + size +
"}}]}").getFields().get(0).schema();
+ GenericData.Fixed fixed = (GenericData.Fixed)
HoodieAvroUtils.convertBytesToFixed(encodedDecimal, fixedSchema);
+ BigDecimal after = new Conversions.DecimalConversion().fromFixed(fixed,
fixedSchema, fixedSchema.getLogicalType());
+ assertEquals(decfield, after);
+ }
+
+ @Test
+ void testHasDecimalField() {
+ assertTrue(HoodieAvroUtils.hasDecimalField(new
Schema.Parser().parse(SCHEMA_WITH_DECIMAL_FIELD)));
+ assertFalse(HoodieAvroUtils.hasDecimalField(new
Schema.Parser().parse(EVOLVED_SCHEMA)));
+ assertFalse(HoodieAvroUtils.hasDecimalField(new
Schema.Parser().parse(SCHEMA_WITH_NON_NULLABLE_FIELD)));
+
assertTrue(HoodieAvroUtils.hasDecimalField(HoodieTestDataGenerator.AVRO_SCHEMA));
+
assertTrue(HoodieAvroUtils.hasDecimalField(HoodieTestDataGenerator.AVRO_TRIP_ENCODED_DECIMAL_SCHEMA));
+ Schema recordWithMapAndArray =
Schema.createRecord("recordWithMapAndArray", null, null, false,
+ Arrays.asList(new Schema.Field("mapfield",
Schema.createMap(Schema.create(Schema.Type.INT)), null, null),
+ new Schema.Field("arrayfield",
Schema.createArray(Schema.create(Schema.Type.INT)), null, null)
+ ));
+ assertFalse(HoodieAvroUtils.hasDecimalField(recordWithMapAndArray));
+ Schema recordWithDecMapAndArray =
Schema.createRecord("recordWithDecMapAndArray", null, null, false,
+ Arrays.asList(new Schema.Field("mapfield",
+
Schema.createMap(LogicalTypes.decimal(10,6).addToSchema(Schema.create(Schema.Type.BYTES))),
null, null),
+ new Schema.Field("arrayfield",
Schema.createArray(Schema.create(Schema.Type.INT)), null, null)
+ ));
+ assertTrue(HoodieAvroUtils.hasDecimalField(recordWithDecMapAndArray));
+ Schema recordWithMapAndDecArray =
Schema.createRecord("recordWithMapAndDecArray", null, null, false,
+ Arrays.asList(new Schema.Field("mapfield",
+ Schema.createMap(Schema.create(Schema.Type.INT)), null, null), new
Schema.Field("arrayfield",
+
Schema.createArray(LogicalTypes.decimal(10,6).addToSchema(Schema.create(Schema.Type.BYTES))),
null, null)
+ ));
+ assertTrue(HoodieAvroUtils.hasDecimalField(recordWithMapAndDecArray));
+ }
+
@Test
void testCreateFullName() {
String result = HoodieAvroUtils.createFullName(new
ArrayDeque<>(Arrays.asList("a", "b", "c")));
diff --git
a/hudi-common/src/test/java/org/apache/hudi/avro/TestMercifulJsonConverter.java
b/hudi-common/src/test/java/org/apache/hudi/avro/TestMercifulJsonConverter.java
index 0844ac98fdb..6a451989bbe 100644
---
a/hudi-common/src/test/java/org/apache/hudi/avro/TestMercifulJsonConverter.java
+++
b/hudi-common/src/test/java/org/apache/hudi/avro/TestMercifulJsonConverter.java
@@ -21,8 +21,10 @@ package org.apache.hudi.avro;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.exception.HoodieJsonToAvroConversionException;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.Conversions;
+import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericFixed;
@@ -34,14 +36,19 @@ import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.math.BigDecimal;
+import java.math.MathContext;
+import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
+import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.stream.Stream;
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_ENCODED_DECIMAL_SCHEMA;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -534,4 +541,61 @@ public class TestMercifulJsonConverter extends
MercifulJsonConverterTestBase {
assertEquals(rec, CONVERTER.convert(json, sanitizedSchema));
}
+
+ @ParameterizedTest
+ @MethodSource("encodedDecimalScalePrecisionProvider")
+ void testEncodedDecimal(int scale, int precision) throws
JsonProcessingException {
+ Random rand = new Random();
+ BigDecimal decfield = BigDecimal.valueOf(rand.nextDouble())
+ .setScale(scale, RoundingMode.HALF_UP).round(new
MathContext(precision, RoundingMode.HALF_UP));
+ Map<String, Object> data = new HashMap<>();
+ data.put("_row_key", "mykey");
+ long timestamp = 214523432;
+ data.put("timestamp", timestamp);
+ data.put("rider", "myrider");
+ data.put("decfield",
Base64.getEncoder().encodeToString(decfield.unscaledValue().toByteArray()));
+ data.put("driver", "mydriver");
+ data.put("fare", rand.nextDouble() * 100);
+ data.put("_hoodie_is_deleted", false);
+ String json = MAPPER.writeValueAsString(data);
+ Schema tripSchema = new Schema.Parser().parse(
+ TRIP_ENCODED_DECIMAL_SCHEMA.replace("6",
Integer.toString(scale)).replace("10", Integer.toString(precision)));
+ GenericRecord genrec = CONVERTER.convert(json, tripSchema);
+ Schema decimalFieldSchema = tripSchema.getField("decfield").schema();
+ BigDecimal decoded =
HoodieAvroUtils.convertBytesToBigDecimal(((ByteBuffer)
genrec.get("decfield")).array(),
+ (LogicalTypes.Decimal) decimalFieldSchema.getLogicalType());
+ assertEquals(decfield, decoded);
+ }
+
+ @ParameterizedTest
+ @MethodSource("encodedDecimalFixedScalePrecisionProvider")
+ void testEncodedDecimalAvroSparkPostProcessorCase(int size, int scale, int
precision) throws JsonProcessingException {
+ Random rand = new Random();
+ String postProcessSchemaString =
String.format("{\"type\":\"record\",\"name\":\"tripUberRec\","
+ +
"\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\",\"doc\":\"\"},{\"name\":\"_row_key\","
+ +
"\"type\":\"string\",\"doc\":\"\"},{\"name\":\"rider\",\"type\":\"string\",\"doc\":\"\"},"
+ +
"{\"name\":\"decfield\",\"type\":{\"type\":\"fixed\",\"name\":\"fixed\","
+ +
"\"namespace\":\"tripUberRec.decfield\",\"size\":%d,\"logicalType\":\"decimal\","
+ +
"\"precision\":%d,\"scale\":%d},\"doc\":\"\"},{\"name\":\"driver\",\"type\":\"string\","
+ +
"\"doc\":\"\"},{\"name\":\"fare\",\"type\":\"double\",\"doc\":\"\"},{\"name\":\"_hoodie_is_deleted\","
+ + "\"type\":\"boolean\",\"doc\":\"\"}]}", size, precision, scale);
+ Schema postProcessSchema = new
Schema.Parser().parse(postProcessSchemaString);
+ BigDecimal decfield = BigDecimal.valueOf(rand.nextDouble())
+ .setScale(scale, RoundingMode.HALF_UP).round(new
MathContext(precision, RoundingMode.HALF_UP));
+ Map<String, Object> data = new HashMap<>();
+ data.put("_row_key", "mykey");
+ long timestamp = 214523432;
+ data.put("timestamp", timestamp);
+ data.put("rider", "myrider");
+ data.put("decfield",
Base64.getEncoder().encodeToString(decfield.unscaledValue().toByteArray()));
+ data.put("driver", "mydriver");
+ data.put("fare", rand.nextDouble() * 100);
+ data.put("_hoodie_is_deleted", false);
+ String json = MAPPER.writeValueAsString(data);
+ GenericRecord genrec = CONVERTER.convert(json, postProcessSchema);
+ GenericData.Fixed fixed = (GenericData.Fixed) genrec.get("decfield");
+ Conversions.DecimalConversion decimalConverter = new
Conversions.DecimalConversion();
+ Schema decimalFieldSchema =
postProcessSchema.getField("decfield").schema();
+ assertEquals(decfield, decimalConverter.fromFixed(fixed,
decimalFieldSchema, decimalFieldSchema.getLogicalType()));
+ }
}
\ No newline at end of file
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index 4122ab452ca..e4d37e67e5c 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -56,12 +56,15 @@ import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.math.BigDecimal;
+import java.math.MathContext;
+import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -170,6 +173,7 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
public static final Schema AVRO_SCHEMA_WITH_METADATA_FIELDS =
HoodieAvroUtils.addMetadataFields(AVRO_SCHEMA);
public static final Schema AVRO_SHORT_TRIP_SCHEMA = new
Schema.Parser().parse(SHORT_TRIP_SCHEMA);
+ public static final Schema AVRO_TRIP_ENCODED_DECIMAL_SCHEMA = new
Schema.Parser().parse(TRIP_ENCODED_DECIMAL_SCHEMA);
public static final Schema AVRO_TRIP_SCHEMA = new
Schema.Parser().parse(TRIP_SCHEMA);
public static final Schema FLATTENED_AVRO_SCHEMA = new
Schema.Parser().parse(TRIP_FLATTENED_SCHEMA);
private final Random rand;
@@ -294,6 +298,8 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
return generateRandomValue(key, commitTime, true);
} else if (TRIP_EXAMPLE_SCHEMA.equals(schemaStr)) {
return generateRandomValue(key, commitTime, isFlattened);
+ } else if (TRIP_ENCODED_DECIMAL_SCHEMA.equals(schemaStr)) {
+ return generatePayloadForTripEncodedDecimalSchema(key, commitTime);
} else if (TRIP_SCHEMA.equals(schemaStr)) {
return generatePayloadForTripSchema(key, commitTime);
} else if (SHORT_TRIP_SCHEMA.equals(schemaStr)) {
@@ -353,6 +359,17 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
return new RawTripTestPayload(rec.toString(), key.getRecordKey(),
key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA);
}
+ /**
+ * Generates a new avro record with TRIP_ENCODED_DECIMAL_SCHEMA, retaining
the key if optionally provided.
+ */
+ public RawTripTestPayload
generatePayloadForTripEncodedDecimalSchema(HoodieKey key, String commitTime)
+ throws IOException {
+ GenericRecord rec =
+ generateRecordForTripEncodedDecimalSchema(key.getRecordKey(), "rider-"
+ commitTime, "driver-" + commitTime, 0);
+ return new RawTripTestPayload(rec.toString(), key.getRecordKey(),
key.getPartitionPath(),
+ TRIP_ENCODED_DECIMAL_SCHEMA);
+ }
+
/**
* Generates a new avro record with TRIP_SCHEMA, retaining the key if
optionally provided.
*/
@@ -508,6 +525,38 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
return rec;
}
+ /*
+Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
+ */
+ public GenericRecord generateRecordForTripEncodedDecimalSchema(String
rowKey, String riderName, String driverName,
+ long
timestamp) {
+ GenericRecord rec = new
GenericData.Record(AVRO_TRIP_ENCODED_DECIMAL_SCHEMA);
+ rec.put("_row_key", rowKey);
+ rec.put("timestamp", timestamp);
+ rec.put("rider", riderName);
+ rec.put("decfield", getNonzeroEncodedBigDecimal(rand, 6, 10));
+ rec.put("lowprecision", getNonzeroEncodedBigDecimal(rand, 2, 4));
+ rec.put("highprecision", getNonzeroEncodedBigDecimal(rand, 12, 32));
+ rec.put("driver", driverName);
+ rec.put("fare", rand.nextDouble() * 100);
+ rec.put("_hoodie_is_deleted", false);
+ return rec;
+ }
+
+ private static String getNonzeroEncodedBigDecimal(Random rand, int scale,
int precision) {
+ //scale the value because rand.nextDouble() only returns a val that is
between 0 and 1
+
+ //make it between 0.1 and 1 so that we keep all values in the same order
of magnitude
+ double nextDouble = rand.nextDouble();
+ while (nextDouble <= 0.1) {
+ nextDouble = rand.nextDouble();
+ }
+ double valuescale = Math.pow(10, precision - scale);
+ BigDecimal dec = BigDecimal.valueOf(nextDouble * valuescale)
+ .setScale(scale, RoundingMode.HALF_UP).round(new
MathContext(precision, RoundingMode.HALF_UP));
+ return
Base64.getEncoder().encodeToString(dec.unscaledValue().toByteArray());
+ }
+
/*
Generate random record using TRIP_SCHEMA
*/
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
index e4d2bf58e43..dd3849fcc71 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
@@ -20,18 +20,24 @@ package org.apache.hudi.utilities.schema;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.utilities.config.FilebasedSchemaProviderConfig;
+import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig;
import org.apache.hudi.utilities.exception.HoodieSchemaProviderException;
import org.apache.hudi.utilities.sources.helpers.SanitizationUtils;
+import io.confluent.kafka.schemaregistry.ParsedSchema;
+import io.confluent.kafka.schemaregistry.json.JsonSchema;
import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
-import java.io.InputStream;
import java.util.Collections;
import static
org.apache.hudi.common.util.ConfigUtils.checkRequiredConfigProperties;
@@ -47,8 +53,6 @@ public class FilebasedSchemaProvider extends SchemaProvider {
private final String sourceFile;
private final String targetFile;
- private final boolean shouldSanitize;
- private final String invalidCharMask;
protected Schema sourceSchema;
@@ -59,8 +63,6 @@ public class FilebasedSchemaProvider extends SchemaProvider {
checkRequiredConfigProperties(props,
Collections.singletonList(FilebasedSchemaProviderConfig.SOURCE_SCHEMA_FILE));
this.sourceFile = getStringWithAltKeys(props,
FilebasedSchemaProviderConfig.SOURCE_SCHEMA_FILE);
this.targetFile = getStringWithAltKeys(props,
FilebasedSchemaProviderConfig.TARGET_SCHEMA_FILE, sourceFile);
- this.shouldSanitize = SanitizationUtils.shouldSanitize(props);
- this.invalidCharMask = SanitizationUtils.getInvalidCharMask(props);
this.fs = HadoopFSUtils.getFs(sourceFile, jssc.hadoopConfiguration(),
true);
this.sourceSchema = parseSchema(this.sourceFile);
if (containsConfigProperty(props,
FilebasedSchemaProviderConfig.TARGET_SCHEMA_FILE)) {
@@ -69,7 +71,7 @@ public class FilebasedSchemaProvider extends SchemaProvider {
}
private Schema parseSchema(String schemaFile) {
- return readAvroSchemaFromFile(schemaFile, this.fs, shouldSanitize,
invalidCharMask);
+ return readSchemaFromFile(schemaFile, this.fs, config);
}
@Override
@@ -86,14 +88,50 @@ public class FilebasedSchemaProvider extends SchemaProvider
{
}
}
- private static Schema readAvroSchemaFromFile(String schemaPath, FileSystem
fs, boolean sanitizeSchema, String invalidCharMask) {
- String schemaStr;
- try (InputStream in = fs.open(new Path(schemaPath))) {
- schemaStr = FileIOUtils.readAsUTFString(in);
+ private static Schema readSchemaFromFile(String schemaPath, FileSystem fs,
TypedProperties props) {
+ return schemaPath.endsWith(".json")
+ ? readJsonSchemaFromFile(schemaPath, fs, props)
+ : readAvroSchemaFromFile(schemaPath, fs, props);
+ }
+
+ private static Schema readJsonSchemaFromFile(String schemaPath, FileSystem
fs, TypedProperties props) {
+ String schemaConverterClass = getStringWithAltKeys(props,
HoodieSchemaProviderConfig.SCHEMA_CONVERTER, true);
+ SchemaRegistryProvider.SchemaConverter schemaConverter;
+ try {
+
ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(schemaConverterClass),
+ "Schema converter class must be set for the json file based schema
provider");
+ schemaConverter = (SchemaRegistryProvider.SchemaConverter)
ReflectionUtils.loadClass(
+ schemaConverterClass, new Class<?>[] {TypedProperties.class}, props);
+ } catch (Exception e) {
+ throw new HoodieSchemaProviderException("Error loading json schema
converter", e);
+ }
+ String schemaStr = readSchemaString(schemaPath, fs);
+ ParsedSchema parsedSchema = new JsonSchema(schemaStr);
+ String convertedSchema;
+ try {
+ convertedSchema = schemaConverter.convert(parsedSchema);
+ } catch (IOException e) {
+ throw new HoodieSchemaProviderException(String.format("Error converting
json schema from file %s", schemaPath),
+ e);
+ }
+ return new Schema.Parser().parse(convertedSchema);
+ }
+
+ private static Schema readAvroSchemaFromFile(String schemaPath,
+ FileSystem fs,
+ TypedProperties props) {
+ boolean shouldSanitize = SanitizationUtils.shouldSanitize(props);
+ String invalidCharMask = SanitizationUtils.getInvalidCharMask(props);
+ String schemaStr = readSchemaString(schemaPath, fs);
+ return SanitizationUtils.parseAvroSchema(schemaStr, shouldSanitize,
invalidCharMask);
+ }
+
+ private static String readSchemaString(String schemaPath, FileSystem fs) {
+ try (FSDataInputStream in = fs.open(new Path(schemaPath))) {
+ return FileIOUtils.readAsUTFString(in);
} catch (IOException ioe) {
throw new HoodieSchemaProviderException(String.format("Error reading
schema from file %s", schemaPath), ioe);
}
- return SanitizationUtils.parseAvroSchema(schemaStr, sanitizeSchema,
invalidCharMask);
}
// Per write batch, refresh the schemas from the file
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
index fa9e0dc5add..f326c98826e 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
@@ -21,6 +21,7 @@ package org.apache.hudi.utilities.streamer;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.MercifulJsonConverter;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
@@ -33,6 +34,7 @@ import
org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
import org.apache.hudi.utilities.sources.InputBatch;
+import org.apache.hudi.utilities.sources.KafkaSource;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
import org.apache.hudi.utilities.sources.helpers.RowConverter;
@@ -165,6 +167,11 @@ public class SourceFormatAdapter implements Closeable {
);
}
+ private InputBatch<JavaRDD<GenericRecord>>
convertJsonStringToAvroFormat(InputBatch<JavaRDD<String>> r) {
+ JavaRDD<GenericRecord> eventsRdd = transformJsonToGenericRdd(r);
+ return new InputBatch<>(Option.ofNullable(eventsRdd),
r.getCheckpointForNextBatch(), r.getSchemaProvider());
+ }
+
/**
* Fetch new data in avro format. If the source provides data in different
format, they are translated to Avro format
*/
@@ -176,8 +183,7 @@ public class SourceFormatAdapter implements Closeable {
case JSON: {
//sanitizing is done inside the convertor in transformJsonToGenericRdd
if enabled
InputBatch<JavaRDD<String>> r = ((Source<JavaRDD<String>>)
source).fetchNext(lastCheckpoint, sourceLimit);
- JavaRDD<GenericRecord> eventsRdd = transformJsonToGenericRdd(r);
- return new
InputBatch<>(Option.ofNullable(eventsRdd),r.getCheckpointForNextBatch(),
r.getSchemaProvider());
+ return convertJsonStringToAvroFormat(r);
}
case ROW: {
//we do the sanitizing here if enabled
@@ -240,8 +246,10 @@ public class SourceFormatAdapter implements Closeable {
case JSON: {
InputBatch<JavaRDD<String>> r = ((Source<JavaRDD<String>>)
source).fetchNext(lastCheckpoint, sourceLimit);
Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
-
- if (isFieldNameSanitizingEnabled()) {
+ // Decimal fields need additional decoding from json generated by
kafka-connect. JSON -> ROW conversion is done through
+ // a spark library that has not implemented this decoding
+ if (isFieldNameSanitizingEnabled()
+ || (HoodieAvroUtils.hasDecimalField(sourceSchema) && source
instanceof KafkaSource)) {
StructType dataType =
AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema);
JavaRDD<Row> rowRDD = transformJsonToRowRdd(r);
if (rowRDD != null) {
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
index 278bafac001..09e3113407d 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
@@ -195,6 +195,13 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
UtilitiesTestBase.Helpers.copyToDFS("streamer-config/source_short_trip_uber.avsc",
storage, dfsBasePath + "/source_short_trip_uber.avsc");
UtilitiesTestBase.Helpers.copyToDFS("streamer-config/source_uber.avsc",
storage, dfsBasePath + "/source_uber.avsc");
+ UtilitiesTestBase.Helpers.copyToDFS(
+ "streamer-config/source_uber_encoded_decimal.json", storage,
+ dfsBasePath + "/source_uber_encoded_decimal.json");
+ UtilitiesTestBase.Helpers.copyToDFS(
+ "streamer-config/source_uber_encoded_decimal.avsc", storage,
+ dfsBasePath + "/source_uber_encoded_decimal.avsc");
+
UtilitiesTestBase.Helpers.copyToDFS("streamer-config/target_short_trip_uber.avsc",
storage, dfsBasePath + "/target_short_trip_uber.avsc");
UtilitiesTestBase.Helpers.copyToDFS("streamer-config/target_uber.avsc",
storage, dfsBasePath + "/target_uber.avsc");
UtilitiesTestBase.Helpers.copyToDFS("streamer-config/invalid_hive_sync_uber_config.properties",
storage, dfsBasePath + "/config/invalid_hive_sync_uber_config.properties");
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestFilebasedSchemaProvider.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestFilebasedSchemaProvider.java
index 389282ddcdb..6d310ff3db2 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestFilebasedSchemaProvider.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestFilebasedSchemaProvider.java
@@ -19,13 +19,13 @@
package org.apache.hudi.utilities.schema;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig;
+import org.apache.hudi.utilities.schema.converter.JsonToAvroSchemaConverter;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.avro.SchemaParseException;
import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
@@ -41,7 +41,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
/**
* Unit tests for {@link FilebasedSchemaProvider}.
*/
-public class TestFilebasedSchemaProvider extends UtilitiesTestBase {
+class TestFilebasedSchemaProvider extends UtilitiesTestBase {
private FilebasedSchemaProvider schemaProvider;
@@ -55,25 +55,15 @@ public class TestFilebasedSchemaProvider extends
UtilitiesTestBase {
UtilitiesTestBase.cleanUpUtilitiesTestServices();
}
- @BeforeEach
- public void setup() throws Exception {
- super.setup();
- }
-
- @AfterEach
- public void teardown() throws Exception {
- super.teardown();
- }
-
@Test
- public void properlyFormattedNestedSchemaTest() throws IOException {
+ void properlyFormattedNestedSchemaTest() throws IOException {
this.schemaProvider = new FilebasedSchemaProvider(
Helpers.setupSchemaOnDFS("streamer-config",
"file_schema_provider_valid.avsc"), jsc);
assertEquals(this.schemaProvider.getSourceSchema(),
generateProperFormattedSchema());
}
@Test
- public void renameBadlyFormattedSchemaTest() throws IOException {
+ void renameBadlyFormattedSchemaTest() throws IOException {
TypedProperties props = Helpers.setupSchemaOnDFS("streamer-config",
"file_schema_provider_invalid.avsc");
props.put(SANITIZE_SCHEMA_FIELD_NAMES.key(), "true");
this.schemaProvider = new FilebasedSchemaProvider(props, jsc);
@@ -81,7 +71,7 @@ public class TestFilebasedSchemaProvider extends
UtilitiesTestBase {
}
@Test
- public void renameBadlyFormattedSchemaWithPropertyDisabledTest() {
+ void renameBadlyFormattedSchemaWithPropertyDisabledTest() {
assertThrows(SchemaParseException.class, () -> {
new FilebasedSchemaProvider(
Helpers.setupSchemaOnDFS("streamer-config",
"file_schema_provider_invalid.avsc"), jsc);
@@ -89,11 +79,22 @@ public class TestFilebasedSchemaProvider extends
UtilitiesTestBase {
}
@Test
- public void renameBadlyFormattedSchemaWithAltCharMaskConfiguredTest() throws
IOException {
+ void renameBadlyFormattedSchemaWithAltCharMaskConfiguredTest() throws
IOException {
TypedProperties props = Helpers.setupSchemaOnDFS("streamer-config",
"file_schema_provider_invalid.avsc");
props.put(SANITIZE_SCHEMA_FIELD_NAMES.key(), "true");
props.put(SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.key(), "_");
this.schemaProvider = new FilebasedSchemaProvider(props, jsc);
assertEquals(this.schemaProvider.getSourceSchema(),
generateRenamedSchemaWithConfiguredReplacement());
}
+
+ @Test
+ void testJsonSchema() throws IOException {
+ TypedProperties jsonProps = Helpers.setupSchemaOnDFS("streamer-config",
"source_uber_encoded_decimal.json");
+ jsonProps.setProperty(HoodieSchemaProviderConfig.SCHEMA_CONVERTER.key(),
JsonToAvroSchemaConverter.class.getName());
+ FilebasedSchemaProvider jsonFilebasedSchemaProvider = new
FilebasedSchemaProvider(jsonProps, jsc);
+ FilebasedSchemaProvider filebasedSchemaProvider = new
FilebasedSchemaProvider(
+ Helpers.setupSchemaOnDFS("streamer-config",
"source_uber_encoded_decimal.avsc"), jsc);
+
+ assertEquals(filebasedSchemaProvider.getSourceSchema(),
jsonFilebasedSchemaProvider.getSourceSchema());
+ }
}
\ No newline at end of file
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestJsonToAvroSchemaConverter.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestJsonToAvroSchemaConverter.java
index 6f5766e81dc..e0684fecf8e 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestJsonToAvroSchemaConverter.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestJsonToAvroSchemaConverter.java
@@ -52,6 +52,7 @@ class TestJsonToAvroSchemaConverter {
"not-null-default-value-schema,_no_stripping_quotes",
"not-null-default-value-schema,_stripping_quotes",
"array-with-item-type-union,",
+ "kafka-decimal-simple,"
})
void testConvertJsonSchemaToAvroSchema(String inputCase, String
avroSchemaFileSuffix) throws IOException {
String jsonSchema = loadJsonSchema(inputCase);
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
index 82bbd3a01c5..fb9c05d0fd1 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
@@ -18,6 +18,7 @@
package org.apache.hudi.utilities.sources;
+import org.apache.hudi.HoodieSchemaUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
@@ -30,10 +31,12 @@ import
org.apache.hudi.common.testutils.InProcessTimeGenerator;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig;
import org.apache.hudi.utilities.config.HoodieStreamerConfig;
import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.hudi.utilities.schema.converter.JsonToAvroSchemaConverter;
import org.apache.hudi.utilities.streamer.BaseErrorTableWriter;
import org.apache.hudi.utilities.streamer.DefaultStreamContext;
import org.apache.hudi.utilities.streamer.ErrorEvent;
@@ -59,6 +62,7 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.math.BigDecimal;
+import java.net.URISyntaxException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -251,6 +255,43 @@ public class TestJsonKafkaSource extends
BaseTestKafkaSource {
assertEquals(Option.empty(), fetch6.getBatch());
}
+ @Test
+ void testJsonKafkaSourceWithEncodedDecimals() throws URISyntaxException {
+ String schemaFilePath =
Objects.requireNonNull(TestJsonKafkaSource.class.getClassLoader()
+
.getResource("streamer-config/source_uber_encoded_decimal.json")).toURI().getPath();
+ final String topic = TEST_TOPIC_PREFIX +
"testJsonKafkaSourceWithEncodedDecimals";
+ testUtils.createTopic(topic, 2);
+ TypedProperties props = createPropsForKafkaSource(topic, Long.MAX_VALUE,
"earliest");
+ props.put("hoodie.deltastreamer.schemaprovider.source.schema.file",
schemaFilePath);
+ props.put(HoodieSchemaProviderConfig.SCHEMA_CONVERTER.key(),
JsonToAvroSchemaConverter.class.getName());
+ schemaProvider = new FilebasedSchemaProvider(props, jsc());
+
+ HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+ Source jsonSource = new JsonKafkaSource(props, jsc(), spark(),
schemaProvider, metrics);
+ SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
+ List<HoodieRecord> send1 =
+ dataGenerator.generateInsertsAsPerSchema("000", 10,
HoodieTestDataGenerator.TRIP_ENCODED_DECIMAL_SCHEMA);
+ testUtils.sendMessages(topic, jsonifyRecords(send1));
+ InputBatch<JavaRDD<GenericRecord>> fetch1 =
kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
+ List<GenericRecord> recs = fetch1.getBatch().get().collect();
+ assertEquals(10, recs.size());
+
+ Schema deducedSchema =
+ HoodieSchemaUtils.deduceWriterSchema(schemaProvider.getSourceSchema(),
Option.empty(), Option.empty(), props);
+ verifyDecimalValue(recs, deducedSchema, "decfield");
+ verifyDecimalValue(recs, deducedSchema, "lowprecision");
+ verifyDecimalValue(recs, deducedSchema, "highprecision");
+
+ testUtils.sendMessages(topic, jsonifyRecords(
+ dataGenerator.generateInsertsAsPerSchema("001", 20,
HoodieTestDataGenerator.TRIP_ENCODED_DECIMAL_SCHEMA)));
+ InputBatch<Dataset<Row>> fetch2 =
+
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()),
30);
+ assertEquals(20, fetch2.getBatch().get().count());
+ assertEquals(20, fetch2.getBatch().get().filter("decfield <
10000.0").filter("decfield > 1000.0")
+ .filter("lowprecision < 100.0").filter("lowprecision > 10.0")
+ .filter("highprecision <
100000000000000000000.0").filter("highprecision >
10000000000000000000.0").count());
+ }
+
private static void verifyDecimalValue(List<GenericRecord> records, Schema
schema, String fieldname) {
Schema fieldSchema = schema.getField(fieldname).schema();
LogicalTypes.Decimal decField = (LogicalTypes.Decimal)
fieldSchema.getLogicalType();
diff --git
a/hudi-utilities/src/test/resources/schema-provider/json/kafka-decimal-simple/expected.json
b/hudi-utilities/src/test/resources/schema-provider/json/kafka-decimal-simple/expected.json
new file mode 100644
index 00000000000..8dac6d45da0
--- /dev/null
+++
b/hudi-utilities/src/test/resources/schema-provider/json/kafka-decimal-simple/expected.json
@@ -0,0 +1,24 @@
+{
+ "type" : "record",
+ "name" : "Product",
+ "doc" : "A product from Acme's catalog",
+ "namespace" : "example.com",
+ "fields" : [ {
+ "name" : "product",
+ "doc" : "The unique identifier for a product",
+ "type" : "long"
+ }, {
+ "name" : "productId",
+ "doc" : "The unique identifier for a product",
+ "type" : {
+ "type": "bytes",
+ "logicalType": "decimal",
+ "precision": 14,
+ "scale": 2
+ }
+ }, {
+ "name" : "productName",
+ "doc" : "Name of the product",
+ "type" : "string"
+ }]
+}
diff --git
a/hudi-utilities/src/test/resources/schema-provider/json/kafka-decimal-simple/input.json
b/hudi-utilities/src/test/resources/schema-provider/json/kafka-decimal-simple/input.json
new file mode 100644
index 00000000000..1d13d79a6bb
--- /dev/null
+++
b/hudi-utilities/src/test/resources/schema-provider/json/kafka-decimal-simple/input.json
@@ -0,0 +1,28 @@
+{
+ "$schema": "https://json-schema.org/draft/2020-12/schema",
+ "$id": "https://example.com/product.schema.json",
+ "title": "Product",
+ "description": "A product from Acme's catalog",
+ "type": "object",
+ "properties": {
+ "product": {
+ "description": "The unique identifier for a product",
+ "type": "integer"
+ },
+ "productId": {
+ "description": "The unique identifier for a product",
+ "title": "org.apache.kafka.connect.data.Decimal",
+ "connect.parameters": {
+ "connect.decimal.precision": "14",
+ "scale": "2"
+ },
+ "connect.type": "bytes",
+ "type": "number"
+ },
+ "productName": {
+ "description": "Name of the product",
+ "type": "string"
+ }
+ },
+ "required": [ "product", "productId", "productName" ]
+}
diff --git
a/hudi-utilities/src/test/resources/streamer-config/source_uber_encoded_decimal.avsc
b/hudi-utilities/src/test/resources/streamer-config/source_uber_encoded_decimal.avsc
new file mode 100644
index 00000000000..a4666511836
--- /dev/null
+++
b/hudi-utilities/src/test/resources/streamer-config/source_uber_encoded_decimal.avsc
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+ "type" : "record",
+ "name" : "tripUberRec",
+ "doc" : "",
+ "fields" : [
+ {
+ "name" : "timestamp",
+ "doc" : "",
+ "type" : "long"
+ }, {
+ "name" : "_row_key",
+ "doc" : "",
+ "type" : "string"
+ }, {
+ "name" : "rider",
+ "doc" : "",
+ "type" : "string"
+ }, {
+ "name" : "decfield",
+ "doc" : "",
+ "type": {
+ "type": "bytes",
+ "logicalType": "decimal",
+ "precision": 10,
+ "scale": 6
+ }
+ }, {
+ "name" : "lowprecision",
+ "doc" : "",
+ "type": {
+ "type": "bytes",
+ "logicalType": "decimal",
+ "precision": 4,
+ "scale": 2
+ }
+ }, {
+ "name" : "highprecision",
+ "doc" : "",
+ "type": {
+ "type": "bytes",
+ "logicalType": "decimal",
+ "precision": 32,
+ "scale": 12
+ }
+ }, {
+ "name" : "driver",
+ "doc" : "",
+ "type" : "string"
+ }, {
+ "name" : "fare",
+ "doc" : "",
+ "type" : "double"
+ },
+ {
+ "name" : "_hoodie_is_deleted",
+ "doc" : "",
+ "type" : "boolean",
+ "default" : false
+ } ]
+}
diff --git
a/hudi-utilities/src/test/resources/streamer-config/source_uber_encoded_decimal.json
b/hudi-utilities/src/test/resources/streamer-config/source_uber_encoded_decimal.json
new file mode 100644
index 00000000000..a36492cba5b
--- /dev/null
+++
b/hudi-utilities/src/test/resources/streamer-config/source_uber_encoded_decimal.json
@@ -0,0 +1,54 @@
+{
+ "$schema": "https://json-schema.org/draft/2020-12/schema",
+ "title": "tripUberRec",
+ "type": "object",
+ "properties": {
+ "timestamp": {
+ "type": "integer"
+ },
+ "_row_key": {
+ "type": "string"
+ },
+ "rider": {
+ "type": "string"
+ },
+ "decfield": {
+ "title": "org.apache.kafka.connect.data.Decimal",
+ "connect.parameters": {
+ "connect.decimal.precision": "10",
+ "scale": "6"
+ },
+ "connect.type": "bytes",
+ "type": "number"
+ },
+ "lowprecision": {
+ "title": "org.apache.kafka.connect.data.Decimal",
+ "connect.parameters": {
+ "connect.decimal.precision": "4",
+ "scale": "2"
+ },
+ "connect.type": "bytes",
+ "type": "number"
+ },
+ "highprecision": {
+ "title": "org.apache.kafka.connect.data.Decimal",
+ "connect.parameters": {
+ "connect.decimal.precision": "32",
+ "scale": "12"
+ },
+ "connect.type": "bytes",
+ "type": "number"
+ },
+ "driver": {
+ "type":"string"
+ },
+ "fare": {
+ "type": "number"
+ },
+ "_hoodie_is_deleted": {
+ "type": "boolean",
+ "default": "false"
+ }
+ },
+ "required": [ "timestamp", "_row_key", "rider", "decfield", "lowprecision",
"highprecision", "driver", "fare", "_hoodie_is_deleted" ]
+}