This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f875edaecd [HUDI-7139] Fix operation type for bulk insert with row 
writer in Hudi Streamer (#10175)
4f875edaecd is described below

commit 4f875edaecd495eaa8996fa8d81c102a971c599f
Author: Y Ethan Guo <[email protected]>
AuthorDate: Sat Nov 25 15:10:37 2023 -0800

    [HUDI-7139] Fix operation type for bulk insert with row writer in Hudi 
Streamer (#10175)
    
    This commit fixes the bug which causes the `operationType` to be null in 
the commit metadata of bulk insert operation with row writer enabled in Hudi 
Streamer (`hoodie.datasource.write.row.writer.enable=true`).  
`HoodieStreamerDatasetBulkInsertCommitActionExecutor` is updated so that 
`#preExecute` and `#afterExecute` should run the same logic as regular bulk 
insert operation without row writer.
---
 ...odieStreamerDatasetBulkInsertCommitActionExecutor.java | 10 ++--------
 .../utilities/deltastreamer/TestHoodieDeltaStreamer.java  | 15 ++++++++++++---
 2 files changed, 14 insertions(+), 11 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java
index 5593a95ca39..2a5113538e4 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java
@@ -26,9 +26,7 @@ import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.action.HoodieWriteMetadata;
 
-import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 
@@ -44,12 +42,8 @@ public class 
HoodieStreamerDatasetBulkInsertCommitActionExecutor extends BaseDat
 
   @Override
   protected void preExecute() {
-    // no op
-  }
-
-  @Override
-  protected void afterExecute(HoodieWriteMetadata<JavaRDD<WriteStatus>> 
result) {
-    // no op
+    table.validateInsertSchema();
+    writeClient.preWrite(instantTime, getWriteOperationType(), 
table.getMetaClient());
   }
 
   @Override
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index f5304cce808..62aa7328fbb 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -1377,7 +1377,10 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
         if (i == 2 || i == 4) { // this validation reloads the timeline. So, 
we are validating only for first and last batch.
           // validate commit metadata for all completed commits to have valid 
schema in extra metadata.
           HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(jsc.hadoopConfiguration()).build();
-          
metaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstants().forEach(entry
 -> assertValidSchemaInCommitMetadata(entry, metaClient));
+          metaClient.reloadActiveTimeline().getCommitsTimeline()
+              .filterCompletedInstants().getInstants()
+              .forEach(entry -> 
assertValidSchemaAndOperationTypeInCommitMetadata(
+                  entry, metaClient, WriteOperationType.BULK_INSERT));
         }
       }
     } finally {
@@ -1743,15 +1746,21 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     assertRecordCount(parquetRecordsCount + 100, tableBasePath, sqlContext);
     // validate commit metadata for all completed commits to have valid schema 
in extra metadata.
     HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(jsc.hadoopConfiguration()).build();
-    
metaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstants().forEach(entry
 -> assertValidSchemaInCommitMetadata(entry, metaClient));
+    metaClient.reloadActiveTimeline().getCommitsTimeline()
+        .filterCompletedInstants().getInstants()
+        .forEach(entry -> assertValidSchemaAndOperationTypeInCommitMetadata(
+            entry, metaClient, WriteOperationType.INSERT));
     testNum++;
   }
 
-  private void assertValidSchemaInCommitMetadata(HoodieInstant instant, 
HoodieTableMetaClient metaClient) {
+  private void assertValidSchemaAndOperationTypeInCommitMetadata(HoodieInstant 
instant,
+                                                                 
HoodieTableMetaClient metaClient,
+                                                                 
WriteOperationType operationType) {
     try {
       HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
           
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(instant).get(), 
HoodieCommitMetadata.class);
       
assertFalse(StringUtils.isNullOrEmpty(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY)));
+      assertEquals(operationType, commitMetadata.getOperationType());
     } catch (IOException ioException) {
       throw new HoodieException("Failed to parse commit metadata for " + 
instant.toString());
     }

Reply via email to