This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c88df86a0120 fix: Ensure that MercifulJsonConverter returns Avro Utf8 
string (#17953)
c88df86a0120 is described below

commit c88df86a012037a0e822740e2e4d4b7bb470b7cb
Author: voonhous <[email protected]>
AuthorDate: Thu Jan 22 11:30:33 2026 +0800

    fix: Ensure that MercifulJsonConverter returns Avro Utf8 string (#17953)
---
 .../apache/hudi/avro/MercifulJsonConverter.java    |  7 +-
 .../helpers/MercifulJsonToRowConverter.java        | 26 +++++++
 .../deltastreamer/TestHoodieDeltaStreamer.java     | 85 ++++++++++++++++------
 3 files changed, 92 insertions(+), 26 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java
index 0c79bc2bd5b2..afa9259326ce 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java
@@ -49,6 +49,7 @@ import org.apache.avro.LogicalTypes;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericFixed;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
 
 import java.io.IOException;
 import java.math.BigDecimal;
@@ -434,7 +435,7 @@ public class MercifulJsonConverter {
     };
   }
 
-  private static JsonFieldProcessor generateStringTypeHandler() {
+  protected JsonFieldProcessor generateStringTypeHandler() {
     return new StringProcessor();
   }
 
@@ -444,10 +445,10 @@ public class MercifulJsonConverter {
     @Override
     public Pair<Boolean, Object> convert(Object value, String name, 
HoodieSchema schema) {
       if (value instanceof String) {
-        return Pair.of(true, value);
+        return Pair.of(true, new Utf8((String) value));
       } else {
         try {
-          return Pair.of(true, STRING_MAPPER.writeValueAsString(value));
+          return Pair.of(true, new 
Utf8(STRING_MAPPER.writeValueAsString(value)));
         } catch (IOException ex) {
           return Pair.of(false, null);
         }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/MercifulJsonToRowConverter.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/MercifulJsonToRowConverter.java
index 031850073074..ed9bef9adbef 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/MercifulJsonToRowConverter.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/MercifulJsonToRowConverter.java
@@ -189,6 +189,32 @@ public class MercifulJsonToRowConverter extends 
MercifulJsonConverter {
     };
   }
 
+  /**
+   * Override to return java.lang.String instead of Avro Utf8.
+   * Spark's encoder expects String, not Avro Utf8.
+   */
+  @Override
+  protected JsonFieldProcessor generateStringTypeHandler() {
+    return new StringToRowProcessor();
+  }
+
+  private static class StringToRowProcessor extends JsonFieldProcessor {
+    private static final ObjectMapper STRING_MAPPER = new ObjectMapper();
+
+    @Override
+    public Pair<Boolean, Object> convert(Object value, String name, 
HoodieSchema schema) {
+      if (value instanceof String) {
+        return Pair.of(true, value);
+      } else {
+        try {
+          return Pair.of(true, STRING_MAPPER.writeValueAsString(value));
+        } catch (IOException ex) {
+          return Pair.of(false, null);
+        }
+      }
+    }
+  }
+
   @Override
   protected JsonFieldProcessor generateFixedTypeHandler() {
     return new FixedToRowTypeProcessor();
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 95c4b75f1181..94c2d1f7056f 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -797,9 +797,40 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     }
   }
 
+  /**
+   * Arguments for testLogicalTypesWithJsonSource.
+   * Parameters: hasTransformer, orderingField, recordType, tableType
+   */
+  private static Stream<Arguments> logicalTypesWithJsonSourceArgs() {
+    return Stream.of(
+        // Test with timestamp ordering field (long type)
+        Arguments.of(true, "timestamp", HoodieRecordType.AVRO, 
HoodieTableType.MERGE_ON_READ),
+        Arguments.of(false, "timestamp", HoodieRecordType.AVRO, 
HoodieTableType.MERGE_ON_READ),
+        Arguments.of(true, "timestamp", HoodieRecordType.SPARK, 
HoodieTableType.MERGE_ON_READ),
+        Arguments.of(false, "timestamp", HoodieRecordType.SPARK, 
HoodieTableType.MERGE_ON_READ),
+        Arguments.of(true, "timestamp", HoodieRecordType.AVRO, 
HoodieTableType.COPY_ON_WRITE),
+        Arguments.of(false, "timestamp", HoodieRecordType.AVRO, 
HoodieTableType.COPY_ON_WRITE),
+        Arguments.of(true, "timestamp", HoodieRecordType.SPARK, 
HoodieTableType.COPY_ON_WRITE),
+        Arguments.of(false, "timestamp", HoodieRecordType.SPARK, 
HoodieTableType.COPY_ON_WRITE),
+        // Test with rider ordering field (string type)
+        Arguments.of(true, "rider", HoodieRecordType.AVRO, 
HoodieTableType.MERGE_ON_READ),
+        Arguments.of(false, "rider", HoodieRecordType.AVRO, 
HoodieTableType.MERGE_ON_READ),
+        Arguments.of(true, "rider", HoodieRecordType.SPARK, 
HoodieTableType.MERGE_ON_READ),
+        Arguments.of(false, "rider", HoodieRecordType.SPARK, 
HoodieTableType.MERGE_ON_READ),
+        Arguments.of(true, "rider", HoodieRecordType.AVRO, 
HoodieTableType.COPY_ON_WRITE),
+        Arguments.of(false, "rider", HoodieRecordType.AVRO, 
HoodieTableType.COPY_ON_WRITE),
+        Arguments.of(true, "rider", HoodieRecordType.SPARK, 
HoodieTableType.COPY_ON_WRITE),
+        Arguments.of(false, "rider", HoodieRecordType.SPARK, 
HoodieTableType.COPY_ON_WRITE));
+  }
+
   @ParameterizedTest
-  @ValueSource(booleans = {true, false})
-  void testLogicalTypesWithJsonSource(boolean hasTransformer) throws Exception 
{
+  @MethodSource("logicalTypesWithJsonSourceArgs")
+  void testLogicalTypesWithJsonSource(boolean hasTransformer, String 
orderingField,
+                                      HoodieRecordType recordType, 
HoodieTableType tableType) throws Exception {
+    // Fix a seed so we can generate repeated rows for updates
+    final long seed = 123L;
+    final int numPartitions = 2;
+
     try {
       //use v6 schema because decimal parsing iso 8859-1 support not available 
currently
       String schemaStr;
@@ -818,14 +849,14 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
       }
       defaultSchemaProviderClassName =
           
TestHoodieDeltaStreamerSchemaEvolutionBase.TestSchemaProvider.class.getName();
-      String tableBasePath = basePath + "testTimestampMillis";
+      String tableBasePath = basePath + "testTimestampMillis_" + orderingField 
+ "_" + recordType + "_" + tableType.name();
       prepareJsonKafkaDFSSource(
           PROPS_FILENAME_TEST_JSON_KAFKA, "earliest", topicName);
 
       // Insert data produced with Schema A, pass Schema A
-      prepareJsonKafkaDFSFilesWithSchema(
-          1000, true, topicName, schemaStr);
-      HoodieDeltaStreamer.Config cfg = 
getConfigForLogicalTypesWithJsonSource(tableBasePath, 
WriteOperationType.INSERT, hasTransformer);
+      prepareJsonKafkaDFSFiles(1000, true, topicName, numPartitions, 
schemaStr, seed);
+      HoodieDeltaStreamer.Config cfg = getConfigForLogicalTypesWithJsonSource(
+          tableBasePath, WriteOperationType.INSERT, hasTransformer, 
orderingField, recordType, tableType);
       syncOnce(cfg);
       // Validate.
       assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage, 
tableBasePath));
@@ -838,15 +869,15 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
       hudiOpts.put("hoodie.datasource.write.recordkey.field", "id");
       logicalAssertions(tableSchema, tableBasePath, hudiOpts, 
HoodieTableVersion.EIGHT.versionCode());
 
-      // Update data.
-      prepareJsonKafkaDFSFilesWithSchema(
-          1000, false, topicName, schemaStr);
-      cfg = getConfigForLogicalTypesWithJsonSource(tableBasePath, 
WriteOperationType.UPSERT, hasTransformer);
+      // Update data, since we are using the same seed, 1000 rows will be 
updated, and 500 new rows will be inserted
+      prepareJsonKafkaDFSFiles(1500, false, topicName, numPartitions, 
schemaStr, seed);
+      cfg = getConfigForLogicalTypesWithJsonSource(
+          tableBasePath, WriteOperationType.UPSERT, hasTransformer, 
orderingField, recordType, tableType);
       syncOnce(cfg);
       // Validate.
       assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage, 
tableBasePath));
-      assertRecordCount(2000, tableBasePath, sqlContext);
-      TestHelpers.assertCommitMetadata(topicName + ",0:1000,1:1000", 
tableBasePath, 2);
+      assertRecordCount(1500, tableBasePath, sqlContext);
+      TestHelpers.assertCommitMetadata(topicName + ",0:1250,1:1250", 
tableBasePath, 2);
       tableSchemaResolver = new TableSchemaResolver(
           HoodieTestUtils.createMetaClient(storage, tableBasePath));
       tableSchema = tableSchemaResolver.getTableSchema(false);
@@ -856,7 +887,9 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     }
   }
 
