This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c88df86a0120 fix: Ensure that MercifulJsonConverter returns Avro Utf8
string (#17953)
c88df86a0120 is described below
commit c88df86a012037a0e822740e2e4d4b7bb470b7cb
Author: voonhous <[email protected]>
AuthorDate: Thu Jan 22 11:30:33 2026 +0800
fix: Ensure that MercifulJsonConverter returns Avro Utf8 string (#17953)
---
.../apache/hudi/avro/MercifulJsonConverter.java | 7 +-
.../helpers/MercifulJsonToRowConverter.java | 26 +++++++
.../deltastreamer/TestHoodieDeltaStreamer.java | 85 ++++++++++++++++------
3 files changed, 92 insertions(+), 26 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java
b/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java
index 0c79bc2bd5b2..afa9259326ce 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java
@@ -49,6 +49,7 @@ import org.apache.avro.LogicalTypes;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
import java.io.IOException;
import java.math.BigDecimal;
@@ -434,7 +435,7 @@ public class MercifulJsonConverter {
};
}
- private static JsonFieldProcessor generateStringTypeHandler() {
+ protected JsonFieldProcessor generateStringTypeHandler() {
return new StringProcessor();
}
@@ -444,10 +445,10 @@ public class MercifulJsonConverter {
@Override
public Pair<Boolean, Object> convert(Object value, String name,
HoodieSchema schema) {
if (value instanceof String) {
- return Pair.of(true, value);
+ return Pair.of(true, new Utf8((String) value));
} else {
try {
- return Pair.of(true, STRING_MAPPER.writeValueAsString(value));
+ return Pair.of(true, new
Utf8(STRING_MAPPER.writeValueAsString(value)));
} catch (IOException ex) {
return Pair.of(false, null);
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/MercifulJsonToRowConverter.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/MercifulJsonToRowConverter.java
index 031850073074..ed9bef9adbef 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/MercifulJsonToRowConverter.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/MercifulJsonToRowConverter.java
@@ -189,6 +189,32 @@ public class MercifulJsonToRowConverter extends
MercifulJsonConverter {
};
}
+ /**
+ * Override to return java.lang.String instead of Avro Utf8.
+ * Spark's encoder expects String, not Avro Utf8.
+ */
+ @Override
+ protected JsonFieldProcessor generateStringTypeHandler() {
+ return new StringToRowProcessor();
+ }
+
+ private static class StringToRowProcessor extends JsonFieldProcessor {
+ private static final ObjectMapper STRING_MAPPER = new ObjectMapper();
+
+ @Override
+ public Pair<Boolean, Object> convert(Object value, String name,
HoodieSchema schema) {
+ if (value instanceof String) {
+ return Pair.of(true, value);
+ } else {
+ try {
+ return Pair.of(true, STRING_MAPPER.writeValueAsString(value));
+ } catch (IOException ex) {
+ return Pair.of(false, null);
+ }
+ }
+ }
+ }
+
@Override
protected JsonFieldProcessor generateFixedTypeHandler() {
return new FixedToRowTypeProcessor();
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 95c4b75f1181..94c2d1f7056f 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -797,9 +797,40 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
}
}
+ /**
+ * Arguments for testLogicalTypesWithJsonSource.
+ * Parameters: hasTransformer, orderingField, recordType, tableType
+ */
+ private static Stream<Arguments> logicalTypesWithJsonSourceArgs() {
+ return Stream.of(
+ // Test with timestamp ordering field (long type)
+ Arguments.of(true, "timestamp", HoodieRecordType.AVRO,
HoodieTableType.MERGE_ON_READ),
+ Arguments.of(false, "timestamp", HoodieRecordType.AVRO,
HoodieTableType.MERGE_ON_READ),
+ Arguments.of(true, "timestamp", HoodieRecordType.SPARK,
HoodieTableType.MERGE_ON_READ),
+ Arguments.of(false, "timestamp", HoodieRecordType.SPARK,
HoodieTableType.MERGE_ON_READ),
+ Arguments.of(true, "timestamp", HoodieRecordType.AVRO,
HoodieTableType.COPY_ON_WRITE),
+ Arguments.of(false, "timestamp", HoodieRecordType.AVRO,
HoodieTableType.COPY_ON_WRITE),
+ Arguments.of(true, "timestamp", HoodieRecordType.SPARK,
HoodieTableType.COPY_ON_WRITE),
+ Arguments.of(false, "timestamp", HoodieRecordType.SPARK,
HoodieTableType.COPY_ON_WRITE),
+ // Test with rider ordering field (string type)
+ Arguments.of(true, "rider", HoodieRecordType.AVRO,
HoodieTableType.MERGE_ON_READ),
+ Arguments.of(false, "rider", HoodieRecordType.AVRO,
HoodieTableType.MERGE_ON_READ),
+ Arguments.of(true, "rider", HoodieRecordType.SPARK,
HoodieTableType.MERGE_ON_READ),
+ Arguments.of(false, "rider", HoodieRecordType.SPARK,
HoodieTableType.MERGE_ON_READ),
+ Arguments.of(true, "rider", HoodieRecordType.AVRO,
HoodieTableType.COPY_ON_WRITE),
+ Arguments.of(false, "rider", HoodieRecordType.AVRO,
HoodieTableType.COPY_ON_WRITE),
+ Arguments.of(true, "rider", HoodieRecordType.SPARK,
HoodieTableType.COPY_ON_WRITE),
+ Arguments.of(false, "rider", HoodieRecordType.SPARK,
HoodieTableType.COPY_ON_WRITE));
+ }
+
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- void testLogicalTypesWithJsonSource(boolean hasTransformer) throws Exception
{
+ @MethodSource("logicalTypesWithJsonSourceArgs")
+ void testLogicalTypesWithJsonSource(boolean hasTransformer, String
orderingField,
+ HoodieRecordType recordType,
HoodieTableType tableType) throws Exception {
+ // Fix a seed so we can generate repeated rows for updates
+ final long seed = 123L;
+ final int numPartitions = 2;
+
try {
//use v6 schema because decimal parsing iso 8859-1 support not available
currently
String schemaStr;
@@ -818,14 +849,14 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
}
defaultSchemaProviderClassName =
TestHoodieDeltaStreamerSchemaEvolutionBase.TestSchemaProvider.class.getName();
- String tableBasePath = basePath + "testTimestampMillis";
+ String tableBasePath = basePath + "testTimestampMillis_" + orderingField
+ "_" + recordType + "_" + tableType.name();
prepareJsonKafkaDFSSource(
PROPS_FILENAME_TEST_JSON_KAFKA, "earliest", topicName);
// Insert data produced with Schema A, pass Schema A
- prepareJsonKafkaDFSFilesWithSchema(
- 1000, true, topicName, schemaStr);
- HoodieDeltaStreamer.Config cfg =
getConfigForLogicalTypesWithJsonSource(tableBasePath,
WriteOperationType.INSERT, hasTransformer);
+ prepareJsonKafkaDFSFiles(1000, true, topicName, numPartitions,
schemaStr, seed);
+ HoodieDeltaStreamer.Config cfg = getConfigForLogicalTypesWithJsonSource(
+ tableBasePath, WriteOperationType.INSERT, hasTransformer,
orderingField, recordType, tableType);
syncOnce(cfg);
// Validate.
assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage,
tableBasePath));
@@ -838,15 +869,15 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
hudiOpts.put("hoodie.datasource.write.recordkey.field", "id");
logicalAssertions(tableSchema, tableBasePath, hudiOpts,
HoodieTableVersion.EIGHT.versionCode());
- // Update data.
- prepareJsonKafkaDFSFilesWithSchema(
- 1000, false, topicName, schemaStr);
- cfg = getConfigForLogicalTypesWithJsonSource(tableBasePath,
WriteOperationType.UPSERT, hasTransformer);
+ // Update data, since we are using the same seed, 1000 rows will be
updated, and 500 new rows will be inserted
+ prepareJsonKafkaDFSFiles(1500, false, topicName, numPartitions,
schemaStr, seed);
+ cfg = getConfigForLogicalTypesWithJsonSource(
+ tableBasePath, WriteOperationType.UPSERT, hasTransformer,
orderingField, recordType, tableType);
syncOnce(cfg);
// Validate.
assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage,
tableBasePath));
- assertRecordCount(2000, tableBasePath, sqlContext);
- TestHelpers.assertCommitMetadata(topicName + ",0:1000,1:1000",
tableBasePath, 2);
+ assertRecordCount(1500, tableBasePath, sqlContext);
+ TestHelpers.assertCommitMetadata(topicName + ",0:1250,1:1250",
tableBasePath, 2);
tableSchemaResolver = new TableSchemaResolver(
HoodieTestUtils.createMetaClient(storage, tableBasePath));
tableSchema = tableSchemaResolver.getTableSchema(false);
@@ -856,7 +887,9 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
}
}
- private static HoodieDeltaStreamer.Config
getConfigForLogicalTypesWithJsonSource(String tableBasePath, WriteOperationType
operationType, boolean hasTransformer) {
+ private static HoodieDeltaStreamer.Config
getConfigForLogicalTypesWithJsonSource(
+ String tableBasePath, WriteOperationType operationType, boolean
hasTransformer,
+ String orderingField, HoodieRecordType recordType, HoodieTableType
tableType) {
List<String> transformerClassNames;
if (hasTransformer) {
transformerClassNames =
Collections.singletonList(TestIdentityTransformer.class.getName());
@@ -868,14 +901,24 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
transformerClassNames,
PROPS_FILENAME_TEST_JSON_KAFKA,
false, true, 100000, false, null,
- HoodieTableType.MERGE_ON_READ.name(),
- "timestamp", null);
+ tableType.name(),
+ orderingField, null);
cfg.payloadClassName = DefaultHoodieRecordPayload.class.getName();
cfg.recordMergeStrategyId =
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
cfg.recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING;
cfg.configs.add(String.format("%s=%s",
HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "0"));
cfg.configs.add("hoodie.datasource.write.row.writer.enable=false");
cfg.configs.add("hoodie.streamer.source.sanitize.invalid.schema.field.names=true");
+ // Configure record merger based on record type
+ if (recordType == HoodieRecordType.SPARK) {
+ cfg.configs.add(String.format("%s=%s",
HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
+ DefaultSparkRecordMerger.class.getName()));
+ cfg.configs.add(String.format("%s=%s",
HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"));
+ } else {
+ cfg.configs.add(String.format("%s=%s",
HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
+ HoodieAvroRecordMerger.class.getName()));
+ cfg.configs.add(String.format("%s=%s",
HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "avro"));
+ }
return cfg;
}
@@ -2829,18 +2872,14 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
}
private void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic,
String topicName) {
- prepareJsonKafkaDFSFiles(numRecords, createTopic, topicName, 2,
HoodieTestDataGenerator.TRIP_SCHEMA);
+ prepareJsonKafkaDFSFiles(numRecords, createTopic, topicName, 2,
HoodieTestDataGenerator.TRIP_SCHEMA, System.nanoTime());
}
private void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic,
String topicName, int numPartitions) {
- prepareJsonKafkaDFSFiles(numRecords, createTopic, topicName,
numPartitions, HoodieTestDataGenerator.TRIP_SCHEMA);
+ prepareJsonKafkaDFSFiles(numRecords, createTopic, topicName,
numPartitions, HoodieTestDataGenerator.TRIP_SCHEMA, System.nanoTime());
}
- private void prepareJsonKafkaDFSFilesWithSchema(int numRecords, boolean
createTopic, String topicName, String schemaStr) {
- prepareJsonKafkaDFSFiles(numRecords, createTopic, topicName, 2, schemaStr);
- }
-
- private void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic,
String topicName, int numPartitions, String schemaStr) {
+ private void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic,
String topicName, int numPartitions, String schemaStr, long seed) {
if (createTopic) {
try {
testUtils.createTopic(topicName, numPartitions);
@@ -2848,7 +2887,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
// no op
}
}
- HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+ HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(seed);
testUtils.sendMessages(topicName,
UtilitiesTestBase.Helpers.jsonifyRecordsByPartitions(
dataGenerator.generateInsertsAsPerSchema(