shangxinli commented on code in PR #18405:
URL: https://github.com/apache/hudi/pull/18405#discussion_r3213866759


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -874,8 +876,30 @@ private Pair<Option<String>, JavaRDD<WriteStatus>> 
writeToSinkAndDoMetaSync(Hood
           totalSuccessfulRecords);
       String commitActionType = CommitUtils.getCommitActionType(cfg.operation, 
HoodieTableType.valueOf(cfg.tableType));
 
-      boolean success = writeClient.commit(instantTime, writeStatusRDD, 
Option.of(checkpointCommitMetadata), commitActionType, 
partitionToReplacedFileIds, Option.empty(),
-          Option.of(writeStatusValidator));
+      // Cache the RDD if not already persisted, so both validators (collect) 
and
+      // writeClient.commit() share the same materialized result without 
re-evaluation.
+      boolean weOwnCache = 
writeStatusRDD.getStorageLevel().equals(StorageLevel.NONE());
+      if (weOwnCache) {
+        writeStatusRDD.cache();
+      }
+      List<WriteStatus> writeStatuses = writeStatusRDD.collect();
+
+      // Run pre-commit streaming offset validators (if configured).
+      // Placement before writeClient.commit() is intentional: offset 
validation is a stronger
+      // guard than commitOnErrors — if offset deviation indicates potential 
data loss, the commit
+      // must be prevented regardless of the commitOnErrors policy.
+      SparkStreamerValidatorUtils.runValidators(props, instantTime, 
writeStatuses,
+          checkpointCommitMetadata, metaClient);
+
+      boolean success;

Review Comment:
   Fixed



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java:
##########
@@ -84,9 +83,27 @@ public static void runValidators(HoodieWriteConfig config,
       Dataset<Row> beforeState = getRecordsFromCommittedFiles(sqlContext, 
partitionsModified, table, afterState.schema());
 
       Stream<SparkPreCommitValidator> validators = 
Arrays.stream(config.getPreCommitValidators().split(","))
-          .map(validatorClass -> ((SparkPreCommitValidator) 
ReflectionUtils.loadClass(validatorClass,
-              new Class<?>[] {HoodieSparkTable.class, 
HoodieEngineContext.class, HoodieWriteConfig.class},
-              table, context, config)));
+          .map(String::trim)
+          .filter(s -> !s.isEmpty())
+          .flatMap(validatorClass -> {
+            try {
+              Class<?> clazz = Class.forName(validatorClass);
+              if (!SparkPreCommitValidator.class.isAssignableFrom(clazz)) {
+                LOG.warn("Skipping validator {} — it does not implement 
SparkPreCommitValidator. "
+                    + "If this is a streaming offset validator (e.g. 
SparkKafkaOffsetValidator), "
+                    + "it will be invoked by SparkStreamerValidatorUtils 
instead.", validatorClass);
+                return Stream.empty();
+              }

Review Comment:
   Fixed



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -874,8 +876,30 @@ private Pair<Option<String>, JavaRDD<WriteStatus>> 
writeToSinkAndDoMetaSync(Hood
           totalSuccessfulRecords);
       String commitActionType = CommitUtils.getCommitActionType(cfg.operation, 
HoodieTableType.valueOf(cfg.tableType));
 
-      boolean success = writeClient.commit(instantTime, writeStatusRDD, 
Option.of(checkpointCommitMetadata), commitActionType, 
partitionToReplacedFileIds, Option.empty(),
-          Option.of(writeStatusValidator));
+      // Cache the RDD if not already persisted, so both validators (collect) 
and
+      // writeClient.commit() share the same materialized result without 
re-evaluation.

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to