shangxinli opened a new pull request, #18362:
URL: https://github.com/apache/hudi/pull/18362
Summary and Changelog
Summary:
Adds Flink-specific pre-commit validation that integrates with the
StreamWriteOperatorCoordinator commit path. Includes a concrete
FlinkKafkaOffsetValidator that compares Kafka offset differences with record
counts to detect data loss in
streaming ingestion. The Flink Kafka checkpoint parser is aligned with the
production StreamerUtil.parseKafkaOffsets() implementation using the
lastIndexOf approach.
Changelog:
- Added FlinkValidationContext implementing ValidationContext for the
Flink commit flow
- Added FlinkKafkaOffsetValidator extending StreamingOffsetValidator for
Flink Kafka checkpoint format
- Added FlinkValidatorUtils orchestrator that instantiates and runs
configured validators via reflection
- Added Flink Kafka checkpoint parsing in
CheckpointUtils.parseFlinkKafkaCheckpoint(), aligned with production
StreamerUtil.parseKafkaOffsets()
- Wired FlinkValidatorUtils.runValidators() into
StreamWriteOperatorCoordinator.doCommit() between write status collection and
commit finalization
- Changed BasePreCommitValidator.validateWithMetadata() from protected to
public for cross-package invocation
- Added FlinkKafkaOffsetValidator FQCN reference to
HoodiePreCommitValidatorConfig documentation
- Added comprehensive unit tests: 52 test cases across 4 test classes
Configuration:
hoodie.precommit.validators=org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator
hoodie.precommit.validators.streaming.offset.tolerance.percentage=0.0
hoodie.precommit.validators.failure.policy=FAIL
This is Phase 2 of a 3-phase implementation:
- Phase 1 (merged, #18068): Core framework in hudi-common and
hudi-client-common
- Phase 2 (this PR): Flink-specific implementation in hudi-flink
- Phase 3 (future): Spark/HoodieStreamer implementation
Impact
Public API Changes:
- BasePreCommitValidator.validateWithMetadata() visibility changed from
protected to public
- StreamingOffsetValidator.validateWithMetadata() visibility changed from
protected to public (to match base class)
New classes (Flink-specific, not public API):
- org.apache.hudi.sink.validator.FlinkValidationContext
- org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator
- org.apache.hudi.sink.validator.FlinkValidatorUtils
User-Facing Changes:
Users can now configure pre-commit validators for Flink streaming jobs by
setting hoodie.precommit.validators in their Flink configuration. The
FlinkKafkaOffsetValidator is the first available validator, primarily intended
for append-only
Kafka ingestion. For upsert workloads with deduplication, configure a
higher tolerance or use WARN_LOG failure policy.
Performance Impact:
None when no validators are configured (default). When validators are
enabled, validation runs once per commit with negligible overhead (checkpoint
string parsing and arithmetic comparison).
Risk Level
Risk Level: low
Justification:
- Validation is opt-in: no impact when hoodie.precommit.validators is
empty (default)
- The validation hook is placed before writeClient.commit() — if
validation fails, the commit is not finalized
- Previous commit metadata is read from the timeline with graceful error
handling (falls back to empty, skipping previous-commit checks)
- Flink Kafka checkpoint parser aligned with production
StreamerUtil.parseKafkaOffsets() for consistency
- 52 unit tests covering all code paths including edge cases
Verification:
- Compiled with -Pflink1.20: BUILD SUCCESS
- Checkstyle: 0 violations across all modified modules
- RAT (license headers): 0 violations
- Unit tests: 52/52 pass
Design Decisions
- FlinkValidationContext.getActiveTimeline() throws
UnsupportedOperationException — the active timeline is not available at
pre-commit time in Flink because the commit has not been written yet.
Validators should use
getPreviousCommitMetadata() instead.
- FlinkValidationContext.getPreviousCommitInstant() also throws
UnsupportedOperationException — isFirstCommit() is overridden to check
previousCommitMetadata presence directly, avoiding the need to construct a
HoodieInstant.
- parseFlinkKafkaCheckpoint() uses lastIndexOf approach (matching
production StreamerUtil.parseKafkaOffsets()) instead of split("%3A") for
robustness. Invalid entries are silently skipped with a warning log rather than
throwing exceptions.
Documentation Update
Configuration guide for the new validator will be added when Phase 3
(Spark) is complete to provide comprehensive documentation covering all engines.
### Contributor's checklist
- [x] Read through [contributor's
guide](https://hudi.apache.org/contribute/how-to-contribute)
- [x] Enough context is provided in the sections above
- [x] Adequate tests were added if applicable
- [x] Commits are signed and follow
[conventions](https://www.conventionalcommits.org/)
- [x] All existing tests pass (checkstyle: 0 violations, build: success)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]