shangxinli opened a new issue, #18067: URL: https://github.com/apache/hudi/issues/18067
### Feature Description
**What the feature achieves:**
Creates an engine-agnostic pre-commit validation framework enabling
validators to access commit metadata, timeline, and write statistics.
Introduces streaming offset validators that detect data loss by comparing
source offset differences with actual record counts.
**Why this feature is needed:**
Problem: Hudi has no built-in data loss detection for streaming pipelines.
Real scenario:
- DeltaStreamer reads 10,000 Kafka messages (offsets 5000→15000)
- Due to software bugs, only 8,000 records written
- Kafka offset 15000 committed anyway
- Result: 2,000 records LOST silently
Current gaps:
- Existing validators only work for Spark (Dataset-based)
- No access to commit metadata (checkpoints, stats)
- Flink has no validation framework
- Users rely on manual reconciliation jobs (too late)
### User Experience
**How users will use this feature:**
Configuration
# Enable validator
hoodie.precommit.validators=org.apache.hudi.client.validator.SparkKafkaOffsetValidator
# Tolerance: 0.0=strict, >0=percentage (default: 0.0)
hoodie.precommit.validators.streaming.offset.tolerance.percentage=0.0
# Warn-only mode: log but don't block (default: false)
hoodie.precommit.validators.warn.only=false
Usage Examples
DeltaStreamer with strict validation:
spark-submit --class
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
hoodie-utilities.jar \
--target-base-path /data/events \
--hoodie-conf
hoodie.precommit.validators=org.apache.hudi.client.validator.SparkKafkaOffsetValidator
Gradual rollout (warn-only):
hoodie.precommit.validators.warn.only=true # Week 1-2: monitor
hoodie.precommit.validators.warn.only=false # Week 3+: enforce
With tolerance (for deduplication):
hoodie.precommit.validators.streaming.offset.tolerance.percentage=10.0
Flink streaming (Phase 2):
Configuration conf = new Configuration();
conf.setString("hoodie.precommit.validators",
"org.apache.hudi.flink.client.validator.FlinkKafkaOffsetValidator");
stream.sinkTo(HoodieSink.builder().withConfiguration(conf).build());
API Changes
New public APIs (hudi-common):
public abstract class BasePreCommitValidator {
protected boolean supportsMetadataValidation();
protected void validateWithMetadata(ValidationContext context);
}
public interface ValidationContext {
String getInstantTime();
Option<HoodieCommitMetadata> getCommitMetadata();
Map<String, String> getExtraMetadata();
long getTotalRecordsWritten();
}
public abstract class StreamingOffsetValidator extends
BasePreCommitValidator {
// Common offset validation logic
}
public class CheckpointUtils {
public static Map<Integer, Long> parseCheckpoint(CheckpointFormat,
String);
public static long calculateOffsetDifference(CheckpointFormat, String
prev, String curr);
}
### Hudi RFC Requirements
**RFC PR link:** (if applicable)
Why RFC is needed
Does this change public interfaces/APIs? Yes
- New public APIs: BasePreCommitValidator, ValidationContext,
StreamingOffsetValidator, CheckpointUtils
- Extends existing SparkPreCommitValidator with new methods
- External users will extend these for custom validators
Does this change storage format? No
- Uses existing extraMetadata mechanism for checkpoints
- No new metadata files
- Backward compatible with all tables
Justification:
- New public API contracts need stability commitment
- Multi-engine architecture requires design review
- Modifies commit flow (metadata built before validation)
- 3-phase implementation needs coordination
- Extensibility design (future sources: Pulsar, Kinesis)
**Task Breakdown**
Phase 1: Core Framework (hudi-common)
- Create BasePreCommitValidator, ValidationContext,
StreamingOffsetValidator
- Enhance CheckpointUtils with multi-format support
- Add configuration properties and unit tests
Phase 2: Flink Implementation (hudi-flink-client)
- Create Flink validator classes and context implementation
- Integrate with Flink checkpoint state extraction
- Write tests and documentation
Phase 3: DeltaStreamer Implementation (hudi-spark-client)
- Extend SparkPreCommitValidator with metadata validation
- Create Spark validator and context implementations
- Modify commit flow, update existing validators
- Write tests and documentation
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
