shangxinli opened a new pull request, #18405:
URL: https://github.com/apache/hudi/pull/18405

   ### Describe the issue this Pull Request addresses
   
   This PR implements Phase 3 of the pre-commit validation framework by adding 
Spark/HoodieStreamer-specific implementations. This enables Kafka offset 
validation for HoodieStreamer to detect data loss in streaming ingestion 
scenarios.
   
   Addresses #18067
   
   ### Summary and Changelog
   
   **Summary:**
   Adds Spark/HoodieStreamer-specific implementations of the pre-commit 
validation framework introduced in Phase 1 (#18068) and Phase 2 (Flink). This 
enables streaming offset validation for HoodieStreamer Kafka ingestion 
pipelines.
   
   **Stack context:**
   This PR builds on Phase 1 (#18068) which introduced the core framework in 
hudi-common and Phase 2 which added Flink implementations.
   
   **Changelog:**
   - Added `SparkKafkaOffsetValidator` extending `StreamingOffsetValidator` for 
Kafka offset validation in HoodieStreamer
   - Added `SparkValidationContext` implementing `ValidationContext` to provide 
Spark-specific validation context with access to commit metadata, write 
statistics, and timeline
   - Added `SparkStreamerValidatorUtils` utility class to instantiate and 
execute validators in the HoodieStreamer commit flow
   - Integrated validator execution into 
`StreamSync.writeToSinkAndDoMetaSync()` before commit finalization
   - Updated `HoodiePreCommitValidatorConfig` documentation to reference the 
new Spark validator class
   - Added comprehensive unit tests: `TestSparkKafkaOffsetValidator` (13 test 
cases), `TestSparkValidationContext` (5 test cases), and 
`TestSparkStreamerValidatorUtils` (9 test cases)
   - All code is new, no existing code was copied
   
   **Configuration properties (reused from Phase 1):**
   - `hoodie.precommit.validators`: Add 
`org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator`
   - `hoodie.precommit.validators.streaming.offset.tolerance.percentage` 
(default: 0.0)
   - `hoodie.precommit.validators.failure.policy` (default: FAIL)
   
   **This completes the 3-phase implementation:**
   - Phase 1 (#18068): Core framework in hudi-common ✅
   - Phase 2: Flink-specific implementation ✅
   - Phase 3 (this PR): Spark/HoodieStreamer implementation ✅
   
   ### Impact
   
   **Public API Changes:**
   - New public classes in `org.apache.hudi.utilities.streamer.validator` 
package:
     - `SparkKafkaOffsetValidator` (concrete validator)
     - `SparkValidationContext` (concrete context implementation)
     - `SparkStreamerValidatorUtils` (utility class)
   - Updated documentation in 
`HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES` to reference the new 
Spark validator
   
   **User-Facing Changes:**
   Users can now enable Kafka offset validation in HoodieStreamer by 
configuring:
   ```
   
hoodie.precommit.validators=org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator
   hoodie.precommit.validators.streaming.offset.tolerance.percentage=0.0
   hoodie.precommit.validators.failure.policy=FAIL
   ```
   
   **Performance Impact:**
   Minimal. The validator execution adds one additional step before commit:
   - Collecting write statuses (already needed for commit)
   - Building commit metadata (lightweight)
   - Comparing Kafka offsets with record counts (O(partitions) operation)
   
   For HoodieStreamer pipelines without validators configured, there is zero 
overhead (early return).
   
   ### Risk Level
   
   **Risk Level: low**
   
   **Justification:**
   - Phase 3 adds opt-in functionality that only activates when explicitly 
configured
   - Integration point in `StreamSync.writeToSinkAndDoMetaSync()` is before 
commit, so failures prevent bad commits rather than leaving the table in an 
inconsistent state
   - Reuses battle-tested framework components from Phase 1 
(`StreamingOffsetValidator`, `CheckpointUtils`)
   - Comprehensive unit test coverage (27 test cases total across 3 test 
classes)
   - All tests pass with 0 checkstyle violations
   - No modifications to existing HoodieStreamer write paths (only adds 
pre-commit validation hook)
   
   **Verification:**
   - Built successfully with Maven: `mvn clean install -pl hudi-utilities -am 
-DskipTests`
   - Checkstyle validation: 0 violations
   - Unit tests: All 27 tests pass
   - Integration tested with HoodieStreamer Kafka ingestion (manual validation)
   
   ### Documentation Update
   
   **User-facing documentation:**
   
   The following configuration guide should be added to the HoodieStreamer 
documentation:
   
   #### Enabling Kafka Offset Validation for HoodieStreamer
   
   To detect data loss in Kafka streaming ingestion, configure the Spark Kafka 
offset validator:
   
   ```properties
   # Enable Spark Kafka offset validator
   
hoodie.precommit.validators=org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator
   
   # Set tolerance for acceptable deviation (default: 0.0 = strict)
   # For append-only workloads, use 0.0
   # For upsert workloads with deduplication, consider 5.0 (5%)
   hoodie.precommit.validators.streaming.offset.tolerance.percentage=0.0
   
   # Set failure policy (default: FAIL)
   # FAIL: reject commit if validation fails
   # WARN_LOG: log warning but allow commit
   hoodie.precommit.validators.failure.policy=FAIL
   ```
   
   **When to use:**
   - Append-only Kafka ingestion via HoodieStreamer
   - Production pipelines where data loss must be detected immediately
   - Scenarios where Kafka offset difference should match record count
   
   **When NOT to use:**
   - Upsert workloads with deduplication (set higher tolerance or use WARN_LOG)
   - Non-Kafka sources (validator only supports Kafka checkpoints)
   - Batch ingestion (not applicable to streaming)
   
   ### Contributor's checklist
   
   - [x] Read through [contributor's 
guide](https://hudi.apache.org/contribute/how-to-contribute)
   - [x] Enough context is provided in the sections above
   - [x] Adequate tests were added if applicable
   - [x] Commits are signed and follow 
[conventions](https://www.conventionalcommits.org/)
   - [x] All existing tests pass
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to