shangxinli commented on code in PR #18765:
URL: https://github.com/apache/hudi/pull/18765#discussion_r3268604051


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -872,38 +869,103 @@ private Pair<Option<String>, JavaRDD<WriteStatus>> 
writeToSinkAndDoMetaSync(Hood
       Map<String, String> checkpointCommitMetadata = 
extractCheckpointMetadata(inputBatch, props, 
writeClient.getConfig().getWriteVersion().versionCode(), cfg);
       AtomicLong totalSuccessfulRecords = new AtomicLong(0);
       Option<String> latestCommittedInstant = getLatestCommittedInstant();
-      WriteStatusValidator writeStatusValidator = new 
HoodieStreamerWriteStatusValidator(cfg.commitOnErrors, instantTime,
-          cfg, errorTableWriter, errorTableWriteStatusRDDOpt, 
errorWriteFailureStrategy, isErrorTableWriteUnificationEnabled, writeClient, 
latestCommittedInstant,
-          totalSuccessfulRecords);
       String commitActionType = CommitUtils.getCommitActionType(cfg.operation, 
HoodieTableType.valueOf(cfg.tableType));
 
-      // Cache the RDD only when pre-commit validators are configured. 
Validators collect the RDD
-      // before commit, so without caching the same DAG would re-evaluate 
inside writeClient.commit().
-      // When no validators are configured, commit consumes the RDD once and 
caching adds no value.
-      // shouldUnpersist is true only when we created the cache here 
(validators present and storage
-      // level was NONE), so the finally block knows to release it.
-      boolean validatorsConfigured = 
!StringUtils.isNullOrEmpty(props.getString(
-          HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(),
-          
HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.defaultValue()));
-      boolean shouldUnpersist = validatorsConfigured && 
writeStatusRDD.getStorageLevel().equals(StorageLevel.NONE());
+      // Pre-commit orchestration (issue #18750): the legacy 
HoodieStreamerWriteStatusValidator
+      // ran inside writeClient.commit() via the WriteStatusValidator callback 
and combined three
+      // concerns — count records, commit the error table, and gate on write 
errors. Each is now
+      // an explicit step here before writeClient.commit(), so the writer no 
longer receives a
+      // callback. Step order is deliberate (see comments below).
+      //
+      // The RDD is cached once and the write statuses are collected once on 
the driver. Both the
+      // count/error-logging steps and writeClient.commit() consume the 
materialized partitions
+      // rather than re-evaluating the upstream DAG. shouldUnpersist tracks 
whether we engaged the
+      // cache here so the finally block knows to release it.
+      boolean shouldUnpersist = 
writeStatusRDD.getStorageLevel().equals(StorageLevel.NONE());
       if (shouldUnpersist) {
         writeStatusRDD.cache();
       }
       boolean success;
       try {
-        if (validatorsConfigured) {
-          List<WriteStatus> writeStatuses = writeStatusRDD.collect();
+        List<WriteStatus> writeStatuses = writeStatusRDD.collect();
+        boolean validatorsConfigured = 
!StringUtils.isNullOrEmpty(props.getString(
+            HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(),
+            
HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.defaultValue()));
+
+        // Step 1: Commit the error table BEFORE running validators or the 
write-error gate.
+        // Error records captured here are a genuine artifact of the write 
attempt and should
+        // survive even when a validator later blocks the data-table commit 
(otherwise the
+        // operator loses the captured errors and the next run has nothing to 
triage against).
+        // Latent design quirk (preserved from HSWSV): if error-table commit 
succeeds and any
+        // subsequent step fails (Step 2 validator including the offset 
validator, Step 4 gate,
+        // or writeClient.commit), the error table will have a committed 
instant for a data-table
+        // instant that never lands. Downstream consumers of the error table 
should tolerate this
+        // divergence.
+        if (errorTableWriter.isPresent()) {
+          boolean errorTableSuccess = 
ErrorTableCommitter.commit(errorTableWriter.get(),
+              errorTableWriteStatusRDDOpt, 
isErrorTableWriteUnificationEnabled, instantTime,
+              latestCommittedInstant);
+          if (!errorTableSuccess) {
+            switch (errorWriteFailureStrategy) {
+              case ROLLBACK_COMMIT:
+                // Roll back the inflight data-table instant so it doesn't 
leak under LAZY
+                // failed-writes cleanup policy (preserves HSWSV behavior).
+                writeClient.rollback(instantTime);
+                throw new HoodieStreamerWriteException("Error table commit 
failed for instant " + instantTime);
+              case LOG_ERROR:
+                LOG.error("Error table write failed for instant {}", 
instantTime);
+                break;
+              default:
+                throw new HoodieStreamerWriteException("Write failure strategy 
not implemented for " + errorWriteFailureStrategy);
+            }
+          }
+        }
 
-          // Run pre-commit streaming offset validators (if configured).
-          // Placement before writeClient.commit() is intentional: offset 
validation is a stronger
-          // guard than commitOnErrors — if offset deviation indicates 
potential data loss, the commit
-          // must be prevented regardless of the commitOnErrors policy.
+        // Step 2: Run user-configured pre-commit validators (offset, custom, 
and the opt-in
+        // SparkWriteErrorValidator). Validators are intentionally stronger 
than commitOnErrors
+        // — a failure here aborts the data-table commit regardless of the 
gate in Step 4.
+        if (validatorsConfigured) {
           SparkStreamerValidatorUtils.runValidators(props, instantTime, 
writeStatuses,
               checkpointCommitMetadata, metaClient);

Review Comment:
   Not intentional — fixed in b8a279b. Step 2 now wraps `runValidators` in 
try/catch and calls `writeClient.rollback(instantTime)` before rethrowing the 
`HoodieValidationException`. Same LAZY-cleanup argument as Step 1 
ROLLBACK_COMMIT and the Step 4 gate. The error-table-already-committed 
divergence you flagged is real but intentional per Step 1's design note 
(operator should be able to triage captured errors even when a later step 
blocks the data-table commit) — added an explicit cross-reference in the new 
Step 2 comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to