lokeshj1703 commented on code in PR #7982:
URL: https://github.com/apache/hudi/pull/7982#discussion_r1118697569


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -710,6 +765,26 @@ private Pair<Option<String>, JavaRDD<WriteStatus>> 
writeToSink(JavaRDD<HoodieRec
             + totalErrorRecords + "/" + totalRecords);
       }
       String commitActionType = CommitUtils.getCommitActionType(cfg.operation, 
HoodieTableType.valueOf(cfg.tableType));
+      if (errorTableWriter.isPresent()) {
+        // Removing writeStatus events from error events, as action on 
writeStatus can cause base table DAG to reexecute
+        // if original cached dataframe get's unpersisted before this action.
+        //        
errorTableWriterInterfaceImpl.get().addErrorEvents(getErrorEventsForWriteStatus(writeStatusRDD));
+        Option<String> commitedInstantTime = 
getLatestInstantWithValidCheckpointInfo(commitTimelineOpt);
+        boolean errorTableSuccess = 
errorTableWriter.get().upsertAndCommit(instantTime, commitedInstantTime);
+        if (!errorTableSuccess) {
+          switch (errorWriteFailureStrategy) {
+            case ROLLBACK_COMMIT:
+              LOG.info("Commit " + instantTime + " failed!");
+              writeClient.rollback(instantTime);
+              throw new HoodieException("Error Table Commit failed!");
+            case LOG_ERROR:
+              LOG.error("Error Table write failed for instant " + instantTime);

Review Comment:
   Will need to change API. API doesn't throw exception right now.
   Created https://issues.apache.org/jira/browse/HUDI-5858 for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to