nsivabalan commented on code in PR #13307:
URL: https://github.com/apache/hudi/pull/13307#discussion_r2112186058


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -1339,4 +1298,112 @@ public JavaRDD<WriteStatus> getWriteStatusRDD() {
       return writeStatusRDD;
     }
   }
+
+  static class HoodieStreamerWriteStatusHandlerCallback implements 
WriteStatusHandlerCallback {
+
+    private final boolean commitOnErrors;
+    private final String instantTime;
+    private final HoodieStreamer.Config cfg;
+    private final Option<BaseErrorTableWriter> errorTableWriter;
+    private final Option<JavaRDD<WriteStatus>> errorTableWriteStatusRDDOpt;
+    private final HoodieErrorTableConfig.ErrorWriteFailureStrategy 
errorWriteFailureStrategy;
+    private final boolean isErrorTableWriteUnificationEnabled;
+    private final String errorTableInstantTime;
+    private final SparkRDDWriteClient writeClient;
+    private final Option<String> latestCommittedInstant;
+    private final AtomicLong totalSuccessfulRecords;
+
+    HoodieStreamerWriteStatusHandlerCallback(boolean commitOnErrors,
+                                             String instantTime,
+                                             HoodieStreamer.Config cfg,
+                                             Option<BaseErrorTableWriter> 
errorTableWriter,
+                                             Option<JavaRDD<WriteStatus>> 
errorTableWriteStatusRDDOpt,
+                                             
HoodieErrorTableConfig.ErrorWriteFailureStrategy errorWriteFailureStrategy,
+                                             boolean 
isErrorTableWriteUnificationEnabled,
+                                             String errorTableInstantTime,
+                                             SparkRDDWriteClient writeClient,
+                                             Option<String> 
latestCommittedInstant,
+                                             AtomicLong 
totalSuccessfulRecords) {
+      this.commitOnErrors = commitOnErrors;
+      this.instantTime = instantTime;
+      this.cfg = cfg;
+      this.errorTableWriter = errorTableWriter;
+      this.errorTableWriteStatusRDDOpt = errorTableWriteStatusRDDOpt;
+      this.errorWriteFailureStrategy = errorWriteFailureStrategy;
+      this.isErrorTableWriteUnificationEnabled = 
isErrorTableWriteUnificationEnabled;
+      this.errorTableInstantTime = errorTableInstantTime;
+      this.writeClient = writeClient;
+      this.latestCommittedInstant = latestCommittedInstant;
+      this.totalSuccessfulRecords = totalSuccessfulRecords;
+    }
+
+    @Override
+    public boolean processWriteStatuses(long tableTotalRecords, long 
tableTotalErroredRecords, HoodieData<WriteStatus> leanWriteStatuses) {
+
+      long totalRecords = tableTotalRecords;
+      long totalErroredRecords = tableTotalErroredRecords;
+      // TODO: Remove flag isErrorTableWriteUnificationEnabled, should not be 
required anymore

Review Comment:
   can we remove the TODOs here. 



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -75,23 +84,67 @@ public SparkRDDWriteClient(HoodieEngineContext context, 
HoodieWriteConfig writeC
                              Option<EmbeddedTimelineService> timelineService) {
     super(context, writeConfig, timelineService, 
SparkUpgradeDowngradeHelper.getInstance());
     this.tableServiceClient = new SparkRDDTableServiceClient<T>(context, 
writeConfig, getTimelineServer());
+    isMetadataTable = 
HoodieTableMetadata.isMetadataTable(config.getBasePath());
   }
 
   @Override
   protected HoodieIndex createIndex(HoodieWriteConfig writeConfig) {
     return SparkHoodieIndexFactory.createIndex(config);
   }
 
+  public boolean commit(String instantTime, JavaRDD<WriteStatus> 
writeStatuses, Option<Map<String, String>> extraMetadata,
+                        String commitActionType, Map<String, List<String>> 
partitionToReplacedFileIds,
+                        Option<BiConsumer<HoodieTableMetaClient, 
HoodieCommitMetadata>> extraPreCommitFunc) {
+    return commit(instantTime, writeStatuses, extraMetadata, commitActionType, 
partitionToReplacedFileIds, extraPreCommitFunc, new 
NoOpWriteStatusHandlerCallback());
+  }
+
   /**
    * Complete changes performed at the given instantTime marker with specified 
action.
    */
   @Override
   public boolean commit(String instantTime, JavaRDD<WriteStatus> 
writeStatuses, Option<Map<String, String>> extraMetadata,
                         String commitActionType, Map<String, List<String>> 
partitionToReplacedFileIds,
-                        Option<BiConsumer<HoodieTableMetaClient, 
HoodieCommitMetadata>> extraPreCommitFunc) {
+                        Option<BiConsumer<HoodieTableMetaClient, 
HoodieCommitMetadata>> extraPreCommitFunc,
+                        WriteStatusHandlerCallback writeStatusHandlerCallback) 
{
     context.setJobStatus(this.getClass().getSimpleName(), "Committing stats: " 
+ config.getTableName());
-    List<HoodieWriteStat> writeStats = 
writeStatuses.map(WriteStatus::getStat).collect();
-    return commitStats(instantTime, writeStats, extraMetadata, 
commitActionType, partitionToReplacedFileIds, extraPreCommitFunc);
+    // Triggering the dag for writes.
+    // If streaming writes are enabled, writes to both data table and metadata 
table gets triggers at this juncture.
+    // If not, writes to data table gets triggered here.
+    // When streaming writes are enabled, WriteStatus is expected to contain 
all stats required to generate metadata table records and so it could be fatter.

Review Comment:
   fix the documentation. we don't have any class named LeanWriteStatus. 
   also, consider renaming the var in L115. 



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -75,23 +84,67 @@ public SparkRDDWriteClient(HoodieEngineContext context, 
HoodieWriteConfig writeC
                              Option<EmbeddedTimelineService> timelineService) {
     super(context, writeConfig, timelineService, 
SparkUpgradeDowngradeHelper.getInstance());
     this.tableServiceClient = new SparkRDDTableServiceClient<T>(context, 
writeConfig, getTimelineServer());
+    isMetadataTable = 
HoodieTableMetadata.isMetadataTable(config.getBasePath());
   }
 
   @Override
   protected HoodieIndex createIndex(HoodieWriteConfig writeConfig) {
     return SparkHoodieIndexFactory.createIndex(config);
   }
 
+  public boolean commit(String instantTime, JavaRDD<WriteStatus> 
writeStatuses, Option<Map<String, String>> extraMetadata,
+                        String commitActionType, Map<String, List<String>> 
partitionToReplacedFileIds,
+                        Option<BiConsumer<HoodieTableMetaClient, 
HoodieCommitMetadata>> extraPreCommitFunc) {
+    return commit(instantTime, writeStatuses, extraMetadata, commitActionType, 
partitionToReplacedFileIds, extraPreCommitFunc, new 
NoOpWriteStatusHandlerCallback());
+  }
+
   /**
    * Complete changes performed at the given instantTime marker with specified 
action.
    */
   @Override
   public boolean commit(String instantTime, JavaRDD<WriteStatus> 
writeStatuses, Option<Map<String, String>> extraMetadata,
                         String commitActionType, Map<String, List<String>> 
partitionToReplacedFileIds,
-                        Option<BiConsumer<HoodieTableMetaClient, 
HoodieCommitMetadata>> extraPreCommitFunc) {
+                        Option<BiConsumer<HoodieTableMetaClient, 
HoodieCommitMetadata>> extraPreCommitFunc,
+                        WriteStatusHandlerCallback writeStatusHandlerCallback) 
{
     context.setJobStatus(this.getClass().getSimpleName(), "Committing stats: " 
+ config.getTableName());
-    List<HoodieWriteStat> writeStats = 
writeStatuses.map(WriteStatus::getStat).collect();
-    return commitStats(instantTime, writeStats, extraMetadata, 
commitActionType, partitionToReplacedFileIds, extraPreCommitFunc);
+    // Triggering the dag for writes.
+    // If streaming writes are enabled, writes to both data table and metadata 
table gets triggers at this juncture.
+    // If not, writes to data table gets triggered here.
+    // When streaming writes are enabled, WriteStatus is expected to contain 
all stats required to generate metadata table records and so it could be fatter.

Review Comment:
   and can you add a java doc on whats expected when streaming writes are not 
enabled. 
   Is there a simpler processing we can do when streaming writes are disabled. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to