shangxinli opened a new pull request, #18362:
URL: https://github.com/apache/hudi/pull/18362

   Summary and Changelog
   
     Summary:
     Adds Flink-specific pre-commit validation that integrates with the 
StreamWriteOperatorCoordinator commit path. Includes a concrete 
FlinkKafkaOffsetValidator that compares Kafka offset differences with record 
counts to detect data loss in
     streaming ingestion. The Flink Kafka checkpoint parser is aligned with the 
production StreamerUtil.parseKafkaOffsets() implementation using the 
lastIndexOf approach.
   
     Changelog:
     - Added FlinkValidationContext implementing ValidationContext for the 
Flink commit flow
     - Added FlinkKafkaOffsetValidator extending StreamingOffsetValidator for 
Flink Kafka checkpoint format
     - Added FlinkValidatorUtils orchestrator that instantiates and runs 
configured validators via reflection
     - Added Flink Kafka checkpoint parsing in 
CheckpointUtils.parseFlinkKafkaCheckpoint(), aligned with production 
StreamerUtil.parseKafkaOffsets()
     - Wired FlinkValidatorUtils.runValidators() into 
StreamWriteOperatorCoordinator.doCommit() between write status collection and 
commit finalization
     - Changed BasePreCommitValidator.validateWithMetadata() from protected to 
public for cross-package invocation
     - Added FlinkKafkaOffsetValidator FQCN reference to 
HoodiePreCommitValidatorConfig documentation
     - Added comprehensive unit tests: 52 test cases across 4 test classes
   
     Configuration:
     
hoodie.precommit.validators=org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator
     hoodie.precommit.validators.streaming.offset.tolerance.percentage=0.0
     hoodie.precommit.validators.failure.policy=FAIL
   
     This is Phase 2 of a 3-phase implementation:
     - Phase 1 (merged, #18068): Core framework in hudi-common and 
hudi-client-common
     - Phase 2 (this PR): Flink-specific implementation in hudi-flink
     - Phase 3 (future): Spark/HoodieStreamer implementation
   
     Impact
   
     Public API Changes:
     - BasePreCommitValidator.validateWithMetadata() visibility changed from 
protected to public
     - StreamingOffsetValidator.validateWithMetadata() visibility changed from 
protected to public (to match base class)
   
     New classes (Flink-specific, not public API):
     - org.apache.hudi.sink.validator.FlinkValidationContext
     - org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator
     - org.apache.hudi.sink.validator.FlinkValidatorUtils
   
     User-Facing Changes:
     Users can now configure pre-commit validators for Flink streaming jobs by 
setting hoodie.precommit.validators in their Flink configuration. The 
FlinkKafkaOffsetValidator is the first available validator, primarily intended 
for append-only
     Kafka ingestion. For upsert workloads with deduplication, configure a 
higher tolerance or use WARN_LOG failure policy.
   
     Performance Impact:
     None when no validators are configured (default). When validators are 
enabled, validation runs once per commit with negligible overhead (checkpoint 
string parsing and arithmetic comparison).
   
     Risk Level
   
     Risk Level: low
   
     Justification:
     - Validation is opt-in: no impact when hoodie.precommit.validators is 
empty (default)
     - The validation hook is placed before writeClient.commit() — if 
validation fails, the commit is not finalized
     - Previous commit metadata is read from the timeline with graceful error 
handling (falls back to empty, skipping previous-commit checks)
     - Flink Kafka checkpoint parser aligned with production 
StreamerUtil.parseKafkaOffsets() for consistency
     - 52 unit tests covering all code paths including edge cases
   
     Verification:
     - Compiled with -Pflink1.20: BUILD SUCCESS
     - Checkstyle: 0 violations across all modified modules
     - RAT (license headers): 0 violations
     - Unit tests: 52/52 pass
   
     Design Decisions
   
     - FlinkValidationContext.getActiveTimeline() throws 
UnsupportedOperationException — the active timeline is not available at 
pre-commit time in Flink because the commit has not been written yet. 
Validators should use
     getPreviousCommitMetadata() instead.
     - FlinkValidationContext.getPreviousCommitInstant() also throws 
UnsupportedOperationException — isFirstCommit() is overridden to check 
previousCommitMetadata presence directly, avoiding the need to construct a 
HoodieInstant.
     - parseFlinkKafkaCheckpoint() uses lastIndexOf approach (matching 
production StreamerUtil.parseKafkaOffsets()) instead of split("%3A") for 
robustness. Invalid entries are silently skipped with a warning log rather than 
throwing exceptions.
   
     Documentation Update
   
     Configuration guide for the new validator will be added when Phase 3 
(Spark) is complete to provide comprehensive documentation covering all engines.
   
   
   ### Contributor's checklist
   
   - [x] Read through [contributor's 
guide](https://hudi.apache.org/contribute/how-to-contribute)
   - [x] Enough context is provided in the sections above
   - [x] Adequate tests were added if applicable
   - [x] Commits are signed and follow 
[conventions](https://www.conventionalcommits.org/)
   - [x] All existing tests pass (checkstyle: 0 violations, build: success)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to