This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 967e456cc456 feat(common): add core pre-commit validation framework -
Phase 1 (#18068)
967e456cc456 is described below
commit 967e456cc4569e85d20724f49631eac73561c422
Author: Xinli Shang <[email protected]>
AuthorDate: Thu Mar 19 20:06:54 2026 -0700
feat(common): add core pre-commit validation framework - Phase 1 (#18068)
Co-authored-by: Xinli Shang <[email protected]>
---
.../client/validator/StreamingOffsetValidator.java | 213 ++++++++
.../config/HoodiePreCommitValidatorConfig.java | 33 ++
.../validator/TestStreamingOffsetValidator.java | 552 +++++++++++++++++++++
.../client/validator/BasePreCommitValidator.java | 80 +++
.../hudi/client/validator/ValidationContext.java | 183 +++++++
.../apache/hudi/common/util/CheckpointUtils.java | 257 ++++++++++
.../hudi/common/util/TestCheckpointUtils.java | 229 +++++++++
7 files changed, 1547 insertions(+)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/validator/StreamingOffsetValidator.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/validator/StreamingOffsetValidator.java
new file mode 100644
index 000000000000..71844f3c1fad
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/validator/StreamingOffsetValidator.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.validator;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.CheckpointUtils;
+import org.apache.hudi.common.util.CheckpointUtils.CheckpointFormat;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import
org.apache.hudi.config.HoodiePreCommitValidatorConfig.ValidationFailurePolicy;
+import org.apache.hudi.exception.HoodieValidationException;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Abstract base class for streaming offset validators.
+ * Handles common offset validation logic across all streaming sources (Kafka,
Pulsar, Kinesis).
+ *
+ * This validator compares source offset differences with actual record counts
written to detect data loss.
+ *
+ * <p><b>Note:</b> This validator is primarily intended for append-only
ingestion scenarios.
+ * For upsert workloads with deduplication or event-time ordering, the
deviation between
+ * source offsets and records written can be legitimately large (e.g.,
duplicate keys are
+ * deduplicated, late-arriving events are dropped). In such cases, configure
an appropriate
+ * tolerance percentage or use WARN_LOG failure policy.</p>
+ *
+ * Algorithm:
+ * 1. Extract current and previous checkpoints from commit metadata
+ * 2. Calculate offset difference using source-specific format
+ * 3. Get actual record count from write statistics
+ * 4. Calculate deviation percentage: |offsetDiff - recordCount| / offsetDiff
* 100
+ * 5. If deviation > tolerance: fail or warn based on failure policy
+ *
+ * Subclasses specify:
+ * - Checkpoint format (SPARK_KAFKA, FLINK_KAFKA, etc.)
+ * - Checkpoint metadata key
+ * - Source-specific parsing logic (if needed)
+ *
+ * Configuration:
+ * - hoodie.precommit.validators.streaming.offset.tolerance.percentage
(default: 0.0)
+ * - hoodie.precommit.validators.failure.policy (default: FAIL)
+ */
+@Slf4j
+public abstract class StreamingOffsetValidator extends BasePreCommitValidator {
+
+ protected final String checkpointKey;
+ protected final double tolerancePercentage;
+ protected final ValidationFailurePolicy failurePolicy;
+ protected final CheckpointFormat checkpointFormat;
+
+ /**
+ * Create a streaming offset validator.
+ *
+ * @param config Validator configuration
+ * @param checkpointKey Key to extract checkpoint from extraMetadata
+ * @param checkpointFormat Format of the checkpoint string
+ */
+ protected StreamingOffsetValidator(TypedProperties config,
+ String checkpointKey,
+ CheckpointFormat checkpointFormat) {
+ super(config);
+ this.checkpointKey = checkpointKey;
+ this.checkpointFormat = checkpointFormat;
+ this.tolerancePercentage = Double.parseDouble(
+
config.getString(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(),
+
HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.defaultValue()));
+ String policyStr = config.getString(
+ HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(),
+
HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.defaultValue());
+ this.failurePolicy = ValidationFailurePolicy.valueOf(policyStr);
+ }
+
+ @Override
+ protected void validateWithMetadata(ValidationContext context) throws
HoodieValidationException {
+ // Skip validation for first commit (no previous checkpoint)
+ if (context.isFirstCommit()) {
+ log.info("Skipping offset validation for first commit");
+ return;
+ }
+
+ // Extract current checkpoint
+ Option<String> currentCheckpointOpt =
context.getExtraMetadata(checkpointKey);
+ if (!currentCheckpointOpt.isPresent()) {
+ log.warn("Current checkpoint not found with key: {}. Skipping
validation.", checkpointKey);
+ return;
+ }
+ String currentCheckpoint = currentCheckpointOpt.get();
+
+ // Validate current checkpoint format
+ if (!CheckpointUtils.isValidCheckpointFormat(checkpointFormat,
currentCheckpoint)) {
+ log.warn("Current checkpoint has invalid format: {}. Skipping
validation.", currentCheckpoint);
+ return;
+ }
+
+ // Extract previous checkpoint
+ Option<String> previousCheckpointOpt = context.getPreviousCommitMetadata()
+ .flatMap(metadata ->
Option.ofNullable(metadata.getMetadata(checkpointKey)));
+
+ if (!previousCheckpointOpt.isPresent()) {
+ log.info("Previous checkpoint not found. May be first streaming commit.
Skipping validation.");
+ return;
+ }
+ String previousCheckpoint = previousCheckpointOpt.get();
+
+ // Validate previous checkpoint format
+ if (!CheckpointUtils.isValidCheckpointFormat(checkpointFormat,
previousCheckpoint)) {
+ log.warn("Previous checkpoint has invalid format: {}. Skipping
validation.", previousCheckpoint);
+ return;
+ }
+
+ // Calculate offset difference using format-specific logic.
+ // Note: calculateOffsetDifference always returns >= 0 because negative
per-partition
+ // diffs (offset resets) are handled internally by using the current
offset.
+ long offsetDifference = CheckpointUtils.calculateOffsetDifference(
+ checkpointFormat, previousCheckpoint, currentCheckpoint);
+
+ // Get actual new record count from write stats.
+ // Use numInserts + numUpdateWrites instead of numWrites to avoid counting
records
+ // re-written due to small file handling. With INSERT operation and small
file handling,
+ // numWrites includes all records in the merged file (existing + new),
which would
+ // inflate the count and mask real data loss.
+ long recordsWritten = context.getTotalInsertRecordsWritten()
+ + context.getTotalUpdateRecordsWritten();
+
+ // For empty commits (e.g., no new data from source), both offsetDiff and
recordsWritten
+ // can be zero. This is a valid scenario — skip validation to avoid false
positives.
+ if (offsetDifference == 0 && recordsWritten == 0) {
+ log.info("Empty commit detected (no offset change, no records written).
Skipping offset validation.");
+ return;
+ }
+
+ // Validate offset vs record consistency
+ validateOffsetConsistency(offsetDifference, recordsWritten,
+ currentCheckpoint, previousCheckpoint);
+ }
+
+ /**
+ * Validate that offset difference matches record count within tolerance.
+ *
+ * @param offsetDiff Expected records based on offset difference
+ * @param recordsWritten Actual records written
+ * @param currentCheckpoint Current checkpoint string (for error messages)
+ * @param previousCheckpoint Previous checkpoint string (for error messages)
+ * @throws HoodieValidationException if validation fails and policy is FAIL
+ */
+ protected void validateOffsetConsistency(long offsetDiff, long
recordsWritten,
+ String currentCheckpoint, String
previousCheckpoint)
+ throws HoodieValidationException {
+
+ double deviation = calculateDeviation(offsetDiff, recordsWritten);
+
+ if (deviation > tolerancePercentage) {
+ String errorMsg = String.format(
+ "Streaming offset validation failed. "
+ + "Offset difference: %d, Records written: %d, Deviation:
%.2f%%, Tolerance: %.2f%%. "
+ + "This may indicate data loss or filtering. "
+ + "Previous checkpoint: %s, Current checkpoint: %s",
+ offsetDiff, recordsWritten, deviation, tolerancePercentage,
+ previousCheckpoint, currentCheckpoint);
+
+ if (failurePolicy == ValidationFailurePolicy.WARN_LOG) {
+ log.warn(errorMsg + " (failure policy is WARN_LOG, commit will
proceed)");
+ } else {
+ throw new HoodieValidationException(errorMsg);
+ }
+ } else {
+ log.info("Offset validation passed. Offset diff: {}, Records: {},
Deviation: {}% (within {}%)",
+ offsetDiff, recordsWritten, String.format("%.2f", deviation),
tolerancePercentage);
+ }
+ }
+
+ /**
+ * Calculate percentage deviation between expected (offset diff) and actual
(record count).
+ *
+ * Formula:
+ * - If both are zero: 0% (perfect match, no data)
+ * - If one is zero: 100% (complete mismatch)
+ * - Otherwise: |offsetDiff - recordsWritten| / offsetDiff * 100
+ *
+ * @param offsetDiff Expected records from offset difference
+ * @param recordsWritten Actual records written
+ * @return Deviation percentage (0.0 = exact match, 100.0 = complete
mismatch)
+ */
+ private double calculateDeviation(long offsetDiff, long recordsWritten) {
+ // Handle edge cases
+ if (offsetDiff == 0 && recordsWritten == 0) {
+ return 0.0; // Both zero - perfect match (no data processed)
+ }
+ if (offsetDiff == 0 || recordsWritten == 0) {
+ return 100.0; // One is zero - complete mismatch
+ }
+
+ long difference = Math.abs(offsetDiff - recordsWritten);
+ return (100.0 * difference) / offsetDiff;
+ }
+}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePreCommitValidatorConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePreCommitValidatorConfig.java
index 384fc555c090..b1faa5153d22 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePreCommitValidatorConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePreCommitValidatorConfig.java
@@ -64,6 +64,39 @@ public class HoodiePreCommitValidatorConfig extends
HoodieConfig {
+ "Expected result is included as part of query separated by '#'.
Example query: 'query1#result1:query2#result2'"
+ "Note \\<TABLE_NAME\\> variable is expected to be present in
query.");
+ public static final ConfigProperty<String>
STREAMING_OFFSET_TOLERANCE_PERCENTAGE = ConfigProperty
+ .key("hoodie.precommit.validators.streaming.offset.tolerance.percentage")
+ .defaultValue("0.0")
+ .sinceVersion("1.2.0")
+ .markAdvanced()
+ .withDocumentation("Tolerance percentage for streaming offset validation
"
+ + "(used by
org.apache.hudi.client.validator.StreamingOffsetValidator). "
+ + "The validator compares the offset difference (expected records
from source) "
+ + "with actual records written. If the deviation exceeds this
percentage, "
+ + "the commit is rejected or warned depending on the validation
failure policy. "
+ + "For upsert workloads with deduplication, set a higher tolerance. "
+ + "Default is 0.0 (strict mode, exact match required).");
+
+ /**
+ * Policy for handling pre-commit validation failures.
+ */
+ public enum ValidationFailurePolicy {
+ /** Validation failures block the commit with an exception. */
+ FAIL,
+ /** Validation failures emit a warning log but allow the commit to
proceed. */
+ WARN_LOG
+ }
+
+ public static final ConfigProperty<String> VALIDATION_FAILURE_POLICY =
ConfigProperty
+ .key("hoodie.precommit.validators.failure.policy")
+ .defaultValue(ValidationFailurePolicy.FAIL.name())
+ .sinceVersion("1.2.0")
+ .markAdvanced()
+ .withDocumentation("Policy for handling pre-commit validation failures. "
+ + "FAIL (default): validation failures block the commit with an
exception. "
+ + "WARN_LOG: validation failures emit a warning log but allow the
commit to proceed. "
+ + "Useful for monitoring data quality without impacting write
availability.");
+
/**
* Spark SQL queries to run on table before committing new data to validate
state before and after commit.
* Multiple queries separated by ';' delimiter are supported.
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/validator/TestStreamingOffsetValidator.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/validator/TestStreamingOffsetValidator.java
new file mode 100644
index 000000000000..818bc2087a3c
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/validator/TestStreamingOffsetValidator.java
@@ -0,0 +1,552 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.validator;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.versioning.v2.InstantComparatorV2;
+import org.apache.hudi.common.util.CheckpointUtils.CheckpointFormat;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import
org.apache.hudi.config.HoodiePreCommitValidatorConfig.ValidationFailurePolicy;
+import org.apache.hudi.exception.HoodieValidationException;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for StreamingOffsetValidator, ValidationContext, and
BasePreCommitValidator.
+ */
+public class TestStreamingOffsetValidator {
+
+ private static final String CHECKPOINT_KEY = "test.checkpoint.key";
+
+ /**
+ * Mock implementation of StreamingOffsetValidator for testing.
+ */
+ private static class MockOffsetValidator extends StreamingOffsetValidator {
+ public MockOffsetValidator(TypedProperties config) {
+ super(config, CHECKPOINT_KEY, CheckpointFormat.SPARK_KAFKA);
+ }
+
+ // Expose protected method for testing
+ public void testValidateOffsetConsistency(long offsetDiff, long
recordsWritten,
+ String current, String
previous) {
+ validateOffsetConsistency(offsetDiff, recordsWritten, current, previous);
+ }
+
+ // Expose validateWithMetadata for testing
+ public void testValidateWithMetadata(ValidationContext context) {
+ validateWithMetadata(context);
+ }
+ }
+
+ /**
+ * Simple test implementation of ValidationContext for testing.
+ */
+ private static class TestValidationContext implements ValidationContext {
+ private final String instantTime;
+ private final Option<HoodieCommitMetadata> commitMetadata;
+ private final Option<List<HoodieWriteStat>> writeStats;
+ private final boolean firstCommit;
+ private final Option<HoodieCommitMetadata> previousCommitMetadata;
+
+ TestValidationContext(String instantTime,
+ Option<HoodieCommitMetadata> commitMetadata,
+ Option<List<HoodieWriteStat>> writeStats,
+ boolean firstCommit,
+ Option<HoodieCommitMetadata> previousCommitMetadata)
{
+ this.instantTime = instantTime;
+ this.commitMetadata = commitMetadata;
+ this.writeStats = writeStats;
+ this.firstCommit = firstCommit;
+ this.previousCommitMetadata = previousCommitMetadata;
+ }
+
+ @Override
+ public String getInstantTime() {
+ return instantTime;
+ }
+
+ @Override
+ public Option<HoodieCommitMetadata> getCommitMetadata() {
+ return commitMetadata;
+ }
+
+ @Override
+ public Option<List<HoodieWriteStat>> getWriteStats() {
+ return writeStats;
+ }
+
+ @Override
+ public HoodieActiveTimeline getActiveTimeline() {
+ return null; // Not needed for these tests
+ }
+
+ @Override
+ public Option<HoodieInstant> getPreviousCommitInstant() {
+ if (firstCommit) {
+ return Option.empty();
+ }
+ return Option.of(new HoodieInstant(HoodieInstant.State.COMPLETED,
"commit",
+ "20260309110000",
InstantComparatorV2.COMPLETION_TIME_BASED_COMPARATOR));
+ }
+
+ @Override
+ public Option<HoodieCommitMetadata> getPreviousCommitMetadata() {
+ return previousCommitMetadata;
+ }
+ }
+
+ private MockOffsetValidator createValidator(double tolerance,
ValidationFailurePolicy policy) {
+ TypedProperties config = new TypedProperties();
+
config.setProperty(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(),
+ String.valueOf(tolerance));
+
config.setProperty(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(),
+ policy.name());
+ return new MockOffsetValidator(config);
+ }
+
+ private HoodieWriteStat createWriteStat(long numInserts, long
numUpdateWrites) {
+ HoodieWriteStat stat = new HoodieWriteStat();
+ stat.setNumInserts(numInserts);
+ stat.setNumUpdateWrites(numUpdateWrites);
+ stat.setNumWrites(numInserts + numUpdateWrites + 50); // numWrites
includes small file overhead
+ return stat;
+ }
+
+ // ========== validateOffsetConsistency tests ==========
+
+ @Test
+ public void testExactMatchValidation() {
+ MockOffsetValidator validator = createValidator(0.0,
ValidationFailurePolicy.FAIL);
+ assertDoesNotThrow(() ->
+ validator.testValidateOffsetConsistency(1000, 1000, "cur", "prev"));
+ }
+
+ @Test
+ public void testStrictModeFailure() {
+ MockOffsetValidator validator = createValidator(0.0,
ValidationFailurePolicy.FAIL);
+ assertThrows(HoodieValidationException.class, () ->
+ validator.testValidateOffsetConsistency(1000, 900, "cur", "prev"));
+ }
+
+ @Test
+ public void testValidationWithTolerance() {
+ MockOffsetValidator validator = createValidator(10.0,
ValidationFailurePolicy.FAIL);
+
+ // 5% deviation - within 10% tolerance
+ assertDoesNotThrow(() ->
+ validator.testValidateOffsetConsistency(1000, 950, "cur", "prev"));
+
+ // 10% deviation - exactly at tolerance
+ assertDoesNotThrow(() ->
+ validator.testValidateOffsetConsistency(1000, 900, "cur", "prev"));
+
+ // 15% deviation - exceeds tolerance
+ assertThrows(HoodieValidationException.class, () ->
+ validator.testValidateOffsetConsistency(1000, 850, "cur", "prev"));
+ }
+
+ @Test
+ public void testWarnLogPolicy() {
+ MockOffsetValidator validator = createValidator(0.0,
ValidationFailurePolicy.WARN_LOG);
+
+ assertDoesNotThrow(() ->
+ validator.testValidateOffsetConsistency(1000, 500, "cur", "prev"));
+ assertDoesNotThrow(() ->
+ validator.testValidateOffsetConsistency(1000, 0, "cur", "prev"));
+ }
+
+ @Test
+ public void testEdgeCaseZeroBoth() {
+ MockOffsetValidator validator = createValidator(0.0,
ValidationFailurePolicy.FAIL);
+ assertDoesNotThrow(() ->
+ validator.testValidateOffsetConsistency(0, 0, "cur", "prev"));
+ }
+
+ @Test
+ public void testEdgeCaseOneZero() {
+ MockOffsetValidator validator = createValidator(0.0,
ValidationFailurePolicy.FAIL);
+
+ assertThrows(HoodieValidationException.class, () ->
+ validator.testValidateOffsetConsistency(0, 1000, "cur", "prev"));
+ assertThrows(HoodieValidationException.class, () ->
+ validator.testValidateOffsetConsistency(1000, 0, "cur", "prev"));
+ }
+
+ @Test
+ public void testDefaultConfiguration() {
+ TypedProperties config = new TypedProperties();
+ MockOffsetValidator validator = new MockOffsetValidator(config);
+
+ assertDoesNotThrow(() ->
+ validator.testValidateOffsetConsistency(1000, 1000, "cur", "prev"));
+ assertThrows(HoodieValidationException.class, () ->
+ validator.testValidateOffsetConsistency(1000, 999, "cur", "prev"));
+ }
+
+ // ========== validateWithMetadata end-to-end tests ==========
+
+ @Test
+ public void testValidateWithMetadataSkipsFirstCommit() {
+ MockOffsetValidator validator = createValidator(0.0,
ValidationFailurePolicy.FAIL);
+
+ TestValidationContext context = new TestValidationContext(
+ "20260309120000",
+ Option.empty(),
+ Option.empty(),
+ true, // first commit
+ Option.empty());
+
+ assertTrue(context.isFirstCommit());
+ // Should not throw - skips validation for first commit
+ assertDoesNotThrow(() -> validator.testValidateWithMetadata(context));
+ }
+
+ @Test
+ public void testValidateWithMetadataSkipsMissingCurrentCheckpoint() {
+ MockOffsetValidator validator = createValidator(0.0,
ValidationFailurePolicy.FAIL);
+
+ // Current commit has no checkpoint metadata
+ HoodieCommitMetadata currentMetadata = new HoodieCommitMetadata();
+ // No checkpoint key added
+
+ TestValidationContext context = new TestValidationContext(
+ "20260309120000",
+ Option.of(currentMetadata),
+ Option.empty(),
+ false,
+ Option.empty());
+
+ assertFalse(context.isFirstCommit());
+ assertDoesNotThrow(() -> validator.testValidateWithMetadata(context));
+ }
+
+ @Test
+ public void testValidateWithMetadataSkipsInvalidCurrentCheckpoint() {
+ MockOffsetValidator validator = createValidator(0.0,
ValidationFailurePolicy.FAIL);
+
+ HoodieCommitMetadata currentMetadata = new HoodieCommitMetadata();
+ currentMetadata.addMetadata(CHECKPOINT_KEY, "invalid_format");
+
+ TestValidationContext context = new TestValidationContext(
+ "20260309120000",
+ Option.of(currentMetadata),
+ Option.empty(),
+ false,
+ Option.empty());
+
+ assertDoesNotThrow(() -> validator.testValidateWithMetadata(context));
+ }
+
+ @Test
+ public void testValidateWithMetadataSkipsMissingPreviousCheckpoint() {
+ MockOffsetValidator validator = createValidator(0.0,
ValidationFailurePolicy.FAIL);
+
+ HoodieCommitMetadata currentMetadata = new HoodieCommitMetadata();
+ currentMetadata.addMetadata(CHECKPOINT_KEY, "topic,0:200,1:300");
+
+ // Previous metadata exists but has no checkpoint key
+ HoodieCommitMetadata prevMetadata = new HoodieCommitMetadata();
+
+ TestValidationContext context = new TestValidationContext(
+ "20260309120000",
+ Option.of(currentMetadata),
+ Option.empty(),
+ false,
+ Option.of(prevMetadata));
+
+ assertDoesNotThrow(() -> validator.testValidateWithMetadata(context));
+ }
+
+ @Test
+ public void testValidateWithMetadataSkipsNoPreviousMetadata() {
+ MockOffsetValidator validator = createValidator(0.0,
ValidationFailurePolicy.FAIL);
+
+ HoodieCommitMetadata currentMetadata = new HoodieCommitMetadata();
+ currentMetadata.addMetadata(CHECKPOINT_KEY, "topic,0:200,1:300");
+
+ TestValidationContext context = new TestValidationContext(
+ "20260309120000",
+ Option.of(currentMetadata),
+ Option.empty(),
+ false,
+ Option.empty()); // no previous commit metadata at all
+
+ assertDoesNotThrow(() -> validator.testValidateWithMetadata(context));
+ }
+
+ @Test
+ public void testValidateWithMetadataSkipsInvalidPreviousCheckpoint() {
+ MockOffsetValidator validator = createValidator(0.0,
ValidationFailurePolicy.FAIL);
+
+ HoodieCommitMetadata currentMetadata = new HoodieCommitMetadata();
+ currentMetadata.addMetadata(CHECKPOINT_KEY, "topic,0:200,1:300");
+
+ HoodieCommitMetadata prevMetadata = new HoodieCommitMetadata();
+ prevMetadata.addMetadata(CHECKPOINT_KEY, "bad_format");
+
+ TestValidationContext context = new TestValidationContext(
+ "20260309120000",
+ Option.of(currentMetadata),
+ Option.empty(),
+ false,
+ Option.of(prevMetadata));
+
+ assertDoesNotThrow(() -> validator.testValidateWithMetadata(context));
+ }
+
+ @Test
+ public void testValidateWithMetadataSkipsEmptyCommit() {
+ MockOffsetValidator validator = createValidator(0.0,
ValidationFailurePolicy.FAIL);
+
+ // Same checkpoint = 0 offset diff, 0 records written
+ HoodieCommitMetadata currentMetadata = new HoodieCommitMetadata();
+ currentMetadata.addMetadata(CHECKPOINT_KEY, "topic,0:100,1:200");
+
+ HoodieCommitMetadata prevMetadata = new HoodieCommitMetadata();
+ prevMetadata.addMetadata(CHECKPOINT_KEY, "topic,0:100,1:200");
+
+ TestValidationContext context = new TestValidationContext(
+ "20260309120000",
+ Option.of(currentMetadata),
+ Option.of(Collections.emptyList()), // no write stats
+ false,
+ Option.of(prevMetadata));
+
+ assertDoesNotThrow(() -> validator.testValidateWithMetadata(context));
+ }
+
+ @Test
+ public void testValidateWithMetadataPassesValidCommit() {
+ MockOffsetValidator validator = createValidator(5.0,
ValidationFailurePolicy.FAIL);
+
+ HoodieCommitMetadata currentMetadata = new HoodieCommitMetadata();
+ currentMetadata.addMetadata(CHECKPOINT_KEY, "topic,0:200,1:400");
+
+ HoodieCommitMetadata prevMetadata = new HoodieCommitMetadata();
+ prevMetadata.addMetadata(CHECKPOINT_KEY, "topic,0:100,1:200");
+
+ // Offset diff = (200-100) + (400-200) = 300
+ // Records: 150 inserts + 150 updates = 300
+ HoodieWriteStat stat1 = createWriteStat(150, 0);
+ HoodieWriteStat stat2 = createWriteStat(0, 150);
+ List<HoodieWriteStat> stats = Arrays.asList(stat1, stat2);
+
+ TestValidationContext context = new TestValidationContext(
+ "20260309120000",
+ Option.of(currentMetadata),
+ Option.of(stats),
+ false,
+ Option.of(prevMetadata));
+
+ assertDoesNotThrow(() -> validator.testValidateWithMetadata(context));
+ }
+
+ @Test
+ public void testValidateWithMetadataFailsOnMismatch() {
+ MockOffsetValidator validator = createValidator(0.0,
ValidationFailurePolicy.FAIL);
+
+ HoodieCommitMetadata currentMetadata = new HoodieCommitMetadata();
+ currentMetadata.addMetadata(CHECKPOINT_KEY, "topic,0:200,1:400");
+
+ HoodieCommitMetadata prevMetadata = new HoodieCommitMetadata();
+ prevMetadata.addMetadata(CHECKPOINT_KEY, "topic,0:100,1:200");
+
+ // Offset diff = 300, but only 100 records written
+ HoodieWriteStat stat = createWriteStat(100, 0);
+ List<HoodieWriteStat> stats = Collections.singletonList(stat);
+
+ TestValidationContext context = new TestValidationContext(
+ "20260309120000",
+ Option.of(currentMetadata),
+ Option.of(stats),
+ false,
+ Option.of(prevMetadata));
+
+ assertThrows(HoodieValidationException.class,
+ () -> validator.testValidateWithMetadata(context));
+ }
+
+ @Test
+ public void testValidateWithMetadataWarnsOnMismatchWithWarnPolicy() {
+ MockOffsetValidator validator = createValidator(0.0,
ValidationFailurePolicy.WARN_LOG);
+
+ HoodieCommitMetadata currentMetadata = new HoodieCommitMetadata();
+ currentMetadata.addMetadata(CHECKPOINT_KEY, "topic,0:200,1:400");
+
+ HoodieCommitMetadata prevMetadata = new HoodieCommitMetadata();
+ prevMetadata.addMetadata(CHECKPOINT_KEY, "topic,0:100,1:200");
+
+ // Large mismatch but WARN_LOG policy
+ HoodieWriteStat stat = createWriteStat(50, 0);
+
+ TestValidationContext context = new TestValidationContext(
+ "20260309120000",
+ Option.of(currentMetadata),
+ Option.of(Collections.singletonList(stat)),
+ false,
+ Option.of(prevMetadata));
+
+ assertDoesNotThrow(() -> validator.testValidateWithMetadata(context));
+ }
+
+ // ========== ValidationContext default method tests ==========
+
+ @Test
+ public void testValidationContextGetExtraMetadata() {
+ HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+ metadata.addMetadata("key1", "value1");
+ metadata.addMetadata("key2", "value2");
+
+ TestValidationContext context = new TestValidationContext(
+ "20260309120000", Option.of(metadata), Option.empty(),
+ true, Option.empty());
+
+ assertEquals("value1", context.getExtraMetadata("key1").get());
+ assertEquals("value2", context.getExtraMetadata("key2").get());
+ assertFalse(context.getExtraMetadata("missing").isPresent());
+
+ assertEquals(2, context.getExtraMetadata().size());
+ }
+
+ @Test
+ public void testValidationContextGetExtraMetadataEmpty() {
+ TestValidationContext context = new TestValidationContext(
+ "20260309120000", Option.empty(), Option.empty(),
+ true, Option.empty());
+
+ assertTrue(context.getExtraMetadata().isEmpty());
+ assertFalse(context.getExtraMetadata("any").isPresent());
+ }
+
+ @Test
+ public void testValidationContextRecordCounts() {
+ HoodieWriteStat stat1 = new HoodieWriteStat();
+ stat1.setNumInserts(100);
+ stat1.setNumUpdateWrites(20);
+ stat1.setNumWrites(200);
+
+ HoodieWriteStat stat2 = new HoodieWriteStat();
+ stat2.setNumInserts(50);
+ stat2.setNumUpdateWrites(30);
+ stat2.setNumWrites(150);
+
+ TestValidationContext context = new TestValidationContext(
+ "20260309120000", Option.empty(),
+ Option.of(Arrays.asList(stat1, stat2)),
+ true, Option.empty());
+
+ // getTotalRecordsWritten uses numInserts + numUpdateWrites (not numWrites)
+ // stat1: 100 + 20 = 120, stat2: 50 + 30 = 80, total = 200
+ assertEquals(200, context.getTotalRecordsWritten());
+ assertEquals(150, context.getTotalInsertRecordsWritten());
+ assertEquals(50, context.getTotalUpdateRecordsWritten());
+ }
+
+ @Test
+ public void testValidationContextRecordCountsEmpty() {
+ TestValidationContext context = new TestValidationContext(
+ "20260309120000", Option.empty(), Option.empty(),
+ true, Option.empty());
+
+ assertEquals(0, context.getTotalRecordsWritten());
+ assertEquals(0, context.getTotalInsertRecordsWritten());
+ assertEquals(0, context.getTotalUpdateRecordsWritten());
+ }
+
+ @Test
+ public void testValidationContextIsFirstCommit() {
+ TestValidationContext firstCommit = new TestValidationContext(
+ "20260309120000", Option.empty(), Option.empty(),
+ true, Option.empty());
+ assertTrue(firstCommit.isFirstCommit());
+
+ TestValidationContext nonFirstCommit = new TestValidationContext(
+ "20260309120000", Option.empty(), Option.empty(),
+ false, Option.empty());
+ assertFalse(nonFirstCommit.isFirstCommit());
+ }
+
+ // ========== BasePreCommitValidator tests ==========
+
+ @Test
+ public void testBasePreCommitValidatorConfig() {
+ TypedProperties config = new TypedProperties();
+ config.setProperty("test.key", "test.value");
+
+ MockOffsetValidator validator = new MockOffsetValidator(config);
+ // config is a protected field accessible from subclasses
+ assertNotNull(validator.config);
+ assertEquals("test.value", validator.config.getString("test.key"));
+ }
+
+ @Test
+ public void testBasePreCommitValidatorDefaultNoOp() {
+ TypedProperties config = new TypedProperties();
+ BasePreCommitValidator validator = new BasePreCommitValidator(config) {};
+
+ TestValidationContext context = new TestValidationContext(
+ "20260309120000", Option.empty(), Option.empty(),
+ true, Option.empty());
+
+ // Default implementation is no-op and should not throw
+ assertDoesNotThrow(() -> validator.validateWithMetadata(context));
+ }
+
+ @Test
+ public void testValidateWithMetadataOffsetDiffZeroRecordsNonZero() {
+ // Covers the branch where offsetDiff == 0 but recordsWritten > 0
+ MockOffsetValidator validator = createValidator(0.0,
ValidationFailurePolicy.FAIL);
+
+ HoodieCommitMetadata currentMetadata = new HoodieCommitMetadata();
+ currentMetadata.addMetadata(CHECKPOINT_KEY, "topic,0:100,1:200");
+
+ HoodieCommitMetadata prevMetadata = new HoodieCommitMetadata();
+ prevMetadata.addMetadata(CHECKPOINT_KEY, "topic,0:100,1:200");
+
+ // Same offsets (diff=0) but records were written — should fail
+ HoodieWriteStat stat = createWriteStat(100, 0);
+
+ TestValidationContext context = new TestValidationContext(
+ "20260309120000",
+ Option.of(currentMetadata),
+ Option.of(Collections.singletonList(stat)),
+ false,
+ Option.of(prevMetadata));
+
+ assertThrows(HoodieValidationException.class,
+ () -> validator.testValidateWithMetadata(context));
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/client/validator/BasePreCommitValidator.java
b/hudi-common/src/main/java/org/apache/hudi/client/validator/BasePreCommitValidator.java
new file mode 100644
index 000000000000..b1c718246608
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/client/validator/BasePreCommitValidator.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.validator;
+
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.exception.HoodieValidationException;
+
+/**
+ * Base class for all pre-commit validators across all engines (Spark, Flink,
Java).
+ * Engine-specific implementations extend this class and implement
ValidationContext.
+ *
+ * <p>This is the foundation for engine-agnostic validation logic that can
access
+ * commit metadata, timeline, and write statistics.</p>
+ *
+ * <p>Integration with existing framework:
+ * In Phase 3, the existing {@code SparkPreCommitValidator} will be refactored
to extend
+ * this class, and {@code SparkValidatorUtils.runValidators()} will be updated
to invoke
+ * {@link #validateWithMetadata(ValidationContext)} for validators that extend
this class.
+ * The existing validator class names configured via
+ * {@code HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES} will continue
to work.</p>
+ *
+ * <p>Phase 1: Core framework in hudi-common</p>
+ * <p>Phase 2: Flink-specific implementations in hudi-flink-datasource</p>
+ * <p>Phase 3: Spark-specific implementations in
hudi-client/hudi-spark-client</p>
+ */
+@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
+public abstract class BasePreCommitValidator {
+
+ protected final TypedProperties config;
+
+ /**
+ * Create a pre-commit validator with configuration.
+ *
+ * @param config Typed properties containing validator configuration
+ */
+ protected BasePreCommitValidator(TypedProperties config) {
+ this.config = config;
+ }
+
+ /**
+ * Perform validation using commit metadata, timeline, and write statistics.
+ * This method is called by the engine-specific orchestration layer.
+ *
+ * <p>Subclasses should override this method to implement validation logic
that:</p>
+ * <ul>
+ * <li>Accesses commit metadata (checkpoints, custom metadata)</li>
+ * <li>Navigates timeline (previous commits)</li>
+ * <li>Analyzes write statistics (record counts, partition info)</li>
+ * </ul>
+ *
+ * @param context Validation context providing access to metadata
(engine-specific implementation)
+ * @throws HoodieValidationException if validation fails
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ protected void validateWithMetadata(ValidationContext context) throws
HoodieValidationException {
+ // Default no-op implementation
+ // Concrete validators override this to implement validation logic
+ }
+
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/client/validator/ValidationContext.java
b/hudi-common/src/main/java/org/apache/hudi/client/validator/ValidationContext.java
new file mode 100644
index 000000000000..30fdbb3ba3b4
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/client/validator/ValidationContext.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.validator;
+
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Provides validators with access to commit information.
+ * Engine-specific implementations (Spark, Flink, Java) provide concrete
implementations
+ * for the core methods; computed convenience methods are provided as defaults.
+ *
+ * <p>This interface abstracts away engine-specific details while providing
consistent
+ * access to validation data across all write engines.</p>
+ *
+ * <p>Example implementations:</p>
+ * <ul>
+ * <li>SparkValidationContext (Phase 3): Accesses Spark RDD write
metadata</li>
+ * <li>FlinkValidationContext (Phase 2): Accesses Flink checkpoint state</li>
+ * <li>JavaValidationContext (Future): Accesses Java client write
metadata</li>
+ * </ul>
+ */
+@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
+public interface ValidationContext {
+
+ /**
+ * Get the current commit instant time being validated.
+ *
+ * @return Instant time string (format: yyyyMMddHHmmss)
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ String getInstantTime();
+
+ /**
+ * Get commit metadata for the current commit being validated.
+ * Contains extraMetadata (checkpoints, custom metadata), operation type,
write stats, etc.
+ *
+ * @return Optional commit metadata
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ Option<HoodieCommitMetadata> getCommitMetadata();
+
+ /**
+ * Get write statistics for the current commit.
+ * Contains record counts, partition info, file info, bytes written, etc.
+ *
+ * @return Optional list of write statistics per partition
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ Option<List<HoodieWriteStat>> getWriteStats();
+
+ /**
+ * Get the active timeline for accessing previous commits.
+ * Used to navigate commit history and extract previous checkpoints.
+ *
+ * @return Active timeline
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ HoodieActiveTimeline getActiveTimeline();
+
+ /**
+ * Get the previous completed commit instant.
+ * Used to access previous checkpoint for delta validation.
+ *
+ * @return Optional previous instant
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ Option<HoodieInstant> getPreviousCommitInstant();
+
+ /**
+ * Get commit metadata for the previous commit.
+ * Used to extract previous checkpoint for comparison.
+ *
+ * @return Optional previous commit metadata
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ Option<HoodieCommitMetadata> getPreviousCommitMetadata();
+
+ // ========== Default convenience methods derived from core methods
==========
+
+ /**
+ * Get all extra metadata from the current commit.
+ * Derived from {@link #getCommitMetadata()}.
+ *
+ * @return Map of metadata key to value, or empty map if no metadata
+ */
+ default Map<String, String> getExtraMetadata() {
+ return getCommitMetadata()
+ .map(HoodieCommitMetadata::getExtraMetadata)
+ .orElse(Collections.emptyMap());
+ }
+
+ /**
+ * Get a specific extra metadata value by key.
+ * Derived from {@link #getCommitMetadata()}.
+ *
+ * @param key Metadata key
+ * @return Optional metadata value
+ */
+ default Option<String> getExtraMetadata(String key) {
+ return getCommitMetadata()
+ .flatMap(metadata -> Option.ofNullable(metadata.getMetadata(key)));
+ }
+
+ /**
+ * Calculate total records written in the current commit.
+ * Uses {@code numInserts + numUpdateWrites} instead of {@code numWrites}
because
+ * for COW upserts, {@code numWrites} is the total number of records in the
file
+ * (not just newly written ones), which would make deviation calculations
meaningless.
+ *
+ * @return Total newly written record count (inserts + updates)
+ */
+ default long getTotalRecordsWritten() {
+ return getWriteStats()
+ .map(stats -> stats.stream()
+ .mapToLong(s -> s.getNumInserts() + s.getNumUpdateWrites())
+ .sum())
+ .orElse(0L);
+ }
+
+ /**
+ * Calculate total insert records written.
+ * Derived from {@link #getWriteStats()} by summing {@code numInserts}
across all partitions.
+ *
+ * @return Total insert count
+ */
+ default long getTotalInsertRecordsWritten() {
+ return getWriteStats()
+ .map(stats ->
stats.stream().mapToLong(HoodieWriteStat::getNumInserts).sum())
+ .orElse(0L);
+ }
+
+ /**
+ * Calculate total update records written.
+ * Derived from {@link #getWriteStats()} by summing {@code numUpdateWrites}
across all partitions.
+ *
+ * @return Total update count
+ */
+ default long getTotalUpdateRecordsWritten() {
+ return getWriteStats()
+ .map(stats ->
stats.stream().mapToLong(HoodieWriteStat::getNumUpdateWrites).sum())
+ .orElse(0L);
+ }
+
+ /**
+ * Check if this is the first commit (no previous commits exist).
+ * Derived from {@link #getPreviousCommitInstant()}.
+ * Validators should skip validation for first commit since there's
+ * no previous checkpoint to compare against.
+ *
+ * @return true if first commit
+ */
+ default boolean isFirstCommit() {
+ return !getPreviousCommitInstant().isPresent();
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/CheckpointUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/CheckpointUtils.java
new file mode 100644
index 000000000000..5e0fc862b605
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CheckpointUtils.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Utility methods for parsing and working with streaming checkpoints.
+ * Supports multiple checkpoint formats used by different engines and sources.
+ *
+ * Checkpoint formats:
+ * - SPARK_KAFKA: "topic,partition:offset,partition:offset,..."
+ * Example: "events,0:1000,1:2000,2:1500"
+ * Used by: HoodieStreamer (Spark)
+ * Note: Both offset-based and timestamp-based Kafka sources produce this
same string
+ * format (see {@code KafkaOffsetGen}). The offsets may originate from either
+ * {@code consumer.endOffsets()} or {@code consumer.offsetsForTimes()}, but
the
+ * checkpoint string format is identical.
+ *
+ * - FLINK_KAFKA: Base64-encoded serialized Map (TopicPartition → Long)
+ * Example: "eyJ0b3BpY..." (base64)
+ * Used by: Flink streaming connector
+ * Note: Actual implementation requires Flink checkpoint deserialization
(Phase 2)
+ *
+ * - PULSAR: "partition:ledgerId:entryId,partition:ledgerId:entryId,..."
+ * Example: "0:123:45,1:234:56"
+ * Used by: Pulsar sources (engine-agnostic)
+ * Note: To be implemented in Phase 4
+ *
+ * - KINESIS: "shardId:sequenceNumber,shardId:sequenceNumber,..."
+ * Example:
"shardId-000000000000:49590338271490256608559692538361571095921575989136588898"
+ * Used by: Kinesis sources (engine-agnostic)
+ * Note: To be implemented in Phase 4
+ */
+@Slf4j
+public class CheckpointUtils {
+
+ /**
+ * Supported checkpoint formats across engines and sources.
+ *
+ * <p>Engine-specific formats (Kafka) are prefixed with the engine name
because different
+ * engines use different checkpoint serialization for the same source.
Source-specific formats
+ * (Pulsar, Kinesis) are not engine-prefixed because they use the same
format across engines.</p>
+ */
+ public enum CheckpointFormat {
+ /** HoodieStreamer (Spark) Kafka format: "topic,0:1000,1:2000" */
+ SPARK_KAFKA,
+
+ /** Flink Kafka format: base64-encoded Map<TopicPartition, Long> */
+ FLINK_KAFKA,
+
+ /** Pulsar format: "0:123:45,1:234:56" (ledgerId:entryId).
Engine-agnostic. */
+ PULSAR,
+
+ /** Kinesis format: "shard-0:12345,shard-1:67890". Engine-agnostic. */
+ KINESIS,
+
+ /** Custom user-defined format */
+ CUSTOM
+ }
+
+ /**
+ * Parse checkpoint string into partition → offset mapping.
+ *
+ * @param format Checkpoint format
+ * @param checkpointStr Checkpoint string
+ * @return Map from partition number to offset/sequence number
+ * @throws IllegalArgumentException if format is invalid
+ */
+ public static Map<Integer, Long> parseCheckpoint(CheckpointFormat format,
String checkpointStr) {
+ switch (format) {
+ case SPARK_KAFKA:
+ return parseSparkKafkaCheckpoint(checkpointStr);
+ case FLINK_KAFKA:
+ throw new UnsupportedOperationException(
+ "Flink Kafka checkpoint parsing not yet implemented. "
+ + "This will be added in Phase 2 with Flink checkpoint
deserialization support.");
+ case PULSAR:
+ throw new UnsupportedOperationException(
+ "Pulsar checkpoint parsing not yet implemented. Planned for Phase
4.");
+ case KINESIS:
+ throw new UnsupportedOperationException(
+ "Kinesis checkpoint parsing not yet implemented. Planned for Phase
4.");
+ default:
+ throw new IllegalArgumentException("Unsupported checkpoint format: " +
format);
+ }
+ }
+
+ /**
+ * Calculate offset difference between two checkpoints.
+ * Handles partition additions, removals, and resets.
+ *
+ * Algorithm:
+ * 1. For each partition in current checkpoint:
+ * - If partition exists in previous: diff = current - previous
+ * - If partition is new: skip (start offset unknown, would overcount)
+ * - If diff is negative (reset): skip (start offset unknown, would
overcount)
+ * 2. Sum all partition diffs
+ *
+ * @param format Checkpoint format
+ * @param previousCheckpoint Previous checkpoint string
+ * @param currentCheckpoint Current checkpoint string
+ * @return Total offset difference across all partitions
+ */
+ public static long calculateOffsetDifference(CheckpointFormat format,
+ String previousCheckpoint,
+ String currentCheckpoint) {
+ Map<Integer, Long> previousOffsets = parseCheckpoint(format,
previousCheckpoint);
+ Map<Integer, Long> currentOffsets = parseCheckpoint(format,
currentCheckpoint);
+
+ long totalDiff = 0;
+
+ for (Map.Entry<Integer, Long> entry : currentOffsets.entrySet()) {
+ int partition = entry.getKey();
+ long currentOffset = entry.getValue();
+ Long previousOffset = previousOffsets.get(partition);
+
+ if (previousOffset != null) {
+ // Partition exists in both checkpoints
+ long diff = currentOffset - previousOffset;
+
+ if (diff < 0) {
+ // Offset reset detected (topic/partition recreated or compaction).
+ // We cannot reliably determine how many records were processed since
+ // the start offset of the new topic may not be 0. Log a warning and
+ // skip this partition to avoid overcounting.
+ log.warn("Detected offset reset for partition {}. Previous offset:
{}, current offset: {}. "
+ + "Skipping partition from diff calculation to avoid
overcounting.", partition, previousOffset, currentOffset);
+ } else {
+ totalDiff += diff;
+ }
+ } else {
+ // New partition appeared. The start offset may not be 0 (e.g.,
compacted
+ // topics), so counting from 0 would overcount. Log a warning and skip
+ // this partition to avoid inflating the diff.
+ log.warn("New partition {} detected (not in previous checkpoint). "
+ + "Skipping partition from diff calculation (start offset unknown,
"
+ + "current offset: {}).", partition, currentOffset);
+ }
+ }
+
+ return totalDiff;
+ }
+
+ /**
+ * Validate checkpoint format.
+ *
+ * @param format Expected checkpoint format
+ * @param checkpointStr Checkpoint string to validate
+ * @return true if valid format
+ */
+ public static boolean isValidCheckpointFormat(CheckpointFormat format,
String checkpointStr) {
+ if (checkpointStr == null || checkpointStr.trim().isEmpty()) {
+ return false;
+ }
+
+ try {
+ parseCheckpoint(format, checkpointStr);
+ return true;
+ } catch (Exception e) {
+ return false;
+ }
+ }
+
+ /**
+ * Extract topic name from Spark Kafka checkpoint.
+ * Format: "topic,partition:offset,..."
+ *
+ * @param checkpointStr Spark Kafka checkpoint
+ * @return Topic name
+ * @throws IllegalArgumentException if invalid format
+ */
+ static String extractTopicName(String checkpointStr) {
+ if (checkpointStr == null || checkpointStr.trim().isEmpty()) {
+ throw new IllegalArgumentException("Checkpoint string cannot be null or
empty");
+ }
+
+ String[] splits = checkpointStr.split(",");
+ if (splits.length < 2) {
+ throw new IllegalArgumentException(
+ "Invalid checkpoint format. Expected: topic,partition:offset,...
Got: " + checkpointStr);
+ }
+
+ return splits[0];
+ }
+
+ // ========== Format-Specific Parsers ==========
+
+ /**
+ * Parse HoodieStreamer (Spark) Kafka checkpoint.
+ * Format: "topic,partition:offset,partition:offset,..."
+ * Example: "events,0:1000,1:2000,2:1500"
+ *
+ * <p>Note: {@code KafkaOffsetGen.CheckpointUtils#strToOffsets} in
hudi-utilities
+ * parses the same format but returns {@code Map<TopicPartition, Long>}
(Kafka-specific).
+ * This method returns {@code Map<Integer, Long>} to avoid Kafka client
dependencies
+ * in hudi-common. TODO: consolidate with KafkaOffsetGen.CheckpointUtils once
+ * https://github.com/apache/hudi/pull/18125 lands.</p>
+ *
+ * @param checkpointStr Checkpoint string
+ * @return Map of partition → offset
+ * @throws IllegalArgumentException if format is invalid
+ */
+ private static Map<Integer, Long> parseSparkKafkaCheckpoint(String
checkpointStr) {
+ if (checkpointStr == null || checkpointStr.trim().isEmpty()) {
+ throw new IllegalArgumentException("Checkpoint string cannot be null or
empty");
+ }
+
+ Map<Integer, Long> offsetMap = new HashMap<>();
+ String[] splits = checkpointStr.split(",");
+
+ if (splits.length < 2) {
+ throw new IllegalArgumentException(
+ "Invalid Spark Kafka checkpoint. Expected:
topic,partition:offset,... Got: " + checkpointStr);
+ }
+
+ // First element is topic name, skip it
+ for (int i = 1; i < splits.length; i++) {
+ String[] partitionOffset = splits[i].split(":");
+ if (partitionOffset.length != 2) {
+ throw new IllegalArgumentException(
+ "Invalid partition:offset format in checkpoint: " + splits[i]);
+ }
+
+ try {
+ int partition = Integer.parseInt(partitionOffset[0]);
+ long offset = Long.parseLong(partitionOffset[1]);
+ offsetMap.put(partition, offset);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(
+ "Invalid number format in checkpoint: " + splits[i], e);
+ }
+ }
+
+ return offsetMap;
+ }
+}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCheckpointUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCheckpointUtils.java
new file mode 100644
index 000000000000..4f43926a7a48
--- /dev/null
+++
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCheckpointUtils.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hudi.common.util.CheckpointUtils.CheckpointFormat;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for CheckpointUtils - Phase 1 core functionality.
+ */
+public class TestCheckpointUtils {
+
+ @Test
+ public void testParseSparkKafkaCheckpoint() {
+ String checkpoint = "test_topic,0:100,1:200,2:150";
+ Map<Integer, Long> offsets = CheckpointUtils.parseCheckpoint(
+ CheckpointFormat.SPARK_KAFKA, checkpoint);
+
+ assertEquals(3, offsets.size());
+ assertEquals(100L, offsets.get(0));
+ assertEquals(200L, offsets.get(1));
+ assertEquals(150L, offsets.get(2));
+ }
+
+ @Test
+ public void testParseSinglePartition() {
+ String checkpoint = "my_topic,0:1000";
+ Map<Integer, Long> offsets = CheckpointUtils.parseCheckpoint(
+ CheckpointFormat.SPARK_KAFKA, checkpoint);
+
+ assertEquals(1, offsets.size());
+ assertEquals(1000L, offsets.get(0));
+ }
+
+ @Test
+ public void testParseInvalidFormat() {
+ assertThrows(IllegalArgumentException.class, () ->
+ CheckpointUtils.parseCheckpoint(CheckpointFormat.SPARK_KAFKA,
"invalid"));
+
+ assertThrows(IllegalArgumentException.class, () ->
+ CheckpointUtils.parseCheckpoint(CheckpointFormat.SPARK_KAFKA,
"topic"));
+
+ assertThrows(IllegalArgumentException.class, () ->
+ CheckpointUtils.parseCheckpoint(CheckpointFormat.SPARK_KAFKA, ""));
+
+ assertThrows(IllegalArgumentException.class, () ->
+ CheckpointUtils.parseCheckpoint(CheckpointFormat.SPARK_KAFKA, null));
+ }
+
+ @Test
+ public void testParseInvalidPartitionOffset() {
+ assertThrows(IllegalArgumentException.class, () ->
+ CheckpointUtils.parseCheckpoint(CheckpointFormat.SPARK_KAFKA,
"topic,0:abc"));
+
+ assertThrows(IllegalArgumentException.class, () ->
+ CheckpointUtils.parseCheckpoint(CheckpointFormat.SPARK_KAFKA,
"topic,abc:100"));
+
+ assertThrows(IllegalArgumentException.class, () ->
+ CheckpointUtils.parseCheckpoint(CheckpointFormat.SPARK_KAFKA,
"topic,0-100"));
+ }
+
+ @Test
+ public void testExtractTopicName() {
+ assertEquals("test_topic",
+ CheckpointUtils.extractTopicName("test_topic,0:100,1:200"));
+ assertEquals("my.topic.name",
+ CheckpointUtils.extractTopicName("my.topic.name,0:1000"));
+ }
+
+ @Test
+ public void testExtractTopicNameInvalid() {
+ assertThrows(IllegalArgumentException.class, () ->
+ CheckpointUtils.extractTopicName(""));
+
+ assertThrows(IllegalArgumentException.class, () ->
+ CheckpointUtils.extractTopicName(null));
+
+ // Topic-only without partition data should be invalid
+ assertThrows(IllegalArgumentException.class, () ->
+ CheckpointUtils.extractTopicName("just_topic"));
+ }
+
+ @Test
+ public void testCalculateOffsetDifferenceBasic() {
+ String previous = "topic,0:100,1:200";
+ String current = "topic,0:150,1:300";
+
+ // (150-100) + (300-200) = 50 + 100 = 150
+ long diff = CheckpointUtils.calculateOffsetDifference(
+ CheckpointFormat.SPARK_KAFKA, previous, current);
+ assertEquals(150L, diff);
+ }
+
+ @Test
+ public void testCalculateOffsetDifferenceNoChange() {
+ String previous = "topic,0:100,1:200";
+ String current = "topic,0:100,1:200";
+
+ long diff = CheckpointUtils.calculateOffsetDifference(
+ CheckpointFormat.SPARK_KAFKA, previous, current);
+ assertEquals(0L, diff);
+ }
+
+ @Test
+ public void testCalculateOffsetDifferenceNewPartition() {
+ String previous = "topic,0:100";
+ String current = "topic,0:150,1:200";
+
+ // Partition 0: 150-100 = 50
+ // Partition 1: new partition, skipped (start offset unknown)
+ // Total: 50
+ long diff = CheckpointUtils.calculateOffsetDifference(
+ CheckpointFormat.SPARK_KAFKA, previous, current);
+ assertEquals(50L, diff);
+ }
+
+ @Test
+ public void testCalculateOffsetDifferenceRemovedPartition() {
+ String previous = "topic,0:100,1:200";
+ String current = "topic,0:150";
+
+ // Only partition 0 exists in both: 150-100 = 50
+ // Partition 1 ignored (not in current)
+ long diff = CheckpointUtils.calculateOffsetDifference(
+ CheckpointFormat.SPARK_KAFKA, previous, current);
+ assertEquals(50L, diff);
+ }
+
+ @Test
+ public void testCalculateOffsetDifferenceNegativeOffset() {
+ // Simulate topic reset where offset goes back to 0
+ String previous = "topic,0:1000";
+ String current = "topic,0:100";
+
+ // When current < previous (offset reset), partition is skipped
+ // to avoid overcounting since start offset is unknown
+ long diff = CheckpointUtils.calculateOffsetDifference(
+ CheckpointFormat.SPARK_KAFKA, previous, current);
+ assertEquals(0L, diff);
+ }
+
+ @Test
+ public void testCalculateOffsetDifferenceMultiplePartitionsWithReset() {
+ String previous = "topic,0:1000,1:2000";
+ String current = "topic,0:100,1:2500";
+
+ // Partition 0: reset, skipped to avoid overcounting
+ // Partition 1: normal increment = 2500-2000 = 500
+ // Total: 500
+ long diff = CheckpointUtils.calculateOffsetDifference(
+ CheckpointFormat.SPARK_KAFKA, previous, current);
+ assertEquals(500L, diff);
+ }
+
+ @Test
+ public void testIsValidCheckpointFormat() {
+ assertTrue(CheckpointUtils.isValidCheckpointFormat(
+ CheckpointFormat.SPARK_KAFKA, "topic,0:100"));
+ assertTrue(CheckpointUtils.isValidCheckpointFormat(
+ CheckpointFormat.SPARK_KAFKA, "topic,0:100,1:200,2:300"));
+ assertTrue(CheckpointUtils.isValidCheckpointFormat(
+ CheckpointFormat.SPARK_KAFKA, "my.topic.name,0:1000"));
+
+ assertFalse(CheckpointUtils.isValidCheckpointFormat(
+ CheckpointFormat.SPARK_KAFKA, ""));
+ assertFalse(CheckpointUtils.isValidCheckpointFormat(
+ CheckpointFormat.SPARK_KAFKA, null));
+ assertFalse(CheckpointUtils.isValidCheckpointFormat(
+ CheckpointFormat.SPARK_KAFKA, "just_topic"));
+ assertFalse(CheckpointUtils.isValidCheckpointFormat(
+ CheckpointFormat.SPARK_KAFKA, "topic,invalid"));
+ assertFalse(CheckpointUtils.isValidCheckpointFormat(
+ CheckpointFormat.SPARK_KAFKA, "topic,0:abc"));
+ }
+
+ @Test
+ public void testUnsupportedFormats() {
+ // Flink format not yet implemented (Phase 2)
+ assertThrows(UnsupportedOperationException.class, () ->
+ CheckpointUtils.parseCheckpoint(CheckpointFormat.FLINK_KAFKA,
"anystring"));
+
+ // Pulsar format not yet implemented (Phase 4)
+ assertThrows(UnsupportedOperationException.class, () ->
+ CheckpointUtils.parseCheckpoint(CheckpointFormat.PULSAR, "anystring"));
+
+ // Kinesis format not yet implemented (Phase 4)
+ assertThrows(UnsupportedOperationException.class, () ->
+ CheckpointUtils.parseCheckpoint(CheckpointFormat.KINESIS,
"anystring"));
+ }
+
+ @Test
+ public void testCustomFormatThrows() {
+ assertThrows(IllegalArgumentException.class, () ->
+ CheckpointUtils.parseCheckpoint(CheckpointFormat.CUSTOM, "anystring"));
+ }
+
+ @Test
+ public void testIsValidCheckpointFormatUnsupported() {
+ // Unsupported formats should return false (caught internally)
+ assertFalse(CheckpointUtils.isValidCheckpointFormat(
+ CheckpointFormat.FLINK_KAFKA, "anystring"));
+ assertFalse(CheckpointUtils.isValidCheckpointFormat(
+ CheckpointFormat.CUSTOM, "anystring"));
+ }
+}