shangxinli opened a new pull request, #18405:
URL: https://github.com/apache/hudi/pull/18405
### Describe the issue this Pull Request addresses
This PR implements Phase 3 of the pre-commit validation framework by adding
Spark/HoodieStreamer-specific implementations. This enables Kafka offset
validation for HoodieStreamer to detect data loss in streaming ingestion
scenarios.
Addresses #18067
### Summary and Changelog
**Summary:**
Adds Spark/HoodieStreamer-specific implementations of the pre-commit
validation framework introduced in Phase 1 (#18068) and Phase 2 (Flink). This
enables streaming offset validation for HoodieStreamer Kafka ingestion
pipelines.
**Stack context:**
This PR builds on Phase 1 (#18068) which introduced the core framework in
hudi-common and Phase 2 which added Flink implementations.
**Changelog:**
- Added `SparkKafkaOffsetValidator` extending `StreamingOffsetValidator` for
Kafka offset validation in HoodieStreamer
- Added `SparkValidationContext` implementing `ValidationContext` to provide
Spark-specific validation context with access to commit metadata, write
statistics, and timeline
- Added `SparkStreamerValidatorUtils` utility class to instantiate and
execute validators in the HoodieStreamer commit flow
- Integrated validator execution into
`StreamSync.writeToSinkAndDoMetaSync()` before commit finalization
- Updated `HoodiePreCommitValidatorConfig` documentation to reference the
new Spark validator class
- Added comprehensive unit tests: `TestSparkKafkaOffsetValidator` (13 test
cases), `TestSparkValidationContext` (5 test cases), and
`TestSparkStreamerValidatorUtils` (9 test cases)
- All code is new, no existing code was copied
**Configuration properties (reused from Phase 1):**
- `hoodie.precommit.validators`: Add
`org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator`
- `hoodie.precommit.validators.streaming.offset.tolerance.percentage`
(default: 0.0)
- `hoodie.precommit.validators.failure.policy` (default: FAIL)
**This completes the 3-phase implementation:**
- Phase 1 (#18068): Core framework in hudi-common ✅
- Phase 2: Flink-specific implementation ✅
- Phase 3 (this PR): Spark/HoodieStreamer implementation ✅
### Impact
**Public API Changes:**
- New public classes in `org.apache.hudi.utilities.streamer.validator`
package:
- `SparkKafkaOffsetValidator` (concrete validator)
- `SparkValidationContext` (concrete context implementation)
- `SparkStreamerValidatorUtils` (utility class)
- Updated documentation in
`HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES` to reference the new
Spark validator
**User-Facing Changes:**
Users can now enable Kafka offset validation in HoodieStreamer by
configuring:
```
hoodie.precommit.validators=org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator
hoodie.precommit.validators.streaming.offset.tolerance.percentage=0.0
hoodie.precommit.validators.failure.policy=FAIL
```
**Performance Impact:**
Minimal. The validator execution adds one additional step before commit:
- Collecting write statuses (already needed for commit)
- Building commit metadata (lightweight)
- Comparing Kafka offsets with record counts (O(partitions) operation)
For HoodieStreamer pipelines without validators configured, there is zero
overhead (early return).
### Risk Level
**Risk Level: low**
**Justification:**
- Phase 3 adds opt-in functionality that only activates when explicitly
configured
- Integration point in `StreamSync.writeToSinkAndDoMetaSync()` is before
commit, so failures prevent bad commits rather than leaving the table in an
inconsistent state
- Reuses battle-tested framework components from Phase 1
(`StreamingOffsetValidator`, `CheckpointUtils`)
- Comprehensive unit test coverage (27 test cases total across 3 test
classes)
- All tests pass with 0 checkstyle violations
- No modifications to existing HoodieStreamer write paths (only adds
pre-commit validation hook)
**Verification:**
- Built successfully with Maven: `mvn clean install -pl hudi-utilities -am
-DskipTests`
- Checkstyle validation: 0 violations
- Unit tests: All 27 tests pass
- Integration tested with HoodieStreamer Kafka ingestion (manual validation)
### Documentation Update
**User-facing documentation:**
The following configuration guide should be added to the HoodieStreamer
documentation:
#### Enabling Kafka Offset Validation for HoodieStreamer
To detect data loss in Kafka streaming ingestion, configure the Spark Kafka
offset validator:
```properties
# Enable Spark Kafka offset validator
hoodie.precommit.validators=org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator
# Set tolerance for acceptable deviation (default: 0.0 = strict)
# For append-only workloads, use 0.0
# For upsert workloads with deduplication, consider 5.0 (5%)
hoodie.precommit.validators.streaming.offset.tolerance.percentage=0.0
# Set failure policy (default: FAIL)
# FAIL: reject commit if validation fails
# WARN_LOG: log warning but allow commit
hoodie.precommit.validators.failure.policy=FAIL
```
**When to use:**
- Append-only Kafka ingestion via HoodieStreamer
- Production pipelines where data loss must be detected immediately
- Scenarios where Kafka offset difference should match record count
**When NOT to use:**
- Upsert workloads with deduplication (set higher tolerance or use WARN_LOG)
- Non-Kafka sources (validator only supports Kafka checkpoints)
- Batch ingestion (not applicable to streaming)
### Contributor's checklist
- [x] Read through [contributor's
guide](https://hudi.apache.org/contribute/how-to-contribute)
- [x] Enough context is provided in the sections above
- [x] Adequate tests were added if applicable
- [x] Commits are signed and follow
[conventions](https://www.conventionalcommits.org/)
- [x] All existing tests pass
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]