This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b934633f066a feat(utilities): add Spark/HoodieStreamer validators for 
pre-commit validation - Phase 3 (#18405)
b934633f066a is described below

commit b934633f066a6efa57e04837b2536baa66b4f2f4
Author: Xinli Shang <[email protected]>
AuthorDate: Sat May 16 20:37:56 2026 -0700

    feat(utilities): add Spark/HoodieStreamer validators for pre-commit 
validation - Phase 3 (#18405)
    
    * feat: Add Spark streamer validators for phase 3 precommit validation
    
    Implements phase 3 of the precommit validation framework by adding:
    - SparkKafkaOffsetValidator: Validates Kafka offset consistency
    - SparkValidationContext: Provides Spark-specific validation context
    - SparkStreamerValidatorUtils: Utility functions for Spark streamer 
validation
    - Comprehensive test coverage for all validator components
    - Integration with StreamSync and HoodiePreCommitValidatorConfig
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
    
    * fix: address code review and fix checkstyle violations
    
    - Remove unused imports (java.io.IOException, HoodieCommitMetadata,
      HoodieTestTable, Option) that caused checkstyle build failures
    - Remove accidentally committed bootstrap_register_only_issue.md
    - Cache writeStatusRDD before collect() to prevent second DAG evaluation
      and potential driver OOM
    - Add comment explaining why validator runs before writeClient.commit():
      offset validation is a stronger guard than commitOnErrors and must
      prevent the commit when data loss is detected
    - Clarify buildCommitMetadata() produces a pre-commit preview object,
      not a fully-constructed commit record
    - Add Javadoc to SparkKafkaOffsetValidator and SparkStreamerValidatorUtils
      explaining incompatibility with SparkValidatorUtils (different interface
      and constructor signature) to prevent misconfiguration
    - Add two-commit integration tests (testSecondCommitMatchingOffsetsPasses,
      testSecondCommitDataLossDetected) using HoodieTestTable to exercise the
      real offset comparison path, not just the first-commit skip path
    
    * fix: skip non-SparkPreCommitValidator classes in SparkValidatorUtils
    
    SparkKafkaOffsetValidator (and similar streaming validators) extend
    BasePreCommitValidator with a (TypedProperties) constructor, not the
    (HoodieSparkTable, HoodieEngineContext, HoodieWriteConfig) constructor
    that SparkValidatorUtils expects. Listing such a validator in
    hoodie.precommit.validators previously caused a reflection error in the
    Spark table write path.
    
    Add a Class.isAssignableFrom check to filter out classes that don't
    implement SparkPreCommitValidator before attempting instantiation, with
    a clear warning pointing users to SparkStreamerValidatorUtils for
    streaming validators.
    
    * ci: trigger CI re-run for flaky trino test
    
    * fix: address reviewer comments on pre-commit streaming offset validator
    
    - Unpersist cached RDD in finally block to prevent executor memory leak
    - Let IOException propagate from loadPreviousCommitMetadata instead of 
silently swallowing it
    - Filter empty validator class names before Class.forName to handle 
trailing comma in config
    - Add write error count to validation message to distinguish write failures 
from silent data loss
    
    * fix: address reviewer follow-up comments on pre-commit streaming validator
    
    - Change runValidators to accept List<WriteStatus> instead of JavaRDD
      to fix RDD unpersist-before-commit bug; StreamSync now caches the RDD,
      collects to list for validators, passes RDD to commit, then unpersists
    
    - Remove generic catch(Exception) in loadPreviousCommitMetadata so
      non-IOException failures propagate instead of silently skipping validation
    
    - Implement getPreviousCommitInstant() in SparkValidationContext via
      timeline lookup instead of throwing UnsupportedOperationException
    
    - Add Objects::nonNull filter when building writeStats list
    
    - Add BasePreCommitValidator assignability guard in 
SparkStreamerValidatorUtils
      to warn and skip SparkPreCommitValidator classes (reverse-direction guard)
    
    - Eliminate double class loading in SparkValidatorUtils by combining
      filter+map into a single flatMap; remove unused ReflectionUtils import
    
    - Remove trivial constructor Javadoc from SparkKafkaOffsetValidator
    
    - Add HoodieTestUtils import in test; remove Spark context boilerplate
      now that runValidators accepts List<WriteStatus> directly
    
    * fix: guard cache() call when writeStatusRDD is already persisted
    
    Calling cache() on an RDD that already has a storage level assigned throws
    SparkUnsupportedOperationException. The write path may cache the RDD
    internally before returning it. Track whether we own the cache and only
    call cache()/unpersist() when the RDD was not already persisted.
    
    * Ensure writeStatusRDD is always unpersisted via try/finally
    
    * fix: use proper import for StorageLevel instead of fully-qualified class 
reference
    
    🤖 Generated with [Claude Code](https://claude.ai/code)
    
    Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
    
    * fix: address final reviewer feedback on pre-commit streaming validator
    
    - Move runValidators() inside the try/finally so writeStatusRDD.unpersist()
      always runs, including on validator exceptions (FAIL policy or
      HoodieIOException from loadPreviousCommitMetadata).
    - Use ReflectionUtils.loadClass in SparkValidatorUtils for instantiation,
      matching SparkStreamerValidatorUtils and the rest of the codebase.
    - Rename weOwnCache to shouldUnpersist to read in the direction of its
      actual use (gating unpersist in finally).
    
    * fix: only cache writeStatusRDD when pre-commit validators are configured
    
    Address danny0405's review comment on PR #18405: skip the 
.cache()/.unpersist()
    cycle when no pre-commit validators are configured, since without 
validators the
    RDD is consumed exactly once by writeClient.commit() and caching adds no 
value.
    
    Guards both the cache call and the validator collect+run on a single
    validatorsConfigured boolean derived from hoodie.precommit.validators.
    
    * address codope review: V2-then-V1 checkpoint key resolution + V2 test 
coverage
    
    Comment 1 (SparkKafkaOffsetValidator hardcoded V1 key):
    - StreamingOffsetValidator base class now exposes a no-key constructor that
      auto-resolves the checkpoint via CheckpointUtils.getCheckpoint(metadata),
      which prefers V2 and falls back to V1. The explicit-key constructor stays
      for subclasses that read a custom non-streamer key (e.g. Flink's
      HOODIE_METADATA_KEY).
    - SparkKafkaOffsetValidator switches to the no-key constructor.
    
    Comment 2 (tests only cover V1 path):
    - testSecondCommitMatchingOffsetsPasses and testSecondCommitDataLossDetected
      are now parameterized over both V1 and V2 checkpoint keys.
    - Added testV2CheckpointKeyOnTableVersionEightFires on a 
HoodieTableVersion.EIGHT
      table with V2 keys, asserting the validator fires on data loss.
    
    ---------
    
    Co-authored-by: Xinli Shang <[email protected]>
    Co-authored-by: Claude Opus 4.6 <[email protected]>
---
 .../client/validator/StreamingOffsetValidator.java |  93 +++++-
 .../config/HoodiePreCommitValidatorConfig.java     |   8 +-
 .../validator/TestStreamingOffsetValidator.java    |   2 +-
 .../hudi/client/utils/SparkValidatorUtils.java     |  25 +-
 .../hudi/client/validator/ValidationContext.java   |  14 +
 .../apache/hudi/utilities/streamer/StreamSync.java |  37 ++-
 .../validator/SparkKafkaOffsetValidator.java       |  60 ++++
 .../validator/SparkStreamerValidatorUtils.java     | 194 +++++++++++++
 .../streamer/validator/SparkValidationContext.java | 139 +++++++++
 .../validator/TestSparkKafkaOffsetValidator.java   | 322 +++++++++++++++++++++
 .../validator/TestSparkStreamerValidatorUtils.java | 290 +++++++++++++++++++
 .../validator/TestSparkValidationContext.java      | 156 ++++++++++
 12 files changed, 1317 insertions(+), 23 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/validator/StreamingOffsetValidator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/validator/StreamingOffsetValidator.java
index ce577d84ca01..40e7a4f1635f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/validator/StreamingOffsetValidator.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/validator/StreamingOffsetValidator.java
@@ -20,11 +20,13 @@
 package org.apache.hudi.client.validator;
 
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.util.CheckpointUtils;
 import org.apache.hudi.common.util.CheckpointUtils.CheckpointFormat;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
 import 
org.apache.hudi.config.HoodiePreCommitValidatorConfig.ValidationFailurePolicy;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieValidationException;
 
 import lombok.extern.slf4j.Slf4j;
@@ -50,7 +52,11 @@ import lombok.extern.slf4j.Slf4j;
  *
  * Subclasses specify:
  * - Checkpoint format (SPARK_KAFKA, FLINK_KAFKA, etc.)
- * - Checkpoint metadata key
+ * - Checkpoint metadata key (optional — when omitted, the validator 
auto-resolves the
+ *   active streamer key from commit metadata using
+ *   {@link 
org.apache.hudi.common.table.checkpoint.CheckpointUtils#getCheckpoint(HoodieCommitMetadata)},
+ *   which prefers V2 and falls back to V1. Subclasses that read a custom 
non-streamer key
+ *   (e.g. Flink's HOODIE_METADATA_KEY) must pass it explicitly.)
  * - Source-specific parsing logic (if needed)
  *
  * Configuration:
@@ -66,7 +72,26 @@ public abstract class StreamingOffsetValidator extends 
BasePreCommitValidator {
   protected final CheckpointFormat checkpointFormat;
 
   /**
-   * Create a streaming offset validator.
+   * Create a streaming offset validator that auto-resolves the checkpoint key 
from commit
+   * metadata using {@link 
org.apache.hudi.common.table.checkpoint.CheckpointUtils#getCheckpoint(HoodieCommitMetadata)}.
+   *
+   * <p>Use this constructor for streamer pipelines (V1 or V2 checkpoint 
keys). The validator
+   * will prefer V2 (table version 8+) and fall back to V1 transparently, so 
subclasses don't
+   * need to know which key the writer used.</p>
+   *
+   * @param config Validator configuration
+   * @param checkpointFormat Format of the checkpoint string
+   */
+  protected StreamingOffsetValidator(TypedProperties config,
+                                      CheckpointFormat checkpointFormat) {
+    this(config, null, checkpointFormat);
+  }
+
+  /**
+   * Create a streaming offset validator with an explicit checkpoint metadata 
key.
+   *
+   * <p>Use this constructor when the writer stores its checkpoint under a 
custom key that
+   * is not the standard streamer V1/V2 key (e.g. Flink's 
HOODIE_METADATA_KEY).</p>
    *
    * @param config Validator configuration
    * @param checkpointKey Key to extract checkpoint from extraMetadata
@@ -95,10 +120,12 @@ public abstract class StreamingOffsetValidator extends 
BasePreCommitValidator {
       return;
     }
 
-    // Extract current checkpoint
-    Option<String> currentCheckpointOpt = 
context.getExtraMetadata(checkpointKey);
+    // Extract current checkpoint — either from the explicit key (custom 
writers like Flink) or
+    // by auto-resolving from commit metadata (streamer pipelines, V2-then-V1 
fallback).
+    Option<String> currentCheckpointOpt = 
resolveCheckpoint(context.getCommitMetadata());
     if (!currentCheckpointOpt.isPresent()) {
-      log.warn("Current checkpoint not found with key: {}. Skipping 
validation.", checkpointKey);
+      log.warn("Current checkpoint not found (key: {}). Skipping validation.",
+          checkpointKey == null ? "<auto-resolved>" : checkpointKey);
       return;
     }
     String currentCheckpoint = currentCheckpointOpt.get();
@@ -110,8 +137,7 @@ public abstract class StreamingOffsetValidator extends 
BasePreCommitValidator {
     }
 
     // Extract previous checkpoint
-    Option<String> previousCheckpointOpt = context.getPreviousCommitMetadata()
-        .flatMap(metadata -> 
Option.ofNullable(metadata.getMetadata(checkpointKey)));
+    Option<String> previousCheckpointOpt = 
resolveCheckpoint(context.getPreviousCommitMetadata());
 
     if (!previousCheckpointOpt.isPresent()) {
       log.info("Previous checkpoint not found. May be first streaming commit. 
Skipping validation.");
@@ -139,6 +165,10 @@ public abstract class StreamingOffsetValidator extends 
BasePreCommitValidator {
     long recordsWritten = context.getTotalInsertRecordsWritten()
         + context.getTotalUpdateRecordsWritten();
 
+    // Track write errors so callers can distinguish write-failure deviation 
(write errors > 0)
+    // from silent data loss (write errors == 0) when the validator fires.
+    long writeErrors = context.getTotalWriteErrors();
+
     // For empty commits (e.g., no new data from source), both offsetDiff and 
recordsWritten
     // can be zero. This is a valid scenario — skip validation to avoid false 
positives.
     if (offsetDifference == 0 && recordsWritten == 0) {
@@ -147,7 +177,7 @@ public abstract class StreamingOffsetValidator extends 
BasePreCommitValidator {
     }
 
     // Validate offset vs record consistency
-    validateOffsetConsistency(offsetDifference, recordsWritten,
+    validateOffsetConsistency(offsetDifference, recordsWritten, writeErrors,
         currentCheckpoint, previousCheckpoint);
   }
 
@@ -155,12 +185,13 @@ public abstract class StreamingOffsetValidator extends 
BasePreCommitValidator {
    * Validate that offset difference matches record count within tolerance.
    *
    * @param offsetDiff Expected records based on offset difference
-   * @param recordsWritten Actual records written
+   * @param recordsWritten Actual records written (inserts + updates)
+   * @param writeErrors Records that failed to write (tracked in write status 
errors)
    * @param currentCheckpoint Current checkpoint string (for error messages)
    * @param previousCheckpoint Previous checkpoint string (for error messages)
    * @throws HoodieValidationException if validation fails and policy is FAIL
    */
-  protected void validateOffsetConsistency(long offsetDiff, long 
recordsWritten,
+  protected void validateOffsetConsistency(long offsetDiff, long 
recordsWritten, long writeErrors,
                                             String currentCheckpoint, String 
previousCheckpoint)
       throws HoodieValidationException {
 
@@ -169,10 +200,13 @@ public abstract class StreamingOffsetValidator extends 
BasePreCommitValidator {
     if (deviation > tolerancePercentage) {
       String errorMsg = String.format(
           "Streaming offset validation failed. "
-              + "Offset difference: %d, Records written: %d, Deviation: 
%.2f%%, Tolerance: %.2f%%. "
-              + "This may indicate data loss or filtering. "
+              + "Offset difference: %d, Records written: %d, Write errors: %d, 
Deviation: %.2f%%, Tolerance: %.2f%%. "
+              + "%s"
               + "Previous checkpoint: %s, Current checkpoint: %s",
-          offsetDiff, recordsWritten, deviation, tolerancePercentage,
+          offsetDiff, recordsWritten, writeErrors, deviation, 
tolerancePercentage,
+          writeErrors > 0
+              ? "Non-zero write errors suggest records failed to write rather 
than silent data loss. "
+              : "This may indicate data loss or filtering. ",
           previousCheckpoint, currentCheckpoint);
 
       if (failurePolicy == ValidationFailurePolicy.WARN_LOG) {
@@ -181,8 +215,8 @@ public abstract class StreamingOffsetValidator extends 
BasePreCommitValidator {
         throw new HoodieValidationException(errorMsg);
       }
     } else {
-      log.info("Offset validation passed. Offset diff: {}, Records: {}, 
Deviation: {}% (within {}%)",
-          offsetDiff, recordsWritten, String.format("%.2f", deviation), 
tolerancePercentage);
+      log.info("Offset validation passed. Offset diff: {}, Records: {}, Write 
errors: {}, Deviation: {}% (within {}%)",
+          offsetDiff, recordsWritten, writeErrors, String.format("%.2f", 
deviation), tolerancePercentage);
     }
   }
 
@@ -210,4 +244,33 @@ public abstract class StreamingOffsetValidator extends 
BasePreCommitValidator {
     long difference = Math.abs(offsetDiff - recordsWritten);
     return (100.0 * difference) / offsetDiff;
   }
+
+  /**
+   * Resolve the checkpoint string from commit metadata.
+   *
+   * <p>When the validator was constructed with an explicit {@code 
checkpointKey}, that key
+   * is read directly. Otherwise, {@link 
org.apache.hudi.common.table.checkpoint.CheckpointUtils#getCheckpoint(HoodieCommitMetadata)}
+   * is used to locate the active streamer checkpoint (V2 first, V1 fallback), 
so callers
+   * don't need to know which key the writer used.</p>
+   *
+   * @param commitMetadataOpt Optional commit metadata containing extraMetadata
+   * @return Optional checkpoint string (empty if metadata is absent or no 
checkpoint key matches)
+   */
+  private Option<String> resolveCheckpoint(Option<HoodieCommitMetadata> 
commitMetadataOpt) {
+    if (!commitMetadataOpt.isPresent()) {
+      return Option.empty();
+    }
+    HoodieCommitMetadata metadata = commitMetadataOpt.get();
+    if (checkpointKey != null) {
+      return Option.ofNullable(metadata.getMetadata(checkpointKey));
+    }
+    try {
+      return Option.ofNullable(
+          
org.apache.hudi.common.table.checkpoint.CheckpointUtils.getCheckpoint(metadata)
+              .getCheckpointKey());
+    } catch (HoodieException e) {
+      // No V1 or V2 streamer checkpoint key present in extraMetadata.
+      return Option.empty();
+    }
+  }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePreCommitValidatorConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePreCommitValidatorConfig.java
index f85cc44120d4..169494b7244a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePreCommitValidatorConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePreCommitValidatorConfig.java
@@ -43,7 +43,10 @@ public class HoodiePreCommitValidatorConfig extends 
HoodieConfig {
       .key("hoodie.precommit.validators")
       .defaultValue("")
       .markAdvanced()
-      .withDocumentation("Comma separated list of class names that can be 
invoked to validate commit");
+      .withDocumentation("Comma separated list of class names that can be 
invoked to validate commit. "
+          + "Available streaming offset validators: "
+          + "org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator (Flink 
Kafka), "
+          + 
"org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator 
(Spark/HoodieStreamer Kafka)");
   public static final String VALIDATOR_TABLE_VARIABLE = "<TABLE_NAME>";
 
   public static final ConfigProperty<String> EQUALITY_SQL_QUERIES = 
ConfigProperty
@@ -71,7 +74,8 @@ public class HoodiePreCommitValidatorConfig extends 
HoodieConfig {
       .markAdvanced()
       .withDocumentation("Tolerance percentage for streaming offset validation 
"
           + "(used by 
org.apache.hudi.client.validator.StreamingOffsetValidator "
-          + "and org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator). "
+          + "and org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator "
+          + "and 
org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator). "
           + "The validator compares the offset difference (expected records 
from source) "
           + "with actual records written. If the deviation exceeds this 
percentage, "
           + "the commit is rejected or warned depending on the validation 
failure policy. "
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/validator/TestStreamingOffsetValidator.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/validator/TestStreamingOffsetValidator.java
index 818bc2087a3c..bc402e5ee5a0 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/validator/TestStreamingOffsetValidator.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/validator/TestStreamingOffsetValidator.java
@@ -62,7 +62,7 @@ public class TestStreamingOffsetValidator {
     // Expose protected method for testing
     public void testValidateOffsetConsistency(long offsetDiff, long 
recordsWritten,
                                                 String current, String 
previous) {
-      validateOffsetConsistency(offsetDiff, recordsWritten, current, previous);
+      validateOffsetConsistency(offsetDiff, recordsWritten, 0L, current, 
previous);
     }
 
     // Expose validateWithMetadata for testing
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
index c7804631d3f6..40ee7d8cefff 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
@@ -84,9 +84,28 @@ public class SparkValidatorUtils {
       Dataset<Row> beforeState = getRecordsFromCommittedFiles(sqlContext, 
partitionsModified, table, afterState.schema());
 
       Stream<SparkPreCommitValidator> validators = 
Arrays.stream(config.getPreCommitValidators().split(","))
-          .map(validatorClass -> ((SparkPreCommitValidator) 
ReflectionUtils.loadClass(validatorClass,
-              new Class<?>[] {HoodieSparkTable.class, 
HoodieEngineContext.class, HoodieWriteConfig.class},
-              table, context, config)));
+          .map(String::trim)
+          .filter(s -> !s.isEmpty())
+          .flatMap(validatorClass -> {
+            try {
+              Class<?> clazz = Class.forName(validatorClass);
+              if (!SparkPreCommitValidator.class.isAssignableFrom(clazz)) {
+                LOG.warn("Skipping validator {} — it does not implement 
SparkPreCommitValidator. "
+                    + "If this is a streaming offset validator (e.g. 
SparkKafkaOffsetValidator), "
+                    + "it will be invoked by SparkStreamerValidatorUtils 
instead.", validatorClass);
+                return Stream.empty();
+              }
+              SparkPreCommitValidator validator = (SparkPreCommitValidator) 
ReflectionUtils.loadClass(
+                  validatorClass,
+                  new Class<?>[] {HoodieSparkTable.class, 
HoodieEngineContext.class, HoodieWriteConfig.class},
+                  table, context, config);
+              return Stream.of(validator);
+            } catch (ClassNotFoundException e) {
+              throw new HoodieValidationException("Cannot find validator 
class: " + validatorClass, e);
+            } catch (ReflectiveOperationException e) {
+              throw new HoodieValidationException("Failed to instantiate 
validator: " + validatorClass, e);
+            }
+          });
 
       boolean allSuccess = validators.map(v -> runValidatorAsync(v, 
writeMetadata, beforeState, afterState, 
instantTime)).map(CompletableFuture::join)
           .reduce(true, Boolean::logicalAnd);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/client/validator/ValidationContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/client/validator/ValidationContext.java
index 30fdbb3ba3b4..b85218e587e8 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/client/validator/ValidationContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/client/validator/ValidationContext.java
@@ -169,6 +169,20 @@ public interface ValidationContext {
         .orElse(0L);
   }
 
+  /**
+   * Calculate total write errors in the current commit.
+   * Records that failed to write are tracked in {@link 
org.apache.hudi.common.model.HoodieWriteStat#getTotalWriteErrors()}.
+   * A non-zero error count alongside a deviation in offset validation 
indicates write failures
+   * rather than silent data loss — useful context for distinguishing the two 
failure modes.
+   *
+   * @return Total count of records that failed to write
+   */
+  default long getTotalWriteErrors() {
+    return getWriteStats()
+        .map(stats -> 
stats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum())
+        .orElse(0L);
+  }
+
   /**
    * Check if this is the first commit (no previous commits exist).
    * Derived from {@link #getPreviousCommitInstant()}.
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 600891c85dff..2b1406c7deab 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -74,6 +74,7 @@ import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieErrorTableConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodiePayloadConfig;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.config.metrics.HoodieMetricsConfig;
 import org.apache.hudi.data.HoodieJavaRDD;
@@ -115,6 +116,7 @@ import 
org.apache.hudi.utilities.schema.SimpleSchemaProvider;
 import org.apache.hudi.utilities.sources.InputBatch;
 import org.apache.hudi.utilities.sources.Source;
 import org.apache.hudi.utilities.streamer.HoodieStreamer.Config;
+import 
org.apache.hudi.utilities.streamer.validator.SparkStreamerValidatorUtils;
 import org.apache.hudi.utilities.transform.Transformer;
 
 import com.codahale.metrics.Timer;
@@ -128,6 +130,7 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.storage.StorageLevel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -874,8 +877,38 @@ public class StreamSync implements Serializable, Closeable 
{
           totalSuccessfulRecords);
       String commitActionType = CommitUtils.getCommitActionType(cfg.operation, 
HoodieTableType.valueOf(cfg.tableType));
 
-      boolean success = writeClient.commit(instantTime, writeStatusRDD, 
Option.of(checkpointCommitMetadata), commitActionType, 
partitionToReplacedFileIds, Option.empty(),
-          Option.of(writeStatusValidator));
+      // Cache the RDD only when pre-commit validators are configured. 
Validators collect the RDD
+      // before commit, so without caching the same DAG would re-evaluate 
inside writeClient.commit().
+      // When no validators are configured, commit consumes the RDD once and 
caching adds no value.
+      // shouldUnpersist is true only when we created the cache here 
(validators present and storage
+      // level was NONE), so the finally block knows to release it.
+      boolean validatorsConfigured = 
!StringUtils.isNullOrEmpty(props.getString(
+          HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(),
+          
HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.defaultValue()));
+      boolean shouldUnpersist = validatorsConfigured && 
writeStatusRDD.getStorageLevel().equals(StorageLevel.NONE());
+      if (shouldUnpersist) {
+        writeStatusRDD.cache();
+      }
+      boolean success;
+      try {
+        if (validatorsConfigured) {
+          List<WriteStatus> writeStatuses = writeStatusRDD.collect();
+
+          // Run pre-commit streaming offset validators (if configured).
+          // Placement before writeClient.commit() is intentional: offset 
validation is a stronger
+          // guard than commitOnErrors — if offset deviation indicates 
potential data loss, the commit
+          // must be prevented regardless of the commitOnErrors policy.
+          SparkStreamerValidatorUtils.runValidators(props, instantTime, 
writeStatuses,
+              checkpointCommitMetadata, metaClient);
+        }
+
+        success = writeClient.commit(instantTime, writeStatusRDD, 
Option.of(checkpointCommitMetadata), commitActionType, 
partitionToReplacedFileIds, Option.empty(),
+            Option.of(writeStatusValidator));
+      } finally {
+        if (shouldUnpersist) {
+          writeStatusRDD.unpersist();
+        }
+      }
       releaseResourcesInvoked = true;
       if (success) {
         LOG.info("Commit " + instantTime + " successful!");
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkKafkaOffsetValidator.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkKafkaOffsetValidator.java
new file mode 100644
index 000000000000..589e09e96de8
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkKafkaOffsetValidator.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer.validator;
+
+import org.apache.hudi.client.validator.StreamingOffsetValidator;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.CheckpointUtils.CheckpointFormat;
+
+/**
+ * Spark/HoodieStreamer-specific Kafka offset validator.
+ *
+ * <p>Validates that the number of records written matches the Kafka offset 
difference
+ * between the current and previous HoodieStreamer checkpoints. The active 
checkpoint key
+ * (V1 {@code deltastreamer.checkpoint.key} or V2 {@code 
streamer.checkpoint.key.v2}) is
+ * resolved at validation time via {@link 
org.apache.hudi.common.table.checkpoint.CheckpointUtils#getCheckpoint},
+ * so this validator works against tables written with either checkpoint key 
version.</p>
+ *
+ * <p>Configuration:
+ * <ul>
+ *   <li>{@code hoodie.precommit.validators}: Include
+ *       {@code 
org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator}</li>
+ *   <li>{@code 
hoodie.precommit.validators.streaming.offset.tolerance.percentage}:
+ *       Acceptable deviation (default: 0.0 = strict)</li>
+ *   <li>{@code hoodie.precommit.validators.failure.policy}:
+ *       FAIL (default) or WARN_LOG</li>
+ * </ul></p>
+ *
+ * <p>This validator is primarily intended for append-only ingestion from 
Kafka via HoodieStreamer.
+ * For upsert workloads with deduplication, configure a higher tolerance or 
use WARN_LOG.</p>
+ *
+ * <p><b>Important:</b> This class extends {@link 
org.apache.hudi.client.validator.BasePreCommitValidator}
+ * and is invoked by {@link SparkStreamerValidatorUtils}, NOT by {@code 
SparkValidatorUtils}
+ * (which expects {@code SparkPreCommitValidator} with a different constructor 
signature).
+ * Listing this class in {@code hoodie.precommit.validators} while also using 
the standard
+ * Spark table write-path validators will cause an instantiation failure in 
{@code SparkValidatorUtils}.
+ * Use this validator exclusively with HoodieStreamer pipelines.</p>
+ */
+public class SparkKafkaOffsetValidator extends StreamingOffsetValidator {
+
+  public SparkKafkaOffsetValidator(TypedProperties config) {
+    super(config, CheckpointFormat.SPARK_KAFKA);
+  }
+}
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkStreamerValidatorUtils.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkStreamerValidatorUtils.java
new file mode 100644
index 000000000000..474215ec0f3c
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkStreamerValidatorUtils.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer.validator;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.validator.BasePreCommitValidator;
+import org.apache.hudi.client.validator.ValidationContext;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieValidationException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for running pre-commit validators in the HoodieStreamer commit flow.
+ *
+ * <p>Instantiates and executes validators configured via
+ * {@code hoodie.precommit.validators}. Each validator must extend
+ * {@link BasePreCommitValidator} and have a constructor that accepts
+ * {@link TypedProperties}.</p>
+ *
+ * <p>Called from {@code StreamSync.writeToSinkAndDoMetaSync()} before
+ * the commit is finalized.</p>
+ *
+ * <p><b>Note on validator compatibility:</b> This utility uses a different 
instantiation
+ * mechanism than {@code SparkValidatorUtils} (used by the Spark table write 
path).
+ * {@code SparkValidatorUtils} expects validators implementing {@code 
SparkPreCommitValidator}
+ * with a {@code (HoodieSparkTable, HoodieEngineContext, HoodieWriteConfig)} 
constructor.
+ * Validators registered here (e.g. {@link SparkKafkaOffsetValidator}) extend
+ * {@link BasePreCommitValidator} with a {@code (TypedProperties)} constructor 
and
+ * are NOT compatible with {@code SparkValidatorUtils}. Do not mix them under 
the same
+ * {@code hoodie.precommit.validators} config if both paths are active.</p>
+ */
+public class SparkStreamerValidatorUtils {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkStreamerValidatorUtils.class);
+
+  /**
+   * Run all configured pre-commit validators.
+   *
+   * <p>The caller is responsible for caching and unpersisting the source RDD 
if needed.
+   * This method accepts pre-collected write statuses to avoid a second DAG 
evaluation —
+   * the caller should cache the RDD, collect to this list, call this method, 
then pass
+   * the same RDD to {@code writeClient.commit()}, and unpersist after commit 
completes.</p>
+   *
+   * @param props Configuration properties containing validator class names
+   * @param instantTime Commit instant time
+   * @param writeStatuses Pre-collected write statuses from Spark write 
operations
+   * @param checkpointCommitMetadata Extra metadata being committed (contains 
checkpoint info)
+   * @param metaClient Table meta client for timeline access and previous 
commit lookup
+   * @throws HoodieValidationException if any validator fails with FAIL policy
+   */
+  public static void runValidators(TypedProperties props,
+                                   String instantTime,
+                                   List<WriteStatus> writeStatuses,
+                                   Map<String, String> 
checkpointCommitMetadata,
+                                   HoodieTableMetaClient metaClient) {
+    String validatorClassNames = props.getString(
+        HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(),
+        HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.defaultValue());
+
+    if (StringUtils.isNullOrEmpty(validatorClassNames)) {
+      return;
+    }
+
+    HoodieCommitMetadata currentMetadata = buildCommitMetadata(writeStatuses, 
checkpointCommitMetadata);
+    List<HoodieWriteStat> writeStats = writeStatuses.stream()
+        .map(WriteStatus::getStat)
+        .filter(Objects::nonNull)
+        .collect(Collectors.toList());
+
+    Option<HoodieCommitMetadata> previousCommitMetadata = 
loadPreviousCommitMetadata(metaClient);
+
+    ValidationContext context = new SparkValidationContext(
+        instantTime,
+        Option.of(currentMetadata),
+        Option.of(writeStats),
+        previousCommitMetadata,
+        metaClient);
+
+    List<String> classNames = Arrays.stream(validatorClassNames.split(","))
+        .map(String::trim)
+        .filter(s -> !s.isEmpty())
+        .collect(Collectors.toList());
+
+    for (String className : classNames) {
+      try {
+        Class<?> clazz = Class.forName(className);
+        if (!BasePreCommitValidator.class.isAssignableFrom(clazz)) {
+          LOG.warn("Skipping validator {} in HoodieStreamer path — it does not 
extend BasePreCommitValidator. "
+              + "If this is a SparkPreCommitValidator (e.g. 
SqlQueryEqualityPreCommitValidator), "
+              + "it must be invoked via SparkValidatorUtils in the standard 
Spark write path instead.", className);
+          continue;
+        }
+        BasePreCommitValidator validator = (BasePreCommitValidator)
+            ReflectionUtils.loadClass(className, new Class<?>[] 
{TypedProperties.class}, props);
+        LOG.info("Running pre-commit validator: {} for instant: {}", 
className, instantTime);
+        validator.validateWithMetadata(context);
+        LOG.info("Pre-commit validator {} passed for instant: {}", className, 
instantTime);
+      } catch (HoodieValidationException e) {
+        LOG.error("Pre-commit validator {} failed for instant: {}", className, 
instantTime, e);
+        throw e;
+      } catch (Exception e) {
+        LOG.error("Failed to instantiate or run validator: {}", className, e);
+        throw new HoodieValidationException(
+            "Failed to run pre-commit validator: " + className, e);
+      }
+    }
+  }
+
+  /**
+   * Build a pre-commit snapshot of {@link HoodieCommitMetadata} from write 
statuses and extra metadata.
+   *
+   * <p>This is intentionally a partial/preview object used only for 
validation — it contains
+   * write stats and checkpoint extra-metadata, but omits fields that are not 
available before the
+   * commit (e.g. schema, operation type). Validators should treat this as a 
read-only snapshot
+   * of what will be committed, not a fully-constructed commit record.</p>
+   */
+  private static HoodieCommitMetadata buildCommitMetadata(
+      List<WriteStatus> writeStatuses, Map<String, String> extraMetadata) {
+    HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+
+    // Add write stats
+    for (WriteStatus status : writeStatuses) {
+      HoodieWriteStat stat = status.getStat();
+      if (stat != null) {
+        metadata.addWriteStat(stat.getPartitionPath(), stat);
+      }
+    }
+
+    // Add extra metadata (includes checkpoint info like 
deltastreamer.checkpoint.key)
+    if (extraMetadata != null) {
+      extraMetadata.forEach(metadata::addMetadata);
+    }
+
+    return metadata;
+  }
+
+  /**
+   * Load the previous completed commit metadata from the timeline.
+   */
+  private static Option<HoodieCommitMetadata> 
loadPreviousCommitMetadata(HoodieTableMetaClient metaClient) {
+    try {
+      HoodieTimeline completedTimeline = metaClient.reloadActiveTimeline()
+          .getWriteTimeline()
+          .filterCompletedInstants();
+      Option<HoodieInstant> lastInstant = completedTimeline.lastInstant();
+      if (lastInstant.isPresent()) {
+        return 
Option.of(completedTimeline.readCommitMetadata(lastInstant.get()));
+      }
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to load previous commit metadata", 
e);
+    }
+    return Option.empty();
+  }
+
+  private SparkStreamerValidatorUtils() {
+    // Utility class
+  }
+}
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkValidationContext.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkValidationContext.java
new file mode 100644
index 000000000000..1d46e13aeedb
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkValidationContext.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer.validator;
+
+import org.apache.hudi.client.validator.ValidationContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+
+import java.util.List;
+
+/**
+ * Spark/HoodieStreamer implementation of {@link ValidationContext}.
+ *
+ * <p>Constructed from data available in {@code 
StreamSync.writeToSinkAndDoMetaSync()}
+ * before the commit is finalized. Provides validators with access to commit 
metadata,
+ * write statistics, and previous commit information for streaming offset 
validation.</p>
+ *
+ * <p>Unlike Flink's implementation, Spark can optionally provide active 
timeline access
+ * via {@link HoodieTableMetaClient} for richer validation patterns.</p>
+ */
+public class SparkValidationContext implements ValidationContext {
+
+  private final String instantTime;
+  private final Option<HoodieCommitMetadata> commitMetadata;
+  private final Option<List<HoodieWriteStat>> writeStats;
+  private final Option<HoodieCommitMetadata> previousCommitMetadata;
+  private final HoodieTableMetaClient metaClient;
+
+  /**
+   * Create a Spark validation context with full timeline access.
+   *
+   * @param instantTime Current commit instant time
+   * @param commitMetadata Current commit metadata (with extraMetadata 
including checkpoints)
+   * @param writeStats Write statistics from write operations
+   * @param previousCommitMetadata Metadata from the previous completed commit
+   * @param metaClient Table meta client for timeline access (may be null for 
testing)
+   */
+  public SparkValidationContext(String instantTime,
+                                Option<HoodieCommitMetadata> commitMetadata,
+                                Option<List<HoodieWriteStat>> writeStats,
+                                Option<HoodieCommitMetadata> 
previousCommitMetadata,
+                                HoodieTableMetaClient metaClient) {
+    this.instantTime = instantTime;
+    this.commitMetadata = commitMetadata;
+    this.writeStats = writeStats;
+    this.previousCommitMetadata = previousCommitMetadata;
+    this.metaClient = metaClient;
+  }
+
+  /**
+   * Create a Spark validation context without timeline access (for testing).
+   *
+   * @param instantTime Current commit instant time
+   * @param commitMetadata Current commit metadata (with extraMetadata 
including checkpoints)
+   * @param writeStats Write statistics from write operations
+   * @param previousCommitMetadata Metadata from the previous completed commit
+   */
+  public SparkValidationContext(String instantTime,
+                                Option<HoodieCommitMetadata> commitMetadata,
+                                Option<List<HoodieWriteStat>> writeStats,
+                                Option<HoodieCommitMetadata> 
previousCommitMetadata) {
+    this(instantTime, commitMetadata, writeStats, previousCommitMetadata, 
null);
+  }
+
+  @Override
+  public String getInstantTime() {
+    return instantTime;
+  }
+
+  @Override
+  public Option<HoodieCommitMetadata> getCommitMetadata() {
+    return commitMetadata;
+  }
+
+  @Override
+  public Option<List<HoodieWriteStat>> getWriteStats() {
+    return writeStats;
+  }
+
+  /**
+   * Get the active timeline. Available when metaClient is provided.
+   *
+   * @throws UnsupportedOperationException if metaClient was not provided
+   */
+  @Override
+  public HoodieActiveTimeline getActiveTimeline() {
+    if (metaClient == null) {
+      throw new UnsupportedOperationException(
+          "Active timeline is not available without HoodieTableMetaClient.");
+    }
+    return metaClient.getActiveTimeline();
+  }
+
+  /**
+   * Get the previous completed commit instant by querying the timeline.
+   * Returns {@link Option#empty()} if this is the first commit or metaClient 
is unavailable.
+   */
+  @Override
+  public Option<HoodieInstant> getPreviousCommitInstant() {
+    if (metaClient == null) {
+      return Option.empty();
+    }
+    return metaClient.getActiveTimeline()
+        .getWriteTimeline()
+        .filterCompletedInstants()
+        .lastInstant();
+  }
+
+  @Override
+  public boolean isFirstCommit() {
+    return !previousCommitMetadata.isPresent();
+  }
+
+  @Override
+  public Option<HoodieCommitMetadata> getPreviousCommitMetadata() {
+    return previousCommitMetadata;
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkKafkaOffsetValidator.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkKafkaOffsetValidator.java
new file mode 100644
index 000000000000..d109aa3246f6
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkKafkaOffsetValidator.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer.validator;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import org.apache.hudi.exception.HoodieValidationException;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Tests for {@link SparkKafkaOffsetValidator}.
+ */
+public class TestSparkKafkaOffsetValidator {
+
+  // ========== Helper methods ==========
+
+  private static TypedProperties defaultConfig() {
+    TypedProperties props = new TypedProperties();
+    
props.setProperty(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(),
 "0.0");
+    
props.setProperty(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(),
 "FAIL");
+    return props;
+  }
+
+  private static TypedProperties configWithTolerance(double tolerance) {
+    TypedProperties props = defaultConfig();
+    
props.setProperty(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(),
+        String.valueOf(tolerance));
+    return props;
+  }
+
+  private static TypedProperties configWithWarnPolicy() {
+    TypedProperties props = defaultConfig();
+    
props.setProperty(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(),
 "WARN_LOG");
+    return props;
+  }
+
+  /**
+   * Build a Spark Kafka checkpoint string.
+   * Format: topic,partition:offset,partition:offset,...
+   */
+  private static String buildSparkKafkaCheckpoint(String topic, int[] 
partitions, long[] offsets) {
+    StringBuilder sb = new StringBuilder();
+    sb.append(topic);
+    for (int i = 0; i < partitions.length; i++) {
+      sb.append(",").append(partitions[i]).append(":").append(offsets[i]);
+    }
+    return sb.toString();
+  }
+
+  private static HoodieCommitMetadata buildMetadata(String checkpointValue) {
+    HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+    if (checkpointValue != null) {
+      metadata.addMetadata(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1, 
checkpointValue);
+    }
+    return metadata;
+  }
+
+  private static List<HoodieWriteStat> buildWriteStats(long numInserts, long 
numUpdates) {
+    HoodieWriteStat stat = new HoodieWriteStat();
+    stat.setNumInserts(numInserts);
+    stat.setNumUpdateWrites(numUpdates);
+    stat.setPartitionPath("partition1");
+    return Collections.singletonList(stat);
+  }
+
+  private static SparkValidationContext buildContext(
+      String instantTime,
+      HoodieCommitMetadata currentMetadata,
+      List<HoodieWriteStat> writeStats,
+      HoodieCommitMetadata previousMetadata) {
+    return new SparkValidationContext(
+        instantTime,
+        Option.of(currentMetadata),
+        Option.of(writeStats),
+        previousMetadata != null ? Option.of(previousMetadata) : 
Option.empty());
+  }
+
+  // ========== Tests ==========
+
+  @Test
+  public void testExactMatchPasses() {
+    // Previous: partition 0 at offset 100, partition 1 at offset 200
+    // Current: partition 0 at offset 200, partition 1 at offset 300
+    // Diff = (200-100) + (300-200) = 200. Records written = 200.
+    String prevCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0, 
1}, new long[]{100, 200});
+    String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0, 
1}, new long[]{200, 300});
+
+    SparkValidationContext ctx = buildContext("20260320120000000",
+        buildMetadata(currCheckpoint),
+        buildWriteStats(200, 0),
+        buildMetadata(prevCheckpoint));
+
+    SparkKafkaOffsetValidator validator = new 
SparkKafkaOffsetValidator(defaultConfig());
+    assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+  }
+
+  @Test
+  public void testDataLossDetected() {
+    // Diff = 1000 but only 500 records written -> 50% deviation
+    String prevCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, 
new long[]{0});
+    String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, 
new long[]{1000});
+
+    SparkValidationContext ctx = buildContext("20260320120000000",
+        buildMetadata(currCheckpoint),
+        buildWriteStats(500, 0),
+        buildMetadata(prevCheckpoint));
+
+    SparkKafkaOffsetValidator validator = new 
SparkKafkaOffsetValidator(defaultConfig());
+    assertThrows(HoodieValidationException.class, () -> 
validator.validateWithMetadata(ctx));
+  }
+
+  @Test
+  public void testWithinTolerancePasses() {
+    // Diff = 1000, records = 950 -> 5% deviation, tolerance = 10%
+    String prevCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, 
new long[]{0});
+    String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, 
new long[]{1000});
+
+    SparkValidationContext ctx = buildContext("20260320120000000",
+        buildMetadata(currCheckpoint),
+        buildWriteStats(950, 0),
+        buildMetadata(prevCheckpoint));
+
+    SparkKafkaOffsetValidator validator = new 
SparkKafkaOffsetValidator(configWithTolerance(10.0));
+    assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+  }
+
+  @Test
+  public void testWarnPolicyDoesNotThrow() {
+    // Data loss but WARN_LOG policy
+    String prevCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, 
new long[]{0});
+    String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, 
new long[]{1000});
+
+    SparkValidationContext ctx = buildContext("20260320120000000",
+        buildMetadata(currCheckpoint),
+        buildWriteStats(0, 0),
+        buildMetadata(prevCheckpoint));
+
+    SparkKafkaOffsetValidator validator = new 
SparkKafkaOffsetValidator(configWithWarnPolicy());
+    assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+  }
+
+  @Test
+  public void testSkipsFirstCommit() {
+    String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, 
new long[]{1000});
+
+    // No previous commit
+    SparkValidationContext ctx = new SparkValidationContext(
+        "20260320120000000",
+        Option.of(buildMetadata(currCheckpoint)),
+        Option.of(buildWriteStats(500, 0)),
+        Option.empty());
+
+    SparkKafkaOffsetValidator validator = new 
SparkKafkaOffsetValidator(defaultConfig());
+    assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+  }
+
+  @Test
+  public void testSkipsWhenNoCheckpointKey() {
+    // Current metadata has no checkpoint key
+    HoodieCommitMetadata currentMeta = new HoodieCommitMetadata();
+    String prevCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, 
new long[]{100});
+
+    SparkValidationContext ctx = buildContext("20260320120000000",
+        currentMeta,
+        buildWriteStats(500, 0),
+        buildMetadata(prevCheckpoint));
+
+    SparkKafkaOffsetValidator validator = new 
SparkKafkaOffsetValidator(defaultConfig());
+    assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+  }
+
+  @Test
+  public void testMultiPartitionValidation() {
+    // 4 partitions, each advancing by 250 = total diff 1000
+    String prevCheckpoint = buildSparkKafkaCheckpoint("events",
+        new int[]{0, 1, 2, 3}, new long[]{0, 0, 0, 0});
+    String currCheckpoint = buildSparkKafkaCheckpoint("events",
+        new int[]{0, 1, 2, 3}, new long[]{250, 250, 250, 250});
+
+    SparkValidationContext ctx = buildContext("20260320120000000",
+        buildMetadata(currCheckpoint),
+        buildWriteStats(800, 200),  // 800 inserts + 200 updates = 1000
+        buildMetadata(prevCheckpoint));
+
+    SparkKafkaOffsetValidator validator = new 
SparkKafkaOffsetValidator(defaultConfig());
+    assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+  }
+
+  @Test
+  public void testEmptyCommitSkipsValidation() {
+    // Both offsets same and no records written
+    String checkpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, new 
long[]{100});
+
+    SparkValidationContext ctx = buildContext("20260320120000000",
+        buildMetadata(checkpoint),
+        buildWriteStats(0, 0),
+        buildMetadata(checkpoint));
+
+    SparkKafkaOffsetValidator validator = new 
SparkKafkaOffsetValidator(defaultConfig());
+    assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+  }
+
+  @Test
+  public void testPreviousCheckpointMissingSkipsValidation() {
+    // Previous metadata exists but has no checkpoint key
+    HoodieCommitMetadata prevMeta = new HoodieCommitMetadata();
+
+    String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, 
new long[]{1000});
+
+    SparkValidationContext ctx = buildContext("20260320120000000",
+        buildMetadata(currCheckpoint),
+        buildWriteStats(500, 0),
+        prevMeta);
+
+    SparkKafkaOffsetValidator validator = new 
SparkKafkaOffsetValidator(defaultConfig());
+    assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+  }
+
+  @Test
+  public void testOvercountingDetected() {
+    // More records written than offset diff
+    // Diff = 100, records = 200 -> |100-200|/100 = 100% deviation
+    String prevCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, 
new long[]{0});
+    String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, 
new long[]{100});
+
+    SparkValidationContext ctx = buildContext("20260320120000000",
+        buildMetadata(currCheckpoint),
+        buildWriteStats(200, 0),
+        buildMetadata(prevCheckpoint));
+
+    SparkKafkaOffsetValidator validator = new 
SparkKafkaOffsetValidator(defaultConfig());
+    assertThrows(HoodieValidationException.class, () -> 
validator.validateWithMetadata(ctx));
+  }
+
+  @Test
+  public void testExactToleranceBoundaryPasses() {
+    // Diff = 1000, records = 900 -> 10% deviation, tolerance = 10%
+    String prevCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, 
new long[]{0});
+    String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, 
new long[]{1000});
+
+    SparkValidationContext ctx = buildContext("20260320120000000",
+        buildMetadata(currCheckpoint),
+        buildWriteStats(900, 0),
+        buildMetadata(prevCheckpoint));
+
+    SparkKafkaOffsetValidator validator = new 
SparkKafkaOffsetValidator(configWithTolerance(10.0));
+    assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+  }
+
+  @Test
+  public void testJustOverToleranceFails() {
+    // Diff = 1000, records = 899 -> 10.1% deviation, tolerance = 10%
+    String prevCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, 
new long[]{0});
+    String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, 
new long[]{1000});
+
+    SparkValidationContext ctx = buildContext("20260320120000000",
+        buildMetadata(currCheckpoint),
+        buildWriteStats(899, 0),
+        buildMetadata(prevCheckpoint));
+
+    SparkKafkaOffsetValidator validator = new 
SparkKafkaOffsetValidator(configWithTolerance(10.0));
+    assertThrows(HoodieValidationException.class, () -> 
validator.validateWithMetadata(ctx));
+  }
+
+  @Test
+  public void testOnlyInsertsNoUpdates() {
+    // Pure insert workload
+    String prevCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0, 
1}, new long[]{0, 0});
+    String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0, 
1}, new long[]{500, 500});
+
+    SparkValidationContext ctx = buildContext("20260320120000000",
+        buildMetadata(currCheckpoint),
+        buildWriteStats(1000, 0),
+        buildMetadata(prevCheckpoint));
+
+    SparkKafkaOffsetValidator validator = new 
SparkKafkaOffsetValidator(defaultConfig());
+    assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+  }
+
+  @Test
+  public void testUpdatesCountedInRecordTotal() {
+    // Diff = 1000. 600 inserts + 400 updates = 1000 total
+    String prevCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, 
new long[]{0});
+    String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, 
new long[]{1000});
+
+    SparkValidationContext ctx = buildContext("20260320120000000",
+        buildMetadata(currCheckpoint),
+        buildWriteStats(600, 400),
+        buildMetadata(prevCheckpoint));
+
+    SparkKafkaOffsetValidator validator = new 
SparkKafkaOffsetValidator(defaultConfig());
+    assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkStreamerValidatorUtils.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkStreamerValidatorUtils.java
new file mode 100644
index 000000000000..69d5d228dab6
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkStreamerValidatorUtils.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer.validator;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import org.apache.hudi.exception.HoodieValidationException;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link SparkStreamerValidatorUtils}.
+ *
+ * <p>Tests cover orchestration logic (class loading, config passing, error 
handling)
+ * as well as end-to-end offset validation using a two-commit timeline to 
verify
+ * the real comparison path is exercised.</p>
+ */
+public class TestSparkStreamerValidatorUtils {
+
+  @TempDir
+  Path tempDir;
+
+  private static TypedProperties propsWithValidator(String validatorClassName) 
{
+    TypedProperties props = new TypedProperties();
+    
props.setProperty(HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(), 
validatorClassName);
+    
props.setProperty(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(),
 "0.0");
+    
props.setProperty(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(),
 "FAIL");
+    return props;
+  }
+
+  private static WriteStatus buildWriteStatus(String partitionPath, long 
numInserts, long numUpdates) {
+    HoodieWriteStat stat = new HoodieWriteStat();
+    stat.setPartitionPath(partitionPath);
+    stat.setNumInserts(numInserts);
+    stat.setNumUpdateWrites(numUpdates);
+
+    WriteStatus ws = new WriteStatus(false, 0.0);
+    ws.setStat(stat);
+    return ws;
+  }
+
+  private HoodieTableMetaClient createMetaClient() throws IOException {
+    return HoodieTestUtils.init(tempDir.toAbsolutePath().toString());
+  }
+
+  private HoodieTableMetaClient createMetaClient(HoodieTableVersion version) 
throws IOException {
+    return HoodieTestUtils.init(tempDir.toAbsolutePath().toString(), 
HoodieTableType.COPY_ON_WRITE, version);
+  }
+
+  // ========== Tests ==========
+
+  @Test
+  public void testNoValidatorsConfigured() throws IOException {
+    TypedProperties props = new TypedProperties();
+    List<WriteStatus> writeStatuses = 
Collections.singletonList(buildWriteStatus("p1", 100, 0));
+
+    assertDoesNotThrow(() -> SparkStreamerValidatorUtils.runValidators(
+        props, "20260320120000000", writeStatuses,
+        new HashMap<>(), createMetaClient()));
+  }
+
+  @Test
+  public void testEmptyValidatorString() throws IOException {
+    TypedProperties props = new TypedProperties();
+    
props.setProperty(HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(), 
"");
+    List<WriteStatus> writeStatuses = 
Collections.singletonList(buildWriteStatus("p1", 100, 0));
+
+    assertDoesNotThrow(() -> SparkStreamerValidatorUtils.runValidators(
+        props, "20260320120000000", writeStatuses,
+        new HashMap<>(), createMetaClient()));
+  }
+
+  @Test
+  public void testValidValidatorFirstCommitPasses() throws IOException {
+    TypedProperties props = propsWithValidator(
+        
"org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator");
+
+    List<WriteStatus> writeStatuses = 
Collections.singletonList(buildWriteStatus("p1", 100, 0));
+    Map<String, String> extraMeta = new HashMap<>();
+    extraMeta.put(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1, 
"events,0:100");
+
+    // First commit (no previous metadata on timeline) — validator should skip 
and pass
+    assertDoesNotThrow(() -> SparkStreamerValidatorUtils.runValidators(
+        props, "20260320120000000", writeStatuses, extraMeta, 
createMetaClient()));
+  }
+
+  @Test
+  public void testInvalidValidatorClassThrows() throws IOException {
+    TypedProperties props = 
propsWithValidator("com.nonexistent.FakeValidator");
+    List<WriteStatus> writeStatuses = 
Collections.singletonList(buildWriteStatus("p1", 100, 0));
+
+    assertThrows(HoodieValidationException.class,
+        () -> SparkStreamerValidatorUtils.runValidators(
+            props, "20260320120000000", writeStatuses, new HashMap<>(), 
createMetaClient()));
+  }
+
+  @Test
+  public void testMultipleValidators() throws IOException {
+    TypedProperties props = propsWithValidator(
+        
"org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator,"
+            + 
"org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator");
+
+    List<WriteStatus> writeStatuses = 
Collections.singletonList(buildWriteStatus("p1", 100, 0));
+    Map<String, String> extraMeta = new HashMap<>();
+    extraMeta.put(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1, 
"events,0:100");
+
+    assertDoesNotThrow(() -> SparkStreamerValidatorUtils.runValidators(
+        props, "20260320120000000", writeStatuses, extraMeta, 
createMetaClient()));
+  }
+
+  @Test
+  public void testValidatorWithWhitespaceInClassNames() throws IOException {
+    TypedProperties props = propsWithValidator(
+        "  
org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator  , ");
+
+    List<WriteStatus> writeStatuses = 
Collections.singletonList(buildWriteStatus("p1", 100, 0));
+
+    assertDoesNotThrow(() -> SparkStreamerValidatorUtils.runValidators(
+        props, "20260320120000000", writeStatuses, new HashMap<>(), 
createMetaClient()));
+  }
+
+  @Test
+  public void testNullExtraMetadataHandled() throws IOException {
+    TypedProperties props = propsWithValidator(
+        
"org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator");
+
+    List<WriteStatus> writeStatuses = 
Collections.singletonList(buildWriteStatus("p1", 100, 0));
+
+    assertDoesNotThrow(() -> SparkStreamerValidatorUtils.runValidators(
+        props, "20260320120000000", writeStatuses, null, createMetaClient()));
+  }
+
+  @Test
+  public void testMultipleWriteStatusesAggregated() throws IOException {
+    TypedProperties props = propsWithValidator(
+        
"org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator");
+
+    List<WriteStatus> writeStatuses = new ArrayList<>();
+    writeStatuses.add(buildWriteStatus("p1", 60, 0));
+    writeStatuses.add(buildWriteStatus("p2", 40, 0));
+
+    Map<String, String> extraMeta = new HashMap<>();
+    extraMeta.put(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1, 
"events,0:100");
+
+    assertDoesNotThrow(() -> SparkStreamerValidatorUtils.runValidators(
+        props, "20260320120000000", writeStatuses, extraMeta, 
createMetaClient()));
+  }
+
+  @Test
+  public void testEmptyWriteStatuses() throws IOException {
+    TypedProperties props = propsWithValidator(
+        
"org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator");
+
+    List<WriteStatus> writeStatuses = Collections.emptyList();
+    Map<String, String> extraMeta = new HashMap<>();
+    extraMeta.put(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1, 
"events,0:100");
+
+    assertDoesNotThrow(() -> SparkStreamerValidatorUtils.runValidators(
+        props, "20260320120000000", writeStatuses, extraMeta, 
createMetaClient()));
+  }
+
+  @Test
+  public void testValidationExceptionPreservedAcrossValidators() throws 
IOException {
+    TypedProperties props = propsWithValidator(
+        
"org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator,"
+            + "com.nonexistent.FakeValidator");
+
+    List<WriteStatus> writeStatuses = 
Collections.singletonList(buildWriteStatus("p1", 100, 0));
+
+    HoodieValidationException ex = 
assertThrows(HoodieValidationException.class,
+        () -> SparkStreamerValidatorUtils.runValidators(
+            props, "20260320120000000", writeStatuses, new HashMap<>(), 
createMetaClient()));
+    assertTrue(ex.getMessage().contains("FakeValidator"));
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = {
+      StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1,
+      StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2
+  })
+  public void testSecondCommitMatchingOffsetsPasses(String checkpointKey) 
throws Exception {
+    TypedProperties props = propsWithValidator(
+        
"org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator");
+
+    // Create table with a previous committed instant: offset 0 -> 500
+    HoodieTableMetaClient metaClient = createMetaClient();
+    HoodieCommitMetadata prevMeta = new HoodieCommitMetadata();
+    prevMeta.addMetadata(checkpointKey, "events,0:500");
+    HoodieTestTable.of(metaClient).addCommit("20260320110000000", 
Option.of(prevMeta));
+
+    // Second commit: offset 500 -> 600, 100 records written — matches diff 
exactly
+    Map<String, String> extraMeta = new HashMap<>();
+    extraMeta.put(checkpointKey, "events,0:600");
+    List<WriteStatus> writeStatuses = 
Collections.singletonList(buildWriteStatus("p1", 100, 0));
+
+    assertDoesNotThrow(() -> SparkStreamerValidatorUtils.runValidators(
+        props, "20260320120000000", writeStatuses, extraMeta, metaClient));
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = {
+      StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1,
+      StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2
+  })
+  public void testSecondCommitDataLossDetected(String checkpointKey) throws 
Exception {
+    TypedProperties props = propsWithValidator(
+        
"org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator");
+
+    // Create table with a previous committed instant: offset 0 -> 1000
+    HoodieTableMetaClient metaClient = createMetaClient();
+    HoodieCommitMetadata prevMeta = new HoodieCommitMetadata();
+    prevMeta.addMetadata(checkpointKey, "events,0:1000");
+    HoodieTestTable.of(metaClient).addCommit("20260320110000000", 
Option.of(prevMeta));
+
+    // Second commit: offset 1000 -> 2000 (diff=1000) but only 500 records 
written — data loss
+    Map<String, String> extraMeta = new HashMap<>();
+    extraMeta.put(checkpointKey, "events,0:2000");
+    List<WriteStatus> writeStatuses = 
Collections.singletonList(buildWriteStatus("p1", 500, 0));
+
+    assertThrows(HoodieValidationException.class,
+        () -> SparkStreamerValidatorUtils.runValidators(
+            props, "20260320120000000", writeStatuses, extraMeta, metaClient));
+  }
+
+  @Test
+  public void testV2CheckpointKeyOnTableVersionEightFires() throws Exception {
+    // Verifies the validator actually fires on a writeTableVersion=8 table 
that uses the
+    // V2 checkpoint key — i.e. the auto-resolution in 
StreamingOffsetValidator picks up V2
+    // and runs the comparison instead of silently skipping.
+    TypedProperties props = propsWithValidator(
+        
"org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator");
+
+    HoodieTableMetaClient metaClient = 
createMetaClient(HoodieTableVersion.EIGHT);
+    HoodieCommitMetadata prevMeta = new HoodieCommitMetadata();
+    prevMeta.addMetadata(StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2, 
"events,0:1000");
+    HoodieTestTable.of(metaClient).addCommit("20260320110000000", 
Option.of(prevMeta));
+
+    // Offset diff = 1000 but only 200 records written — must fail
+    Map<String, String> extraMeta = new HashMap<>();
+    extraMeta.put(StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2, 
"events,0:2000");
+    List<WriteStatus> writeStatuses = 
Collections.singletonList(buildWriteStatus("p1", 200, 0));
+
+    assertThrows(HoodieValidationException.class,
+        () -> SparkStreamerValidatorUtils.runValidators(
+            props, "20260320120000000", writeStatuses, extraMeta, metaClient));
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkValidationContext.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkValidationContext.java
new file mode 100644
index 000000000000..7f94262e98c4
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkValidationContext.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer.validator;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
+import org.apache.hudi.common.util.Option;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link SparkValidationContext}.
+ */
+public class TestSparkValidationContext {
+
+  private static HoodieWriteStat buildStat(long inserts, long updates) {
+    HoodieWriteStat stat = new HoodieWriteStat();
+    stat.setNumInserts(inserts);
+    stat.setNumUpdateWrites(updates);
+    stat.setPartitionPath("partition1");
+    return stat;
+  }
+
+  @Test
+  public void testBasicProperties() {
+    HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+    metadata.addMetadata("key1", "value1");
+    List<HoodieWriteStat> writeStats = 
Collections.singletonList(buildStat(100, 50));
+
+    SparkValidationContext ctx = new SparkValidationContext(
+        "20260320120000000",
+        Option.of(metadata),
+        Option.of(writeStats),
+        Option.empty());
+
+    assertEquals("20260320120000000", ctx.getInstantTime());
+    assertTrue(ctx.getCommitMetadata().isPresent());
+    assertTrue(ctx.getWriteStats().isPresent());
+    assertEquals(1, ctx.getWriteStats().get().size());
+  }
+
+  @Test
+  public void testRecordCounting() {
+    List<HoodieWriteStat> writeStats = Arrays.asList(
+        buildStat(100, 50),   // partition1: 100 inserts, 50 updates
+        buildStat(200, 30));  // partition2: 200 inserts, 30 updates
+
+    SparkValidationContext ctx = new SparkValidationContext(
+        "20260320120000000",
+        Option.of(new HoodieCommitMetadata()),
+        Option.of(writeStats),
+        Option.empty());
+
+    assertEquals(300, ctx.getTotalInsertRecordsWritten());
+    assertEquals(80, ctx.getTotalUpdateRecordsWritten());
+    assertEquals(380, ctx.getTotalRecordsWritten());
+  }
+
+  @Test
+  public void testFirstCommitDetection() {
+    // No previous commit metadata -> first commit
+    SparkValidationContext ctx = new SparkValidationContext(
+        "20260320120000000",
+        Option.of(new HoodieCommitMetadata()),
+        Option.of(Collections.emptyList()),
+        Option.empty());
+
+    assertTrue(ctx.isFirstCommit());
+  }
+
+  @Test
+  public void testNotFirstCommitWhenPreviousExists() {
+    HoodieCommitMetadata prevMeta = new HoodieCommitMetadata();
+
+    SparkValidationContext ctx = new SparkValidationContext(
+        "20260320120000000",
+        Option.of(new HoodieCommitMetadata()),
+        Option.of(Collections.emptyList()),
+        Option.of(prevMeta));
+
+    assertFalse(ctx.isFirstCommit());
+  }
+
+  @Test
+  public void testExtraMetadataAccess() {
+    HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+    metadata.addMetadata(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1, 
"events,0:1000");
+    metadata.addMetadata("custom.key", "custom_value");
+
+    SparkValidationContext ctx = new SparkValidationContext(
+        "20260320120000000",
+        Option.of(metadata),
+        Option.of(Collections.emptyList()),
+        Option.empty());
+
+    assertEquals("events,0:1000",
+        
ctx.getExtraMetadata(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1).get());
+    assertEquals("custom_value", ctx.getExtraMetadata("custom.key").get());
+    assertFalse(ctx.getExtraMetadata("nonexistent.key").isPresent());
+  }
+
+  @Test
+  public void testPreviousCommitMetadataAccess() {
+    HoodieCommitMetadata prevMeta = new HoodieCommitMetadata();
+    prevMeta.addMetadata(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1, 
"events,0:500");
+
+    SparkValidationContext ctx = new SparkValidationContext(
+        "20260320120000000",
+        Option.of(new HoodieCommitMetadata()),
+        Option.of(Collections.emptyList()),
+        Option.of(prevMeta));
+
+    assertTrue(ctx.getPreviousCommitMetadata().isPresent());
+    assertEquals("events,0:500",
+        
ctx.getPreviousCommitMetadata().get().getMetadata(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1));
+  }
+
+  @Test
+  public void testEmptyWriteStats() {
+    SparkValidationContext ctx = new SparkValidationContext(
+        "20260320120000000",
+        Option.of(new HoodieCommitMetadata()),
+        Option.empty(),
+        Option.empty());
+
+    assertEquals(0, ctx.getTotalRecordsWritten());
+    assertEquals(0, ctx.getTotalInsertRecordsWritten());
+    assertEquals(0, ctx.getTotalUpdateRecordsWritten());
+  }
+}

Reply via email to