This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 236801be01ee fix(ingest): Fix Timestamp Conversions, Add legacy api 
support (#14076)
236801be01ee is described below

commit 236801be01ee4d9cbb2bca5e0105cdf1516c861e
Author: Jon Vexler <[email protected]>
AuthorDate: Thu Oct 30 05:19:00 2025 -0400

    fix(ingest): Fix Timestamp Conversions, Add legacy api support (#14076)
    
    Co-authored-by: Jonathan Vexler <=>
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../apache/hudi/stats/SparkValueMetadataUtils.java |   2 +-
 .../common/testutils/HoodieTestDataGenerator.java  |  25 +++++-
 .../helpers/MercifulJsonToRowConverter.java        |  18 ++--
 .../utilities/sources/helpers/RowConverter.java    |   6 +-
 .../utilities/streamer/SourceFormatAdapter.java    |  11 ++-
 .../deltastreamer/TestHoodieDeltaStreamer.java     | 100 ++++++++++++++++++++-
 .../hudi/utilities/sources/TestJsonDFSSource.java  |   7 ++
 ...ava => TestMercifulJsonToRowConverterBase.java} |  90 ++++++++++---------
 .../TestMercifulJsonToRowConverterJava8Api.java    |  50 +++++++++++
 .../TestMercifulJsonToRowConverterLegacyApi.java   |  49 ++++++++++
 .../sources/AbstractDFSSourceTestBase.java         |   9 +-
 11 files changed, 308 insertions(+), 59 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/stats/SparkValueMetadataUtils.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/stats/SparkValueMetadataUtils.java
index 0a689a7e31b0..986a1c29ec92 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/stats/SparkValueMetadataUtils.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/stats/SparkValueMetadataUtils.java
@@ -144,7 +144,7 @@ public class SparkValueMetadataUtils {
    * we need to return java.sql.Timestamp and java.sql.Date
    *
    */
-  public static Object convertJavaTypeToSparkType(Comparable<?> javaVal, 
boolean useJava8api) {
+  public static Object convertJavaTypeToSparkType(Object javaVal, boolean 
useJava8api) {
     if (!useJava8api) {
       if (javaVal instanceof Instant) {
         return Timestamp.from((Instant) javaVal);
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index 4aab20e0c2c0..e9db289b8164 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -179,6 +179,12 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
       + 
"{\"name\":\"dec_fixed_small\",\"type\":{\"type\":\"fixed\",\"name\":\"decFixedSmall\",\"size\":3,\"logicalType\":\"decimal\",\"precision\":5,\"scale\":2}},"
       + 
"{\"name\":\"dec_fixed_large\",\"type\":{\"type\":\"fixed\",\"name\":\"decFixedLarge\",\"size\":8,\"logicalType\":\"decimal\",\"precision\":18,\"scale\":9}},";
 
+  public static final String EXTENDED_LOGICAL_TYPES_SCHEMA_NO_LTS_V6 = 
"{\"name\":\"ts_millis\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}},"
+      + 
"{\"name\":\"ts_micros\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}},"
+      + 
"{\"name\":\"event_date\",\"type\":{\"type\":\"int\",\"logicalType\":\"date\"}},"
+      + 
"{\"name\":\"dec_fixed_small\",\"type\":{\"type\":\"fixed\",\"name\":\"decFixedSmall\",\"size\":3,\"logicalType\":\"decimal\",\"precision\":5,\"scale\":2}},"
+      + 
"{\"name\":\"dec_fixed_large\",\"type\":{\"type\":\"fixed\",\"name\":\"decFixedLarge\",\"size\":8,\"logicalType\":\"decimal\",\"precision\":18,\"scale\":9}},";
+
   public static final String EXTRA_COL_SCHEMA1 = "{\"name\": 
\"extra_column1\", \"type\": [\"null\", \"string\"], \"default\": null },";
   public static final String EXTRA_COL_SCHEMA2 = "{\"name\": 
\"extra_column2\", \"type\": [\"null\", \"string\"], \"default\": null},";
   public static final String EXTRA_COL_SCHEMA_FOR_AWS_DMS_PAYLOAD = 
"{\"name\": \"Op\", \"type\": [\"null\", \"string\"], \"default\": null},";
@@ -202,6 +208,9 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
   public static final String TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS =
       TRIP_SCHEMA_PREFIX + EXTENDED_LOGICAL_TYPES_SCHEMA_NO_LTS + 
TRIP_SCHEMA_SUFFIX;
 
+  public static final String TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS_V6 =
+      TRIP_SCHEMA_PREFIX + EXTENDED_LOGICAL_TYPES_SCHEMA_NO_LTS_V6 + 
TRIP_SCHEMA_SUFFIX;
+
 
   public static final String TRIP_NESTED_EXAMPLE_SCHEMA =
       TRIP_SCHEMA_PREFIX + FARE_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX;
@@ -233,6 +242,7 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
   public static final Schema AVRO_TRIP_LOGICAL_TYPES_SCHEMA = new 
Schema.Parser().parse(TRIP_LOGICAL_TYPES_SCHEMA);
   public static final Schema AVRO_TRIP_LOGICAL_TYPES_SCHEMA_V6 = new 
Schema.Parser().parse(TRIP_LOGICAL_TYPES_SCHEMA_V6);
   public static final Schema AVRO_TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS = new 
Schema.Parser().parse(TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS);
+  public static final Schema AVRO_TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS_V6 = new 
Schema.Parser().parse(TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS_V6);
   public static final Schema AVRO_TRIP_SCHEMA = new 
Schema.Parser().parse(TRIP_SCHEMA);
   public static final Schema FLATTENED_AVRO_SCHEMA = new 
Schema.Parser().parse(TRIP_FLATTENED_SCHEMA);
 
@@ -380,6 +390,8 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
         return generatePayloadForLogicalTypesSchemaV6(key, commitTime, false, 
timestamp);
       } else if (TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS.equals(schemaStr)) {
         return generatePayloadForLogicalTypesSchemaNoLTS(key, commitTime, 
false, timestamp);
+      } else if (TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS_V6.equals(schemaStr)) {
+        return generatePayloadForLogicalTypesSchemaNoLTSV6(key, commitTime, 
false, timestamp);
       }
     } else {
       if (TRIP_EXAMPLE_SCHEMA.equals(schemaStr)) {
@@ -390,6 +402,8 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
         return generatePayloadForLogicalTypesSchemaV6(key, commitTime, true, 
timestamp);
       } else if (TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS.equals(schemaStr)) {
         return generatePayloadForLogicalTypesSchemaNoLTS(key, commitTime, 
true, timestamp);
+      } else if (TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS_V6.equals(schemaStr)) {
+        return generatePayloadForLogicalTypesSchemaNoLTSV6(key, commitTime, 
true, timestamp);
       }
     }
 
@@ -451,6 +465,10 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
   public IndexedRecord generatePayloadForLogicalTypesSchemaNoLTS(HoodieKey 
key, String commitTime, boolean isDelete, long timestamp) {
     return generateRecordForTripLogicalTypesSchema(key, "rider-" + commitTime, 
"driver-" + commitTime, timestamp, isDelete, false, false);
   }
+  
+  public IndexedRecord generatePayloadForLogicalTypesSchemaNoLTSV6(HoodieKey 
key, String commitTime, boolean isDelete, long timestamp) {
+    return generateRecordForTripLogicalTypesSchema(key, "rider-" + commitTime, 
"driver-" + commitTime, timestamp, isDelete, true, false);
+  }
 
   public IndexedRecord generatePayloadForLogicalTypesSchema(HoodieKey key, 
String commitTime, boolean isDelete, long timestamp) {
     return generateRecordForTripLogicalTypesSchema(key, "rider-" + commitTime, 
"driver-" + commitTime, timestamp, isDelete, false, true);
@@ -679,8 +697,11 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
                                                                long timestamp, 
boolean isDeleteRecord, boolean v6, boolean hasLTS) {
     GenericRecord rec;
     if (!hasLTS) {
-      // LTS = Local Timestamp
-      rec = new GenericData.Record(AVRO_TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS);
+      if (v6) {
+        rec = new GenericData.Record(AVRO_TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS_V6);
+      } else {
+        rec = new GenericData.Record(AVRO_TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS);
+      }
     } else if (v6) {
       rec = new GenericData.Record(AVRO_TRIP_LOGICAL_TYPES_SCHEMA_V6);
     } else {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/MercifulJsonToRowConverter.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/MercifulJsonToRowConverter.java
index 2c6e18897e68..2d9aca6bf526 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/MercifulJsonToRowConverter.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/MercifulJsonToRowConverter.java
@@ -30,8 +30,10 @@ import 
org.apache.hudi.avro.processors.LocalTimestampMilliLogicalTypeProcessor;
 import org.apache.hudi.avro.processors.Parser;
 import org.apache.hudi.avro.processors.TimestampMicroLogicalTypeProcessor;
 import org.apache.hudi.avro.processors.TimestampMilliLogicalTypeProcessor;
+import org.apache.hudi.common.util.DateTimeUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.stats.SparkValueMetadataUtils;
 import org.apache.hudi.stats.ValueType;
 import org.apache.hudi.utilities.exception.HoodieJsonToRowConversionException;
 
@@ -47,7 +49,6 @@ import org.apache.spark.sql.RowFactory;
 
 import java.io.IOException;
 import java.math.BigDecimal;
-import java.sql.Timestamp;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.temporal.ChronoField;
@@ -63,19 +64,21 @@ import scala.collection.JavaConverters;
  * Converts Json record to Row Record.
  */
 public class MercifulJsonToRowConverter extends MercifulJsonConverter {
+  private final boolean useJava8api;
 
   /**
    * Allows enabling sanitization and allows choice of invalidCharMask for 
sanitization
    */
-  public MercifulJsonToRowConverter(boolean shouldSanitize, String 
invalidCharMask) {
-    this(new 
ObjectMapper().enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS), 
shouldSanitize, invalidCharMask);
+  public MercifulJsonToRowConverter(boolean shouldSanitize, String 
invalidCharMask, boolean useJava8api) {
+    this(new 
ObjectMapper().enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS), 
shouldSanitize, invalidCharMask, useJava8api);
   }
 
   /**
    * Allows a configured ObjectMapper to be passed for converting json records 
to row.
    */
-  public MercifulJsonToRowConverter(ObjectMapper mapper, boolean 
shouldSanitize, String invalidCharMask) {
+  public MercifulJsonToRowConverter(ObjectMapper mapper, boolean 
shouldSanitize, String invalidCharMask, boolean useJava8api) {
     super(mapper, shouldSanitize, invalidCharMask);
+    this.useJava8api = useJava8api;
   }
 
   /**
@@ -102,7 +105,7 @@ public class MercifulJsonToRowConverter extends 
MercifulJsonConverter {
     for (Schema.Field f : fields) {
       Object val = shouldSanitize ? getFieldFromJson(f, inputJson, 
schema.getFullName(), invalidCharMask) : inputJson.get(f.name());
       if (val != null) {
-        values.set(f.pos(), convertJsonField(val, f.name(), f.schema()));
+        values.set(f.pos(), 
SparkValueMetadataUtils.convertJavaTypeToSparkType(convertJsonField(val, 
f.name(), f.schema()), useJava8api));
       }
     }
     return RowFactory.create(values.toArray());
@@ -238,7 +241,7 @@ public class MercifulJsonToRowConverter extends 
MercifulJsonConverter {
           },
           value, schema);
       if (result.getLeft()) {
-        return Pair.of(true, new Timestamp((Long) result.getRight()));
+        return Pair.of(true, Instant.ofEpochMilli((Long) result.getRight()));
       }
       return Pair.of(false, null);
     }
@@ -275,8 +278,7 @@ public class MercifulJsonToRowConverter extends 
MercifulJsonConverter {
           },
           value, schema);
       if (result.getLeft()) {
-        // timestamp in spark sql doesn't support precision to the micro.
-        return Pair.of(true, new Timestamp(((Long) result.getRight()) / 1000));
+        return Pair.of(true, DateTimeUtils.microsToInstant((Long) 
result.getRight()));
       }
       return Pair.of(false, null);
     }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/RowConverter.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/RowConverter.java
index 7ed174b38a5f..51dec7fcdff5 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/RowConverter.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/RowConverter.java
@@ -43,17 +43,19 @@ public class RowConverter implements Serializable {
   private final String schemaStr;
   private final String invalidCharMask;
   private final boolean shouldSanitize;
+  private final boolean useJava8api;
 
   /**
    * To be lazily initialized on executors.
    */
   private transient MercifulJsonToRowConverter jsonConverter;
 
-  public RowConverter(Schema schema, boolean shouldSanitize, String 
invalidCharMask) {
+  public RowConverter(Schema schema, boolean shouldSanitize, String 
invalidCharMask, boolean useJava8api) {
     this.schemaStr = schema.toString();
     this.schema = schema;
     this.shouldSanitize = shouldSanitize;
     this.invalidCharMask = invalidCharMask;
+    this.useJava8api = useJava8api;
   }
 
   private void initSchema() {
@@ -65,7 +67,7 @@ public class RowConverter implements Serializable {
 
   private void initJsonConvertor() {
     if (jsonConverter == null) {
-      jsonConverter = new MercifulJsonToRowConverter(this.shouldSanitize, 
this.invalidCharMask);
+      jsonConverter = new MercifulJsonToRowConverter(this.shouldSanitize, 
this.invalidCharMask, this.useJava8api);
     }
   }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
index f326c98826ef..3a33f4c06f75 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
@@ -47,6 +47,7 @@ import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import org.apache.spark.sql.internal.SQLConf;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.Metadata;
 import org.apache.spark.sql.types.StructField;
@@ -77,6 +78,9 @@ public class SourceFormatAdapter implements Closeable {
   private  boolean wrapWithException = 
ROW_THROW_EXPLICIT_EXCEPTIONS.defaultValue();
   private String invalidCharMask = 
SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.defaultValue();
 
+  private boolean useJava8api = (boolean) 
SQLConf.DATETIME_JAVA8API_ENABLED().defaultValue().get();
+
+
   private Option<BaseErrorTableWriter> errorTableWriter = Option.empty();
 
   public SourceFormatAdapter(Source source) {
@@ -90,6 +94,7 @@ public class SourceFormatAdapter implements Closeable {
       this.shouldSanitize = SanitizationUtils.shouldSanitize(props.get());
       this.invalidCharMask = SanitizationUtils.getInvalidCharMask(props.get());
       this.wrapWithException = ConfigUtils.getBooleanWithAltKeys(props.get(), 
ROW_THROW_EXPLICIT_EXCEPTIONS);
+      this.useJava8api = (boolean) 
getSource().getSparkSession().conf().get(SQLConf.DATETIME_JAVA8API_ENABLED());
     }
     if (this.shouldSanitize && source.getSourceType() == 
Source.SourceType.PROTO) {
       throw new IllegalArgumentException("PROTO cannot be sanitized");
@@ -112,6 +117,10 @@ public class SourceFormatAdapter implements Closeable {
     return invalidCharMask;
   }
 
+  private boolean getUseJava8api() {
+    return useJava8api;
+  }
+
   /**
    * transform input rdd of json string to generic records with support for 
adding error events to error table
    * @param inputBatch
@@ -134,7 +143,7 @@ public class SourceFormatAdapter implements Closeable {
 
   private JavaRDD<Row> transformJsonToRowRdd(InputBatch<JavaRDD<String>> 
inputBatch) {
     
MercifulJsonConverter.clearCache(inputBatch.getSchemaProvider().getSourceSchema().getFullName());
-    RowConverter convertor = new 
RowConverter(inputBatch.getSchemaProvider().getSourceSchema(), 
isFieldNameSanitizingEnabled(), getInvalidCharMask());
+    RowConverter convertor = new 
RowConverter(inputBatch.getSchemaProvider().getSourceSchema(), 
isFieldNameSanitizingEnabled(), getInvalidCharMask(), getUseJava8api());
     return inputBatch.getBatch().map(rdd -> {
       if (errorTableWriter.isPresent()) {
         JavaRDD<Either<Row, String>> javaRDD = 
rdd.map(convertor::fromJsonToRowWithError);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index a5c393af1a0a..9556d70b8602 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -111,7 +111,7 @@ import org.apache.hudi.utilities.sources.ParquetDFSSource;
 import org.apache.hudi.utilities.sources.SqlSource;
 import org.apache.hudi.utilities.sources.TestDataSource;
 import org.apache.hudi.utilities.sources.TestParquetDFSSourceEmptyBatch;
-import 
org.apache.hudi.utilities.sources.helpers.TestMercifulJsonToRowConverter;
+import 
org.apache.hudi.utilities.sources.helpers.TestMercifulJsonToRowConverterBase;
 import org.apache.hudi.utilities.streamer.HoodieStreamer;
 import org.apache.hudi.utilities.streamer.NoNewDataTerminationStrategy;
 import org.apache.hudi.utilities.streamer.StreamSync;
@@ -788,10 +788,92 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testLogicalTypesWithJsonSource(boolean hasTransformer) throws Exception 
{
+    try {
+      //use v6 schema because decimal parsing iso 8859-1 support not available 
currently
+      String schemaStr;
+      if (HoodieSparkUtils.isSpark3_3()) {
+        
TestHoodieDeltaStreamerSchemaEvolutionBase.TestSchemaProvider.sourceSchema =
+            HoodieTestDataGenerator.AVRO_TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS_V6;
+        
TestHoodieDeltaStreamerSchemaEvolutionBase.TestSchemaProvider.targetSchema =
+            HoodieTestDataGenerator.AVRO_TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS_V6;
+        schemaStr = 
HoodieTestDataGenerator.TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS_V6;
+      } else {
+        
TestHoodieDeltaStreamerSchemaEvolutionBase.TestSchemaProvider.sourceSchema =
+            HoodieTestDataGenerator.AVRO_TRIP_LOGICAL_TYPES_SCHEMA_V6;
+        
TestHoodieDeltaStreamerSchemaEvolutionBase.TestSchemaProvider.targetSchema =
+            HoodieTestDataGenerator.AVRO_TRIP_LOGICAL_TYPES_SCHEMA_V6;
+        schemaStr = HoodieTestDataGenerator.TRIP_LOGICAL_TYPES_SCHEMA_V6;
+      }
+      defaultSchemaProviderClassName =
+          
TestHoodieDeltaStreamerSchemaEvolutionBase.TestSchemaProvider.class.getName();
+      String tableBasePath = basePath + "testTimestampMillis";
+      prepareJsonKafkaDFSSource(
+          PROPS_FILENAME_TEST_JSON_KAFKA, "earliest", topicName);
+
+      // Insert data produced with Schema A, pass Schema A
+      prepareJsonKafkaDFSFilesWithSchema(
+          1000, true, topicName, schemaStr);
+      HoodieDeltaStreamer.Config cfg = 
getConfigForLogicalTypesWithJsonSource(tableBasePath, 
WriteOperationType.INSERT, hasTransformer);
+      new HoodieDeltaStreamer(cfg, jsc).sync();
+      // Validate.
+      assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage, 
tableBasePath));
+      assertRecordCount(1000, tableBasePath, sqlContext);
+      TestHelpers.assertCommitMetadata(topicName + ",0:500,1:500", 
tableBasePath, 1);
+      TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(
+          HoodieTestUtils.createMetaClient(storage, tableBasePath));
+      Schema tableSchema = tableSchemaResolver.getTableAvroSchema(false);
+      Map<String, String> hudiOpts = new HashMap<>();
+      hudiOpts.put("hoodie.datasource.write.recordkey.field", "id");
+      logicalAssertions(tableSchema, tableBasePath, hudiOpts, 
HoodieTableVersion.EIGHT.versionCode());
+
+      // Update data.
+      prepareJsonKafkaDFSFilesWithSchema(
+          1000, false, topicName, schemaStr);
+      cfg = getConfigForLogicalTypesWithJsonSource(tableBasePath, 
WriteOperationType.UPSERT, hasTransformer);
+      new HoodieDeltaStreamer(cfg, jsc).sync();
+      // Validate.
+      assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage, 
tableBasePath));
+      assertRecordCount(2000, tableBasePath, sqlContext);
+      TestHelpers.assertCommitMetadata(topicName + ",0:1000,1:1000", 
tableBasePath, 2);
+      tableSchemaResolver = new TableSchemaResolver(
+          HoodieTestUtils.createMetaClient(storage, tableBasePath));
+      tableSchema = tableSchemaResolver.getTableAvroSchema(false);
+      logicalAssertions(tableSchema, tableBasePath, hudiOpts, 
HoodieTableVersion.EIGHT.versionCode());
+    } finally {
+      defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName();
+    }
+  }
+
+  private static HoodieDeltaStreamer.Config 
getConfigForLogicalTypesWithJsonSource(String tableBasePath, WriteOperationType 
operationType, boolean hasTransformer) {
+    List<String> transformerClassNames;
+    if (hasTransformer) {
+      transformerClassNames = 
Collections.singletonList(TestIdentityTransformer.class.getName());
+    } else {
+      transformerClassNames = Collections.emptyList();
+    }
+    HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(
+        tableBasePath, operationType, JsonKafkaSource.class.getName(),
+        transformerClassNames,
+        PROPS_FILENAME_TEST_JSON_KAFKA,
+        false, true, 100000, false, null,
+        HoodieTableType.MERGE_ON_READ.name(),
+        "timestamp", null);
+    cfg.payloadClassName = DefaultHoodieRecordPayload.class.getName();
+    cfg.recordMergeStrategyId = 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
+    cfg.recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING;
+    cfg.configs.add(String.format("%s=%s", 
HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "0"));
+    cfg.configs.add("hoodie.datasource.write.row.writer.enable=false");
+    
cfg.configs.add("hoodie.streamer.source.sanitize.invalid.schema.field.names=true");
+    return cfg;
+  }
+
   @ParameterizedTest
   @EnumSource(value = HoodieTableVersion.class, names = {"SIX", "EIGHT"})
   public void testBackwardsCompatibility(HoodieTableVersion version) throws 
Exception {
-    TestMercifulJsonToRowConverter.timestampNTZCompatibility(() -> {
+    TestMercifulJsonToRowConverterBase.timestampNTZCompatibility(() -> {
       String dirName = "colstats-upgrade-test-v" + version.versionCode();
       String dataPath = basePath + "/" + dirName;
       java.nio.file.Path zipOutput = Paths.get(new URI(dataPath));
@@ -2379,10 +2461,18 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   }
 
   private void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic, 
String topicName) {
-    prepareJsonKafkaDFSFiles(numRecords, createTopic, topicName, 2);
+    prepareJsonKafkaDFSFiles(numRecords, createTopic, topicName, 2, 
HoodieTestDataGenerator.TRIP_SCHEMA);
   }
 
   private void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic, 
String topicName, int numPartitions) {
+    prepareJsonKafkaDFSFiles(numRecords, createTopic, topicName, 
numPartitions, HoodieTestDataGenerator.TRIP_SCHEMA);
+  }
+
+  private void prepareJsonKafkaDFSFilesWithSchema(int numRecords, boolean 
createTopic, String topicName, String schemaStr) {
+    prepareJsonKafkaDFSFiles(numRecords, createTopic, topicName, 2, schemaStr);
+  }
+
+  private void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic, 
String topicName, int numPartitions, String schemaStr) {
     if (createTopic) {
       try {
         testUtils.createTopic(topicName, numPartitions);
@@ -2392,7 +2482,9 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     }
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
     testUtils.sendMessages(topicName,
-        
UtilitiesTestBase.Helpers.jsonifyRecordsByPartitions(dataGenerator.generateInsertsAsPerSchema("000",
 numRecords, HoodieTestDataGenerator.TRIP_SCHEMA), numPartitions));
+        UtilitiesTestBase.Helpers.jsonifyRecordsByPartitions(
+            dataGenerator.generateInsertsAsPerSchema(
+                "000", numRecords, schemaStr), numPartitions));
   }
 
   private void testParquetDFSSource(boolean useSchemaProvider, List<String> 
transformerClassNames) throws Exception {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java
index 3a64747eda5b..d67808dd1e04 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java
@@ -61,6 +61,13 @@ public class TestJsonDFSSource extends 
AbstractDFSSourceTestBase {
     return new JsonDFSSource(props, jsc, sparkSession, schemaProvider);
   }
 
+  @Override
+  protected Option<TypedProperties> getSourceFormatAdapterProps() {
+    TypedProperties properties = new TypedProperties();
+    
properties.setProperty(HoodieStreamerConfig.SANITIZE_SCHEMA_FIELD_NAMES.key(), 
"true");
+    return Option.of(properties);
+  }
+
   @Override
   public void writeNewDataToFile(List<HoodieRecord> records, Path path) throws 
IOException {
     UtilitiesTestBase.Helpers.saveStringsToDFS(
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestMercifulJsonToRowConverter.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestMercifulJsonToRowConverterBase.java
similarity index 91%
rename from 
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestMercifulJsonToRowConverter.java
rename to 
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestMercifulJsonToRowConverterBase.java
index 595132eaacf4..81066aed4a9e 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestMercifulJsonToRowConverter.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestMercifulJsonToRowConverterBase.java
@@ -22,6 +22,7 @@ import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.HoodieSparkUtils;
 import org.apache.hudi.avro.MercifulJsonConverterTestBase;
 import org.apache.hudi.common.testutils.SchemaTestUtil;
+import org.apache.hudi.common.util.DateTimeUtils;
 import org.apache.hudi.stats.ValueType;
 import org.apache.hudi.utilities.exception.HoodieJsonToRowConversionException;
 
@@ -37,7 +38,6 @@ import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.types.StructType;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
@@ -49,6 +49,7 @@ import java.math.RoundingMode;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.sql.Timestamp;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Base64;
 import java.util.Collections;
@@ -62,23 +63,16 @@ import static 
org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
-public class TestMercifulJsonToRowConverter extends 
MercifulJsonConverterTestBase {
+public abstract class TestMercifulJsonToRowConverterBase extends 
MercifulJsonConverterTestBase {
   private static final ObjectMapper MAPPER = new ObjectMapper();
-  private static final MercifulJsonToRowConverter CONVERTER = new 
MercifulJsonToRowConverter(true, "__");
 
   private static final String SIMPLE_AVRO_WITH_DEFAULT = 
"/schema/simple-test-with-default-value.avsc";
 
-  protected static SparkSession spark;
+  protected abstract MercifulJsonToRowConverter getConverter();
 
-  @BeforeAll
-  public static void start() {
-    spark = SparkSession
-        .builder()
-        .master("local[*]")
-        .appName(TestMercifulJsonToRowConverter.class.getName())
-        .config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
-        .getOrCreate();
-  }
+  protected abstract boolean isJava8ApiEnabled();
+
+  protected static SparkSession spark;
 
   @AfterAll
   public static void clear() throws IOException {
@@ -102,7 +96,7 @@ public class TestMercifulJsonToRowConverter extends 
MercifulJsonConverterTestBas
     values.set(1, number);
     values.set(3, color);
     Row recRow = RowFactory.create(values.toArray());
-    Row realRow = CONVERTER.convertToRow(json, simpleSchema);
+    Row realRow = getConverter().convertToRow(json, simpleSchema);
     validateSchemaCompatibility(Collections.singletonList(realRow), 
simpleSchema);
     assertEquals(recRow, realRow);
   }
@@ -126,7 +120,7 @@ public class TestMercifulJsonToRowConverter extends 
MercifulJsonConverterTestBas
     values.set(2, color);
     Row recRow = RowFactory.create(values.toArray());
 
-    Assertions.assertEquals(recRow, CONVERTER.convertToRow(json, 
simpleSchema));
+    Assertions.assertEquals(recRow, getConverter().convertToRow(json, 
simpleSchema));
   }
 
   private static final String DECIMAL_AVRO_FILE_PATH = 
"/decimal-logical-type.avsc";
@@ -149,7 +143,7 @@ public class TestMercifulJsonToRowConverter extends 
MercifulJsonConverterTestBas
     Schema schema = SchemaTestUtil.getSchema(DECIMAL_AVRO_FILE_PATH);
 
     Row expectRow = RowFactory.create(bigDecimal);
-    Row realRow = CONVERTER.convertToRow(json, schema);
+    Row realRow = getConverter().convertToRow(json, schema);
     validateSchemaCompatibility(Collections.singletonList(realRow), schema);
     assertEquals(expectRow, realRow);
   }
@@ -178,7 +172,7 @@ public class TestMercifulJsonToRowConverter extends 
MercifulJsonConverterTestBas
 
     // Schedule with timestamp same as that of committed instant
     assertThrows(HoodieJsonToRowConversionException.class, () -> {
-      CONVERTER.convertToRow(json, schema);
+      getConverter().convertToRow(json, schema);
     });
   }
 
@@ -222,7 +216,7 @@ public class TestMercifulJsonToRowConverter extends 
MercifulJsonConverterTestBas
     String json = MAPPER.writeValueAsString(data);
 
     Row expectRow = RowFactory.create(bigDecimal);
-    Row realRow = CONVERTER.convertToRow(json, schema);
+    Row realRow = getConverter().convertToRow(json, schema);
     validateSchemaCompatibility(Collections.singletonList(realRow), schema);
     assertEquals(expectRow, realRow);
   }
@@ -241,11 +235,11 @@ public class TestMercifulJsonToRowConverter extends 
MercifulJsonConverterTestBas
     if (shouldConvert) {
       BigDecimal bigDecimal = new BigDecimal(expected);
       Row expectedRow = RowFactory.create(bigDecimal);
-      Row actualRow = CONVERTER.convertToRow(json, schema);
+      Row actualRow = getConverter().convertToRow(json, schema);
       validateSchemaCompatibility(Collections.singletonList(actualRow), 
schema);
       assertEquals(expectedRow, actualRow);
     } else {
-      assertThrows(HoodieJsonToRowConversionException.class, () -> 
CONVERTER.convertToRow(json, schema));
+      assertThrows(HoodieJsonToRowConversionException.class, () -> 
getConverter().convertToRow(json, schema));
     }
   }
 
@@ -277,7 +271,7 @@ public class TestMercifulJsonToRowConverter extends 
MercifulJsonConverterTestBas
 
     // Duration type is not supported in Row object.
     assertThrows(HoodieJsonToRowConversionException.class, () -> {
-      CONVERTER.convertToRow(json, schema);
+      getConverter().convertToRow(json, schema);
     });
   }
 
@@ -293,7 +287,7 @@ public class TestMercifulJsonToRowConverter extends 
MercifulJsonConverterTestBas
     Schema schema = SchemaTestUtil.getSchemaFromResourceFilePath(schemaFile);
     // Schedule with timestamp same as that of committed instant
     assertThrows(HoodieJsonToRowConversionException.class, () -> {
-      CONVERTER.convertToRow(json, schema);
+      getConverter().convertToRow(json, schema);
     });
   }
 
@@ -320,10 +314,20 @@ public class TestMercifulJsonToRowConverter extends 
MercifulJsonConverterTestBas
     if (groundTruthRow == null) {
       return;
     }
-    Row rec = 
RowFactory.create(java.sql.Date.valueOf(groundTruthRow).toLocalDate());
-    Row realRow = CONVERTER.convertToRow(json, schema);
+    Row rec;
+    if (isJava8ApiEnabled()) {
+      rec = 
RowFactory.create(java.sql.Date.valueOf(groundTruthRow).toLocalDate());
+    } else {
+      rec = RowFactory.create(java.sql.Date.valueOf(groundTruthRow));
+    }
+
+    Row realRow = getConverter().convertToRow(json, schema);
     validateSchemaCompatibility(Collections.singletonList(realRow), schema);
-    assertEquals(rec.getLocalDate(0).toString(), 
realRow.getLocalDate(0).toString());
+    if (isJava8ApiEnabled()) {
+      assertEquals(rec.getLocalDate(0).toString(), 
realRow.getLocalDate(0).toString());
+    } else {
+      assertEquals(rec.getDate(0).toString(), realRow.getDate(0).toString());
+    }
   }
 
   /**
@@ -340,7 +344,7 @@ public class TestMercifulJsonToRowConverter extends 
MercifulJsonConverterTestBas
     data.put("dateField", 1);
     String json = MAPPER.writeValueAsString(data);
     assertThrows(HoodieJsonToRowConversionException.class, () -> {
-      CONVERTER.convertToRow(json, schema);
+      getConverter().convertToRow(json, schema);
     });
   }
 
@@ -398,7 +402,7 @@ public class TestMercifulJsonToRowConverter extends 
MercifulJsonConverterTestBas
       String json = MAPPER.writeValueAsString(data);
 
       Row rec = 
RowFactory.create(ValueType.castToLocalTimestampMillis(milliSecOfDay, null), 
ValueType.castToLocalTimestampMicros(microSecOfDay, null));
-      Row actualRow = CONVERTER.convertToRow(json, schema);
+      Row actualRow = getConverter().convertToRow(json, schema);
       validateSchemaCompatibility(Collections.singletonList(actualRow), 
schema);
       assertEquals(rec, actualRow);
     });
@@ -415,7 +419,7 @@ public class TestMercifulJsonToRowConverter extends 
MercifulJsonConverterTestBas
     String json = MAPPER.writeValueAsString(data);
 
     assertThrows(HoodieJsonToRowConversionException.class, () -> {
-      CONVERTER.convertToRow(json, schema);
+      getConverter().convertToRow(json, schema);
     });
   }
 
@@ -444,8 +448,14 @@ public class TestMercifulJsonToRowConverter extends 
MercifulJsonConverterTestBas
     data.put("timestampMicrosField", timeMicro);
     String json = MAPPER.writeValueAsString(data);
 
-    Row rec = RowFactory.create(new Timestamp(milliSecOfDay), new 
Timestamp(microSecOfDay / 1000));
-    Row actualRow = CONVERTER.convertToRow(json, schema);
+    Row rec;
+    if (isJava8ApiEnabled()) {
+      rec = RowFactory.create(Instant.ofEpochMilli(milliSecOfDay), 
DateTimeUtils.microsToInstant(microSecOfDay));
+    } else {
+      rec = 
RowFactory.create(Timestamp.from(Instant.ofEpochMilli(milliSecOfDay)), 
Timestamp.from(DateTimeUtils.microsToInstant(microSecOfDay)));
+    }
+
+    Row actualRow = getConverter().convertToRow(json, schema);
     validateSchemaCompatibility(Collections.singletonList(actualRow), schema);
     assertEquals(rec, actualRow);
   }
@@ -462,7 +472,7 @@ public class TestMercifulJsonToRowConverter extends 
MercifulJsonConverterTestBas
     // Schedule with timestamp same as that of committed instant
 
     assertThrows(HoodieJsonToRowConversionException.class, () -> {
-      CONVERTER.convertToRow(json, schema);
+      getConverter().convertToRow(json, schema);
     });
   }
 
@@ -491,7 +501,7 @@ public class TestMercifulJsonToRowConverter extends 
MercifulJsonConverterTestBas
     String json = MAPPER.writeValueAsString(data);
 
     Row rec = RowFactory.create(microSecOfDay, milliSecOfDay);
-    Row realRow = CONVERTER.convertToRow(json, schema);
+    Row realRow = getConverter().convertToRow(json, schema);
     validateSchemaCompatibility(Collections.singletonList(realRow), schema);
     assertEquals(rec.get(0).toString(), realRow.get(0).toString());
     assertEquals(rec.get(1).toString(), realRow.get(1).toString());
@@ -510,7 +520,7 @@ public class TestMercifulJsonToRowConverter extends 
MercifulJsonConverterTestBas
     data.put("timeMillisField", invalidInput);
     // Schedule with timestamp same as that of committed instant
     assertThrows(HoodieJsonToRowConversionException.class, () -> {
-      CONVERTER.convertToRow(MAPPER.writeValueAsString(data), schema);
+      getConverter().convertToRow(MAPPER.writeValueAsString(data), schema);
     });
 
     data.clear();
@@ -518,7 +528,7 @@ public class TestMercifulJsonToRowConverter extends 
MercifulJsonConverterTestBas
     data.put("timeMillisField", validInput);
     // Schedule with timestamp same as that of committed instant
     assertThrows(HoodieJsonToRowConversionException.class, () -> {
-      CONVERTER.convertToRow(MAPPER.writeValueAsString(data), schema);
+      getConverter().convertToRow(MAPPER.writeValueAsString(data), schema);
     });
   }
 
@@ -542,7 +552,7 @@ public class TestMercifulJsonToRowConverter extends 
MercifulJsonConverterTestBas
     String json = MAPPER.writeValueAsString(data);
 
     Row rec = RowFactory.create(uuid);
-    Row real = CONVERTER.convertToRow(json, schema);
+    Row real = getConverter().convertToRow(json, schema);
     validateSchemaCompatibility(Collections.singletonList(real), schema);
     assertEquals(rec, real);
   }
@@ -559,7 +569,7 @@ public class TestMercifulJsonToRowConverter extends 
MercifulJsonConverterTestBas
     Schema nestedSchema = new Schema.Parser().parse(nestedSchemaStr);
 
     Row expected = RowFactory.create("Jane Smith", 
RowFactory.create(contactInput));
-    Row real = CONVERTER.convertToRow(json, nestedSchema);
+    Row real = getConverter().convertToRow(json, nestedSchema);
     validateSchemaCompatibility(Collections.singletonList(real), nestedSchema);
     assertEquals(expected, real);
   }
@@ -585,7 +595,7 @@ public class TestMercifulJsonToRowConverter extends 
MercifulJsonConverterTestBas
     values.set(1, number);
     values.set(2, color);
     Row expected = RowFactory.create(values.toArray());
-    Row actual = CONVERTER.convertToRow(json, sanitizedSchema);
+    Row actual = getConverter().convertToRow(json, sanitizedSchema);
     validateSchemaCompatibility(Collections.singletonList(actual), 
sanitizedSchema);
     assertEquals(expected, actual);
   }
@@ -613,7 +623,7 @@ public class TestMercifulJsonToRowConverter extends 
MercifulJsonConverterTestBas
     values.set(2, color);
     values.set(3, unmatched);
     Row recRow = RowFactory.create(values.toArray());
-    Row realRow = CONVERTER.convertToRow(json, sanitizedSchema);
+    Row realRow = getConverter().convertToRow(json, sanitizedSchema);
     validateSchemaCompatibility(Collections.singletonList(realRow), 
sanitizedSchema);
     assertEquals(recRow, realRow);
   }
@@ -638,7 +648,7 @@ public class TestMercifulJsonToRowConverter extends 
MercifulJsonConverterTestBas
     String json = MAPPER.writeValueAsString(data);
     Schema tripSchema = new Schema.Parser().parse(
         TRIP_ENCODED_DECIMAL_SCHEMA.replace("6", 
Integer.toString(scale)).replace("10", Integer.toString(precision)));
-    Row rec = CONVERTER.convertToRow(json, tripSchema);
+    Row rec = getConverter().convertToRow(json, tripSchema);
     validateSchemaCompatibility(Collections.singletonList(rec), tripSchema);
     BigDecimal actualField = 
rec.getDecimal(tripSchema.getField("decfield").pos());
     assertEquals(decfield, actualField);
@@ -669,7 +679,7 @@ public class TestMercifulJsonToRowConverter extends 
MercifulJsonConverterTestBas
     data.put("fare", rand.nextDouble() * 100);
     data.put("_hoodie_is_deleted", false);
     String json = MAPPER.writeValueAsString(data);
-    Row rec = CONVERTER.convertToRow(json, postProcessSchema);
+    Row rec = getConverter().convertToRow(json, postProcessSchema);
     BigDecimal actualField = 
rec.getDecimal(postProcessSchema.getField("decfield").pos());
     validateSchemaCompatibility(Collections.singletonList(rec), 
postProcessSchema);
     assertEquals(decfield, actualField);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestMercifulJsonToRowConverterJava8Api.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestMercifulJsonToRowConverterJava8Api.java
new file mode 100644
index 000000000000..5063dedc7cc1
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestMercifulJsonToRowConverterJava8Api.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.BeforeAll;
+
+public class TestMercifulJsonToRowConverterJava8Api extends 
TestMercifulJsonToRowConverterBase {
+
+
+  private static final MercifulJsonToRowConverter CONVERTER = new 
MercifulJsonToRowConverter(true, "__", true);
+
+  @BeforeAll
+  public static void start() {
+    spark = SparkSession
+        .builder()
+        .master("local[*]")
+        .appName(TestMercifulJsonToRowConverterBase.class.getName())
+        .config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
+        .config("spark.sql.datetime.java8API.enabled", "true")
+        .getOrCreate();
+  }
+
+  @Override
+  protected MercifulJsonToRowConverter getConverter() {
+    return CONVERTER;
+  }
+
+  @Override
+  protected boolean isJava8ApiEnabled() {
+    return true;
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestMercifulJsonToRowConverterLegacyApi.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestMercifulJsonToRowConverterLegacyApi.java
new file mode 100644
index 000000000000..8268bbb37b2b
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestMercifulJsonToRowConverterLegacyApi.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.BeforeAll;
+
+public class TestMercifulJsonToRowConverterLegacyApi extends 
TestMercifulJsonToRowConverterBase {
+
+  private static final MercifulJsonToRowConverter CONVERTER = new 
MercifulJsonToRowConverter(true, "__", false);
+
+  @BeforeAll
+  public static void start() {
+    spark = SparkSession
+        .builder()
+        .master("local[*]")
+        .appName(TestMercifulJsonToRowConverterBase.class.getName())
+        .config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
+        .config("spark.sql.datetime.java8API.enabled", "false")
+        .getOrCreate();
+  }
+
+  @Override
+  protected MercifulJsonToRowConverter getConverter() {
+    return CONVERTER;
+  }
+
+  @Override
+  protected boolean isJava8ApiEnabled() {
+    return false;
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
index 76a1a6453670..46e9979bfbc5 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
@@ -81,6 +81,10 @@ public abstract class AbstractDFSSourceTestBase extends 
UtilitiesTestBase {
 
   protected abstract Source prepareDFSSource(TypedProperties props);
 
+  protected Option<TypedProperties> getSourceFormatAdapterProps() {
+    return Option.empty();
+  }
+
   /**
    * Writes test data, i.e., a {@link List} of {@link HoodieRecord}, to a file 
on DFS.
    *
@@ -111,7 +115,7 @@ public abstract class AbstractDFSSourceTestBase extends 
UtilitiesTestBase {
   @Test
   public void testReadingFromSource() throws IOException {
     fs.mkdirs(new Path(dfsRoot));
-    SourceFormatAdapter sourceFormatAdapter = new 
SourceFormatAdapter(prepareDFSSource());
+    SourceFormatAdapter sourceFormatAdapter = new 
SourceFormatAdapter(prepareDFSSource(), Option.empty(), 
getSourceFormatAdapterProps());
 
     // 1. Extract without any checkpoint => get all the data, respecting 
sourceLimit
     assertEquals(Option.empty(),
@@ -136,6 +140,9 @@ public abstract class AbstractDFSSourceTestBase extends 
UtilitiesTestBase {
         .createDataFrame(JavaRDD.toRDD(fetch1.getBatch().get()),
             schemaProvider.getSourceSchema().toString(), sparkSession);
     assertEquals(100, fetch1Rows.count());
+    // city_to_state can't be in except because it is a map
+    assertEquals(0, fetch1AsRows.getBatch().get().drop("city_to_state")
+        .except(fetch1Rows.drop("city_to_state")).count());
 
     // 2. Produce new data, extract new data
     generateOneFile("2", "001", 10000);


Reply via email to