This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 967e456cc456 feat(common): add core pre-commit validation framework - 
Phase 1  (#18068)
967e456cc456 is described below

commit 967e456cc4569e85d20724f49631eac73561c422
Author: Xinli Shang <[email protected]>
AuthorDate: Thu Mar 19 20:06:54 2026 -0700

    feat(common): add core pre-commit validation framework - Phase 1  (#18068)
    
    Co-authored-by: Xinli Shang <[email protected]>
---
 .../client/validator/StreamingOffsetValidator.java | 213 ++++++++
 .../config/HoodiePreCommitValidatorConfig.java     |  33 ++
 .../validator/TestStreamingOffsetValidator.java    | 552 +++++++++++++++++++++
 .../client/validator/BasePreCommitValidator.java   |  80 +++
 .../hudi/client/validator/ValidationContext.java   | 183 +++++++
 .../apache/hudi/common/util/CheckpointUtils.java   | 257 ++++++++++
 .../hudi/common/util/TestCheckpointUtils.java      | 229 +++++++++
 7 files changed, 1547 insertions(+)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/validator/StreamingOffsetValidator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/validator/StreamingOffsetValidator.java
new file mode 100644
index 000000000000..71844f3c1fad
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/validator/StreamingOffsetValidator.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.validator;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.CheckpointUtils;
+import org.apache.hudi.common.util.CheckpointUtils.CheckpointFormat;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import 
org.apache.hudi.config.HoodiePreCommitValidatorConfig.ValidationFailurePolicy;
+import org.apache.hudi.exception.HoodieValidationException;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Abstract base class for streaming offset validators.
+ * Handles common offset validation logic across all streaming sources (Kafka, 
Pulsar, Kinesis).
+ *
+ * This validator compares source offset differences with actual record counts 
written to detect data loss.
+ *
+ * <p><b>Note:</b> This validator is primarily intended for append-only 
ingestion scenarios.
+ * For upsert workloads with deduplication or event-time ordering, the 
deviation between
+ * source offsets and records written can be legitimately large (e.g., 
duplicate keys are
+ * deduplicated, late-arriving events are dropped). In such cases, configure 
an appropriate
+ * tolerance percentage or use WARN_LOG failure policy.</p>
+ *
+ * Algorithm:
+ * 1. Extract current and previous checkpoints from commit metadata
+ * 2. Calculate offset difference using source-specific format
+ * 3. Get actual record count from write statistics
+ * 4. Calculate deviation percentage: |offsetDiff - recordCount| / offsetDiff 
* 100
+ * 5. If deviation &gt; tolerance: fail or warn based on failure policy
+ *
+ * Subclasses specify:
+ * - Checkpoint format (SPARK_KAFKA, FLINK_KAFKA, etc.)
+ * - Checkpoint metadata key
+ * - Source-specific parsing logic (if needed)
+ *
+ * Configuration:
+ * - hoodie.precommit.validators.streaming.offset.tolerance.percentage 
(default: 0.0)
+ * - hoodie.precommit.validators.failure.policy (default: FAIL)
+ */
+@Slf4j
+public abstract class StreamingOffsetValidator extends BasePreCommitValidator {
+
+  protected final String checkpointKey;
+  protected final double tolerancePercentage;
+  protected final ValidationFailurePolicy failurePolicy;
+  protected final CheckpointFormat checkpointFormat;
+
+  /**
+   * Create a streaming offset validator.
+   *
+   * @param config Validator configuration
+   * @param checkpointKey Key to extract checkpoint from extraMetadata
+   * @param checkpointFormat Format of the checkpoint string
+   */
+  protected StreamingOffsetValidator(TypedProperties config,
+                                      String checkpointKey,
+                                      CheckpointFormat checkpointFormat) {
+    super(config);
+    this.checkpointKey = checkpointKey;
+    this.checkpointFormat = checkpointFormat;
+    this.tolerancePercentage = Double.parseDouble(
+        
config.getString(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(),
+            
HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.defaultValue()));
+    String policyStr = config.getString(
+        HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(),
+        
HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.defaultValue());
+    this.failurePolicy = ValidationFailurePolicy.valueOf(policyStr);
+  }
+
+  @Override
+  protected void validateWithMetadata(ValidationContext context) throws 
HoodieValidationException {
+    // Skip validation for first commit (no previous checkpoint)
+    if (context.isFirstCommit()) {
+      log.info("Skipping offset validation for first commit");
+      return;
+    }
+
+    // Extract current checkpoint
+    Option<String> currentCheckpointOpt = 
context.getExtraMetadata(checkpointKey);
+    if (!currentCheckpointOpt.isPresent()) {
+      log.warn("Current checkpoint not found with key: {}. Skipping 
validation.", checkpointKey);
+      return;
+    }
+    String currentCheckpoint = currentCheckpointOpt.get();
+
+    // Validate current checkpoint format
+    if (!CheckpointUtils.isValidCheckpointFormat(checkpointFormat, 
currentCheckpoint)) {
+      log.warn("Current checkpoint has invalid format: {}. Skipping 
validation.", currentCheckpoint);
+      return;
+    }
+
+    // Extract previous checkpoint
+    Option<String> previousCheckpointOpt = context.getPreviousCommitMetadata()
+        .flatMap(metadata -> 
Option.ofNullable(metadata.getMetadata(checkpointKey)));
+
+    if (!previousCheckpointOpt.isPresent()) {
+      log.info("Previous checkpoint not found. May be first streaming commit. 
Skipping validation.");
+      return;
+    }
+    String previousCheckpoint = previousCheckpointOpt.get();
+
+    // Validate previous checkpoint format
+    if (!CheckpointUtils.isValidCheckpointFormat(checkpointFormat, 
previousCheckpoint)) {
+      log.warn("Previous checkpoint has invalid format: {}. Skipping 
validation.", previousCheckpoint);
+      return;
+    }
+
+    // Calculate offset difference using format-specific logic.
+    // Note: calculateOffsetDifference always returns >= 0 because negative 
per-partition
+    // diffs (offset resets) are handled internally by using the current 
offset.
+    long offsetDifference = CheckpointUtils.calculateOffsetDifference(
+        checkpointFormat, previousCheckpoint, currentCheckpoint);
+
+    // Get actual new record count from write stats.
+    // Use numInserts + numUpdateWrites instead of numWrites to avoid counting 
records
+    // re-written due to small file handling. With INSERT operation and small 
file handling,
+    // numWrites includes all records in the merged file (existing + new), 
which would
+    // inflate the count and mask real data loss.
+    long recordsWritten = context.getTotalInsertRecordsWritten()
+        + context.getTotalUpdateRecordsWritten();
+
+    // For empty commits (e.g., no new data from source), both offsetDiff and 
recordsWritten
+    // can be zero. This is a valid scenario — skip validation to avoid false 
positives.
+    if (offsetDifference == 0 && recordsWritten == 0) {
+      log.info("Empty commit detected (no offset change, no records written). 
Skipping offset validation.");
+      return;
+    }
+
+    // Validate offset vs record consistency
+    validateOffsetConsistency(offsetDifference, recordsWritten,
+        currentCheckpoint, previousCheckpoint);
+  }
+
+  /**
+   * Validate that offset difference matches record count within tolerance.
+   *
+   * @param offsetDiff Expected records based on offset difference
+   * @param recordsWritten Actual records written
+   * @param currentCheckpoint Current checkpoint string (for error messages)
+   * @param previousCheckpoint Previous checkpoint string (for error messages)
+   * @throws HoodieValidationException if validation fails and policy is FAIL
+   */
+  protected void validateOffsetConsistency(long offsetDiff, long 
recordsWritten,
+                                            String currentCheckpoint, String 
previousCheckpoint)
+      throws HoodieValidationException {
+
+    double deviation = calculateDeviation(offsetDiff, recordsWritten);
+
+    if (deviation > tolerancePercentage) {
+      String errorMsg = String.format(
+          "Streaming offset validation failed. "
+              + "Offset difference: %d, Records written: %d, Deviation: 
%.2f%%, Tolerance: %.2f%%. "
+              + "This may indicate data loss or filtering. "
+              + "Previous checkpoint: %s, Current checkpoint: %s",
+          offsetDiff, recordsWritten, deviation, tolerancePercentage,
+          previousCheckpoint, currentCheckpoint);
+
+      if (failurePolicy == ValidationFailurePolicy.WARN_LOG) {
+        log.warn(errorMsg + " (failure policy is WARN_LOG, commit will 
proceed)");
+      } else {
+        throw new HoodieValidationException(errorMsg);
+      }
+    } else {
+      log.info("Offset validation passed. Offset diff: {}, Records: {}, 
Deviation: {}% (within {}%)",
+          offsetDiff, recordsWritten, String.format("%.2f", deviation), 
tolerancePercentage);
+    }
+  }
+
+  /**
+   * Calculate percentage deviation between expected (offset diff) and actual 
(record count).
+   *
+   * Formula:
+   * - If both are zero: 0% (perfect match, no data)
+   * - If one is zero: 100% (complete mismatch)
+   * - Otherwise: |offsetDiff - recordsWritten| / offsetDiff * 100
+   *
+   * @param offsetDiff Expected records from offset difference
+   * @param recordsWritten Actual records written
+   * @return Deviation percentage (0.0 = exact match, 100.0 = complete 
mismatch)
+   */
+  private double calculateDeviation(long offsetDiff, long recordsWritten) {
+    // Handle edge cases
+    if (offsetDiff == 0 && recordsWritten == 0) {
+      return 0.0;  // Both zero - perfect match (no data processed)
+    }
+    if (offsetDiff == 0 || recordsWritten == 0) {
+      return 100.0;  // One is zero - complete mismatch
+    }
+
+    long difference = Math.abs(offsetDiff - recordsWritten);
+    return (100.0 * difference) / offsetDiff;
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePreCommitValidatorConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePreCommitValidatorConfig.java
index 384fc555c090..b1faa5153d22 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePreCommitValidatorConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePreCommitValidatorConfig.java
@@ -64,6 +64,39 @@ public class HoodiePreCommitValidatorConfig extends 
HoodieConfig {
           + "Expected result is included as part of query separated by '#'. 
Example query: 'query1#result1:query2#result2'"
           + "Note \\<TABLE_NAME\\> variable is expected to be present in 
query.");
 
+  public static final ConfigProperty<String> 
STREAMING_OFFSET_TOLERANCE_PERCENTAGE = ConfigProperty
+      .key("hoodie.precommit.validators.streaming.offset.tolerance.percentage")
+      .defaultValue("0.0")
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("Tolerance percentage for streaming offset validation 
"
+          + "(used by 
org.apache.hudi.client.validator.StreamingOffsetValidator). "
+          + "The validator compares the offset difference (expected records 
from source) "
+          + "with actual records written. If the deviation exceeds this 
percentage, "
+          + "the commit is rejected or warned depending on the validation 
failure policy. "
+          + "For upsert workloads with deduplication, set a higher tolerance. "
+          + "Default is 0.0 (strict mode, exact match required).");
+
+  /**
+   * Policy for handling pre-commit validation failures.
+   */
+  public enum ValidationFailurePolicy {
+    /** Validation failures block the commit with an exception. */
+    FAIL,
+    /** Validation failures emit a warning log but allow the commit to 
proceed. */
+    WARN_LOG
+  }
+
+  public static final ConfigProperty<String> VALIDATION_FAILURE_POLICY = 
ConfigProperty
+      .key("hoodie.precommit.validators.failure.policy")
+      .defaultValue(ValidationFailurePolicy.FAIL.name())
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("Policy for handling pre-commit validation failures. "
+          + "FAIL (default): validation failures block the commit with an 
exception. "
+          + "WARN_LOG: validation failures emit a warning log but allow the 
commit to proceed. "
+          + "Useful for monitoring data quality without impacting write 
availability.");
+
   /**
    * Spark SQL queries to run on table before committing new data to validate 
state before and after commit.
    * Multiple queries separated by ';' delimiter are supported.
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/validator/TestStreamingOffsetValidator.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/validator/TestStreamingOffsetValidator.java
new file mode 100644
index 000000000000..818bc2087a3c
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/validator/TestStreamingOffsetValidator.java
@@ -0,0 +1,552 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.validator;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.versioning.v2.InstantComparatorV2;
+import org.apache.hudi.common.util.CheckpointUtils.CheckpointFormat;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import 
org.apache.hudi.config.HoodiePreCommitValidatorConfig.ValidationFailurePolicy;
+import org.apache.hudi.exception.HoodieValidationException;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for StreamingOffsetValidator, ValidationContext, and 
BasePreCommitValidator.
+ */
+public class TestStreamingOffsetValidator {
+
+  private static final String CHECKPOINT_KEY = "test.checkpoint.key";
+
+  /**
+   * Mock implementation of StreamingOffsetValidator for testing.
+   */
+  private static class MockOffsetValidator extends StreamingOffsetValidator {
+    public MockOffsetValidator(TypedProperties config) {
+      super(config, CHECKPOINT_KEY, CheckpointFormat.SPARK_KAFKA);
+    }
+
+    // Expose protected method for testing
+    public void testValidateOffsetConsistency(long offsetDiff, long 
recordsWritten,
+                                                String current, String 
previous) {
+      validateOffsetConsistency(offsetDiff, recordsWritten, current, previous);
+    }
+
+    // Expose validateWithMetadata for testing
+    public void testValidateWithMetadata(ValidationContext context) {
+      validateWithMetadata(context);
+    }
+  }
+
+  /**
+   * Simple test implementation of ValidationContext for testing.
+   */
+  private static class TestValidationContext implements ValidationContext {
+    private final String instantTime;
+    private final Option<HoodieCommitMetadata> commitMetadata;
+    private final Option<List<HoodieWriteStat>> writeStats;
+    private final boolean firstCommit;
+    private final Option<HoodieCommitMetadata> previousCommitMetadata;
+
+    TestValidationContext(String instantTime,
+                          Option<HoodieCommitMetadata> commitMetadata,
+                          Option<List<HoodieWriteStat>> writeStats,
+                          boolean firstCommit,
+                          Option<HoodieCommitMetadata> previousCommitMetadata) 
{
+      this.instantTime = instantTime;
+      this.commitMetadata = commitMetadata;
+      this.writeStats = writeStats;
+      this.firstCommit = firstCommit;
+      this.previousCommitMetadata = previousCommitMetadata;
+    }
+
+    @Override
+    public String getInstantTime() {
+      return instantTime;
+    }
+
+    @Override
+    public Option<HoodieCommitMetadata> getCommitMetadata() {
+      return commitMetadata;
+    }
+
+    @Override
+    public Option<List<HoodieWriteStat>> getWriteStats() {
+      return writeStats;
+    }
+
+    @Override
+    public HoodieActiveTimeline getActiveTimeline() {
+      return null; // Not needed for these tests
+    }
+
+    @Override
+    public Option<HoodieInstant> getPreviousCommitInstant() {
+      if (firstCommit) {
+        return Option.empty();
+      }
+      return Option.of(new HoodieInstant(HoodieInstant.State.COMPLETED, 
"commit",
+          "20260309110000", 
InstantComparatorV2.COMPLETION_TIME_BASED_COMPARATOR));
+    }
+
+    @Override
+    public Option<HoodieCommitMetadata> getPreviousCommitMetadata() {
+      return previousCommitMetadata;
+    }
+  }
+
+  private MockOffsetValidator createValidator(double tolerance, 
ValidationFailurePolicy policy) {
+    TypedProperties config = new TypedProperties();
+    
config.setProperty(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(),
+        String.valueOf(tolerance));
+    
config.setProperty(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(),
+        policy.name());
+    return new MockOffsetValidator(config);
+  }
+
+  private HoodieWriteStat createWriteStat(long numInserts, long 
numUpdateWrites) {
+    HoodieWriteStat stat = new HoodieWriteStat();
+    stat.setNumInserts(numInserts);
+    stat.setNumUpdateWrites(numUpdateWrites);
+    stat.setNumWrites(numInserts + numUpdateWrites + 50); // numWrites 
includes small file overhead
+    return stat;
+  }
+
+  // ========== validateOffsetConsistency tests ==========
+
+  @Test
+  public void testExactMatchValidation() {
+    MockOffsetValidator validator = createValidator(0.0, 
ValidationFailurePolicy.FAIL);
+    assertDoesNotThrow(() ->
+        validator.testValidateOffsetConsistency(1000, 1000, "cur", "prev"));
+  }
+
+  @Test
+  public void testStrictModeFailure() {
+    MockOffsetValidator validator = createValidator(0.0, 
ValidationFailurePolicy.FAIL);
+    assertThrows(HoodieValidationException.class, () ->
+        validator.testValidateOffsetConsistency(1000, 900, "cur", "prev"));
+  }
+
+  @Test
+  public void testValidationWithTolerance() {
+    MockOffsetValidator validator = createValidator(10.0, 
ValidationFailurePolicy.FAIL);
+
+    // 5% deviation - within 10% tolerance
+    assertDoesNotThrow(() ->
+        validator.testValidateOffsetConsistency(1000, 950, "cur", "prev"));
+
+    // 10% deviation - exactly at tolerance
+    assertDoesNotThrow(() ->
+        validator.testValidateOffsetConsistency(1000, 900, "cur", "prev"));
+
+    // 15% deviation - exceeds tolerance
+    assertThrows(HoodieValidationException.class, () ->
+        validator.testValidateOffsetConsistency(1000, 850, "cur", "prev"));
+  }
+
+  @Test
+  public void testWarnLogPolicy() {
+    MockOffsetValidator validator = createValidator(0.0, 
ValidationFailurePolicy.WARN_LOG);
+
+    assertDoesNotThrow(() ->
+        validator.testValidateOffsetConsistency(1000, 500, "cur", "prev"));
+    assertDoesNotThrow(() ->
+        validator.testValidateOffsetConsistency(1000, 0, "cur", "prev"));
+  }
+
+  @Test
+  public void testEdgeCaseZeroBoth() {
+    MockOffsetValidator validator = createValidator(0.0, 
ValidationFailurePolicy.FAIL);
+    assertDoesNotThrow(() ->
+        validator.testValidateOffsetConsistency(0, 0, "cur", "prev"));
+  }
+
+  @Test
+  public void testEdgeCaseOneZero() {
+    MockOffsetValidator validator = createValidator(0.0, 
ValidationFailurePolicy.FAIL);
+
+    assertThrows(HoodieValidationException.class, () ->
+        validator.testValidateOffsetConsistency(0, 1000, "cur", "prev"));
+    assertThrows(HoodieValidationException.class, () ->
+        validator.testValidateOffsetConsistency(1000, 0, "cur", "prev"));
+  }
+
+  @Test
+  public void testDefaultConfiguration() {
+    TypedProperties config = new TypedProperties();
+    MockOffsetValidator validator = new MockOffsetValidator(config);
+
+    assertDoesNotThrow(() ->
+        validator.testValidateOffsetConsistency(1000, 1000, "cur", "prev"));
+    assertThrows(HoodieValidationException.class, () ->
+        validator.testValidateOffsetConsistency(1000, 999, "cur", "prev"));
+  }
+
+  // ========== validateWithMetadata end-to-end tests ==========
+
+  @Test
+  public void testValidateWithMetadataSkipsFirstCommit() {
+    MockOffsetValidator validator = createValidator(0.0, 
ValidationFailurePolicy.FAIL);
+
+    TestValidationContext context = new TestValidationContext(
+        "20260309120000",
+        Option.empty(),
+        Option.empty(),
+        true, // first commit
+        Option.empty());
+
+    assertTrue(context.isFirstCommit());
+    // Should not throw - skips validation for first commit
+    assertDoesNotThrow(() -> validator.testValidateWithMetadata(context));
+  }
+
+  @Test
+  public void testValidateWithMetadataSkipsMissingCurrentCheckpoint() {
+    MockOffsetValidator validator = createValidator(0.0, 
ValidationFailurePolicy.FAIL);
+
+    // Current commit has no checkpoint metadata
+    HoodieCommitMetadata currentMetadata = new HoodieCommitMetadata();
+    // No checkpoint key added
+
+    TestValidationContext context = new TestValidationContext(
+        "20260309120000",
+        Option.of(currentMetadata),
+        Option.empty(),
+        false,
+        Option.empty());
+
+    assertFalse(context.isFirstCommit());
+    assertDoesNotThrow(() -> validator.testValidateWithMetadata(context));
+  }
+
+  @Test
+  public void testValidateWithMetadataSkipsInvalidCurrentCheckpoint() {
+    MockOffsetValidator validator = createValidator(0.0, 
ValidationFailurePolicy.FAIL);
+
+    HoodieCommitMetadata currentMetadata = new HoodieCommitMetadata();
+    currentMetadata.addMetadata(CHECKPOINT_KEY, "invalid_format");
+
+    TestValidationContext context = new TestValidationContext(
+        "20260309120000",
+        Option.of(currentMetadata),
+        Option.empty(),
+        false,
+        Option.empty());
+
+    assertDoesNotThrow(() -> validator.testValidateWithMetadata(context));
+  }
+
+  @Test
+  public void testValidateWithMetadataSkipsMissingPreviousCheckpoint() {
+    MockOffsetValidator validator = createValidator(0.0, 
ValidationFailurePolicy.FAIL);
+
+    HoodieCommitMetadata currentMetadata = new HoodieCommitMetadata();
+    currentMetadata.addMetadata(CHECKPOINT_KEY, "topic,0:200,1:300");
+
+    // Previous metadata exists but has no checkpoint key
+    HoodieCommitMetadata prevMetadata = new HoodieCommitMetadata();
+
+    TestValidationContext context = new TestValidationContext(
+        "20260309120000",
+        Option.of(currentMetadata),
+        Option.empty(),
+        false,
+        Option.of(prevMetadata));
+
+    assertDoesNotThrow(() -> validator.testValidateWithMetadata(context));
+  }
+
+  @Test
+  public void testValidateWithMetadataSkipsNoPreviousMetadata() {
+    MockOffsetValidator validator = createValidator(0.0, 
ValidationFailurePolicy.FAIL);
+
+    HoodieCommitMetadata currentMetadata = new HoodieCommitMetadata();
+    currentMetadata.addMetadata(CHECKPOINT_KEY, "topic,0:200,1:300");
+
+    TestValidationContext context = new TestValidationContext(
+        "20260309120000",
+        Option.of(currentMetadata),
+        Option.empty(),
+        false,
+        Option.empty()); // no previous commit metadata at all
+
+    assertDoesNotThrow(() -> validator.testValidateWithMetadata(context));
+  }
+
+  @Test
+  public void testValidateWithMetadataSkipsInvalidPreviousCheckpoint() {
+    MockOffsetValidator validator = createValidator(0.0, 
ValidationFailurePolicy.FAIL);
+
+    HoodieCommitMetadata currentMetadata = new HoodieCommitMetadata();
+    currentMetadata.addMetadata(CHECKPOINT_KEY, "topic,0:200,1:300");
+
+    HoodieCommitMetadata prevMetadata = new HoodieCommitMetadata();
+    prevMetadata.addMetadata(CHECKPOINT_KEY, "bad_format");
+
+    TestValidationContext context = new TestValidationContext(
+        "20260309120000",
+        Option.of(currentMetadata),
+        Option.empty(),
+        false,
+        Option.of(prevMetadata));
+
+    assertDoesNotThrow(() -> validator.testValidateWithMetadata(context));
+  }
+
+  @Test
+  public void testValidateWithMetadataSkipsEmptyCommit() {
+    MockOffsetValidator validator = createValidator(0.0, 
ValidationFailurePolicy.FAIL);
+
+    // Same checkpoint = 0 offset diff, 0 records written
+    HoodieCommitMetadata currentMetadata = new HoodieCommitMetadata();
+    currentMetadata.addMetadata(CHECKPOINT_KEY, "topic,0:100,1:200");
+
+    HoodieCommitMetadata prevMetadata = new HoodieCommitMetadata();
+    prevMetadata.addMetadata(CHECKPOINT_KEY, "topic,0:100,1:200");
+
+    TestValidationContext context = new TestValidationContext(
+        "20260309120000",
+        Option.of(currentMetadata),
+        Option.of(Collections.emptyList()), // no write stats
+        false,
+        Option.of(prevMetadata));
+
+    assertDoesNotThrow(() -> validator.testValidateWithMetadata(context));
+  }
+
+  @Test
+  public void testValidateWithMetadataPassesValidCommit() {
+    MockOffsetValidator validator = createValidator(5.0, 
ValidationFailurePolicy.FAIL);
+
+    HoodieCommitMetadata currentMetadata = new HoodieCommitMetadata();
+    currentMetadata.addMetadata(CHECKPOINT_KEY, "topic,0:200,1:400");
+
+    HoodieCommitMetadata prevMetadata = new HoodieCommitMetadata();
+    prevMetadata.addMetadata(CHECKPOINT_KEY, "topic,0:100,1:200");
+
+    // Offset diff = (200-100) + (400-200) = 300
+    // Records: 150 inserts + 150 updates = 300
+    HoodieWriteStat stat1 = createWriteStat(150, 0);
+    HoodieWriteStat stat2 = createWriteStat(0, 150);
+    List<HoodieWriteStat> stats = Arrays.asList(stat1, stat2);
+
+    TestValidationContext context = new TestValidationContext(
+        "20260309120000",
+        Option.of(currentMetadata),
+        Option.of(stats),
+        false,
+        Option.of(prevMetadata));
+
+    assertDoesNotThrow(() -> validator.testValidateWithMetadata(context));
+  }
+
+  @Test
+  public void testValidateWithMetadataFailsOnMismatch() {
+    MockOffsetValidator validator = createValidator(0.0, 
ValidationFailurePolicy.FAIL);
+
+    HoodieCommitMetadata currentMetadata = new HoodieCommitMetadata();
+    currentMetadata.addMetadata(CHECKPOINT_KEY, "topic,0:200,1:400");
+
+    HoodieCommitMetadata prevMetadata = new HoodieCommitMetadata();
+    prevMetadata.addMetadata(CHECKPOINT_KEY, "topic,0:100,1:200");
+
+    // Offset diff = 300, but only 100 records written
+    HoodieWriteStat stat = createWriteStat(100, 0);
+    List<HoodieWriteStat> stats = Collections.singletonList(stat);
+
+    TestValidationContext context = new TestValidationContext(
+        "20260309120000",
+        Option.of(currentMetadata),
+        Option.of(stats),
+        false,
+        Option.of(prevMetadata));
+
+    assertThrows(HoodieValidationException.class,
+        () -> validator.testValidateWithMetadata(context));
+  }
+
+  @Test
+  public void testValidateWithMetadataWarnsOnMismatchWithWarnPolicy() {
+    MockOffsetValidator validator = createValidator(0.0, 
ValidationFailurePolicy.WARN_LOG);
+
+    HoodieCommitMetadata currentMetadata = new HoodieCommitMetadata();
+    currentMetadata.addMetadata(CHECKPOINT_KEY, "topic,0:200,1:400");
+
+    HoodieCommitMetadata prevMetadata = new HoodieCommitMetadata();
+    prevMetadata.addMetadata(CHECKPOINT_KEY, "topic,0:100,1:200");
+
+    // Large mismatch but WARN_LOG policy
+    HoodieWriteStat stat = createWriteStat(50, 0);
+
+    TestValidationContext context = new TestValidationContext(
+        "20260309120000",
+        Option.of(currentMetadata),
+        Option.of(Collections.singletonList(stat)),
+        false,
+        Option.of(prevMetadata));
+
+    assertDoesNotThrow(() -> validator.testValidateWithMetadata(context));
+  }
+
+  // ========== ValidationContext default method tests ==========
+
+  @Test
+  public void testValidationContextGetExtraMetadata() {
+    HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+    metadata.addMetadata("key1", "value1");
+    metadata.addMetadata("key2", "value2");
+
+    TestValidationContext context = new TestValidationContext(
+        "20260309120000", Option.of(metadata), Option.empty(),
+        true, Option.empty());
+
+    assertEquals("value1", context.getExtraMetadata("key1").get());
+    assertEquals("value2", context.getExtraMetadata("key2").get());
+    assertFalse(context.getExtraMetadata("missing").isPresent());
+
+    assertEquals(2, context.getExtraMetadata().size());
+  }
+
+  @Test
+  public void testValidationContextGetExtraMetadataEmpty() {
+    TestValidationContext context = new TestValidationContext(
+        "20260309120000", Option.empty(), Option.empty(),
+        true, Option.empty());
+
+    assertTrue(context.getExtraMetadata().isEmpty());
+    assertFalse(context.getExtraMetadata("any").isPresent());
+  }
+
+  @Test
+  public void testValidationContextRecordCounts() {
+    HoodieWriteStat stat1 = new HoodieWriteStat();
+    stat1.setNumInserts(100);
+    stat1.setNumUpdateWrites(20);
+    stat1.setNumWrites(200);
+
+    HoodieWriteStat stat2 = new HoodieWriteStat();
+    stat2.setNumInserts(50);
+    stat2.setNumUpdateWrites(30);
+    stat2.setNumWrites(150);
+
+    TestValidationContext context = new TestValidationContext(
+        "20260309120000", Option.empty(),
+        Option.of(Arrays.asList(stat1, stat2)),
+        true, Option.empty());
+
+    // getTotalRecordsWritten uses numInserts + numUpdateWrites (not numWrites)
+    // stat1: 100 + 20 = 120, stat2: 50 + 30 = 80, total = 200
+    assertEquals(200, context.getTotalRecordsWritten());
+    assertEquals(150, context.getTotalInsertRecordsWritten());
+    assertEquals(50, context.getTotalUpdateRecordsWritten());
+  }
+
+  @Test
+  public void testValidationContextRecordCountsEmpty() {
+    TestValidationContext context = new TestValidationContext(
+        "20260309120000", Option.empty(), Option.empty(),
+        true, Option.empty());
+
+    assertEquals(0, context.getTotalRecordsWritten());
+    assertEquals(0, context.getTotalInsertRecordsWritten());
+    assertEquals(0, context.getTotalUpdateRecordsWritten());
+  }
+
+  @Test
+  public void testValidationContextIsFirstCommit() {
+    TestValidationContext firstCommit = new TestValidationContext(
+        "20260309120000", Option.empty(), Option.empty(),
+        true, Option.empty());
+    assertTrue(firstCommit.isFirstCommit());
+
+    TestValidationContext nonFirstCommit = new TestValidationContext(
+        "20260309120000", Option.empty(), Option.empty(),
+        false, Option.empty());
+    assertFalse(nonFirstCommit.isFirstCommit());
+  }
+
+  // ========== BasePreCommitValidator tests ==========
+
+  @Test
+  public void testBasePreCommitValidatorConfig() {
+    TypedProperties config = new TypedProperties();
+    config.setProperty("test.key", "test.value");
+
+    MockOffsetValidator validator = new MockOffsetValidator(config);
+    // config is a protected field accessible from subclasses
+    assertNotNull(validator.config);
+    assertEquals("test.value", validator.config.getString("test.key"));
+  }
+
+  @Test
+  public void testBasePreCommitValidatorDefaultNoOp() {
+    TypedProperties config = new TypedProperties();
+    BasePreCommitValidator validator = new BasePreCommitValidator(config) {};
+
+    TestValidationContext context = new TestValidationContext(
+        "20260309120000", Option.empty(), Option.empty(),
+        true, Option.empty());
+
+    // Default implementation is no-op and should not throw
+    assertDoesNotThrow(() -> validator.validateWithMetadata(context));
+  }
+
+  @Test
+  public void testValidateWithMetadataOffsetDiffZeroRecordsNonZero() {
+    // Covers the branch where offsetDiff == 0 but recordsWritten > 0
+    MockOffsetValidator validator = createValidator(0.0, 
ValidationFailurePolicy.FAIL);
+
+    HoodieCommitMetadata currentMetadata = new HoodieCommitMetadata();
+    currentMetadata.addMetadata(CHECKPOINT_KEY, "topic,0:100,1:200");
+
+    HoodieCommitMetadata prevMetadata = new HoodieCommitMetadata();
+    prevMetadata.addMetadata(CHECKPOINT_KEY, "topic,0:100,1:200");
+
+    // Same offsets (diff=0) but records were written — should fail
+    HoodieWriteStat stat = createWriteStat(100, 0);
+
+    TestValidationContext context = new TestValidationContext(
+        "20260309120000",
+        Option.of(currentMetadata),
+        Option.of(Collections.singletonList(stat)),
+        false,
+        Option.of(prevMetadata));
+
+    assertThrows(HoodieValidationException.class,
+        () -> validator.testValidateWithMetadata(context));
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/client/validator/BasePreCommitValidator.java
 
b/hudi-common/src/main/java/org/apache/hudi/client/validator/BasePreCommitValidator.java
new file mode 100644
index 000000000000..b1c718246608
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/client/validator/BasePreCommitValidator.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.validator;
+
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.exception.HoodieValidationException;
+
+/**
+ * Base class for all pre-commit validators across all engines (Spark, Flink, 
Java).
+ * Engine-specific implementations extend this class and implement 
ValidationContext.
+ *
+ * <p>This is the foundation for engine-agnostic validation logic that can 
access
+ * commit metadata, timeline, and write statistics.</p>
+ *
+ * <p>Integration with existing framework:
+ * In Phase 3, the existing {@code SparkPreCommitValidator} will be refactored 
to extend
+ * this class, and {@code SparkValidatorUtils.runValidators()} will be updated 
to invoke
+ * {@link #validateWithMetadata(ValidationContext)} for validators that extend 
this class.
+ * The existing validator class names configured via
+ * {@code HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES} will continue 
to work.</p>
+ *
+ * <p>Phase 1: Core framework in hudi-common</p>
+ * <p>Phase 2: Flink-specific implementations in hudi-flink-datasource</p>
+ * <p>Phase 3: Spark-specific implementations in 
hudi-client/hudi-spark-client</p>
+ */
+@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
+public abstract class BasePreCommitValidator {
+
+  protected final TypedProperties config;
+
+  /**
+   * Create a pre-commit validator with configuration.
+   *
+   * @param config Typed properties containing validator configuration
+   */
+  protected BasePreCommitValidator(TypedProperties config) {
+    this.config = config;
+  }
+
+  /**
+   * Perform validation using commit metadata, timeline, and write statistics.
+   * This method is called by the engine-specific orchestration layer.
+   *
+   * <p>Subclasses should override this method to implement validation logic 
that:</p>
+   * <ul>
+   *   <li>Accesses commit metadata (checkpoints, custom metadata)</li>
+   *   <li>Navigates timeline (previous commits)</li>
+   *   <li>Analyzes write statistics (record counts, partition info)</li>
+   * </ul>
+   *
+   * @param context Validation context providing access to metadata 
(engine-specific implementation)
+   * @throws HoodieValidationException if validation fails
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  protected void validateWithMetadata(ValidationContext context) throws 
HoodieValidationException {
+    // Default no-op implementation
+    // Concrete validators override this to implement validation logic
+  }
+
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/client/validator/ValidationContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/client/validator/ValidationContext.java
new file mode 100644
index 000000000000..30fdbb3ba3b4
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/client/validator/ValidationContext.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.validator;
+
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Provides validators with access to commit information.
+ * Engine-specific implementations (Spark, Flink, Java) provide concrete 
implementations
+ * for the core methods; computed convenience methods are provided as defaults.
+ *
+ * <p>This interface abstracts away engine-specific details while providing 
consistent
+ * access to validation data across all write engines.</p>
+ *
+ * <p>Example implementations:</p>
+ * <ul>
+ *   <li>SparkValidationContext (Phase 3): Accesses Spark RDD write 
metadata</li>
+ *   <li>FlinkValidationContext (Phase 2): Accesses Flink checkpoint state</li>
+ *   <li>JavaValidationContext (Future): Accesses Java client write 
metadata</li>
+ * </ul>
+ */
+@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
+public interface ValidationContext {
+
+  /**
+   * Get the current commit instant time being validated.
+   *
+   * @return Instant time string (format: yyyyMMddHHmmss)
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  String getInstantTime();
+
+  /**
+   * Get commit metadata for the current commit being validated.
+   * Contains extraMetadata (checkpoints, custom metadata), operation type, 
write stats, etc.
+   *
+   * @return Optional commit metadata
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  Option<HoodieCommitMetadata> getCommitMetadata();
+
+  /**
+   * Get write statistics for the current commit.
+   * Contains record counts, partition info, file info, bytes written, etc.
+   *
+   * @return Optional list of write statistics per partition
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  Option<List<HoodieWriteStat>> getWriteStats();
+
+  /**
+   * Get the active timeline for accessing previous commits.
+   * Used to navigate commit history and extract previous checkpoints.
+   *
+   * @return Active timeline
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  HoodieActiveTimeline getActiveTimeline();
+
+  /**
+   * Get the previous completed commit instant.
+   * Used to access previous checkpoint for delta validation.
+   *
+   * @return Optional previous instant
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  Option<HoodieInstant> getPreviousCommitInstant();
+
+  /**
+   * Get commit metadata for the previous commit.
+   * Used to extract previous checkpoint for comparison.
+   *
+   * @return Optional previous commit metadata
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  Option<HoodieCommitMetadata> getPreviousCommitMetadata();
+
+  // ========== Default convenience methods derived from core methods 
==========
+
+  /**
+   * Get all extra metadata from the current commit.
+   * Derived from {@link #getCommitMetadata()}.
+   *
+   * @return Map of metadata key to value, or empty map if no metadata
+   */
+  default Map<String, String> getExtraMetadata() {
+    return getCommitMetadata()
+        .map(HoodieCommitMetadata::getExtraMetadata)
+        .orElse(Collections.emptyMap());
+  }
+
+  /**
+   * Get a specific extra metadata value by key.
+   * Derived from {@link #getCommitMetadata()}.
+   *
+   * @param key Metadata key
+   * @return Optional metadata value
+   */
+  default Option<String> getExtraMetadata(String key) {
+    return getCommitMetadata()
+        .flatMap(metadata -> Option.ofNullable(metadata.getMetadata(key)));
+  }
+
+  /**
+   * Calculate total records written in the current commit.
+   * Uses {@code numInserts + numUpdateWrites} instead of {@code numWrites} 
because
+   * for COW upserts, {@code numWrites} is the total number of records in the 
file
+   * (not just newly written ones), which would make deviation calculations 
meaningless.
+   *
+   * @return Total newly written record count (inserts + updates)
+   */
+  default long getTotalRecordsWritten() {
+    return getWriteStats()
+        .map(stats -> stats.stream()
+            .mapToLong(s -> s.getNumInserts() + s.getNumUpdateWrites())
+            .sum())
+        .orElse(0L);
+  }
+
+  /**
+   * Calculate total insert records written.
+   * Derived from {@link #getWriteStats()} by summing {@code numInserts} 
across all partitions.
+   *
+   * @return Total insert count
+   */
+  default long getTotalInsertRecordsWritten() {
+    return getWriteStats()
+        .map(stats -> 
stats.stream().mapToLong(HoodieWriteStat::getNumInserts).sum())
+        .orElse(0L);
+  }
+
+  /**
+   * Calculate total update records written.
+   * Derived from {@link #getWriteStats()} by summing {@code numUpdateWrites} 
across all partitions.
+   *
+   * @return Total update count
+   */
+  default long getTotalUpdateRecordsWritten() {
+    return getWriteStats()
+        .map(stats -> 
stats.stream().mapToLong(HoodieWriteStat::getNumUpdateWrites).sum())
+        .orElse(0L);
+  }
+
+  /**
+   * Check if this is the first commit (no previous commits exist).
+   * Derived from {@link #getPreviousCommitInstant()}.
+   * Validators should skip validation for first commit since there's
+   * no previous checkpoint to compare against.
+   *
+   * @return true if first commit
+   */
+  default boolean isFirstCommit() {
+    return !getPreviousCommitInstant().isPresent();
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/CheckpointUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/CheckpointUtils.java
new file mode 100644
index 000000000000..5e0fc862b605
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CheckpointUtils.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Utility methods for parsing and working with streaming checkpoints.
+ * Supports multiple checkpoint formats used by different engines and sources.
+ *
+ * Checkpoint formats:
+ * - SPARK_KAFKA: "topic,partition:offset,partition:offset,..."
+ *   Example: "events,0:1000,1:2000,2:1500"
+ *   Used by: HoodieStreamer (Spark)
+ *   Note: Both offset-based and timestamp-based Kafka sources produce this 
same string
+ *   format (see {@code KafkaOffsetGen}). The offsets may originate from either
+ *   {@code consumer.endOffsets()} or {@code consumer.offsetsForTimes()}, but 
the
+ *   checkpoint string format is identical.
+ *
+ * - FLINK_KAFKA: Base64-encoded serialized Map (TopicPartition → Long)
+ *   Example: "eyJ0b3BpY..." (base64)
+ *   Used by: Flink streaming connector
+ *   Note: Actual implementation requires Flink checkpoint deserialization 
(Phase 2)
+ *
+ * - PULSAR: "partition:ledgerId:entryId,partition:ledgerId:entryId,..."
+ *   Example: "0:123:45,1:234:56"
+ *   Used by: Pulsar sources (engine-agnostic)
+ *   Note: To be implemented in Phase 4
+ *
+ * - KINESIS: "shardId:sequenceNumber,shardId:sequenceNumber,..."
+ *   Example: 
"shardId-000000000000:49590338271490256608559692538361571095921575989136588898"
+ *   Used by: Kinesis sources (engine-agnostic)
+ *   Note: To be implemented in Phase 4
+ */
+@Slf4j
+public class CheckpointUtils {
+
+  /**
+   * Supported checkpoint formats across engines and sources.
+   *
+   * <p>Engine-specific formats (Kafka) are prefixed with the engine name 
because different
+   * engines use different checkpoint serialization for the same source. 
Source-specific formats
+   * (Pulsar, Kinesis) are not engine-prefixed because they use the same 
format across engines.</p>
+   */
+  public enum CheckpointFormat {
+    /** HoodieStreamer (Spark) Kafka format: "topic,0:1000,1:2000" */
+    SPARK_KAFKA,
+
+    /** Flink Kafka format: base64-encoded Map&lt;TopicPartition, Long&gt; */
+    FLINK_KAFKA,
+
+    /** Pulsar format: "0:123:45,1:234:56" (ledgerId:entryId). 
Engine-agnostic. */
+    PULSAR,
+
+    /** Kinesis format: "shard-0:12345,shard-1:67890". Engine-agnostic. */
+    KINESIS,
+
+    /** Custom user-defined format */
+    CUSTOM
+  }
+
+  /**
+   * Parse checkpoint string into partition → offset mapping.
+   *
+   * @param format Checkpoint format
+   * @param checkpointStr Checkpoint string
+   * @return Map from partition number to offset/sequence number
+   * @throws IllegalArgumentException if format is invalid
+   */
+  public static Map<Integer, Long> parseCheckpoint(CheckpointFormat format, 
String checkpointStr) {
+    switch (format) {
+      case SPARK_KAFKA:
+        return parseSparkKafkaCheckpoint(checkpointStr);
+      case FLINK_KAFKA:
+        throw new UnsupportedOperationException(
+            "Flink Kafka checkpoint parsing not yet implemented. "
+                + "This will be added in Phase 2 with Flink checkpoint 
deserialization support.");
+      case PULSAR:
+        throw new UnsupportedOperationException(
+            "Pulsar checkpoint parsing not yet implemented. Planned for Phase 
4.");
+      case KINESIS:
+        throw new UnsupportedOperationException(
+            "Kinesis checkpoint parsing not yet implemented. Planned for Phase 
4.");
+      default:
+        throw new IllegalArgumentException("Unsupported checkpoint format: " + 
format);
+    }
+  }
+
+  /**
+   * Calculate offset difference between two checkpoints.
+   * Handles partition additions, removals, and resets.
+   *
+   * Algorithm:
+   * 1. For each partition in current checkpoint:
+   *    - If partition exists in previous: diff = current - previous
+   *    - If partition is new: skip (start offset unknown, would overcount)
+   *    - If diff is negative (reset): skip (start offset unknown, would 
overcount)
+   * 2. Sum all partition diffs
+   *
+   * @param format Checkpoint format
+   * @param previousCheckpoint Previous checkpoint string
+   * @param currentCheckpoint Current checkpoint string
+   * @return Total offset difference across all partitions
+   */
+  public static long calculateOffsetDifference(CheckpointFormat format,
+                                                String previousCheckpoint,
+                                                String currentCheckpoint) {
+    Map<Integer, Long> previousOffsets = parseCheckpoint(format, 
previousCheckpoint);
+    Map<Integer, Long> currentOffsets = parseCheckpoint(format, 
currentCheckpoint);
+
+    long totalDiff = 0;
+
+    for (Map.Entry<Integer, Long> entry : currentOffsets.entrySet()) {
+      int partition = entry.getKey();
+      long currentOffset = entry.getValue();
+      Long previousOffset = previousOffsets.get(partition);
+
+      if (previousOffset != null) {
+        // Partition exists in both checkpoints
+        long diff = currentOffset - previousOffset;
+
+        if (diff < 0) {
+          // Offset reset detected (topic/partition recreated or compaction).
+          // We cannot reliably determine how many records were processed since
+          // the start offset of the new topic may not be 0. Log a warning and
+          // skip this partition to avoid overcounting.
+          log.warn("Detected offset reset for partition {}. Previous offset: 
{}, current offset: {}. "
+              + "Skipping partition from diff calculation to avoid 
overcounting.", partition, previousOffset, currentOffset);
+        } else {
+          totalDiff += diff;
+        }
+      } else {
+        // New partition appeared. The start offset may not be 0 (e.g., 
compacted
+        // topics), so counting from 0 would overcount. Log a warning and skip
+        // this partition to avoid inflating the diff.
+        log.warn("New partition {} detected (not in previous checkpoint). "
+            + "Skipping partition from diff calculation (start offset unknown, 
"
+            + "current offset: {}).", partition, currentOffset);
+      }
+    }
+
+    return totalDiff;
+  }
+
+  /**
+   * Validate checkpoint format.
+   *
+   * @param format Expected checkpoint format
+   * @param checkpointStr Checkpoint string to validate
+   * @return true if valid format
+   */
+  public static boolean isValidCheckpointFormat(CheckpointFormat format, 
String checkpointStr) {
+    if (checkpointStr == null || checkpointStr.trim().isEmpty()) {
+      return false;
+    }
+
+    try {
+      parseCheckpoint(format, checkpointStr);
+      return true;
+    } catch (Exception e) {
+      return false;
+    }
+  }
+
+  /**
+   * Extract topic name from Spark Kafka checkpoint.
+   * Format: "topic,partition:offset,..."
+   *
+   * @param checkpointStr Spark Kafka checkpoint
+   * @return Topic name
+   * @throws IllegalArgumentException if invalid format
+   */
+  static String extractTopicName(String checkpointStr) {
+    if (checkpointStr == null || checkpointStr.trim().isEmpty()) {
+      throw new IllegalArgumentException("Checkpoint string cannot be null or 
empty");
+    }
+
+    String[] splits = checkpointStr.split(",");
+    if (splits.length < 2) {
+      throw new IllegalArgumentException(
+          "Invalid checkpoint format. Expected: topic,partition:offset,... 
Got: " + checkpointStr);
+    }
+
+    return splits[0];
+  }
+
+  // ========== Format-Specific Parsers ==========
+
+  /**
+   * Parse HoodieStreamer (Spark) Kafka checkpoint.
+   * Format: "topic,partition:offset,partition:offset,..."
+   * Example: "events,0:1000,1:2000,2:1500"
+   *
+   * <p>Note: {@code KafkaOffsetGen.CheckpointUtils#strToOffsets} in 
hudi-utilities
+   * parses the same format but returns {@code Map<TopicPartition, Long>} 
(Kafka-specific).
+   * This method returns {@code Map<Integer, Long>} to avoid Kafka client 
dependencies
+   * in hudi-common. TODO: consolidate with KafkaOffsetGen.CheckpointUtils once
+   * https://github.com/apache/hudi/pull/18125 lands.</p>
+   *
+   * @param checkpointStr Checkpoint string
+   * @return Map of partition → offset
+   * @throws IllegalArgumentException if format is invalid
+   */
+  private static Map<Integer, Long> parseSparkKafkaCheckpoint(String 
checkpointStr) {
+    if (checkpointStr == null || checkpointStr.trim().isEmpty()) {
+      throw new IllegalArgumentException("Checkpoint string cannot be null or 
empty");
+    }
+
+    Map<Integer, Long> offsetMap = new HashMap<>();
+    String[] splits = checkpointStr.split(",");
+
+    if (splits.length < 2) {
+      throw new IllegalArgumentException(
+          "Invalid Spark Kafka checkpoint. Expected: 
topic,partition:offset,... Got: " + checkpointStr);
+    }
+
+    // First element is topic name, skip it
+    for (int i = 1; i < splits.length; i++) {
+      String[] partitionOffset = splits[i].split(":");
+      if (partitionOffset.length != 2) {
+        throw new IllegalArgumentException(
+            "Invalid partition:offset format in checkpoint: " + splits[i]);
+      }
+
+      try {
+        int partition = Integer.parseInt(partitionOffset[0]);
+        long offset = Long.parseLong(partitionOffset[1]);
+        offsetMap.put(partition, offset);
+      } catch (NumberFormatException e) {
+        throw new IllegalArgumentException(
+            "Invalid number format in checkpoint: " + splits[i], e);
+      }
+    }
+
+    return offsetMap;
+  }
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCheckpointUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCheckpointUtils.java
new file mode 100644
index 000000000000..4f43926a7a48
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCheckpointUtils.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hudi.common.util.CheckpointUtils.CheckpointFormat;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for CheckpointUtils - Phase 1 core functionality.
+ */
+public class TestCheckpointUtils {
+
+  @Test
+  public void testParseSparkKafkaCheckpoint() {
+    String checkpoint = "test_topic,0:100,1:200,2:150";
+    Map<Integer, Long> offsets = CheckpointUtils.parseCheckpoint(
+        CheckpointFormat.SPARK_KAFKA, checkpoint);
+
+    assertEquals(3, offsets.size());
+    assertEquals(100L, offsets.get(0));
+    assertEquals(200L, offsets.get(1));
+    assertEquals(150L, offsets.get(2));
+  }
+
+  @Test
+  public void testParseSinglePartition() {
+    String checkpoint = "my_topic,0:1000";
+    Map<Integer, Long> offsets = CheckpointUtils.parseCheckpoint(
+        CheckpointFormat.SPARK_KAFKA, checkpoint);
+
+    assertEquals(1, offsets.size());
+    assertEquals(1000L, offsets.get(0));
+  }
+
+  @Test
+  public void testParseInvalidFormat() {
+    assertThrows(IllegalArgumentException.class, () ->
+        CheckpointUtils.parseCheckpoint(CheckpointFormat.SPARK_KAFKA, 
"invalid"));
+
+    assertThrows(IllegalArgumentException.class, () ->
+        CheckpointUtils.parseCheckpoint(CheckpointFormat.SPARK_KAFKA, 
"topic"));
+
+    assertThrows(IllegalArgumentException.class, () ->
+        CheckpointUtils.parseCheckpoint(CheckpointFormat.SPARK_KAFKA, ""));
+
+    assertThrows(IllegalArgumentException.class, () ->
+        CheckpointUtils.parseCheckpoint(CheckpointFormat.SPARK_KAFKA, null));
+  }
+
+  @Test
+  public void testParseInvalidPartitionOffset() {
+    assertThrows(IllegalArgumentException.class, () ->
+        CheckpointUtils.parseCheckpoint(CheckpointFormat.SPARK_KAFKA, 
"topic,0:abc"));
+
+    assertThrows(IllegalArgumentException.class, () ->
+        CheckpointUtils.parseCheckpoint(CheckpointFormat.SPARK_KAFKA, 
"topic,abc:100"));
+
+    assertThrows(IllegalArgumentException.class, () ->
+        CheckpointUtils.parseCheckpoint(CheckpointFormat.SPARK_KAFKA, 
"topic,0-100"));
+  }
+
+  @Test
+  public void testExtractTopicName() {
+    assertEquals("test_topic",
+        CheckpointUtils.extractTopicName("test_topic,0:100,1:200"));
+    assertEquals("my.topic.name",
+        CheckpointUtils.extractTopicName("my.topic.name,0:1000"));
+  }
+
+  @Test
+  public void testExtractTopicNameInvalid() {
+    assertThrows(IllegalArgumentException.class, () ->
+        CheckpointUtils.extractTopicName(""));
+
+    assertThrows(IllegalArgumentException.class, () ->
+        CheckpointUtils.extractTopicName(null));
+
+    // Topic-only without partition data should be invalid
+    assertThrows(IllegalArgumentException.class, () ->
+        CheckpointUtils.extractTopicName("just_topic"));
+  }
+
+  @Test
+  public void testCalculateOffsetDifferenceBasic() {
+    String previous = "topic,0:100,1:200";
+    String current = "topic,0:150,1:300";
+
+    // (150-100) + (300-200) = 50 + 100 = 150
+    long diff = CheckpointUtils.calculateOffsetDifference(
+        CheckpointFormat.SPARK_KAFKA, previous, current);
+    assertEquals(150L, diff);
+  }
+
+  @Test
+  public void testCalculateOffsetDifferenceNoChange() {
+    String previous = "topic,0:100,1:200";
+    String current = "topic,0:100,1:200";
+
+    long diff = CheckpointUtils.calculateOffsetDifference(
+        CheckpointFormat.SPARK_KAFKA, previous, current);
+    assertEquals(0L, diff);
+  }
+
+  @Test
+  public void testCalculateOffsetDifferenceNewPartition() {
+    String previous = "topic,0:100";
+    String current = "topic,0:150,1:200";
+
+    // Partition 0: 150-100 = 50
+    // Partition 1: new partition, skipped (start offset unknown)
+    // Total: 50
+    long diff = CheckpointUtils.calculateOffsetDifference(
+        CheckpointFormat.SPARK_KAFKA, previous, current);
+    assertEquals(50L, diff);
+  }
+
+  @Test
+  public void testCalculateOffsetDifferenceRemovedPartition() {
+    String previous = "topic,0:100,1:200";
+    String current = "topic,0:150";
+
+    // Only partition 0 exists in both: 150-100 = 50
+    // Partition 1 ignored (not in current)
+    long diff = CheckpointUtils.calculateOffsetDifference(
+        CheckpointFormat.SPARK_KAFKA, previous, current);
+    assertEquals(50L, diff);
+  }
+
+  @Test
+  public void testCalculateOffsetDifferenceNegativeOffset() {
+    // Simulate topic reset where offset goes back to 0
+    String previous = "topic,0:1000";
+    String current = "topic,0:100";
+
+    // When current < previous (offset reset), partition is skipped
+    // to avoid overcounting since start offset is unknown
+    long diff = CheckpointUtils.calculateOffsetDifference(
+        CheckpointFormat.SPARK_KAFKA, previous, current);
+    assertEquals(0L, diff);
+  }
+
+  @Test
+  public void testCalculateOffsetDifferenceMultiplePartitionsWithReset() {
+    String previous = "topic,0:1000,1:2000";
+    String current = "topic,0:100,1:2500";
+
+    // Partition 0: reset, skipped to avoid overcounting
+    // Partition 1: normal increment = 2500-2000 = 500
+    // Total: 500
+    long diff = CheckpointUtils.calculateOffsetDifference(
+        CheckpointFormat.SPARK_KAFKA, previous, current);
+    assertEquals(500L, diff);
+  }
+
+  @Test
+  public void testIsValidCheckpointFormat() {
+    assertTrue(CheckpointUtils.isValidCheckpointFormat(
+        CheckpointFormat.SPARK_KAFKA, "topic,0:100"));
+    assertTrue(CheckpointUtils.isValidCheckpointFormat(
+        CheckpointFormat.SPARK_KAFKA, "topic,0:100,1:200,2:300"));
+    assertTrue(CheckpointUtils.isValidCheckpointFormat(
+        CheckpointFormat.SPARK_KAFKA, "my.topic.name,0:1000"));
+
+    assertFalse(CheckpointUtils.isValidCheckpointFormat(
+        CheckpointFormat.SPARK_KAFKA, ""));
+    assertFalse(CheckpointUtils.isValidCheckpointFormat(
+        CheckpointFormat.SPARK_KAFKA, null));
+    assertFalse(CheckpointUtils.isValidCheckpointFormat(
+        CheckpointFormat.SPARK_KAFKA, "just_topic"));
+    assertFalse(CheckpointUtils.isValidCheckpointFormat(
+        CheckpointFormat.SPARK_KAFKA, "topic,invalid"));
+    assertFalse(CheckpointUtils.isValidCheckpointFormat(
+        CheckpointFormat.SPARK_KAFKA, "topic,0:abc"));
+  }
+
+  @Test
+  public void testUnsupportedFormats() {
+    // Flink format not yet implemented (Phase 2)
+    assertThrows(UnsupportedOperationException.class, () ->
+        CheckpointUtils.parseCheckpoint(CheckpointFormat.FLINK_KAFKA, 
"anystring"));
+
+    // Pulsar format not yet implemented (Phase 4)
+    assertThrows(UnsupportedOperationException.class, () ->
+        CheckpointUtils.parseCheckpoint(CheckpointFormat.PULSAR, "anystring"));
+
+    // Kinesis format not yet implemented (Phase 4)
+    assertThrows(UnsupportedOperationException.class, () ->
+        CheckpointUtils.parseCheckpoint(CheckpointFormat.KINESIS, 
"anystring"));
+  }
+
+  @Test
+  public void testCustomFormatThrows() {
+    assertThrows(IllegalArgumentException.class, () ->
+        CheckpointUtils.parseCheckpoint(CheckpointFormat.CUSTOM, "anystring"));
+  }
+
+  @Test
+  public void testIsValidCheckpointFormatUnsupported() {
+    // Unsupported formats should return false (caught internally)
+    assertFalse(CheckpointUtils.isValidCheckpointFormat(
+        CheckpointFormat.FLINK_KAFKA, "anystring"));
+    assertFalse(CheckpointUtils.isValidCheckpointFormat(
+        CheckpointFormat.CUSTOM, "anystring"));
+  }
+}

Reply via email to