-  private static HoodieDeltaStreamer.Config 
getConfigForLogicalTypesWithJsonSource(String tableBasePath, WriteOperationType 
operationType, boolean hasTransformer) {
+  private static HoodieDeltaStreamer.Config 
getConfigForLogicalTypesWithJsonSource(
+      String tableBasePath, WriteOperationType operationType, boolean 
hasTransformer,
+      String orderingField, HoodieRecordType recordType, HoodieTableType 
tableType) {
     List<String> transformerClassNames;
     if (hasTransformer) {
       transformerClassNames = 
Collections.singletonList(TestIdentityTransformer.class.getName());
@@ -868,14 +901,24 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
         transformerClassNames,
         PROPS_FILENAME_TEST_JSON_KAFKA,
         false, true, 100000, false, null,
-        HoodieTableType.MERGE_ON_READ.name(),
-        "timestamp", null);
+        tableType.name(),
+        orderingField, null);
     cfg.payloadClassName = DefaultHoodieRecordPayload.class.getName();
     cfg.recordMergeStrategyId = 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
     cfg.recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING;
     cfg.configs.add(String.format("%s=%s", 
HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "0"));
     cfg.configs.add("hoodie.datasource.write.row.writer.enable=false");
     
cfg.configs.add("hoodie.streamer.source.sanitize.invalid.schema.field.names=true");
+    // Configure record merger based on record type
+    if (recordType == HoodieRecordType.SPARK) {
+      cfg.configs.add(String.format("%s=%s", 
HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
+          DefaultSparkRecordMerger.class.getName()));
+      cfg.configs.add(String.format("%s=%s", 
HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"));
+    } else {
+      cfg.configs.add(String.format("%s=%s", 
HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
+          HoodieAvroRecordMerger.class.getName()));
+      cfg.configs.add(String.format("%s=%s", 
HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "avro"));
+    }
     return cfg;
   }
 
@@ -2829,18 +2872,14 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   }
 
   private void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic, 
String topicName) {
-    prepareJsonKafkaDFSFiles(numRecords, createTopic, topicName, 2, 
HoodieTestDataGenerator.TRIP_SCHEMA);
+    prepareJsonKafkaDFSFiles(numRecords, createTopic, topicName, 2, 
HoodieTestDataGenerator.TRIP_SCHEMA, System.nanoTime());
   }
 
   private void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic, 
String topicName, int numPartitions) {
-    prepareJsonKafkaDFSFiles(numRecords, createTopic, topicName, 
numPartitions, HoodieTestDataGenerator.TRIP_SCHEMA);
+    prepareJsonKafkaDFSFiles(numRecords, createTopic, topicName, 
numPartitions, HoodieTestDataGenerator.TRIP_SCHEMA, System.nanoTime());
   }
 
-  private void prepareJsonKafkaDFSFilesWithSchema(int numRecords, boolean 
createTopic, String topicName, String schemaStr) {
-    prepareJsonKafkaDFSFiles(numRecords, createTopic, topicName, 2, schemaStr);
-  }
-
-  private void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic, 
String topicName, int numPartitions, String schemaStr) {
+  private void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic, 
String topicName, int numPartitions, String schemaStr, long seed) {
     if (createTopic) {
       try {
         testUtils.createTopic(topicName, numPartitions);
@@ -2848,7 +2887,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
         // no op
       }
     }
-    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(seed);
     testUtils.sendMessages(topicName,
         UtilitiesTestBase.Helpers.jsonifyRecordsByPartitions(
             dataGenerator.generateInsertsAsPerSchema(

Reply via email to