danny0405 commented on code in PR #18765:
URL: https://github.com/apache/hudi/pull/18765#discussion_r3263704513
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -872,38 +869,110 @@ private Pair<Option<String>, JavaRDD<WriteStatus>>
writeToSinkAndDoMetaSync(Hood
Map<String, String> checkpointCommitMetadata =
extractCheckpointMetadata(inputBatch, props,
writeClient.getConfig().getWriteVersion().versionCode(), cfg);
AtomicLong totalSuccessfulRecords = new AtomicLong(0);
Option<String> latestCommittedInstant = getLatestCommittedInstant();
- WriteStatusValidator writeStatusValidator = new
HoodieStreamerWriteStatusValidator(cfg.commitOnErrors, instantTime,
- cfg, errorTableWriter, errorTableWriteStatusRDDOpt,
errorWriteFailureStrategy, isErrorTableWriteUnificationEnabled, writeClient,
latestCommittedInstant,
- totalSuccessfulRecords);
String commitActionType = CommitUtils.getCommitActionType(cfg.operation,
HoodieTableType.valueOf(cfg.tableType));
- // Cache the RDD only when pre-commit validators are configured.
Validators collect the RDD
- // before commit, so without caching the same DAG would re-evaluate
inside writeClient.commit().
- // When no validators are configured, commit consumes the RDD once and
caching adds no value.
- // shouldUnpersist is true only when we created the cache here
(validators present and storage
- // level was NONE), so the finally block knows to release it.
+ // Pre-commit orchestration (issue #18750): the legacy
HoodieStreamerWriteStatusValidator
+ // ran inside writeClient.commit() via the WriteStatusValidator callback
and combined three
+ // concerns — count records, commit the error table, and gate on write
errors. Each is now
+ // an explicit step here before writeClient.commit(), so the writer no
longer receives a
+ // callback. Step order is deliberate (see comments below).
+ //
+ // We collect the write statuses only when user-configured pre-commit
validators are present,
+ // because the runValidators() entry point requires a materialized
List<WriteStatus>. When no
+ // validators are configured we count via distributed Spark aggregation,
preserving the
+ // pre-#18750 no-overhead behavior for the default path. The cache is
engaged in both paths
+ // so that the count action (or collect) and the later
writeClient.commit() consume the same
+ // materialization rather than re-evaluating the upstream DAG.
boolean validatorsConfigured =
!StringUtils.isNullOrEmpty(props.getString(
HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(),
HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.defaultValue()));
- boolean shouldUnpersist = validatorsConfigured &&
writeStatusRDD.getStorageLevel().equals(StorageLevel.NONE());
+ boolean shouldUnpersist =
writeStatusRDD.getStorageLevel().equals(StorageLevel.NONE());
if (shouldUnpersist) {
writeStatusRDD.cache();
}
boolean success;
try {
- if (validatorsConfigured) {
- List<WriteStatus> writeStatuses = writeStatusRDD.collect();
+ // Collect only when validators need the materialized list. Null
marker triggers the
+ // RDD-based code paths for the writeStatuses-on-demand checks below.
+ List<WriteStatus> writeStatuses = validatorsConfigured ?
writeStatusRDD.collect() : null;
+
+ // Step 1: Commit the error table BEFORE running validators or the
write-error gate.
+ // Error records captured here are a genuine artifact of the write
attempt and should
+ // survive even when a validator later blocks the data-table commit
(otherwise the
+ // operator loses the captured errors and the next run has nothing to
triage against).
+ // Note: success here means the error-table commit landed; no rollback
is needed on
+ // failure because the data-table writeClient.commit() has not been
called yet.
+ // Latent design quirk (preserved from HSWSV): if error-table commit
succeeds and any
+ // subsequent step (Step 4 gate, validator, or writeClient.commit)
fails, the error
+ // table will have a committed instant for a data-table instant that
never lands.
+ // Downstream consumers of the error table should tolerate this
divergence.
+ if (errorTableWriter.isPresent()) {
+ boolean errorTableSuccess =
ErrorTableCommitter.commit(errorTableWriter.get(),
+ errorTableWriteStatusRDDOpt,
isErrorTableWriteUnificationEnabled, instantTime,
+ latestCommittedInstant);
+ if (!errorTableSuccess) {
+ switch (errorWriteFailureStrategy) {
+ case ROLLBACK_COMMIT:
+ throw new HoodieStreamerWriteException("Error table commit
failed for instant " + instantTime);
+ case LOG_ERROR:
+ LOG.error("Error table write failed for instant {}",
instantTime);
+ break;
+ default:
+ throw new HoodieStreamerWriteException("Write failure strategy
not implemented for " + errorWriteFailureStrategy);
+ }
+ }
+ }
- // Run pre-commit streaming offset validators (if configured).
- // Placement before writeClient.commit() is intentional: offset
validation is a stronger
- // guard than commitOnErrors — if offset deviation indicates
potential data loss, the commit
- // must be prevented regardless of the commitOnErrors policy.
+ // Step 2: Run user-configured pre-commit validators (offset, custom,
and the opt-in
+ // SparkWriteErrorValidator). Validators are intentionally stronger
than commitOnErrors
+ // — a failure here aborts the data-table commit regardless of the
gate in Step 4.
+ if (validatorsConfigured) {
SparkStreamerValidatorUtils.runValidators(props, instantTime,
writeStatuses,
checkpointCommitMetadata, metaClient);
}
- success = writeClient.commit(instantTime, writeStatusRDD,
Option.of(checkpointCommitMetadata), commitActionType,
partitionToReplacedFileIds, Option.empty(),
- Option.of(writeStatusValidator));
+ // Step 3: Count records. Drives the runMetaSync() decision below the
try/finally.
+ SuccessfulRecordCounter.Counts counts = validatorsConfigured
+ ? SuccessfulRecordCounter.compute(writeStatuses,
errorTableWriteStatusRDDOpt,
+ isErrorTableWriteUnificationEnabled)
+ : SuccessfulRecordCounter.computeFromRdd(writeStatusRDD,
errorTableWriteStatusRDDOpt,
+ isErrorTableWriteUnificationEnabled);
+ totalSuccessfulRecords.set(counts.getTotalSuccessfulRecords());
+ LOG.info("instantTime={}, totalRecords={}, totalErrorRecords={},
totalSuccessfulRecords={}",
+ instantTime, counts.getTotalRecords(),
counts.getTotalErroredRecords(),
+ counts.getTotalSuccessfulRecords());
+ if (counts.getTotalRecords() == 0) {
+ LOG.info("No new data, perform empty commit.");
+ }
+
+ // Step 4: Apply the legacy HSWSV write-error gate.
+ // commitOnErrors=false (default): any error -> log top N + fail.
+ // commitOnErrors=true: log a warning, proceed to commit.
+ // This gate is redundant with SparkWriteErrorValidator when that
validator is configured
+ // with failure.policy=FAIL — both will reject the same commits. The
redundancy is
+ // intentional: the gate preserves HSWSV's default behavior for users
who do not configure
+ // any validators, while the validator gives users running multiple
validators a unified
+ // failure-policy story.
+ if (counts.hasErrors()) {
+ if (cfg.commitOnErrors) {
+ LOG.warn("Some records failed to be merged but forcing commit
since commitOnErrors set. Errors/Total={}/{}",
+ counts.getTotalErroredRecords(), counts.getTotalRecords());
+ } else {
+ LOG.error("Delta Sync found errors when writing.
Errors/Total={}/{}",
+ counts.getTotalErroredRecords(), counts.getTotalRecords());
+ // Use the already-collected list when available to avoid an extra
Spark action.
+ if (writeStatuses != null) {
+ WriteErrorReporter.logTopErrors(writeStatuses);
+ } else {
+ WriteErrorReporter.logTopErrors(writeStatusRDD);
+ }
+ throw new HoodieStreamerWriteException("Commit " + instantTime + "
has write errors and commitOnErrors=false");
+ }
+ }
+
+ // Step 5: Commit. No WriteStatusValidator callback — all checks are
above.
+ success = writeClient.commit(instantTime, writeStatusRDD,
Option.of(checkpointCommitMetadata),
+ commitActionType, partitionToReplacedFileIds, Option.empty(),
Option.empty());
Review Comment:
we could remove the last parameter.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]