lokeshj1703 commented on code in PR #13307:
URL: https://github.com/apache/hudi/pull/13307#discussion_r2114539119


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -1339,4 +1298,112 @@ public JavaRDD<WriteStatus> getWriteStatusRDD() {
       return writeStatusRDD;
     }
   }
+
+  static class HoodieStreamerWriteStatusHandlerCallback implements 
WriteStatusHandlerCallback {
+
+    private final boolean commitOnErrors;
+    private final String instantTime;
+    private final HoodieStreamer.Config cfg;
+    private final Option<BaseErrorTableWriter> errorTableWriter;
+    private final Option<JavaRDD<WriteStatus>> errorTableWriteStatusRDDOpt;
+    private final HoodieErrorTableConfig.ErrorWriteFailureStrategy 
errorWriteFailureStrategy;
+    private final boolean isErrorTableWriteUnificationEnabled;
+    private final String errorTableInstantTime;
+    private final SparkRDDWriteClient writeClient;
+    private final Option<String> latestCommittedInstant;
+    private final AtomicLong totalSuccessfulRecords;
+
+    HoodieStreamerWriteStatusHandlerCallback(boolean commitOnErrors,
+                                             String instantTime,
+                                             HoodieStreamer.Config cfg,
+                                             Option<BaseErrorTableWriter> 
errorTableWriter,
+                                             Option<JavaRDD<WriteStatus>> 
errorTableWriteStatusRDDOpt,
+                                             
HoodieErrorTableConfig.ErrorWriteFailureStrategy errorWriteFailureStrategy,
+                                             boolean 
isErrorTableWriteUnificationEnabled,
+                                             String errorTableInstantTime,
+                                             SparkRDDWriteClient writeClient,
+                                             Option<String> 
latestCommittedInstant,
+                                             AtomicLong 
totalSuccessfulRecords) {
+      this.commitOnErrors = commitOnErrors;
+      this.instantTime = instantTime;
+      this.cfg = cfg;
+      this.errorTableWriter = errorTableWriter;
+      this.errorTableWriteStatusRDDOpt = errorTableWriteStatusRDDOpt;
+      this.errorWriteFailureStrategy = errorWriteFailureStrategy;
+      this.isErrorTableWriteUnificationEnabled = 
isErrorTableWriteUnificationEnabled;
+      this.errorTableInstantTime = errorTableInstantTime;
+      this.writeClient = writeClient;
+      this.latestCommittedInstant = latestCommittedInstant;
+      this.totalSuccessfulRecords = totalSuccessfulRecords;
+    }
+
+    @Override
+    public boolean processWriteStatuses(long tableTotalRecords, long 
tableTotalErroredRecords, HoodieData<WriteStatus> leanWriteStatuses) {
+
+      long totalRecords = tableTotalRecords;
+      long totalErroredRecords = tableTotalErroredRecords;
+      // TODO: Remove flag isErrorTableWriteUnificationEnabled, should not be 
required anymore

Review Comment:
   Addressed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to