nsivabalan commented on code in PR #13307:
URL: https://github.com/apache/hudi/pull/13307#discussion_r2119932644


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -1339,4 +1298,111 @@ public JavaRDD<WriteStatus> getWriteStatusRDD() {
       return writeStatusRDD;
     }
   }
+
+  static class HoodieStreamerWriteStatusHandlerCallback implements 
WriteStatusHandlerCallback {
+
+    private final boolean commitOnErrors;
+    private final String instantTime;
+    private final HoodieStreamer.Config cfg;
+    private final Option<BaseErrorTableWriter> errorTableWriter;
+    private final Option<JavaRDD<WriteStatus>> errorTableWriteStatusRDDOpt;
+    private final HoodieErrorTableConfig.ErrorWriteFailureStrategy 
errorWriteFailureStrategy;
+    private final boolean isErrorTableWriteUnificationEnabled;
+    private final String errorTableInstantTime;
+    private final SparkRDDWriteClient writeClient;
+    private final Option<String> latestCommittedInstant;
+    private final AtomicLong totalSuccessfulRecords;
+
+    HoodieStreamerWriteStatusHandlerCallback(boolean commitOnErrors,
+                                             String instantTime,
+                                             HoodieStreamer.Config cfg,
+                                             Option<BaseErrorTableWriter> 
errorTableWriter,
+                                             Option<JavaRDD<WriteStatus>> 
errorTableWriteStatusRDDOpt,
+                                             
HoodieErrorTableConfig.ErrorWriteFailureStrategy errorWriteFailureStrategy,
+                                             boolean 
isErrorTableWriteUnificationEnabled,
+                                             String errorTableInstantTime,
+                                             SparkRDDWriteClient writeClient,
+                                             Option<String> 
latestCommittedInstant,
+                                             AtomicLong 
totalSuccessfulRecords) {
+      this.commitOnErrors = commitOnErrors;
+      this.instantTime = instantTime;
+      this.cfg = cfg;
+      this.errorTableWriter = errorTableWriter;
+      this.errorTableWriteStatusRDDOpt = errorTableWriteStatusRDDOpt;
+      this.errorWriteFailureStrategy = errorWriteFailureStrategy;
+      this.isErrorTableWriteUnificationEnabled = 
isErrorTableWriteUnificationEnabled;
+      this.errorTableInstantTime = errorTableInstantTime;
+      this.writeClient = writeClient;
+      this.latestCommittedInstant = latestCommittedInstant;
+      this.totalSuccessfulRecords = totalSuccessfulRecords;
+    }
+
+    @Override
+    public boolean processWriteStatuses(long tableTotalRecords, long 
tableTotalErroredRecords, HoodieData<WriteStatus> leanWriteStatuses) {
+
+      long totalRecords = tableTotalRecords;
+      long totalErroredRecords = tableTotalErroredRecords;
+      if (isErrorTableWriteUnificationEnabled) {
+        totalRecords += errorTableWriteStatusRDDOpt.map(status -> 
status.mapToDouble(WriteStatus::getTotalRecords).sum().longValue()).orElse(0L);
+        totalErroredRecords += errorTableWriteStatusRDDOpt.map(status -> 
status.mapToDouble(WriteStatus::getTotalErrorRecords).sum().longValue()).orElse(0L);
+      }
+      long totalSuccessfulRecords = totalRecords - totalErroredRecords;
+      this.totalSuccessfulRecords.set(totalSuccessfulRecords);
+      LOG.info("instantTime={}, totalRecords={}, totalErrorRecords={}, 
totalSuccessfulRecords={}",
+          instantTime, totalRecords, totalErroredRecords, 
totalSuccessfulRecords);
+      if (totalRecords == 0) {
+        LOG.info("No new data, perform empty commit.");
+      }
+      boolean hasErrorRecords = totalErroredRecords > 0;
+      if (!hasErrorRecords || commitOnErrors) {
+        if (hasErrorRecords) {
+          LOG.warn("Some records failed to be merged but forcing commit since 
commitOnErrors set. Errors/Total="
+              + totalErroredRecords + "/" + totalRecords);
+        }
+      }
+
+      if (errorTableWriter.isPresent()) {
+        boolean errorTableSuccess = true;
+        // Commit the error events triggered so far to the error table
+        if (isErrorTableWriteUnificationEnabled && 
errorTableWriteStatusRDDOpt.isPresent()) {
+          errorTableSuccess = 
errorTableWriter.get().commit(errorTableInstantTime, 
errorTableWriteStatusRDDOpt.get());
+        } else if (!isErrorTableWriteUnificationEnabled) {
+          errorTableSuccess = 
errorTableWriter.get().upsertAndCommit(instantTime, latestCommittedInstant);
+        }
+        if (!errorTableSuccess) {
+          switch (errorWriteFailureStrategy) {
+            case ROLLBACK_COMMIT:
+              LOG.info("Commit " + instantTime + " failed!");
+              writeClient.rollback(instantTime);
+              throw new HoodieStreamerWriteException("Error table commit 
failed");
+            case LOG_ERROR:
+              LOG.error("Error Table write failed for instant " + instantTime);
+              break;
+            default:
+              throw new HoodieStreamerWriteException("Write failure strategy 
not implemented for " + errorWriteFailureStrategy);
+          }
+        }
+      }
+      boolean canProceed = !hasErrorRecords || commitOnErrors;
+      if (canProceed) {
+        return canProceed;
+      } else {
+        LOG.error("Delta Sync found errors when writing. Errors/Total=" + 
totalErroredRecords + "/" + totalRecords);
+        LOG.error("Printing out the top 100 errors");
+
+        List<WriteStatus> erroredWriteStatueses = 
leanWriteStatuses.filter(WriteStatus::hasErrors).collectAsList();

Review Comment:
   we should not be collecting this in driver as it may OOM. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to