This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b420850430a [HUDI-7237] Hudi Streamer: Handle edge case with null
schema, minor cleanups (#10342)
b420850430a is described below
commit b420850430a19cee45abed5aedbbd8569eaecd43
Author: Tim Brown <[email protected]>
AuthorDate: Tue Jan 23 18:53:22 2024 -0600
[HUDI-7237] Hudi Streamer: Handle edge case with null schema, minor
cleanups (#10342)
---
.../schema/utils/AvroSchemaEvolutionUtils.java | 2 +-
.../schema/SchemaProviderWithPostProcessor.java | 13 ++-
.../apache/hudi/utilities/sources/InputBatch.java | 8 +-
.../apache/hudi/utilities/streamer/StreamSync.java | 89 +++++++++---------
.../deltastreamer/TestHoodieDeltaStreamer.java | 101 ++++++++++++++++-----
.../deltastreamer/TestSourceFormatAdapter.java | 2 +-
6 files changed, 139 insertions(+), 76 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java
index c09b21ad3ae..e714d99f0e0 100644
---
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java
@@ -144,7 +144,7 @@ public class AvroSchemaEvolutionUtils {
return sourceSchema;
}
- if (sourceSchema.getType() == Schema.Type.NULL ||
sourceSchema.getFields().isEmpty()) {
+ if (sourceSchema == null || sourceSchema.getType() == Schema.Type.NULL ||
sourceSchema.getFields().isEmpty()) {
return targetSchema;
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProviderWithPostProcessor.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProviderWithPostProcessor.java
index bd5bae4601d..c1965e86989 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProviderWithPostProcessor.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProviderWithPostProcessor.java
@@ -18,9 +18,10 @@
package org.apache.hudi.utilities.schema;
-import org.apache.avro.Schema;
import org.apache.hudi.common.util.Option;
+import org.apache.avro.Schema;
+
/**
* A schema provider which applies schema post process hook on schema.
*/
@@ -38,14 +39,16 @@ public class SchemaProviderWithPostProcessor extends
SchemaProvider {
@Override
public Schema getSourceSchema() {
- return schemaPostProcessor.map(processor ->
processor.processSchema(schemaProvider.getSourceSchema()))
- .orElse(schemaProvider.getSourceSchema());
+ Schema sourceSchema = schemaProvider.getSourceSchema();
+ return schemaPostProcessor.map(processor ->
processor.processSchema(sourceSchema))
+ .orElse(sourceSchema);
}
@Override
public Schema getTargetSchema() {
- return schemaPostProcessor.map(processor ->
processor.processSchema(schemaProvider.getTargetSchema()))
- .orElse(schemaProvider.getTargetSchema());
+ Schema targetSchema = schemaProvider.getTargetSchema();
+ return schemaPostProcessor.map(processor ->
processor.processSchema(targetSchema))
+ .orElse(targetSchema);
}
public SchemaProvider getOriginalSchemaProvider() {
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java
index 04e3a574dc5..206909317fc 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java
@@ -55,12 +55,16 @@ public class InputBatch<T> {
if (batch.isPresent() && schemaProvider == null) {
throw new HoodieException("Please provide a valid schema provider
class!");
}
- return Option.ofNullable(schemaProvider).orElse(new NullSchemaProvider());
+ return
Option.ofNullable(schemaProvider).orElseGet(NullSchemaProvider::getInstance);
}
public static class NullSchemaProvider extends SchemaProvider {
+ private static final NullSchemaProvider INSTANCE = new
NullSchemaProvider();
+ public static NullSchemaProvider getInstance() {
+ return INSTANCE;
+ }
- public NullSchemaProvider() {
+ private NullSchemaProvider() {
this(null, null);
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 0407bea1935..8d629299684 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -274,18 +274,16 @@ public class StreamSync implements Serializable,
Closeable {
this.processedSchema = new SchemaSet();
this.autoGenerateRecordKeys =
KeyGenUtils.enableAutoGenerateRecordKeys(props);
this.keyGenClassName = getKeyGeneratorClassName(new
TypedProperties(props));
- refreshTimeline();
- // Register User Provided schema first
- registerAvroSchemas(schemaProvider);
-
-
- this.metrics = (HoodieIngestionMetrics)
ReflectionUtils.loadClass(cfg.ingestionMetricsClass,
getHoodieClientConfig(this.schemaProvider));
- this.hoodieMetrics = new
HoodieMetrics(getHoodieClientConfig(this.schemaProvider));
this.conf = conf;
+
+ HoodieWriteConfig hoodieWriteConfig = getHoodieClientConfig();
+ this.metrics = (HoodieIngestionMetrics)
ReflectionUtils.loadClass(cfg.ingestionMetricsClass, hoodieWriteConfig);
+ this.hoodieMetrics = new HoodieMetrics(hoodieWriteConfig);
if (props.getBoolean(ERROR_TABLE_ENABLED.key(),
ERROR_TABLE_ENABLED.defaultValue())) {
this.errorTableWriter = ErrorTableUtils.getErrorTableWriter(cfg,
sparkSession, props, hoodieSparkContext, fs);
this.errorWriteFailureStrategy =
ErrorTableUtils.getErrorWriteFailureStrategy(props);
}
+ refreshTimeline();
Source source = UtilHelpers.createSource(cfg.sourceClassName, props,
hoodieSparkContext.jsc(), sparkSession, schemaProvider, metrics);
this.formatAdapter = new SourceFormatAdapter(source,
this.errorTableWriter, Option.of(props));
@@ -309,7 +307,7 @@ public class StreamSync implements Serializable, Closeable {
if (fs.exists(new Path(cfg.targetBasePath))) {
try {
HoodieTableMetaClient meta = HoodieTableMetaClient.builder()
- .setConf(new Configuration(fs.getConf()))
+ .setConf(conf)
.setBasePath(cfg.targetBasePath)
.setPayloadClassName(cfg.payloadClassName)
.setRecordMergerStrategy(props.getProperty(HoodieWriteConfig.RECORD_MERGER_STRATEGY.key(),
HoodieWriteConfig.RECORD_MERGER_STRATEGY.defaultValue()))
@@ -337,7 +335,7 @@ public class StreamSync implements Serializable, Closeable {
LOG.warn("Base path exists, but table is not fully initialized.
Re-initializing again");
initializeEmptyTable();
// reload the timeline from metaClient and validate that its empty
table. If there are any instants found, then we should fail the pipeline, bcoz
hoodie.properties got deleted by mistake.
- HoodieTableMetaClient metaClientToValidate =
HoodieTableMetaClient.builder().setConf(new
Configuration(fs.getConf())).setBasePath(cfg.targetBasePath).build();
+ HoodieTableMetaClient metaClientToValidate =
HoodieTableMetaClient.builder().setConf(conf).setBasePath(cfg.targetBasePath).build();
if (metaClientToValidate.reloadActiveTimeline().countInstants() >
0) {
// Deleting the recreated hoodie.properties and throwing
exception.
fs.delete(new Path(String.format("%s%s/%s",
basePathWithForwardSlash, HoodieTableMetaClient.METAFOLDER_NAME,
HoodieTableConfig.HOODIE_PROPERTIES_FILE)));
@@ -395,7 +393,7 @@ public class StreamSync implements Serializable, Closeable {
// Refresh Timeline
refreshTimeline();
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
- .setConf(new Configuration(fs.getConf()))
+ .setConf(conf)
.setBasePath(cfg.targetBasePath)
.setRecordMergerStrategy(props.getProperty(HoodieWriteConfig.RECORD_MERGER_STRATEGY.key(),
HoodieWriteConfig.RECORD_MERGER_STRATEGY.defaultValue()))
.setTimeGeneratorConfig(HoodieTimeGeneratorConfig.newBuilder().fromProperties(props).withPath(cfg.targetBasePath).build())
@@ -432,7 +430,7 @@ public class StreamSync implements Serializable, Closeable {
}
// complete the pending compaction before writing to sink
- if (cfg.retryLastPendingInlineCompactionJob &&
getHoodieClientConfig(this.schemaProvider).inlineCompactionEnabled()) {
+ if (cfg.retryLastPendingInlineCompactionJob &&
writeClient.getConfig().inlineCompactionEnabled()) {
Option<String> pendingCompactionInstant =
getLastPendingCompactionInstant(allCommitsTimelineOpt);
if (pendingCompactionInstant.isPresent()) {
HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata =
writeClient.compact(pendingCompactionInstant.get());
@@ -440,7 +438,7 @@ public class StreamSync implements Serializable, Closeable {
refreshTimeline();
reInitWriteClient(schemaProvider.getSourceSchema(),
schemaProvider.getTargetSchema(), null);
}
- } else if (cfg.retryLastPendingInlineClusteringJob &&
getHoodieClientConfig(this.schemaProvider).inlineClusteringEnabled()) {
+ } else if (cfg.retryLastPendingInlineClusteringJob &&
writeClient.getConfig().inlineClusteringEnabled()) {
// complete the pending clustering before writing to sink
Option<String> pendingClusteringInstant =
getLastPendingClusteringInstant(allCommitsTimelineOpt);
if (pendingClusteringInstant.isPresent()) {
@@ -1001,7 +999,7 @@ public class StreamSync implements Serializable, Closeable
{
* this constraint.
*/
private void setupWriteClient(Option<JavaRDD<HoodieRecord>> recordsOpt)
throws IOException {
- if ((null != schemaProvider)) {
+ if (null != schemaProvider) {
Schema sourceSchema = schemaProvider.getSourceSchema();
Schema targetSchema = schemaProvider.getTargetSchema();
reInitWriteClient(sourceSchema, targetSchema, recordsOpt);
@@ -1013,8 +1011,9 @@ public class StreamSync implements Serializable,
Closeable {
if (HoodieStreamerUtils.isDropPartitionColumns(props)) {
targetSchema = HoodieAvroUtils.removeFields(targetSchema,
HoodieStreamerUtils.getPartitionColumns(props));
}
- registerAvroSchemas(sourceSchema, targetSchema);
- final HoodieWriteConfig initialWriteConfig =
getHoodieClientConfig(targetSchema);
+ final Pair<HoodieWriteConfig, Schema> initialWriteConfigAndSchema =
getHoodieClientConfigAndWriterSchema(targetSchema, true);
+ final HoodieWriteConfig initialWriteConfig =
initialWriteConfigAndSchema.getLeft();
+ registerAvroSchemas(sourceSchema, initialWriteConfigAndSchema.getRight());
final HoodieWriteConfig writeConfig = SparkSampleWritesUtils
.getWriteConfigWithRecordSizeEstimate(hoodieSparkContext.jsc(),
recordsOpt, initialWriteConfig)
.orElse(initialWriteConfig);
@@ -1036,20 +1035,21 @@ public class StreamSync implements Serializable,
Closeable {
}
/**
- * Helper to construct Write Client config.
- *
- * @param schemaProvider Schema Provider
+ * Helper to construct Write Client config without a schema.
*/
- private HoodieWriteConfig getHoodieClientConfig(SchemaProvider
schemaProvider) {
- return getHoodieClientConfig(schemaProvider != null ?
schemaProvider.getTargetSchema() : null);
+ private HoodieWriteConfig getHoodieClientConfig() {
+ return getHoodieClientConfigAndWriterSchema(null, false).getLeft();
}
/**
* Helper to construct Write Client config.
*
- * @param schema Schema
+ * @param schema initial writer schema. If null or Avro Null type, the
schema will be fetched from previous commit metadata for the table.
+ * @param requireSchemaInConfig whether the schema should be present in the
config. This is an optimization to avoid fetching schema from previous commits
if not needed.
+ *
+ * @return Pair of HoodieWriteConfig and writer schema.
*/
- private HoodieWriteConfig getHoodieClientConfig(Schema schema) {
+ private Pair<HoodieWriteConfig, Schema>
getHoodieClientConfigAndWriterSchema(Schema schema, boolean
requireSchemaInConfig) {
final boolean combineBeforeUpsert = true;
final boolean autoCommit = false;
@@ -1075,8 +1075,13 @@ public class StreamSync implements Serializable,
Closeable {
.withAutoCommit(autoCommit)
.withProps(props);
- if (schema != null) {
- builder.withSchema(getSchemaForWriteConfig(schema).toString());
+ // If schema is required in the config, we need to handle the case where
the target schema is null and should be fetched from previous commits
+ final Schema returnSchema;
+ if (requireSchemaInConfig) {
+ returnSchema = getSchemaForWriteConfig(schema);
+ builder.withSchema(returnSchema.toString());
+ } else {
+ returnSchema = schema;
}
HoodieWriteConfig config = builder.build();
@@ -1108,30 +1113,28 @@ public class StreamSync implements Serializable,
Closeable {
String.format("%s should be set to %s", COMBINE_BEFORE_INSERT.key(),
cfg.filterDupes));
ValidationUtils.checkArgument(config.shouldCombineBeforeUpsert(),
String.format("%s should be set to %s", COMBINE_BEFORE_UPSERT.key(),
combineBeforeUpsert));
- return config;
+ return Pair.of(config, returnSchema);
}
private Schema getSchemaForWriteConfig(Schema targetSchema) {
Schema newWriteSchema = targetSchema;
try {
- if (targetSchema != null) {
- // check if targetSchema is equal to NULL schema
- if (SchemaCompatibility.checkReaderWriterCompatibility(targetSchema,
InputBatch.NULL_SCHEMA).getType() ==
SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE
- &&
SchemaCompatibility.checkReaderWriterCompatibility(InputBatch.NULL_SCHEMA,
targetSchema).getType() ==
SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE) {
- // target schema is null. fetch schema from commit metadata and use
it
- HoodieTableMetaClient meta =
HoodieTableMetaClient.builder().setConf(new Configuration(fs.getConf()))
- .setBasePath(cfg.targetBasePath)
- .setPayloadClassName(cfg.payloadClassName)
- .build();
- int totalCompleted =
meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants();
- if (totalCompleted > 0) {
- TableSchemaResolver schemaResolver = new TableSchemaResolver(meta);
- Option<Schema> tableSchema =
schemaResolver.getTableAvroSchemaIfPresent(false);
- if (tableSchema.isPresent()) {
- newWriteSchema = tableSchema.get();
- } else {
- LOG.warn("Could not fetch schema from table. Falling back to
using target schema from schema provider");
- }
+ // check if targetSchema is equal to NULL schema
+ if (targetSchema == null ||
(SchemaCompatibility.checkReaderWriterCompatibility(targetSchema,
InputBatch.NULL_SCHEMA).getType() ==
SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE
+ &&
SchemaCompatibility.checkReaderWriterCompatibility(InputBatch.NULL_SCHEMA,
targetSchema).getType() ==
SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE)) {
+ // target schema is null. fetch schema from commit metadata and use it
+ HoodieTableMetaClient meta =
HoodieTableMetaClient.builder().setConf(conf)
+ .setBasePath(cfg.targetBasePath)
+ .setPayloadClassName(cfg.payloadClassName)
+ .build();
+ int totalCompleted =
meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants();
+ if (totalCompleted > 0) {
+ TableSchemaResolver schemaResolver = new TableSchemaResolver(meta);
+ Option<Schema> tableSchema =
schemaResolver.getTableAvroSchemaIfPresent(false);
+ if (tableSchema.isPresent()) {
+ newWriteSchema = tableSchema.get();
+ } else {
+ LOG.warn("Could not fetch schema from table. Falling back to using
target schema from schema provider");
}
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 7217993cc9c..0674207ae8b 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -188,7 +188,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
if (type == HoodieRecordType.SPARK) {
Map<String, String> opts = new HashMap<>();
opts.put(HoodieWriteConfig.RECORD_MERGER_IMPLS.key(),
HoodieSparkRecordMerger.class.getName());
- opts.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(),"parquet");
+ opts.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet");
for (Map.Entry<String, String> entry : opts.entrySet()) {
hoodieConfig.add(String.format("%s=%s", entry.getKey(),
entry.getValue()));
}
@@ -206,7 +206,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
}
protected HoodieDeltaStreamer initialHoodieDeltaStreamer(String
tableBasePath, int totalRecords, String asyncCluster, HoodieRecordType
recordType,
-
WriteOperationType writeOperationType, Set<String> customConfigs) throws
IOException {
+ WriteOperationType
writeOperationType, Set<String> customConfigs) throws IOException {
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
writeOperationType);
addRecordMerger(recordType, cfg.configs);
cfg.continuousMode = true;
@@ -465,16 +465,16 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
// Initial bulk insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.BULK_INSERT);
addRecordMerger(recordType, cfg.configs);
- syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1);
+ syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1);
// No new data => no commits.
cfg.sourceLimit = 0;
- syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1);
+ syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1);
// upsert() #1
cfg.sourceLimit = 2000;
cfg.operation = WriteOperationType.UPSERT;
- syncAndAssertRecordCount(cfg,1950, tableBasePath, "00001", 2);
+ syncAndAssertRecordCount(cfg, 1950, tableBasePath, "00001", 2);
List<Row> counts = countsPerCommit(tableBasePath, sqlContext);
assertEquals(1950, counts.stream().mapToLong(entry ->
entry.getLong(1)).sum());
@@ -534,7 +534,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg.sourceLimit = 2000;
cfg.operation = WriteOperationType.UPSERT;
cfg.configs.add(HoodieTableConfig.RECORDKEY_FIELDS.key() +
"=differentval");
- assertThrows(HoodieException.class, () ->
syncAndAssertRecordCount(cfg,1000,tableBasePath,"00000",1));
+ assertThrows(HoodieException.class, () -> syncAndAssertRecordCount(cfg,
1000, tableBasePath, "00000", 1));
List<Row> counts = countsPerCommit(tableBasePath, sqlContext);
assertEquals(1000, counts.stream().mapToLong(entry ->
entry.getLong(1)).sum());
@@ -647,7 +647,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
@ParameterizedTest
@EnumSource(value = HoodieRecordType.class, names = {"AVRO", "SPARK"})
public void testUpsertsCOW_ContinuousModeDisabled(HoodieRecordType
recordType) throws Exception {
- String tableBasePath = basePath + "/non_continuous_cow";
+ String tableBasePath = basePath + "/non_continuous_cow";
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.UPSERT);
addRecordMerger(recordType, cfg.configs);
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
@@ -678,7 +678,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
@ParameterizedTest
@EnumSource(value = HoodieRecordType.class, names = {"AVRO", "SPARK"})
public void testUpsertsMOR_ContinuousModeDisabled(HoodieRecordType
recordType) throws Exception {
- String tableBasePath = basePath + "/non_continuous_mor";
+ String tableBasePath = basePath + "/non_continuous_mor";
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.UPSERT);
addRecordMerger(recordType, cfg.configs);
cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
@@ -846,7 +846,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
prepareParquetDFSSource(false, false, "source.avsc", "target.avsc",
PROPS_FILENAME_TEST_PARQUET,
PARQUET_SOURCE_ROOT, false, "partition_path", "", extraProps);
String tableBasePath = basePath + "test_parquet_table" + testNum;
- HoodieDeltaStreamer.Config deltaCfg =
TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
ParquetDFSSource.class.getName(),
+ HoodieDeltaStreamer.Config deltaCfg =
TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
ParquetDFSSource.class.getName(),
null, PROPS_FILENAME_TEST_PARQUET, false,
false, 100000, false, null, "MERGE_ON_READ", "timestamp", null);
deltaCfg.retryLastPendingInlineCompactionJob = false;
@@ -995,7 +995,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
private List<String> getAllMultiWriterConfigs() {
List<String> configs = new ArrayList<>();
configs.add(String.format("%s=%s",
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
InProcessLockProvider.class.getCanonicalName()));
- configs.add(String.format("%s=%s",
LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000"));
+ configs.add(String.format("%s=%s",
LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000"));
configs.add(String.format("%s=%s",
HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name()));
configs.add(String.format("%s=%s",
HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(),
HoodieFailedWritesCleaningPolicy.LAZY.name()));
return configs;
@@ -1041,7 +1041,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
}
@ParameterizedTest
- @EnumSource(value = HoodieRecordType.class, names = {"AVRO","SPARK"})
+ @EnumSource(value = HoodieRecordType.class, names = {"AVRO", "SPARK"})
public void testHoodieIndexer(HoodieRecordType recordType) throws Exception {
String tableBasePath = basePath + "/asyncindexer";
HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 1000,
"false", recordType, WriteOperationType.INSERT,
@@ -1429,7 +1429,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
int counter = 2;
while (counter < 100) { // lets keep going. if the test times out, we
will cancel the future within finally. So, safe to generate 100 batches.
LOG.info("Generating data for batch " + counter);
- prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT,
Integer.toString(counter) + ".parquet", false, null, null);
+ prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT,
Integer.toString(counter) + ".parquet", false, null, null);
counter++;
Thread.sleep(2000);
}
@@ -1474,9 +1474,9 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
* 1 ===============> HUDI TABLE 2 (incr-pull with transform) (incr-pull)
Hudi Table 1 is synced with Hive.
*/
@ParameterizedTest
- @EnumSource(value = HoodieRecordType.class, names = {"AVRO","SPARK"})
+ @EnumSource(value = HoodieRecordType.class, names = {"AVRO", "SPARK"})
public void
testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline(HoodieRecordType
recordType) throws Exception {
- String tableBasePath = basePath + "/" + recordType.toString() +
"/test_table2";
+ String tableBasePath = basePath + "/" + recordType.toString() +
"/test_table2";
String downstreamTableBasePath = basePath + "/" + recordType.toString() +
"/test_downstream_table2";
// Initial bulk insert to ingest to first hudi table
@@ -1599,8 +1599,8 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
public void testPartialPayloadClass() throws Exception {
String dataSetBasePath = basePath + "/test_dataset_mor";
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath,
WriteOperationType.BULK_INSERT,
- Collections.singletonList(SqlQueryBasedTransformer.class.getName()),
PROPS_FILENAME_TEST_SOURCE, false,
- true, true, PartialUpdateAvroPayload.class.getName(),
"MERGE_ON_READ");
+ Collections.singletonList(SqlQueryBasedTransformer.class.getName()),
PROPS_FILENAME_TEST_SOURCE, false,
+ true, true, PartialUpdateAvroPayload.class.getName(), "MERGE_ON_READ");
new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
assertRecordCount(1000, dataSetBasePath, sqlContext);
@@ -1831,7 +1831,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
prepareJsonKafkaDFSSource(propsFileName, autoResetValue, topicName, null,
false);
}
- private void prepareJsonKafkaDFSSource(String propsFileName, String
autoResetValue, String topicName, Map<String,String> extraProps, boolean
shouldAddOffsets) throws IOException {
+ private void prepareJsonKafkaDFSSource(String propsFileName, String
autoResetValue, String topicName, Map<String, String> extraProps, boolean
shouldAddOffsets) throws IOException {
// Properties used for testing delta-streamer with JsonKafka source
TypedProperties props = new TypedProperties();
populateAllCommonProps(props, basePath, testUtils.brokerAddress());
@@ -2032,7 +2032,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
ObjectMapper objectMapper = new ObjectMapper();
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(metaClient.getCommitsTimeline().getInstantDetails(instants.get(0)).get(),
HoodieCommitMetadata.class);
- Map<String,String> checkpointVals =
objectMapper.readValue(commitMetadata.getExtraMetadata().get(CHECKPOINT_KEY),
Map.class);
+ Map<String, String> checkpointVals =
objectMapper.readValue(commitMetadata.getExtraMetadata().get(CHECKPOINT_KEY),
Map.class);
String parquetFirstcheckpoint = checkpointVals.get("parquet");
assertNotNull(parquetFirstcheckpoint);
@@ -2048,7 +2048,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
checkpointVals =
objectMapper.readValue(commitMetadata.getExtraMetadata().get(CHECKPOINT_KEY),
Map.class);
String parquetSecondCheckpoint = checkpointVals.get("parquet");
assertNotNull(parquetSecondCheckpoint);
- assertEquals(kafkaCheckpoint,checkpointVals.get("kafka"));
+ assertEquals(kafkaCheckpoint, checkpointVals.get("kafka"));
assertTrue(Long.parseLong(parquetSecondCheckpoint) >
Long.parseLong(parquetFirstcheckpoint));
parquetDs.shutdownGracefully();
kafkaDs.shutdownGracefully();
@@ -2074,6 +2074,43 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
testParquetDFSSource(false, null, true);
}
+ @Test
+ public void testEmptyBatchWithNullSchemaValue() throws Exception {
+ PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum;
+ int parquetRecordsCount = 10;
+ prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT,
FIRST_PARQUET_FILE_NAME, false, null, null);
+ prepareParquetDFSSource(false, false, "source.avsc", "target.avsc",
PROPS_FILENAME_TEST_PARQUET,
+ PARQUET_SOURCE_ROOT, false, "partition_path", "0");
+
+ String tableBasePath = basePath + "/test_parquet_table" + testNum;
+ HoodieDeltaStreamer.Config config = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.INSERT, ParquetDFSSource.class.getName(),
+ null, PROPS_FILENAME_TEST_PARQUET, false,
+ false, 100000, false, null, null, "timestamp", null);
+ HoodieDeltaStreamer deltaStreamer1 = new HoodieDeltaStreamer(config, jsc);
+ deltaStreamer1.sync();
+ assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(jsc.hadoopConfiguration()).build();
+ HoodieInstant firstCommit =
metaClient.getActiveTimeline().lastInstant().get();
+ deltaStreamer1.shutdownGracefully();
+
+ prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "2.parquet", false, null,
null);
+ HoodieDeltaStreamer.Config updatedConfig = config;
+ updatedConfig.schemaProviderClassName =
NullValueSchemaProvider.class.getName();
+ updatedConfig.sourceClassName =
TestParquetDFSSourceEmptyBatch.class.getName();
+ HoodieDeltaStreamer deltaStreamer2 = new
HoodieDeltaStreamer(updatedConfig, jsc);
+ deltaStreamer2.sync();
+ // since we mimic'ed empty batch, total records should be same as first
sync().
+ assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
+
+ // validate schema is set in commit even if target schema returns null on
empty batch
+ TableSchemaResolver tableSchemaResolver = new
TableSchemaResolver(metaClient);
+ HoodieInstant secondCommit =
metaClient.reloadActiveTimeline().lastInstant().get();
+ Schema lastCommitSchema =
tableSchemaResolver.getTableAvroSchema(secondCommit, true);
+ assertNotEquals(firstCommit, secondCommit);
+ assertNotEquals(lastCommitSchema, Schema.create(Schema.Type.NULL));
+ deltaStreamer2.shutdownGracefully();
+ }
+
@Test
public void testDeltaStreamerRestartAfterMissingHoodieProps() throws
Exception {
testDeltaStreamerRestartAfterMissingHoodieProps(true);
@@ -2311,7 +2348,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
sqlSourceProps.setProperty("hoodie.embed.timeline.server", "false");
sqlSourceProps.setProperty("hoodie.datasource.write.recordkey.field",
"_row_key");
sqlSourceProps.setProperty("hoodie.datasource.write.partitionpath.field",
"partition_path");
-
sqlSourceProps.setProperty("hoodie.deltastreamer.source.sql.sql.query","select
* from test_sql_table");
+ sqlSourceProps.setProperty("hoodie.deltastreamer.source.sql.sql.query",
"select * from test_sql_table");
UtilitiesTestBase.Helpers.savePropsToDFS(sqlSourceProps, fs, basePath +
"/" + PROPS_FILENAME_TEST_SQL_SOURCE);
@@ -2537,8 +2574,8 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(basePath +
"/testFetchPreviousCheckpoint", WriteOperationType.BULK_INSERT);
TypedProperties properties = new TypedProperties();
- properties.setProperty("hoodie.datasource.write.recordkey.field","key");
- properties.setProperty("hoodie.datasource.write.partitionpath.field","pp");
+ properties.setProperty("hoodie.datasource.write.recordkey.field", "key");
+ properties.setProperty("hoodie.datasource.write.partitionpath.field",
"pp");
TestStreamSync testDeltaSync = new TestStreamSync(cfg, sparkSession, null,
properties,
jsc, fs, jsc.hadoopConfiguration(), null);
@@ -2579,7 +2616,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(
-
HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(fs.getConf()).build());
+
HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(fs.getConf()).build());
// get schema from data file written in the latest commit
Schema tableSchema = tableSchemaResolver.getTableAvroSchemaFromDataFile();
assertNotNull(tableSchema);
@@ -2758,7 +2795,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
}
@ParameterizedTest
- @CsvSource(value = {"COPY_ON_WRITE, AVRO", "MERGE_ON_READ, AVRO",
+ @CsvSource(value = {"COPY_ON_WRITE, AVRO", "MERGE_ON_READ, AVRO",
"COPY_ON_WRITE, SPARK", "MERGE_ON_READ, SPARK"})
public void testConfigurationHotUpdate(HoodieTableType tableType,
HoodieRecordType recordType) throws Exception {
String tableBasePath = basePath +
String.format("/configurationHotUpdate_%s_%s", tableType.name(),
recordType.name());
@@ -2913,4 +2950,20 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
arguments(true,
Collections.singletonList(TripsWithDistanceTransformer.class.getName()))
);
}
+
+ public static class NullValueSchemaProvider extends SchemaProvider {
+
+ public NullValueSchemaProvider(TypedProperties props) {
+ super(props);
+ }
+
+ public NullValueSchemaProvider(TypedProperties props, JavaSparkContext
jssc) {
+ super(props, jssc);
+ }
+
+ @Override
+ public Schema getSourceSchema() {
+ return null;
+ }
+ }
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
index 30b997e856a..1d6f2f110b2 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
@@ -130,7 +130,7 @@ public class TestSourceFormatAdapter {
@MethodSource("provideDataFiles")
public void testRowSanitization(String unsanitizedDataFile, String
sanitizedDataFile, StructType unsanitizedSchema, StructType sanitizedSchema) {
JavaRDD<String> unsanitizedRDD = jsc.textFile(unsanitizedDataFile);
- SchemaProvider schemaProvider = new InputBatch.NullSchemaProvider();
+ SchemaProvider schemaProvider =
InputBatch.NullSchemaProvider.getInstance();
verifySanitization(fetchRowData(unsanitizedRDD, unsanitizedSchema,
schemaProvider), sanitizedDataFile, sanitizedSchema);
verifySanitization(fetchRowData(unsanitizedRDD, unsanitizedSchema, null),
sanitizedDataFile, sanitizedSchema);