This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new df4cca8aa56 [HUDI-7154] Fix NPE from empty batch with row writer 
enabled in Hudi Streamer (#10198)
df4cca8aa56 is described below

commit df4cca8aa560d21bde1bf4c1a4079d3d2f760c6f
Author: Y Ethan Guo <[email protected]>
AuthorDate: Mon Dec 4 08:06:59 2023 -0800

    [HUDI-7154] Fix NPE from empty batch with row writer enabled in Hudi 
Streamer (#10198)
    
    
    ---------
    
    Co-authored-by: sivabalan <[email protected]>
---
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     | 26 +++++++----
 .../apache/hudi/utilities/streamer/StreamSync.java |  5 ++-
 .../deltastreamer/TestHoodieDeltaStreamer.java     | 51 ++++++++++++++++++----
 3 files changed, 62 insertions(+), 20 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index b8dbb18287e..e925e2a5423 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -155,19 +155,27 @@ object HoodieSparkSqlWriter {
     Metrics.shutdownAllMetrics()
   }
 
-  def getBulkInsertRowConfig(writerSchema: Schema, hoodieConfig: HoodieConfig,
+  def getBulkInsertRowConfig(writerSchema: 
org.apache.hudi.common.util.Option[Schema], hoodieConfig: HoodieConfig,
                              basePath: String, tblName: String): 
HoodieWriteConfig = {
-    val writerSchemaStr = writerSchema.toString
-
+    var writerSchemaStr : String = null
+    if ( writerSchema.isPresent) {
+      writerSchemaStr = writerSchema.get().toString
+    }
     // Make opts mutable since it could be modified by 
tryOverrideParquetWriteLegacyFormatProperty
-    val opts = mutable.Map() ++ hoodieConfig.getProps.toMap ++
-      Map(HoodieWriteConfig.AVRO_SCHEMA_STRING.key -> writerSchemaStr)
+    val optsWithoutSchema = mutable.Map() ++ hoodieConfig.getProps.toMap
+    val opts = if (writerSchema.isPresent) {
+      optsWithoutSchema ++ Map(HoodieWriteConfig.AVRO_SCHEMA_STRING.key -> 
writerSchemaStr)
+    } else {
+      optsWithoutSchema
+    }
+
+    if (writerSchema.isPresent) {
+      // Auto set the value of "hoodie.parquet.writelegacyformat.enabled"
+      tryOverrideParquetWriteLegacyFormatProperty(opts, 
convertAvroSchemaToStructType(writerSchema.get))
+    }
 
-    // Auto set the value of "hoodie.parquet.writelegacyformat.enabled"
-    tryOverrideParquetWriteLegacyFormatProperty(opts, 
convertAvroSchemaToStructType(writerSchema))
     DataSourceUtils.createHoodieConfig(writerSchemaStr, basePath, tblName, 
opts)
   }
-
 }
 
 class HoodieSparkSqlWriterInternal {
@@ -779,7 +787,7 @@ class HoodieSparkSqlWriterInternal {
     val sqlContext = 
writeClient.getEngineContext.asInstanceOf[HoodieSparkEngineContext].getSqlContext
     val jsc = 
writeClient.getEngineContext.asInstanceOf[HoodieSparkEngineContext].getJavaSparkContext
 
-    val writeConfig = 
HoodieSparkSqlWriter.getBulkInsertRowConfig(writerSchema, hoodieConfig, 
basePath.toString, tblName)
+    val writeConfig = 
HoodieSparkSqlWriter.getBulkInsertRowConfig(org.apache.hudi.common.util.Option.of(writerSchema),
 hoodieConfig, basePath.toString, tblName)
     val overwriteOperationType = 
Option(hoodieConfig.getString(HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE))
       .map(WriteOperationType.fromValue)
       .orNull
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 19289e650c4..ff2debc8dcc 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -757,7 +757,8 @@ public class StreamSync implements Serializable, Closeable {
     hoodieConfig.setValue(DataSourceWriteOptions.PAYLOAD_CLASS_NAME().key(), 
cfg.payloadClassName);
     hoodieConfig.setValue(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), 
HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName(props));
     hoodieConfig.setValue("path", cfg.targetBasePath);
-    return HoodieSparkSqlWriter.getBulkInsertRowConfig(writerSchema, 
hoodieConfig, cfg.targetBasePath, cfg.targetTableName);
+    return HoodieSparkSqlWriter.getBulkInsertRowConfig(writerSchema != 
InputBatch.NULL_SCHEMA ? Option.of(writerSchema) : Option.empty(),
+        hoodieConfig, cfg.targetBasePath, cfg.targetTableName);
   }
 
   /**
@@ -899,7 +900,7 @@ public class StreamSync implements Serializable, Closeable {
     instantTime = startCommit(instantTime, !autoGenerateRecordKeys);
 
     if (useRowWriter) {
-      Dataset<Row> df = (Dataset<Row>) 
inputBatch.getBatch().orElse(hoodieSparkContext.emptyRDD());
+      Dataset<Row> df = (Dataset<Row>) 
inputBatch.getBatch().orElse(hoodieSparkContext.getSqlContext().emptyDataFrame());
       HoodieWriteConfig hoodieWriteConfig = 
prepareHoodieConfigForRowWriter(inputBatch.getSchemaProvider().getTargetSchema());
       BaseDatasetBulkInsertCommitActionExecutor executor = new 
HoodieStreamerDatasetBulkInsertCommitActionExecutor(hoodieWriteConfig, 
writeClient, instantTime);
       writeClientWriteResult = new WriteClientWriteResult(executor.execute(df, 
!HoodieStreamerUtils.getPartitionColumns(props).isEmpty()).getWriteStatuses());
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 62aa7328fbb..03208a0c0e5 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -47,6 +47,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
@@ -114,6 +115,7 @@ import org.apache.spark.sql.api.java.UDF4;
 import org.apache.spark.sql.functions;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructField;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
@@ -232,6 +234,11 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     return new HoodieClusteringJob(jsc, scheduleClusteringConfig);
   }
 
+  @AfterEach
+  public void perTestAfterEach() {
+    testNum++;
+  }
+
   @Test
   public void testProps() {
     TypedProperties props =
@@ -1340,7 +1347,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     boolean hasTransformer = transformerClassNames != null && 
!transformerClassNames.isEmpty();
     prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT, 
FIRST_PARQUET_FILE_NAME, false, null, null);
     prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", 
"target.avsc", PROPS_FILENAME_TEST_PARQUET,
-        PARQUET_SOURCE_ROOT, false, "partition_path", testEmptyBatch ? "1" : 
"");
+        PARQUET_SOURCE_ROOT, false, "partition_path", "");
 
     String tableBasePath = basePath + "/test_parquet_table" + testNum;
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.BULK_INSERT, testEmptyBatch ? 
TestParquetDFSSourceEmptyBatch.class.getName()
@@ -1351,27 +1358,34 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc);
     deltaStreamer.sync();
     assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
+    deltaStreamer.shutdownGracefully();
 
     try {
       if (testEmptyBatch) {
+        prepareParquetDFSSource(useSchemaProvider, hasTransformer, 
"source.avsc", "target.avsc", PROPS_FILENAME_TEST_PARQUET,
+            PARQUET_SOURCE_ROOT, false, "partition_path", "0");
         prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "2.parquet", false, 
null, null);
         deltaStreamer = new HoodieDeltaStreamer(cfg, jsc);
         deltaStreamer.sync();
         // since we mimic'ed empty batch, total records should be same as 
first sync().
-        assertRecordCount(200, tableBasePath, sqlContext);
+        assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
         HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(jsc.hadoopConfiguration()).build();
 
         // validate table schema fetches valid schema from last but one commit.
         TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);
         assertNotEquals(tableSchemaResolver.getTableAvroSchema(), 
Schema.create(Schema.Type.NULL).toString());
+        // schema from latest commit and last but one commit should match
+        compareLatestTwoSchemas(metaClient);
+        prepareParquetDFSSource(useSchemaProvider, hasTransformer, 
"source.avsc", "target.avsc", PROPS_FILENAME_TEST_PARQUET,
+            PARQUET_SOURCE_ROOT, false, "partition_path", "");
+        deltaStreamer.shutdownGracefully();
       }
 
-      int recordsSoFar = testEmptyBatch ? 200 : 100;
-
+      int recordsSoFar = 100;
+      deltaStreamer = new HoodieDeltaStreamer(cfg, jsc);
       // add 3 more batches and ensure all commits succeed.
       for (int i = 2; i < 5; i++) {
         prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, Integer.toString(i) + 
".parquet", false, null, null);
-        deltaStreamer = new HoodieDeltaStreamer(cfg, jsc);
         deltaStreamer.sync();
         assertRecordCount(recordsSoFar + (i - 1) * 100, tableBasePath, 
sqlContext);
         if (i == 2 || i == 4) { // this validation reloads the timeline. So, 
we are validating only for first and last batch.
@@ -1717,20 +1731,25 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     boolean hasTransformer = transformerClassNames != null && 
!transformerClassNames.isEmpty();
     prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT, 
FIRST_PARQUET_FILE_NAME, false, null, null);
     prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", 
"target.avsc", PROPS_FILENAME_TEST_PARQUET,
-        PARQUET_SOURCE_ROOT, false, "partition_path", testEmptyBatch ? "1" : 
"");
+        PARQUET_SOURCE_ROOT, false, "partition_path", "");
 
     String tableBasePath = basePath + "/test_parquet_table" + testNum;
-    HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
+    HoodieDeltaStreamer.Config cfg =
         TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, 
testEmptyBatch ? TestParquetDFSSourceEmptyBatch.class.getName()
                 : ParquetDFSSource.class.getName(),
             transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false,
-            useSchemaProvider, 100000, false, null, null, "timestamp", null), 
jsc);
+            useSchemaProvider, 100000, false, null, null, "timestamp", null);
+    HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc);
     deltaStreamer.sync();
     assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
+    deltaStreamer.shutdownGracefully();
 
     if (testEmptyBatch) {
       prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "2.parquet", false, 
null, null);
-      deltaStreamer.sync();
+      prepareParquetDFSSource(useSchemaProvider, hasTransformer, 
"source.avsc", "target.avsc", PROPS_FILENAME_TEST_PARQUET,
+          PARQUET_SOURCE_ROOT, false, "partition_path", "0");
+      HoodieDeltaStreamer deltaStreamer1 = new HoodieDeltaStreamer(cfg, jsc);
+      deltaStreamer1.sync();
       // since we mimic'ed empty batch, total records should be same as first 
sync().
       assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
       HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(jsc.hadoopConfiguration()).build();
@@ -1738,6 +1757,11 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
       // validate table schema fetches valid schema from last but one commit.
       TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);
       assertNotEquals(tableSchemaResolver.getTableAvroSchema(), 
Schema.create(Schema.Type.NULL).toString());
+      // schema from latest commit and last but one commit should match
+      compareLatestTwoSchemas(metaClient);
+      prepareParquetDFSSource(useSchemaProvider, hasTransformer, 
"source.avsc", "target.avsc", PROPS_FILENAME_TEST_PARQUET,
+          PARQUET_SOURCE_ROOT, false, "partition_path", "");
+      deltaStreamer1.shutdownGracefully();
     }
 
     // proceed w/ non empty batch.
@@ -1751,6 +1775,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
         .forEach(entry -> assertValidSchemaAndOperationTypeInCommitMetadata(
             entry, metaClient, WriteOperationType.INSERT));
     testNum++;
+    deltaStreamer.shutdownGracefully();
   }
 
   private void assertValidSchemaAndOperationTypeInCommitMetadata(HoodieInstant 
instant,
@@ -1766,6 +1791,14 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     }
   }
 
+  private void compareLatestTwoSchemas(HoodieTableMetaClient metaClient) 
throws IOException {
+    // schema from latest commit and last but one commit should match
+    List<HoodieInstant> completedInstants = 
metaClient.getActiveTimeline().getWriteTimeline().filterCompletedInstants().getInstants();
+    HoodieCommitMetadata commitMetadata1 = 
TimelineUtils.getCommitMetadata(completedInstants.get(0), 
metaClient.getActiveTimeline());
+    HoodieCommitMetadata commitMetadata2 = 
TimelineUtils.getCommitMetadata(completedInstants.get(1), 
metaClient.getActiveTimeline());
+    assertEquals(commitMetadata1.getMetadata(HoodieCommitMetadata.SCHEMA_KEY), 
commitMetadata2.getMetadata(HoodieCommitMetadata.SCHEMA_KEY));
+  }
+
   private void testORCDFSSource(boolean useSchemaProvider, List<String> 
transformerClassNames) throws Exception {
     // prepare ORCDFSSource
     TypedProperties orcProps = new TypedProperties();

Reply via email to