This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b420850430a [HUDI-7237] Hudi Streamer: Handle edge case with null 
schema, minor cleanups (#10342)
b420850430a is described below

commit b420850430a19cee45abed5aedbbd8569eaecd43
Author: Tim Brown <[email protected]>
AuthorDate: Tue Jan 23 18:53:22 2024 -0600

    [HUDI-7237] Hudi Streamer: Handle edge case with null schema, minor 
cleanups (#10342)
---
 .../schema/utils/AvroSchemaEvolutionUtils.java     |   2 +-
 .../schema/SchemaProviderWithPostProcessor.java    |  13 ++-
 .../apache/hudi/utilities/sources/InputBatch.java  |   8 +-
 .../apache/hudi/utilities/streamer/StreamSync.java |  89 +++++++++---------
 .../deltastreamer/TestHoodieDeltaStreamer.java     | 101 ++++++++++++++++-----
 .../deltastreamer/TestSourceFormatAdapter.java     |   2 +-
 6 files changed, 139 insertions(+), 76 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java
index c09b21ad3ae..e714d99f0e0 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java
@@ -144,7 +144,7 @@ public class AvroSchemaEvolutionUtils {
       return sourceSchema;
     }
 
-    if (sourceSchema.getType() == Schema.Type.NULL || 
sourceSchema.getFields().isEmpty()) {
+    if (sourceSchema == null || sourceSchema.getType() == Schema.Type.NULL || 
sourceSchema.getFields().isEmpty()) {
       return targetSchema;
     }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProviderWithPostProcessor.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProviderWithPostProcessor.java
index bd5bae4601d..c1965e86989 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProviderWithPostProcessor.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProviderWithPostProcessor.java
@@ -18,9 +18,10 @@
 
 package org.apache.hudi.utilities.schema;
 
-import org.apache.avro.Schema;
 import org.apache.hudi.common.util.Option;
 
+import org.apache.avro.Schema;
+
 /**
  * A schema provider which applies schema post process hook on schema.
  */
@@ -38,14 +39,16 @@ public class SchemaProviderWithPostProcessor extends 
SchemaProvider {
 
   @Override
   public Schema getSourceSchema() {
-    return schemaPostProcessor.map(processor -> 
processor.processSchema(schemaProvider.getSourceSchema()))
-        .orElse(schemaProvider.getSourceSchema());
+    Schema sourceSchema = schemaProvider.getSourceSchema();
+    return schemaPostProcessor.map(processor -> 
processor.processSchema(sourceSchema))
+        .orElse(sourceSchema);
   }
 
   @Override
   public Schema getTargetSchema() {
-    return schemaPostProcessor.map(processor -> 
processor.processSchema(schemaProvider.getTargetSchema()))
-        .orElse(schemaProvider.getTargetSchema());
+    Schema targetSchema = schemaProvider.getTargetSchema();
+    return schemaPostProcessor.map(processor -> 
processor.processSchema(targetSchema))
+        .orElse(targetSchema);
   }
 
   public SchemaProvider getOriginalSchemaProvider() {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java
index 04e3a574dc5..206909317fc 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java
@@ -55,12 +55,16 @@ public class InputBatch<T> {
     if (batch.isPresent() && schemaProvider == null) {
       throw new HoodieException("Please provide a valid schema provider 
class!");
     }
-    return Option.ofNullable(schemaProvider).orElse(new NullSchemaProvider());
+    return 
Option.ofNullable(schemaProvider).orElseGet(NullSchemaProvider::getInstance);
   }
 
   public static class NullSchemaProvider extends SchemaProvider {
+    private static final NullSchemaProvider INSTANCE = new 
NullSchemaProvider();
+    public static NullSchemaProvider getInstance() {
+      return INSTANCE;
+    }
 
-    public NullSchemaProvider() {
+    private NullSchemaProvider() {
       this(null, null);
     }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 0407bea1935..8d629299684 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -274,18 +274,16 @@ public class StreamSync implements Serializable, 
Closeable {
     this.processedSchema = new SchemaSet();
     this.autoGenerateRecordKeys = 
KeyGenUtils.enableAutoGenerateRecordKeys(props);
     this.keyGenClassName = getKeyGeneratorClassName(new 
TypedProperties(props));
-    refreshTimeline();
-    // Register User Provided schema first
-    registerAvroSchemas(schemaProvider);
-
-
-    this.metrics = (HoodieIngestionMetrics) 
ReflectionUtils.loadClass(cfg.ingestionMetricsClass, 
getHoodieClientConfig(this.schemaProvider));
-    this.hoodieMetrics = new 
HoodieMetrics(getHoodieClientConfig(this.schemaProvider));
     this.conf = conf;
+
+    HoodieWriteConfig hoodieWriteConfig = getHoodieClientConfig();
+    this.metrics = (HoodieIngestionMetrics) 
ReflectionUtils.loadClass(cfg.ingestionMetricsClass, hoodieWriteConfig);
+    this.hoodieMetrics = new HoodieMetrics(hoodieWriteConfig);
     if (props.getBoolean(ERROR_TABLE_ENABLED.key(), 
ERROR_TABLE_ENABLED.defaultValue())) {
       this.errorTableWriter = ErrorTableUtils.getErrorTableWriter(cfg, 
sparkSession, props, hoodieSparkContext, fs);
       this.errorWriteFailureStrategy = 
ErrorTableUtils.getErrorWriteFailureStrategy(props);
     }
+    refreshTimeline();
     Source source = UtilHelpers.createSource(cfg.sourceClassName, props, 
hoodieSparkContext.jsc(), sparkSession, schemaProvider, metrics);
     this.formatAdapter = new SourceFormatAdapter(source, 
this.errorTableWriter, Option.of(props));
 
@@ -309,7 +307,7 @@ public class StreamSync implements Serializable, Closeable {
     if (fs.exists(new Path(cfg.targetBasePath))) {
       try {
         HoodieTableMetaClient meta = HoodieTableMetaClient.builder()
-            .setConf(new Configuration(fs.getConf()))
+            .setConf(conf)
             .setBasePath(cfg.targetBasePath)
             .setPayloadClassName(cfg.payloadClassName)
             
.setRecordMergerStrategy(props.getProperty(HoodieWriteConfig.RECORD_MERGER_STRATEGY.key(),
 HoodieWriteConfig.RECORD_MERGER_STRATEGY.defaultValue()))
@@ -337,7 +335,7 @@ public class StreamSync implements Serializable, Closeable {
             LOG.warn("Base path exists, but table is not fully initialized. 
Re-initializing again");
             initializeEmptyTable();
             // reload the timeline from metaClient and validate that its empty 
table. If there are any instants found, then we should fail the pipeline, bcoz 
hoodie.properties got deleted by mistake.
-            HoodieTableMetaClient metaClientToValidate = 
HoodieTableMetaClient.builder().setConf(new 
Configuration(fs.getConf())).setBasePath(cfg.targetBasePath).build();
+            HoodieTableMetaClient metaClientToValidate = 
HoodieTableMetaClient.builder().setConf(conf).setBasePath(cfg.targetBasePath).build();
             if (metaClientToValidate.reloadActiveTimeline().countInstants() > 
0) {
               // Deleting the recreated hoodie.properties and throwing 
exception.
               fs.delete(new Path(String.format("%s%s/%s", 
basePathWithForwardSlash, HoodieTableMetaClient.METAFOLDER_NAME, 
HoodieTableConfig.HOODIE_PROPERTIES_FILE)));
@@ -395,7 +393,7 @@ public class StreamSync implements Serializable, Closeable {
     // Refresh Timeline
     refreshTimeline();
     HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
-        .setConf(new Configuration(fs.getConf()))
+        .setConf(conf)
         .setBasePath(cfg.targetBasePath)
         
.setRecordMergerStrategy(props.getProperty(HoodieWriteConfig.RECORD_MERGER_STRATEGY.key(),
 HoodieWriteConfig.RECORD_MERGER_STRATEGY.defaultValue()))
         
.setTimeGeneratorConfig(HoodieTimeGeneratorConfig.newBuilder().fromProperties(props).withPath(cfg.targetBasePath).build())
@@ -432,7 +430,7 @@ public class StreamSync implements Serializable, Closeable {
       }
 
       // complete the pending compaction before writing to sink
-      if (cfg.retryLastPendingInlineCompactionJob && 
getHoodieClientConfig(this.schemaProvider).inlineCompactionEnabled()) {
+      if (cfg.retryLastPendingInlineCompactionJob && 
writeClient.getConfig().inlineCompactionEnabled()) {
         Option<String> pendingCompactionInstant = 
getLastPendingCompactionInstant(allCommitsTimelineOpt);
         if (pendingCompactionInstant.isPresent()) {
           HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata = 
writeClient.compact(pendingCompactionInstant.get());
@@ -440,7 +438,7 @@ public class StreamSync implements Serializable, Closeable {
           refreshTimeline();
           reInitWriteClient(schemaProvider.getSourceSchema(), 
schemaProvider.getTargetSchema(), null);
         }
-      } else if (cfg.retryLastPendingInlineClusteringJob && 
getHoodieClientConfig(this.schemaProvider).inlineClusteringEnabled()) {
+      } else if (cfg.retryLastPendingInlineClusteringJob && 
writeClient.getConfig().inlineClusteringEnabled()) {
         // complete the pending clustering before writing to sink
         Option<String> pendingClusteringInstant = 
getLastPendingClusteringInstant(allCommitsTimelineOpt);
         if (pendingClusteringInstant.isPresent()) {
@@ -1001,7 +999,7 @@ public class StreamSync implements Serializable, Closeable 
{
    * this constraint.
    */
   private void setupWriteClient(Option<JavaRDD<HoodieRecord>> recordsOpt) 
throws IOException {
-    if ((null != schemaProvider)) {
+    if (null != schemaProvider) {
       Schema sourceSchema = schemaProvider.getSourceSchema();
       Schema targetSchema = schemaProvider.getTargetSchema();
       reInitWriteClient(sourceSchema, targetSchema, recordsOpt);
@@ -1013,8 +1011,9 @@ public class StreamSync implements Serializable, 
Closeable {
     if (HoodieStreamerUtils.isDropPartitionColumns(props)) {
       targetSchema = HoodieAvroUtils.removeFields(targetSchema, 
HoodieStreamerUtils.getPartitionColumns(props));
     }
-    registerAvroSchemas(sourceSchema, targetSchema);
-    final HoodieWriteConfig initialWriteConfig = 
getHoodieClientConfig(targetSchema);
+    final Pair<HoodieWriteConfig, Schema> initialWriteConfigAndSchema = 
getHoodieClientConfigAndWriterSchema(targetSchema, true);
+    final HoodieWriteConfig initialWriteConfig = 
initialWriteConfigAndSchema.getLeft();
+    registerAvroSchemas(sourceSchema, initialWriteConfigAndSchema.getRight());
     final HoodieWriteConfig writeConfig = SparkSampleWritesUtils
         .getWriteConfigWithRecordSizeEstimate(hoodieSparkContext.jsc(), 
recordsOpt, initialWriteConfig)
         .orElse(initialWriteConfig);
@@ -1036,20 +1035,21 @@ public class StreamSync implements Serializable, 
Closeable {
   }
 
   /**
-   * Helper to construct Write Client config.
-   *
-   * @param schemaProvider Schema Provider
+   * Helper to construct Write Client config without a schema.
    */
-  private HoodieWriteConfig getHoodieClientConfig(SchemaProvider 
schemaProvider) {
-    return getHoodieClientConfig(schemaProvider != null ? 
schemaProvider.getTargetSchema() : null);
+  private HoodieWriteConfig getHoodieClientConfig() {
+    return getHoodieClientConfigAndWriterSchema(null, false).getLeft();
   }
 
   /**
    * Helper to construct Write Client config.
    *
-   * @param schema Schema
+   * @param schema initial writer schema. If null or Avro Null type, the 
schema will be fetched from previous commit metadata for the table.
+   * @param requireSchemaInConfig whether the schema should be present in the 
config. This is an optimization to avoid fetching schema from previous commits 
if not needed.
+   *
+   * @return Pair of HoodieWriteConfig and writer schema.
    */
-  private HoodieWriteConfig getHoodieClientConfig(Schema schema) {
+  private Pair<HoodieWriteConfig, Schema> 
getHoodieClientConfigAndWriterSchema(Schema schema, boolean 
requireSchemaInConfig) {
     final boolean combineBeforeUpsert = true;
     final boolean autoCommit = false;
 
@@ -1075,8 +1075,13 @@ public class StreamSync implements Serializable, 
Closeable {
             .withAutoCommit(autoCommit)
             .withProps(props);
 
-    if (schema != null) {
-      builder.withSchema(getSchemaForWriteConfig(schema).toString());
+    // If schema is required in the config, we need to handle the case where 
the target schema is null and should be fetched from previous commits
+    final Schema returnSchema;
+    if (requireSchemaInConfig) {
+      returnSchema = getSchemaForWriteConfig(schema);
+      builder.withSchema(returnSchema.toString());
+    } else {
+      returnSchema = schema;
     }
 
     HoodieWriteConfig config = builder.build();
@@ -1108,30 +1113,28 @@ public class StreamSync implements Serializable, 
Closeable {
         String.format("%s should be set to %s", COMBINE_BEFORE_INSERT.key(), 
cfg.filterDupes));
     ValidationUtils.checkArgument(config.shouldCombineBeforeUpsert(),
         String.format("%s should be set to %s", COMBINE_BEFORE_UPSERT.key(), 
combineBeforeUpsert));
-    return config;
+    return Pair.of(config, returnSchema);
   }
 
   private Schema getSchemaForWriteConfig(Schema targetSchema) {
     Schema newWriteSchema = targetSchema;
     try {
-      if (targetSchema != null) {
-        // check if targetSchema is equal to NULL schema
-        if (SchemaCompatibility.checkReaderWriterCompatibility(targetSchema, 
InputBatch.NULL_SCHEMA).getType() == 
SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE
-            && 
SchemaCompatibility.checkReaderWriterCompatibility(InputBatch.NULL_SCHEMA, 
targetSchema).getType() == 
SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE) {
-          // target schema is null. fetch schema from commit metadata and use 
it
-          HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(new Configuration(fs.getConf()))
-              .setBasePath(cfg.targetBasePath)
-              .setPayloadClassName(cfg.payloadClassName)
-              .build();
-          int totalCompleted = 
meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants();
-          if (totalCompleted > 0) {
-            TableSchemaResolver schemaResolver = new TableSchemaResolver(meta);
-            Option<Schema> tableSchema = 
schemaResolver.getTableAvroSchemaIfPresent(false);
-            if (tableSchema.isPresent()) {
-              newWriteSchema = tableSchema.get();
-            } else {
-              LOG.warn("Could not fetch schema from table. Falling back to 
using target schema from schema provider");
-            }
+      // check if targetSchema is equal to NULL schema
+      if (targetSchema == null || 
(SchemaCompatibility.checkReaderWriterCompatibility(targetSchema, 
InputBatch.NULL_SCHEMA).getType() == 
SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE
+          && 
SchemaCompatibility.checkReaderWriterCompatibility(InputBatch.NULL_SCHEMA, 
targetSchema).getType() == 
SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE)) {
+        // target schema is null. fetch schema from commit metadata and use it
+        HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(conf)
+            .setBasePath(cfg.targetBasePath)
+            .setPayloadClassName(cfg.payloadClassName)
+            .build();
+        int totalCompleted = 
meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants();
+        if (totalCompleted > 0) {
+          TableSchemaResolver schemaResolver = new TableSchemaResolver(meta);
+          Option<Schema> tableSchema = 
schemaResolver.getTableAvroSchemaIfPresent(false);
+          if (tableSchema.isPresent()) {
+            newWriteSchema = tableSchema.get();
+          } else {
+            LOG.warn("Could not fetch schema from table. Falling back to using 
target schema from schema provider");
           }
         }
       }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 7217993cc9c..0674207ae8b 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -188,7 +188,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     if (type == HoodieRecordType.SPARK) {
       Map<String, String> opts = new HashMap<>();
       opts.put(HoodieWriteConfig.RECORD_MERGER_IMPLS.key(), 
HoodieSparkRecordMerger.class.getName());
-      opts.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(),"parquet");
+      opts.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet");
       for (Map.Entry<String, String> entry : opts.entrySet()) {
         hoodieConfig.add(String.format("%s=%s", entry.getKey(), 
entry.getValue()));
       }
@@ -206,7 +206,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   }
 
   protected HoodieDeltaStreamer initialHoodieDeltaStreamer(String 
tableBasePath, int totalRecords, String asyncCluster, HoodieRecordType 
recordType,
-                                                             
WriteOperationType writeOperationType, Set<String> customConfigs) throws 
IOException {
+                                                           WriteOperationType 
writeOperationType, Set<String> customConfigs) throws IOException {
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
writeOperationType);
     addRecordMerger(recordType, cfg.configs);
     cfg.continuousMode = true;
@@ -465,16 +465,16 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     // Initial bulk insert
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.BULK_INSERT);
     addRecordMerger(recordType, cfg.configs);
-    syncAndAssertRecordCount(cfg, 1000,  tableBasePath,  "00000",  1);
+    syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1);
 
     // No new data => no commits.
     cfg.sourceLimit = 0;
-    syncAndAssertRecordCount(cfg, 1000,  tableBasePath,  "00000",  1);
+    syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1);
 
     // upsert() #1
     cfg.sourceLimit = 2000;
     cfg.operation = WriteOperationType.UPSERT;
-    syncAndAssertRecordCount(cfg,1950, tableBasePath, "00001", 2);
+    syncAndAssertRecordCount(cfg, 1950, tableBasePath, "00001", 2);
     List<Row> counts = countsPerCommit(tableBasePath, sqlContext);
     assertEquals(1950, counts.stream().mapToLong(entry -> 
entry.getLong(1)).sum());
 
@@ -534,7 +534,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.sourceLimit = 2000;
     cfg.operation = WriteOperationType.UPSERT;
     cfg.configs.add(HoodieTableConfig.RECORDKEY_FIELDS.key() + 
"=differentval");
-    assertThrows(HoodieException.class, () -> 
syncAndAssertRecordCount(cfg,1000,tableBasePath,"00000",1));
+    assertThrows(HoodieException.class, () -> syncAndAssertRecordCount(cfg, 
1000, tableBasePath, "00000", 1));
     List<Row> counts = countsPerCommit(tableBasePath, sqlContext);
     assertEquals(1000, counts.stream().mapToLong(entry -> 
entry.getLong(1)).sum());
 
@@ -647,7 +647,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   @ParameterizedTest
   @EnumSource(value = HoodieRecordType.class, names = {"AVRO", "SPARK"})
   public void testUpsertsCOW_ContinuousModeDisabled(HoodieRecordType 
recordType) throws Exception {
-    String tableBasePath = basePath  + "/non_continuous_cow";
+    String tableBasePath = basePath + "/non_continuous_cow";
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.UPSERT);
     addRecordMerger(recordType, cfg.configs);
     cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
@@ -678,7 +678,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   @ParameterizedTest
   @EnumSource(value = HoodieRecordType.class, names = {"AVRO", "SPARK"})
   public void testUpsertsMOR_ContinuousModeDisabled(HoodieRecordType 
recordType) throws Exception {
-    String tableBasePath = basePath  + "/non_continuous_mor";
+    String tableBasePath = basePath + "/non_continuous_mor";
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.UPSERT);
     addRecordMerger(recordType, cfg.configs);
     cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
@@ -846,7 +846,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     prepareParquetDFSSource(false, false, "source.avsc", "target.avsc", 
PROPS_FILENAME_TEST_PARQUET,
         PARQUET_SOURCE_ROOT, false, "partition_path", "", extraProps);
     String tableBasePath = basePath + "test_parquet_table" + testNum;
-    HoodieDeltaStreamer.Config deltaCfg =  
TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, 
ParquetDFSSource.class.getName(),
+    HoodieDeltaStreamer.Config deltaCfg = 
TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, 
ParquetDFSSource.class.getName(),
         null, PROPS_FILENAME_TEST_PARQUET, false,
         false, 100000, false, null, "MERGE_ON_READ", "timestamp", null);
     deltaCfg.retryLastPendingInlineCompactionJob = false;
@@ -995,7 +995,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   private List<String> getAllMultiWriterConfigs() {
     List<String> configs = new ArrayList<>();
     configs.add(String.format("%s=%s", 
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), 
InProcessLockProvider.class.getCanonicalName()));
-    configs.add(String.format("%s=%s", 
LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000"));
+    configs.add(String.format("%s=%s", 
LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000"));
     configs.add(String.format("%s=%s", 
HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), 
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name()));
     configs.add(String.format("%s=%s", 
HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(), 
HoodieFailedWritesCleaningPolicy.LAZY.name()));
     return configs;
@@ -1041,7 +1041,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   }
 
   @ParameterizedTest
-  @EnumSource(value = HoodieRecordType.class, names = {"AVRO","SPARK"})
+  @EnumSource(value = HoodieRecordType.class, names = {"AVRO", "SPARK"})
   public void testHoodieIndexer(HoodieRecordType recordType) throws Exception {
     String tableBasePath = basePath + "/asyncindexer";
     HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 1000, 
"false", recordType, WriteOperationType.INSERT,
@@ -1429,7 +1429,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
         int counter = 2;
         while (counter < 100) { // lets keep going. if the test times out, we 
will cancel the future within finally. So, safe to generate 100 batches.
           LOG.info("Generating data for batch " + counter);
-          prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT,  
Integer.toString(counter) + ".parquet", false, null, null);
+          prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, 
Integer.toString(counter) + ".parquet", false, null, null);
           counter++;
           Thread.sleep(2000);
         }
@@ -1474,9 +1474,9 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
    * 1 ===============> HUDI TABLE 2 (incr-pull with transform) (incr-pull) 
Hudi Table 1 is synced with Hive.
    */
   @ParameterizedTest
-  @EnumSource(value = HoodieRecordType.class, names = {"AVRO","SPARK"})
+  @EnumSource(value = HoodieRecordType.class, names = {"AVRO", "SPARK"})
   public void 
testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline(HoodieRecordType
 recordType) throws Exception {
-    String tableBasePath = basePath + "/" + recordType.toString() +  
"/test_table2";
+    String tableBasePath = basePath + "/" + recordType.toString() + 
"/test_table2";
     String downstreamTableBasePath = basePath + "/" + recordType.toString() + 
"/test_downstream_table2";
 
     // Initial bulk insert to ingest to first hudi table
@@ -1599,8 +1599,8 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   public void testPartialPayloadClass() throws Exception {
     String dataSetBasePath = basePath + "/test_dataset_mor";
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, 
WriteOperationType.BULK_INSERT,
-          Collections.singletonList(SqlQueryBasedTransformer.class.getName()), 
PROPS_FILENAME_TEST_SOURCE, false,
-          true, true, PartialUpdateAvroPayload.class.getName(), 
"MERGE_ON_READ");
+        Collections.singletonList(SqlQueryBasedTransformer.class.getName()), 
PROPS_FILENAME_TEST_SOURCE, false,
+        true, true, PartialUpdateAvroPayload.class.getName(), "MERGE_ON_READ");
     new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
     assertRecordCount(1000, dataSetBasePath, sqlContext);
 
@@ -1831,7 +1831,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     prepareJsonKafkaDFSSource(propsFileName, autoResetValue, topicName, null, 
false);
   }
 
-  private void prepareJsonKafkaDFSSource(String propsFileName, String 
autoResetValue, String topicName, Map<String,String> extraProps, boolean 
shouldAddOffsets) throws IOException {
+  private void prepareJsonKafkaDFSSource(String propsFileName, String 
autoResetValue, String topicName, Map<String, String> extraProps, boolean 
shouldAddOffsets) throws IOException {
     // Properties used for testing delta-streamer with JsonKafka source
     TypedProperties props = new TypedProperties();
     populateAllCommonProps(props, basePath, testUtils.brokerAddress());
@@ -2032,7 +2032,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     ObjectMapper objectMapper = new ObjectMapper();
     HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
         
.fromBytes(metaClient.getCommitsTimeline().getInstantDetails(instants.get(0)).get(),
 HoodieCommitMetadata.class);
-    Map<String,String>  checkpointVals = 
objectMapper.readValue(commitMetadata.getExtraMetadata().get(CHECKPOINT_KEY), 
Map.class);
+    Map<String, String> checkpointVals = 
objectMapper.readValue(commitMetadata.getExtraMetadata().get(CHECKPOINT_KEY), 
Map.class);
 
     String parquetFirstcheckpoint = checkpointVals.get("parquet");
     assertNotNull(parquetFirstcheckpoint);
@@ -2048,7 +2048,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     checkpointVals = 
objectMapper.readValue(commitMetadata.getExtraMetadata().get(CHECKPOINT_KEY), 
Map.class);
     String parquetSecondCheckpoint = checkpointVals.get("parquet");
     assertNotNull(parquetSecondCheckpoint);
-    assertEquals(kafkaCheckpoint,checkpointVals.get("kafka"));
+    assertEquals(kafkaCheckpoint, checkpointVals.get("kafka"));
     assertTrue(Long.parseLong(parquetSecondCheckpoint) > 
Long.parseLong(parquetFirstcheckpoint));
     parquetDs.shutdownGracefully();
     kafkaDs.shutdownGracefully();
@@ -2074,6 +2074,43 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     testParquetDFSSource(false, null, true);
   }
 
+  @Test
+  public void testEmptyBatchWithNullSchemaValue() throws Exception {
+    PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum;
+    int parquetRecordsCount = 10;
+    prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT, 
FIRST_PARQUET_FILE_NAME, false, null, null);
+    prepareParquetDFSSource(false, false, "source.avsc", "target.avsc", 
PROPS_FILENAME_TEST_PARQUET,
+        PARQUET_SOURCE_ROOT, false, "partition_path", "0");
+
+    String tableBasePath = basePath + "/test_parquet_table" + testNum;
+    HoodieDeltaStreamer.Config config = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.INSERT, ParquetDFSSource.class.getName(),
+        null, PROPS_FILENAME_TEST_PARQUET, false,
+        false, 100000, false, null, null, "timestamp", null);
+    HoodieDeltaStreamer deltaStreamer1 = new HoodieDeltaStreamer(config, jsc);
+    deltaStreamer1.sync();
+    assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(jsc.hadoopConfiguration()).build();
+    HoodieInstant firstCommit = 
metaClient.getActiveTimeline().lastInstant().get();
+    deltaStreamer1.shutdownGracefully();
+
+    prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "2.parquet", false, null, 
null);
+    HoodieDeltaStreamer.Config updatedConfig = config;
+    updatedConfig.schemaProviderClassName = 
NullValueSchemaProvider.class.getName();
+    updatedConfig.sourceClassName = 
TestParquetDFSSourceEmptyBatch.class.getName();
+    HoodieDeltaStreamer deltaStreamer2 = new 
HoodieDeltaStreamer(updatedConfig, jsc);
+    deltaStreamer2.sync();
+    // since we mimic'ed empty batch, total records should be same as first 
sync().
+    assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
+
+    // validate schema is set in commit even if target schema returns null on 
empty batch
+    TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);
+    HoodieInstant secondCommit = 
metaClient.reloadActiveTimeline().lastInstant().get();
+    Schema lastCommitSchema = 
tableSchemaResolver.getTableAvroSchema(secondCommit, true);
+    assertNotEquals(firstCommit, secondCommit);
+    assertNotEquals(lastCommitSchema, Schema.create(Schema.Type.NULL));
+    deltaStreamer2.shutdownGracefully();
+  }
+
   @Test
   public void testDeltaStreamerRestartAfterMissingHoodieProps() throws 
Exception {
     testDeltaStreamerRestartAfterMissingHoodieProps(true);
@@ -2311,7 +2348,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     sqlSourceProps.setProperty("hoodie.embed.timeline.server", "false");
     sqlSourceProps.setProperty("hoodie.datasource.write.recordkey.field", 
"_row_key");
     sqlSourceProps.setProperty("hoodie.datasource.write.partitionpath.field", 
"partition_path");
-    
sqlSourceProps.setProperty("hoodie.deltastreamer.source.sql.sql.query","select 
* from test_sql_table");
+    sqlSourceProps.setProperty("hoodie.deltastreamer.source.sql.sql.query", 
"select * from test_sql_table");
 
     UtilitiesTestBase.Helpers.savePropsToDFS(sqlSourceProps, fs, basePath + 
"/" + PROPS_FILENAME_TEST_SQL_SOURCE);
 
@@ -2537,8 +2574,8 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(basePath + 
"/testFetchPreviousCheckpoint", WriteOperationType.BULK_INSERT);
 
     TypedProperties properties = new TypedProperties();
-    properties.setProperty("hoodie.datasource.write.recordkey.field","key");
-    properties.setProperty("hoodie.datasource.write.partitionpath.field","pp");
+    properties.setProperty("hoodie.datasource.write.recordkey.field", "key");
+    properties.setProperty("hoodie.datasource.write.partitionpath.field", 
"pp");
     TestStreamSync testDeltaSync = new TestStreamSync(cfg, sparkSession, null, 
properties,
         jsc, fs, jsc.hadoopConfiguration(), null);
 
@@ -2579,7 +2616,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
 
     TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(
-            
HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(fs.getConf()).build());
+        
HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(fs.getConf()).build());
     // get schema from data file written in the latest commit
     Schema tableSchema = tableSchemaResolver.getTableAvroSchemaFromDataFile();
     assertNotNull(tableSchema);
@@ -2758,7 +2795,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   }
 
   @ParameterizedTest
-  @CsvSource(value = {"COPY_ON_WRITE, AVRO",  "MERGE_ON_READ, AVRO",
+  @CsvSource(value = {"COPY_ON_WRITE, AVRO", "MERGE_ON_READ, AVRO",
       "COPY_ON_WRITE, SPARK", "MERGE_ON_READ, SPARK"})
   public void testConfigurationHotUpdate(HoodieTableType tableType, 
HoodieRecordType recordType) throws Exception {
     String tableBasePath = basePath + 
String.format("/configurationHotUpdate_%s_%s", tableType.name(), 
recordType.name());
@@ -2913,4 +2950,20 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
         arguments(true, 
Collections.singletonList(TripsWithDistanceTransformer.class.getName()))
     );
   }
+
+  public static class NullValueSchemaProvider extends SchemaProvider {
+
+    public NullValueSchemaProvider(TypedProperties props) {
+      super(props);
+    }
+
+    public NullValueSchemaProvider(TypedProperties props, JavaSparkContext 
jssc) {
+      super(props, jssc);
+    }
+
+    @Override
+    public Schema getSourceSchema() {
+      return null;
+    }
+  }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
index 30b997e856a..1d6f2f110b2 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
@@ -130,7 +130,7 @@ public class TestSourceFormatAdapter {
   @MethodSource("provideDataFiles")
   public void testRowSanitization(String unsanitizedDataFile, String 
sanitizedDataFile, StructType unsanitizedSchema, StructType sanitizedSchema) {
     JavaRDD<String> unsanitizedRDD = jsc.textFile(unsanitizedDataFile);
-    SchemaProvider schemaProvider = new InputBatch.NullSchemaProvider();
+    SchemaProvider schemaProvider = 
InputBatch.NullSchemaProvider.getInstance();
     verifySanitization(fetchRowData(unsanitizedRDD, unsanitizedSchema, 
schemaProvider), sanitizedDataFile, sanitizedSchema);
     verifySanitization(fetchRowData(unsanitizedRDD, unsanitizedSchema, null), 
sanitizedDataFile, sanitizedSchema);
 

Reply via email to