This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 236801be01ee fix(ingest): Fix Timestamp Conversions, Add legacy api
support (#14076)
236801be01ee is described below
commit 236801be01ee4d9cbb2bca5e0105cdf1516c861e
Author: Jon Vexler <[email protected]>
AuthorDate: Thu Oct 30 05:19:00 2025 -0400
fix(ingest): Fix Timestamp Conversions, Add legacy api support (#14076)
Co-authored-by: Jonathan Vexler <=>
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../apache/hudi/stats/SparkValueMetadataUtils.java | 2 +-
.../common/testutils/HoodieTestDataGenerator.java | 25 +++++-
.../helpers/MercifulJsonToRowConverter.java | 18 ++--
.../utilities/sources/helpers/RowConverter.java | 6 +-
.../utilities/streamer/SourceFormatAdapter.java | 11 ++-
.../deltastreamer/TestHoodieDeltaStreamer.java | 100 ++++++++++++++++++++-
.../hudi/utilities/sources/TestJsonDFSSource.java | 7 ++
...ava => TestMercifulJsonToRowConverterBase.java} | 90 ++++++++++---------
.../TestMercifulJsonToRowConverterJava8Api.java | 50 +++++++++++
.../TestMercifulJsonToRowConverterLegacyApi.java | 49 ++++++++++
.../sources/AbstractDFSSourceTestBase.java | 9 +-
11 files changed, 308 insertions(+), 59 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/stats/SparkValueMetadataUtils.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/stats/SparkValueMetadataUtils.java
index 0a689a7e31b0..986a1c29ec92 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/stats/SparkValueMetadataUtils.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/stats/SparkValueMetadataUtils.java
@@ -144,7 +144,7 @@ public class SparkValueMetadataUtils {
* we need to return java.sql.Timestamp and java.sql.Date
*
*/
- public static Object convertJavaTypeToSparkType(Comparable<?> javaVal,
boolean useJava8api) {
+ public static Object convertJavaTypeToSparkType(Object javaVal, boolean
useJava8api) {
if (!useJava8api) {
if (javaVal instanceof Instant) {
return Timestamp.from((Instant) javaVal);
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index 4aab20e0c2c0..e9db289b8164 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -179,6 +179,12 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
+
"{\"name\":\"dec_fixed_small\",\"type\":{\"type\":\"fixed\",\"name\":\"decFixedSmall\",\"size\":3,\"logicalType\":\"decimal\",\"precision\":5,\"scale\":2}},"
+
"{\"name\":\"dec_fixed_large\",\"type\":{\"type\":\"fixed\",\"name\":\"decFixedLarge\",\"size\":8,\"logicalType\":\"decimal\",\"precision\":18,\"scale\":9}},";
+ public static final String EXTENDED_LOGICAL_TYPES_SCHEMA_NO_LTS_V6 =
"{\"name\":\"ts_millis\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}},"
+ +
"{\"name\":\"ts_micros\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}},"
+ +
"{\"name\":\"event_date\",\"type\":{\"type\":\"int\",\"logicalType\":\"date\"}},"
+ +
"{\"name\":\"dec_fixed_small\",\"type\":{\"type\":\"fixed\",\"name\":\"decFixedSmall\",\"size\":3,\"logicalType\":\"decimal\",\"precision\":5,\"scale\":2}},"
+ +
"{\"name\":\"dec_fixed_large\",\"type\":{\"type\":\"fixed\",\"name\":\"decFixedLarge\",\"size\":8,\"logicalType\":\"decimal\",\"precision\":18,\"scale\":9}},";
+
public static final String EXTRA_COL_SCHEMA1 = "{\"name\":
\"extra_column1\", \"type\": [\"null\", \"string\"], \"default\": null },";
public static final String EXTRA_COL_SCHEMA2 = "{\"name\":
\"extra_column2\", \"type\": [\"null\", \"string\"], \"default\": null},";
public static final String EXTRA_COL_SCHEMA_FOR_AWS_DMS_PAYLOAD =
"{\"name\": \"Op\", \"type\": [\"null\", \"string\"], \"default\": null},";
@@ -202,6 +208,9 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
public static final String TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS =
TRIP_SCHEMA_PREFIX + EXTENDED_LOGICAL_TYPES_SCHEMA_NO_LTS +
TRIP_SCHEMA_SUFFIX;
+ public static final String TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS_V6 =
+ TRIP_SCHEMA_PREFIX + EXTENDED_LOGICAL_TYPES_SCHEMA_NO_LTS_V6 +
TRIP_SCHEMA_SUFFIX;
+
public static final String TRIP_NESTED_EXAMPLE_SCHEMA =
TRIP_SCHEMA_PREFIX + FARE_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX;
@@ -233,6 +242,7 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
public static final Schema AVRO_TRIP_LOGICAL_TYPES_SCHEMA = new
Schema.Parser().parse(TRIP_LOGICAL_TYPES_SCHEMA);
public static final Schema AVRO_TRIP_LOGICAL_TYPES_SCHEMA_V6 = new
Schema.Parser().parse(TRIP_LOGICAL_TYPES_SCHEMA_V6);
public static final Schema AVRO_TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS = new
Schema.Parser().parse(TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS);
+ public static final Schema AVRO_TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS_V6 = new
Schema.Parser().parse(TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS_V6);
public static final Schema AVRO_TRIP_SCHEMA = new
Schema.Parser().parse(TRIP_SCHEMA);
public static final Schema FLATTENED_AVRO_SCHEMA = new
Schema.Parser().parse(TRIP_FLATTENED_SCHEMA);
@@ -380,6 +390,8 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
return generatePayloadForLogicalTypesSchemaV6(key, commitTime, false,
timestamp);
} else if (TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS.equals(schemaStr)) {
return generatePayloadForLogicalTypesSchemaNoLTS(key, commitTime,
false, timestamp);
+ } else if (TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS_V6.equals(schemaStr)) {
+ return generatePayloadForLogicalTypesSchemaNoLTSV6(key, commitTime,
false, timestamp);
}
} else {
if (TRIP_EXAMPLE_SCHEMA.equals(schemaStr)) {
@@ -390,6 +402,8 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
return generatePayloadForLogicalTypesSchemaV6(key, commitTime, true,
timestamp);
} else if (TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS.equals(schemaStr)) {
return generatePayloadForLogicalTypesSchemaNoLTS(key, commitTime,
true, timestamp);
+ } else if (TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS_V6.equals(schemaStr)) {
+ return generatePayloadForLogicalTypesSchemaNoLTSV6(key, commitTime,
true, timestamp);
}
}
@@ -451,6 +465,10 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
public IndexedRecord generatePayloadForLogicalTypesSchemaNoLTS(HoodieKey
key, String commitTime, boolean isDelete, long timestamp) {
return generateRecordForTripLogicalTypesSchema(key, "rider-" + commitTime,
"driver-" + commitTime, timestamp, isDelete, false, false);
}
+
+ public IndexedRecord generatePayloadForLogicalTypesSchemaNoLTSV6(HoodieKey
key, String commitTime, boolean isDelete, long timestamp) {
+ return generateRecordForTripLogicalTypesSchema(key, "rider-" + commitTime,
"driver-" + commitTime, timestamp, isDelete, true, false);
+ }
public IndexedRecord generatePayloadForLogicalTypesSchema(HoodieKey key,
String commitTime, boolean isDelete, long timestamp) {
return generateRecordForTripLogicalTypesSchema(key, "rider-" + commitTime,
"driver-" + commitTime, timestamp, isDelete, false, true);
@@ -679,8 +697,11 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
long timestamp,
boolean isDeleteRecord, boolean v6, boolean hasLTS) {
GenericRecord rec;
if (!hasLTS) {
- // LTS = Local Timestamp
- rec = new GenericData.Record(AVRO_TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS);
+ if (v6) {
+ rec = new GenericData.Record(AVRO_TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS_V6);
+ } else {
+ rec = new GenericData.Record(AVRO_TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS);
+ }
} else if (v6) {
rec = new GenericData.Record(AVRO_TRIP_LOGICAL_TYPES_SCHEMA_V6);
} else {
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/MercifulJsonToRowConverter.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/MercifulJsonToRowConverter.java
index 2c6e18897e68..2d9aca6bf526 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/MercifulJsonToRowConverter.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/MercifulJsonToRowConverter.java
@@ -30,8 +30,10 @@ import
org.apache.hudi.avro.processors.LocalTimestampMilliLogicalTypeProcessor;
import org.apache.hudi.avro.processors.Parser;
import org.apache.hudi.avro.processors.TimestampMicroLogicalTypeProcessor;
import org.apache.hudi.avro.processors.TimestampMilliLogicalTypeProcessor;
+import org.apache.hudi.common.util.DateTimeUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.stats.SparkValueMetadataUtils;
import org.apache.hudi.stats.ValueType;
import org.apache.hudi.utilities.exception.HoodieJsonToRowConversionException;
@@ -47,7 +49,6 @@ import org.apache.spark.sql.RowFactory;
import java.io.IOException;
import java.math.BigDecimal;
-import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.temporal.ChronoField;
@@ -63,19 +64,21 @@ import scala.collection.JavaConverters;
* Converts Json record to Row Record.
*/
public class MercifulJsonToRowConverter extends MercifulJsonConverter {
+ private final boolean useJava8api;
/**
* Allows enabling sanitization and allows choice of invalidCharMask for
sanitization
*/
- public MercifulJsonToRowConverter(boolean shouldSanitize, String
invalidCharMask) {
- this(new
ObjectMapper().enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS),
shouldSanitize, invalidCharMask);
+ public MercifulJsonToRowConverter(boolean shouldSanitize, String
invalidCharMask, boolean useJava8api) {
+ this(new
ObjectMapper().enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS),
shouldSanitize, invalidCharMask, useJava8api);
}
/**
* Allows a configured ObjectMapper to be passed for converting json records
to row.
*/
- public MercifulJsonToRowConverter(ObjectMapper mapper, boolean
shouldSanitize, String invalidCharMask) {
+ public MercifulJsonToRowConverter(ObjectMapper mapper, boolean
shouldSanitize, String invalidCharMask, boolean useJava8api) {
super(mapper, shouldSanitize, invalidCharMask);
+ this.useJava8api = useJava8api;
}
/**
@@ -102,7 +105,7 @@ public class MercifulJsonToRowConverter extends
MercifulJsonConverter {
for (Schema.Field f : fields) {
Object val = shouldSanitize ? getFieldFromJson(f, inputJson,
schema.getFullName(), invalidCharMask) : inputJson.get(f.name());
if (val != null) {
- values.set(f.pos(), convertJsonField(val, f.name(), f.schema()));
+ values.set(f.pos(),
SparkValueMetadataUtils.convertJavaTypeToSparkType(convertJsonField(val,
f.name(), f.schema()), useJava8api));
}
}
return RowFactory.create(values.toArray());
@@ -238,7 +241,7 @@ public class MercifulJsonToRowConverter extends
MercifulJsonConverter {
},
value, schema);
if (result.getLeft()) {
- return Pair.of(true, new Timestamp((Long) result.getRight()));
+ return Pair.of(true, Instant.ofEpochMilli((Long) result.getRight()));
}
return Pair.of(false, null);
}
@@ -275,8 +278,7 @@ public class MercifulJsonToRowConverter extends
MercifulJsonConverter {
},
value, schema);
if (result.getLeft()) {
- // timestamp in spark sql doesn't support precision to the micro.
- return Pair.of(true, new Timestamp(((Long) result.getRight()) / 1000));
+ return Pair.of(true, DateTimeUtils.microsToInstant((Long)
result.getRight()));
}
return Pair.of(false, null);
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/RowConverter.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/RowConverter.java
index 7ed174b38a5f..51dec7fcdff5 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/RowConverter.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/RowConverter.java
@@ -43,17 +43,19 @@ public class RowConverter implements Serializable {
private final String schemaStr;
private final String invalidCharMask;
private final boolean shouldSanitize;
+ private final boolean useJava8api;
/**
* To be lazily initialized on executors.
*/
private transient MercifulJsonToRowConverter jsonConverter;
- public RowConverter(Schema schema, boolean shouldSanitize, String
invalidCharMask) {
+ public RowConverter(Schema schema, boolean shouldSanitize, String
invalidCharMask, boolean useJava8api) {
this.schemaStr = schema.toString();
this.schema = schema;
this.shouldSanitize = shouldSanitize;
this.invalidCharMask = invalidCharMask;
+ this.useJava8api = useJava8api;
}
private void initSchema() {
@@ -65,7 +67,7 @@ public class RowConverter implements Serializable {
private void initJsonConvertor() {
if (jsonConverter == null) {
- jsonConverter = new MercifulJsonToRowConverter(this.shouldSanitize,
this.invalidCharMask);
+ jsonConverter = new MercifulJsonToRowConverter(this.shouldSanitize,
this.invalidCharMask, this.useJava8api);
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
index f326c98826ef..3a33f4c06f75 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
@@ -47,6 +47,7 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
+import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
@@ -77,6 +78,9 @@ public class SourceFormatAdapter implements Closeable {
private boolean wrapWithException =
ROW_THROW_EXPLICIT_EXCEPTIONS.defaultValue();
private String invalidCharMask =
SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.defaultValue();
+ private boolean useJava8api = (boolean)
SQLConf.DATETIME_JAVA8API_ENABLED().defaultValue().get();
+
+
private Option<BaseErrorTableWriter> errorTableWriter = Option.empty();
public SourceFormatAdapter(Source source) {
@@ -90,6 +94,7 @@ public class SourceFormatAdapter implements Closeable {
this.shouldSanitize = SanitizationUtils.shouldSanitize(props.get());
this.invalidCharMask = SanitizationUtils.getInvalidCharMask(props.get());
this.wrapWithException = ConfigUtils.getBooleanWithAltKeys(props.get(),
ROW_THROW_EXPLICIT_EXCEPTIONS);
+ this.useJava8api = (boolean)
getSource().getSparkSession().conf().get(SQLConf.DATETIME_JAVA8API_ENABLED());
}
if (this.shouldSanitize && source.getSourceType() ==
Source.SourceType.PROTO) {
throw new IllegalArgumentException("PROTO cannot be sanitized");
@@ -112,6 +117,10 @@ public class SourceFormatAdapter implements Closeable {
return invalidCharMask;
}
+ private boolean getUseJava8api() {
+ return useJava8api;
+ }
+
/**
* transform input rdd of json string to generic records with support for
adding error events to error table
* @param inputBatch
@@ -134,7 +143,7 @@ public class SourceFormatAdapter implements Closeable {
private JavaRDD<Row> transformJsonToRowRdd(InputBatch<JavaRDD<String>>
inputBatch) {
MercifulJsonConverter.clearCache(inputBatch.getSchemaProvider().getSourceSchema().getFullName());
- RowConverter convertor = new
RowConverter(inputBatch.getSchemaProvider().getSourceSchema(),
isFieldNameSanitizingEnabled(), getInvalidCharMask());
+ RowConverter convertor = new
RowConverter(inputBatch.getSchemaProvider().getSourceSchema(),
isFieldNameSanitizingEnabled(), getInvalidCharMask(), getUseJava8api());
return inputBatch.getBatch().map(rdd -> {
if (errorTableWriter.isPresent()) {
JavaRDD<Either<Row, String>> javaRDD =
rdd.map(convertor::fromJsonToRowWithError);
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index a5c393af1a0a..9556d70b8602 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -111,7 +111,7 @@ import org.apache.hudi.utilities.sources.ParquetDFSSource;
import org.apache.hudi.utilities.sources.SqlSource;
import org.apache.hudi.utilities.sources.TestDataSource;
import org.apache.hudi.utilities.sources.TestParquetDFSSourceEmptyBatch;
-import
org.apache.hudi.utilities.sources.helpers.TestMercifulJsonToRowConverter;
+import
org.apache.hudi.utilities.sources.helpers.TestMercifulJsonToRowConverterBase;
import org.apache.hudi.utilities.streamer.HoodieStreamer;
import org.apache.hudi.utilities.streamer.NoNewDataTerminationStrategy;
import org.apache.hudi.utilities.streamer.StreamSync;
@@ -788,10 +788,92 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
}
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testLogicalTypesWithJsonSource(boolean hasTransformer) throws Exception
{
+ try {
+ //use v6 schema because decimal parsing iso 8859-1 support not available
currently
+ String schemaStr;
+ if (HoodieSparkUtils.isSpark3_3()) {
+
TestHoodieDeltaStreamerSchemaEvolutionBase.TestSchemaProvider.sourceSchema =
+ HoodieTestDataGenerator.AVRO_TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS_V6;
+
TestHoodieDeltaStreamerSchemaEvolutionBase.TestSchemaProvider.targetSchema =
+ HoodieTestDataGenerator.AVRO_TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS_V6;
+ schemaStr =
HoodieTestDataGenerator.TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS_V6;
+ } else {
+
TestHoodieDeltaStreamerSchemaEvolutionBase.TestSchemaProvider.sourceSchema =
+ HoodieTestDataGenerator.AVRO_TRIP_LOGICAL_TYPES_SCHEMA_V6;
+
TestHoodieDeltaStreamerSchemaEvolutionBase.TestSchemaProvider.targetSchema =
+ HoodieTestDataGenerator.AVRO_TRIP_LOGICAL_TYPES_SCHEMA_V6;
+ schemaStr = HoodieTestDataGenerator.TRIP_LOGICAL_TYPES_SCHEMA_V6;
+ }
+ defaultSchemaProviderClassName =
+
TestHoodieDeltaStreamerSchemaEvolutionBase.TestSchemaProvider.class.getName();
+ String tableBasePath = basePath + "testTimestampMillis";
+ prepareJsonKafkaDFSSource(
+ PROPS_FILENAME_TEST_JSON_KAFKA, "earliest", topicName);
+
+ // Insert data produced with Schema A, pass Schema A
+ prepareJsonKafkaDFSFilesWithSchema(
+ 1000, true, topicName, schemaStr);
+ HoodieDeltaStreamer.Config cfg =
getConfigForLogicalTypesWithJsonSource(tableBasePath,
WriteOperationType.INSERT, hasTransformer);
+ new HoodieDeltaStreamer(cfg, jsc).sync();
+ // Validate.
+ assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage,
tableBasePath));
+ assertRecordCount(1000, tableBasePath, sqlContext);
+ TestHelpers.assertCommitMetadata(topicName + ",0:500,1:500",
tableBasePath, 1);
+ TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(
+ HoodieTestUtils.createMetaClient(storage, tableBasePath));
+ Schema tableSchema = tableSchemaResolver.getTableAvroSchema(false);
+ Map<String, String> hudiOpts = new HashMap<>();
+ hudiOpts.put("hoodie.datasource.write.recordkey.field", "id");
+ logicalAssertions(tableSchema, tableBasePath, hudiOpts,
HoodieTableVersion.EIGHT.versionCode());
+
+ // Update data.
+ prepareJsonKafkaDFSFilesWithSchema(
+ 1000, false, topicName, schemaStr);
+ cfg = getConfigForLogicalTypesWithJsonSource(tableBasePath,
WriteOperationType.UPSERT, hasTransformer);
+ new HoodieDeltaStreamer(cfg, jsc).sync();
+ // Validate.
+ assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage,
tableBasePath));
+ assertRecordCount(2000, tableBasePath, sqlContext);
+ TestHelpers.assertCommitMetadata(topicName + ",0:1000,1:1000",
tableBasePath, 2);
+ tableSchemaResolver = new TableSchemaResolver(
+ HoodieTestUtils.createMetaClient(storage, tableBasePath));
+ tableSchema = tableSchemaResolver.getTableAvroSchema(false);
+ logicalAssertions(tableSchema, tableBasePath, hudiOpts,
HoodieTableVersion.EIGHT.versionCode());
+ } finally {
+ defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName();
+ }
+ }
+
+ private static HoodieDeltaStreamer.Config
getConfigForLogicalTypesWithJsonSource(String tableBasePath, WriteOperationType
operationType, boolean hasTransformer) {
+ List<String> transformerClassNames;
+ if (hasTransformer) {
+ transformerClassNames =
Collections.singletonList(TestIdentityTransformer.class.getName());
+ } else {
+ transformerClassNames = Collections.emptyList();
+ }
+ HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(
+ tableBasePath, operationType, JsonKafkaSource.class.getName(),
+ transformerClassNames,
+ PROPS_FILENAME_TEST_JSON_KAFKA,
+ false, true, 100000, false, null,
+ HoodieTableType.MERGE_ON_READ.name(),
+ "timestamp", null);
+ cfg.payloadClassName = DefaultHoodieRecordPayload.class.getName();
+ cfg.recordMergeStrategyId =
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
+ cfg.recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING;
+ cfg.configs.add(String.format("%s=%s",
HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "0"));
+ cfg.configs.add("hoodie.datasource.write.row.writer.enable=false");
+
cfg.configs.add("hoodie.streamer.source.sanitize.invalid.schema.field.names=true");
+ return cfg;
+ }
+
@ParameterizedTest
@EnumSource(value = HoodieTableVersion.class, names = {"SIX", "EIGHT"})
public void testBackwardsCompatibility(HoodieTableVersion version) throws
Exception {
- TestMercifulJsonToRowConverter.timestampNTZCompatibility(() -> {
+ TestMercifulJsonToRowConverterBase.timestampNTZCompatibility(() -> {
String dirName = "colstats-upgrade-test-v" + version.versionCode();
String dataPath = basePath + "/" + dirName;
java.nio.file.Path zipOutput = Paths.get(new URI(dataPath));
@@ -2379,10 +2461,18 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
}
private void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic,
String topicName) {
- prepareJsonKafkaDFSFiles(numRecords, createTopic, topicName, 2);
+ prepareJsonKafkaDFSFiles(numRecords, createTopic, topicName, 2,
HoodieTestDataGenerator.TRIP_SCHEMA);
}
private void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic,
String topicName, int numPartitions) {
+ prepareJsonKafkaDFSFiles(numRecords, createTopic, topicName,
numPartitions, HoodieTestDataGenerator.TRIP_SCHEMA);
+ }
+
+ private void prepareJsonKafkaDFSFilesWithSchema(int numRecords, boolean
createTopic, String topicName, String schemaStr) {
+ prepareJsonKafkaDFSFiles(numRecords, createTopic, topicName, 2, schemaStr);
+ }
+
+ private void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic,
String topicName, int numPartitions, String schemaStr) {
if (createTopic) {
try {
testUtils.createTopic(topicName, numPartitions);
@@ -2392,7 +2482,9 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
}
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
testUtils.sendMessages(topicName,
-
UtilitiesTestBase.Helpers.jsonifyRecordsByPartitions(dataGenerator.generateInsertsAsPerSchema("000",
numRecords, HoodieTestDataGenerator.TRIP_SCHEMA), numPartitions));
+ UtilitiesTestBase.Helpers.jsonifyRecordsByPartitions(
+ dataGenerator.generateInsertsAsPerSchema(
+ "000", numRecords, schemaStr), numPartitions));
}
private void testParquetDFSSource(boolean useSchemaProvider, List<String>
transformerClassNames) throws Exception {
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java
index 3a64747eda5b..d67808dd1e04 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java
@@ -61,6 +61,13 @@ public class TestJsonDFSSource extends
AbstractDFSSourceTestBase {
return new JsonDFSSource(props, jsc, sparkSession, schemaProvider);
}
+ @Override
+ protected Option<TypedProperties> getSourceFormatAdapterProps() {
+ TypedProperties properties = new TypedProperties();
+
properties.setProperty(HoodieStreamerConfig.SANITIZE_SCHEMA_FIELD_NAMES.key(),
"true");
+ return Option.of(properties);
+ }
+
@Override
public void writeNewDataToFile(List<HoodieRecord> records, Path path) throws
IOException {
UtilitiesTestBase.Helpers.saveStringsToDFS(
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestMercifulJsonToRowConverter.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestMercifulJsonToRowConverterBase.java
similarity index 91%
rename from
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestMercifulJsonToRowConverter.java
rename to
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestMercifulJsonToRowConverterBase.java
index 595132eaacf4..81066aed4a9e 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestMercifulJsonToRowConverter.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestMercifulJsonToRowConverterBase.java
@@ -22,6 +22,7 @@ import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.avro.MercifulJsonConverterTestBase;
import org.apache.hudi.common.testutils.SchemaTestUtil;
+import org.apache.hudi.common.util.DateTimeUtils;
import org.apache.hudi.stats.ValueType;
import org.apache.hudi.utilities.exception.HoodieJsonToRowConversionException;
@@ -37,7 +38,6 @@ import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
@@ -49,6 +49,7 @@ import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.sql.Timestamp;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
@@ -62,23 +63,16 @@ import static
org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
-public class TestMercifulJsonToRowConverter extends
MercifulJsonConverterTestBase {
+public abstract class TestMercifulJsonToRowConverterBase extends
MercifulJsonConverterTestBase {
private static final ObjectMapper MAPPER = new ObjectMapper();
- private static final MercifulJsonToRowConverter CONVERTER = new
MercifulJsonToRowConverter(true, "__");
private static final String SIMPLE_AVRO_WITH_DEFAULT =
"/schema/simple-test-with-default-value.avsc";
- protected static SparkSession spark;
+ protected abstract MercifulJsonToRowConverter getConverter();
- @BeforeAll
- public static void start() {
- spark = SparkSession
- .builder()
- .master("local[*]")
- .appName(TestMercifulJsonToRowConverter.class.getName())
- .config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
- .getOrCreate();
- }
+ protected abstract boolean isJava8ApiEnabled();
+
+ protected static SparkSession spark;
@AfterAll
public static void clear() throws IOException {
@@ -102,7 +96,7 @@ public class TestMercifulJsonToRowConverter extends
MercifulJsonConverterTestBas
values.set(1, number);
values.set(3, color);
Row recRow = RowFactory.create(values.toArray());
- Row realRow = CONVERTER.convertToRow(json, simpleSchema);
+ Row realRow = getConverter().convertToRow(json, simpleSchema);
validateSchemaCompatibility(Collections.singletonList(realRow),
simpleSchema);
assertEquals(recRow, realRow);
}
@@ -126,7 +120,7 @@ public class TestMercifulJsonToRowConverter extends
MercifulJsonConverterTestBas
values.set(2, color);
Row recRow = RowFactory.create(values.toArray());
- Assertions.assertEquals(recRow, CONVERTER.convertToRow(json,
simpleSchema));
+ Assertions.assertEquals(recRow, getConverter().convertToRow(json,
simpleSchema));
}
private static final String DECIMAL_AVRO_FILE_PATH =
"/decimal-logical-type.avsc";
@@ -149,7 +143,7 @@ public class TestMercifulJsonToRowConverter extends
MercifulJsonConverterTestBas
Schema schema = SchemaTestUtil.getSchema(DECIMAL_AVRO_FILE_PATH);
Row expectRow = RowFactory.create(bigDecimal);
- Row realRow = CONVERTER.convertToRow(json, schema);
+ Row realRow = getConverter().convertToRow(json, schema);
validateSchemaCompatibility(Collections.singletonList(realRow), schema);
assertEquals(expectRow, realRow);
}
@@ -178,7 +172,7 @@ public class TestMercifulJsonToRowConverter extends
MercifulJsonConverterTestBas
// Schedule with timestamp same as that of committed instant
assertThrows(HoodieJsonToRowConversionException.class, () -> {
- CONVERTER.convertToRow(json, schema);
+ getConverter().convertToRow(json, schema);
});
}
@@ -222,7 +216,7 @@ public class TestMercifulJsonToRowConverter extends
MercifulJsonConverterTestBas
String json = MAPPER.writeValueAsString(data);
Row expectRow = RowFactory.create(bigDecimal);
- Row realRow = CONVERTER.convertToRow(json, schema);
+ Row realRow = getConverter().convertToRow(json, schema);
validateSchemaCompatibility(Collections.singletonList(realRow), schema);
assertEquals(expectRow, realRow);
}
@@ -241,11 +235,11 @@ public class TestMercifulJsonToRowConverter extends
MercifulJsonConverterTestBas
if (shouldConvert) {
BigDecimal bigDecimal = new BigDecimal(expected);
Row expectedRow = RowFactory.create(bigDecimal);
- Row actualRow = CONVERTER.convertToRow(json, schema);
+ Row actualRow = getConverter().convertToRow(json, schema);
validateSchemaCompatibility(Collections.singletonList(actualRow),
schema);
assertEquals(expectedRow, actualRow);
} else {
- assertThrows(HoodieJsonToRowConversionException.class, () ->
CONVERTER.convertToRow(json, schema));
+ assertThrows(HoodieJsonToRowConversionException.class, () ->
getConverter().convertToRow(json, schema));
}
}
@@ -277,7 +271,7 @@ public class TestMercifulJsonToRowConverter extends
MercifulJsonConverterTestBas
// Duration type is not supported in Row object.
assertThrows(HoodieJsonToRowConversionException.class, () -> {
- CONVERTER.convertToRow(json, schema);
+ getConverter().convertToRow(json, schema);
});
}
@@ -293,7 +287,7 @@ public class TestMercifulJsonToRowConverter extends
MercifulJsonConverterTestBas
Schema schema = SchemaTestUtil.getSchemaFromResourceFilePath(schemaFile);
// Schedule with timestamp same as that of committed instant
assertThrows(HoodieJsonToRowConversionException.class, () -> {
- CONVERTER.convertToRow(json, schema);
+ getConverter().convertToRow(json, schema);
});
}
@@ -320,10 +314,20 @@ public class TestMercifulJsonToRowConverter extends
MercifulJsonConverterTestBas
if (groundTruthRow == null) {
return;
}
- Row rec =
RowFactory.create(java.sql.Date.valueOf(groundTruthRow).toLocalDate());
- Row realRow = CONVERTER.convertToRow(json, schema);
+ Row rec;
+ if (isJava8ApiEnabled()) {
+ rec =
RowFactory.create(java.sql.Date.valueOf(groundTruthRow).toLocalDate());
+ } else {
+ rec = RowFactory.create(java.sql.Date.valueOf(groundTruthRow));
+ }
+
+ Row realRow = getConverter().convertToRow(json, schema);
validateSchemaCompatibility(Collections.singletonList(realRow), schema);
- assertEquals(rec.getLocalDate(0).toString(),
realRow.getLocalDate(0).toString());
+ if (isJava8ApiEnabled()) {
+ assertEquals(rec.getLocalDate(0).toString(),
realRow.getLocalDate(0).toString());
+ } else {
+ assertEquals(rec.getDate(0).toString(), realRow.getDate(0).toString());
+ }
}
/**
@@ -340,7 +344,7 @@ public class TestMercifulJsonToRowConverter extends
MercifulJsonConverterTestBas
data.put("dateField", 1);
String json = MAPPER.writeValueAsString(data);
assertThrows(HoodieJsonToRowConversionException.class, () -> {
- CONVERTER.convertToRow(json, schema);
+ getConverter().convertToRow(json, schema);
});
}
@@ -398,7 +402,7 @@ public class TestMercifulJsonToRowConverter extends
MercifulJsonConverterTestBas
String json = MAPPER.writeValueAsString(data);
Row rec =
RowFactory.create(ValueType.castToLocalTimestampMillis(milliSecOfDay, null),
ValueType.castToLocalTimestampMicros(microSecOfDay, null));
- Row actualRow = CONVERTER.convertToRow(json, schema);
+ Row actualRow = getConverter().convertToRow(json, schema);
validateSchemaCompatibility(Collections.singletonList(actualRow),
schema);
assertEquals(rec, actualRow);
});
@@ -415,7 +419,7 @@ public class TestMercifulJsonToRowConverter extends
MercifulJsonConverterTestBas
String json = MAPPER.writeValueAsString(data);
assertThrows(HoodieJsonToRowConversionException.class, () -> {
- CONVERTER.convertToRow(json, schema);
+ getConverter().convertToRow(json, schema);
});
}
@@ -444,8 +448,14 @@ public class TestMercifulJsonToRowConverter extends
MercifulJsonConverterTestBas
data.put("timestampMicrosField", timeMicro);
String json = MAPPER.writeValueAsString(data);
- Row rec = RowFactory.create(new Timestamp(milliSecOfDay), new
Timestamp(microSecOfDay / 1000));
- Row actualRow = CONVERTER.convertToRow(json, schema);
+ Row rec;
+ if (isJava8ApiEnabled()) {
+ rec = RowFactory.create(Instant.ofEpochMilli(milliSecOfDay),
DateTimeUtils.microsToInstant(microSecOfDay));
+ } else {
+ rec =
RowFactory.create(Timestamp.from(Instant.ofEpochMilli(milliSecOfDay)),
Timestamp.from(DateTimeUtils.microsToInstant(microSecOfDay)));
+ }
+
+ Row actualRow = getConverter().convertToRow(json, schema);
validateSchemaCompatibility(Collections.singletonList(actualRow), schema);
assertEquals(rec, actualRow);
}
@@ -462,7 +472,7 @@ public class TestMercifulJsonToRowConverter extends
MercifulJsonConverterTestBas
// Schedule with timestamp same as that of committed instant
assertThrows(HoodieJsonToRowConversionException.class, () -> {
- CONVERTER.convertToRow(json, schema);
+ getConverter().convertToRow(json, schema);
});
}
@@ -491,7 +501,7 @@ public class TestMercifulJsonToRowConverter extends
MercifulJsonConverterTestBas
String json = MAPPER.writeValueAsString(data);
Row rec = RowFactory.create(microSecOfDay, milliSecOfDay);
- Row realRow = CONVERTER.convertToRow(json, schema);
+ Row realRow = getConverter().convertToRow(json, schema);
validateSchemaCompatibility(Collections.singletonList(realRow), schema);
assertEquals(rec.get(0).toString(), realRow.get(0).toString());
assertEquals(rec.get(1).toString(), realRow.get(1).toString());
@@ -510,7 +520,7 @@ public class TestMercifulJsonToRowConverter extends
MercifulJsonConverterTestBas
data.put("timeMillisField", invalidInput);
// Schedule with timestamp same as that of committed instant
assertThrows(HoodieJsonToRowConversionException.class, () -> {
- CONVERTER.convertToRow(MAPPER.writeValueAsString(data), schema);
+ getConverter().convertToRow(MAPPER.writeValueAsString(data), schema);
});
data.clear();
@@ -518,7 +528,7 @@ public class TestMercifulJsonToRowConverter extends
MercifulJsonConverterTestBas
data.put("timeMillisField", validInput);
// Schedule with timestamp same as that of committed instant
assertThrows(HoodieJsonToRowConversionException.class, () -> {
- CONVERTER.convertToRow(MAPPER.writeValueAsString(data), schema);
+ getConverter().convertToRow(MAPPER.writeValueAsString(data), schema);
});
}
@@ -542,7 +552,7 @@ public class TestMercifulJsonToRowConverter extends
MercifulJsonConverterTestBas
String json = MAPPER.writeValueAsString(data);
Row rec = RowFactory.create(uuid);
- Row real = CONVERTER.convertToRow(json, schema);
+ Row real = getConverter().convertToRow(json, schema);
validateSchemaCompatibility(Collections.singletonList(real), schema);
assertEquals(rec, real);
}
@@ -559,7 +569,7 @@ public class TestMercifulJsonToRowConverter extends
MercifulJsonConverterTestBas
Schema nestedSchema = new Schema.Parser().parse(nestedSchemaStr);
Row expected = RowFactory.create("Jane Smith",
RowFactory.create(contactInput));
- Row real = CONVERTER.convertToRow(json, nestedSchema);
+ Row real = getConverter().convertToRow(json, nestedSchema);
validateSchemaCompatibility(Collections.singletonList(real), nestedSchema);
assertEquals(expected, real);
}
@@ -585,7 +595,7 @@ public class TestMercifulJsonToRowConverter extends
MercifulJsonConverterTestBas
values.set(1, number);
values.set(2, color);
Row expected = RowFactory.create(values.toArray());
- Row actual = CONVERTER.convertToRow(json, sanitizedSchema);
+ Row actual = getConverter().convertToRow(json, sanitizedSchema);
validateSchemaCompatibility(Collections.singletonList(actual),
sanitizedSchema);
assertEquals(expected, actual);
}
@@ -613,7 +623,7 @@ public class TestMercifulJsonToRowConverter extends
MercifulJsonConverterTestBas
values.set(2, color);
values.set(3, unmatched);
Row recRow = RowFactory.create(values.toArray());
- Row realRow = CONVERTER.convertToRow(json, sanitizedSchema);
+ Row realRow = getConverter().convertToRow(json, sanitizedSchema);
validateSchemaCompatibility(Collections.singletonList(realRow),
sanitizedSchema);
assertEquals(recRow, realRow);
}
@@ -638,7 +648,7 @@ public class TestMercifulJsonToRowConverter extends
MercifulJsonConverterTestBas
String json = MAPPER.writeValueAsString(data);
Schema tripSchema = new Schema.Parser().parse(
TRIP_ENCODED_DECIMAL_SCHEMA.replace("6",
Integer.toString(scale)).replace("10", Integer.toString(precision)));
- Row rec = CONVERTER.convertToRow(json, tripSchema);
+ Row rec = getConverter().convertToRow(json, tripSchema);
validateSchemaCompatibility(Collections.singletonList(rec), tripSchema);
BigDecimal actualField =
rec.getDecimal(tripSchema.getField("decfield").pos());
assertEquals(decfield, actualField);
@@ -669,7 +679,7 @@ public class TestMercifulJsonToRowConverter extends
MercifulJsonConverterTestBas
data.put("fare", rand.nextDouble() * 100);
data.put("_hoodie_is_deleted", false);
String json = MAPPER.writeValueAsString(data);
- Row rec = CONVERTER.convertToRow(json, postProcessSchema);
+ Row rec = getConverter().convertToRow(json, postProcessSchema);
BigDecimal actualField =
rec.getDecimal(postProcessSchema.getField("decfield").pos());
validateSchemaCompatibility(Collections.singletonList(rec),
postProcessSchema);
assertEquals(decfield, actualField);
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestMercifulJsonToRowConverterJava8Api.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestMercifulJsonToRowConverterJava8Api.java
new file mode 100644
index 000000000000..5063dedc7cc1
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestMercifulJsonToRowConverterJava8Api.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.BeforeAll;
+
+public class TestMercifulJsonToRowConverterJava8Api extends
TestMercifulJsonToRowConverterBase {
+
+
+ private static final MercifulJsonToRowConverter CONVERTER = new
MercifulJsonToRowConverter(true, "__", true);
+
+ @BeforeAll
+ public static void start() {
+ spark = SparkSession
+ .builder()
+ .master("local[*]")
+ .appName(TestMercifulJsonToRowConverterBase.class.getName())
+ .config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
+ .config("spark.sql.datetime.java8API.enabled", "true")
+ .getOrCreate();
+ }
+
+ @Override
+ protected MercifulJsonToRowConverter getConverter() {
+ return CONVERTER;
+ }
+
+ @Override
+ protected boolean isJava8ApiEnabled() {
+ return true;
+ }
+}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestMercifulJsonToRowConverterLegacyApi.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestMercifulJsonToRowConverterLegacyApi.java
new file mode 100644
index 000000000000..8268bbb37b2b
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestMercifulJsonToRowConverterLegacyApi.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.BeforeAll;
+
+public class TestMercifulJsonToRowConverterLegacyApi extends
TestMercifulJsonToRowConverterBase {
+
+ private static final MercifulJsonToRowConverter CONVERTER = new
MercifulJsonToRowConverter(true, "__", false);
+
+ @BeforeAll
+ public static void start() {
+ spark = SparkSession
+ .builder()
+ .master("local[*]")
+ .appName(TestMercifulJsonToRowConverterBase.class.getName())
+ .config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
+ .config("spark.sql.datetime.java8API.enabled", "false")
+ .getOrCreate();
+ }
+
+ @Override
+ protected MercifulJsonToRowConverter getConverter() {
+ return CONVERTER;
+ }
+
+ @Override
+ protected boolean isJava8ApiEnabled() {
+ return false;
+ }
+}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
index 76a1a6453670..46e9979bfbc5 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
@@ -81,6 +81,10 @@ public abstract class AbstractDFSSourceTestBase extends
UtilitiesTestBase {
protected abstract Source prepareDFSSource(TypedProperties props);
+ protected Option<TypedProperties> getSourceFormatAdapterProps() {
+ return Option.empty();
+ }
+
/**
* Writes test data, i.e., a {@link List} of {@link HoodieRecord}, to a file
on DFS.
*
@@ -111,7 +115,7 @@ public abstract class AbstractDFSSourceTestBase extends
UtilitiesTestBase {
@Test
public void testReadingFromSource() throws IOException {
fs.mkdirs(new Path(dfsRoot));
- SourceFormatAdapter sourceFormatAdapter = new
SourceFormatAdapter(prepareDFSSource());
+ SourceFormatAdapter sourceFormatAdapter = new
SourceFormatAdapter(prepareDFSSource(), Option.empty(),
getSourceFormatAdapterProps());
// 1. Extract without any checkpoint => get all the data, respecting
sourceLimit
assertEquals(Option.empty(),
@@ -136,6 +140,9 @@ public abstract class AbstractDFSSourceTestBase extends
UtilitiesTestBase {
.createDataFrame(JavaRDD.toRDD(fetch1.getBatch().get()),
schemaProvider.getSourceSchema().toString(), sparkSession);
assertEquals(100, fetch1Rows.count());
+ // city_to_state can't be in except because it is a map
+ assertEquals(0, fetch1AsRows.getBatch().get().drop("city_to_state")
+ .except(fetch1Rows.drop("city_to_state")).count());
// 2. Produce new data, extract new data
generateOneFile("2", "001", 10000);