shangxinli opened a new issue, #18067:
URL: https://github.com/apache/hudi/issues/18067

   ### Feature Description
   
   **What the feature achieves:**
   Creates an engine-agnostic pre-commit validation framework enabling 
validators to access commit metadata, timeline, and write statistics. 
Introduces streaming offset validators that detect data loss by comparing 
source offset differences with actual record counts.     
    
   **Why this feature is needed:**
   Problem: Hudi has no built-in data loss detection for streaming pipelines.   
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                             
     Real scenario:                                                             
                                                                                
                                                                                
                                  
     - DeltaStreamer reads 10,000 Kafka messages (offsets 5000→15000)           
                                                                                
                                                                                
                                  
     - Due to software bugs, only 8,000 records written                         
                                                                                
                                                                                
                                            
     - Kafka offset 15000 committed anyway                                      
                                                                                
                                                                                
                                  
     - Result: 2,000 records LOST silently                                      
                                                                                
                                                                                
                      
                                                                                
                                                                                
                                                                                
                                  
     Current gaps:                                                              
                                                                                
                                                                                
                                  
     - Existing validators only work for Spark (Dataset-based)                  
                                                                                
                                                                                
                                  
     - No access to commit metadata (checkpoints, stats)                        
                                                                                
                                                                                
                                  
     - Flink has no validation framework                                        
                                                                                
                                                                                
                                  
     - Users rely on manual reconciliation jobs (too late) 
   
   
   ### User Experience
   
   **How users will use this feature:**
    Configuration                                                               
                                                                                
                                                                                
                                 
                                                                                
                                                                                
                                                                                
                                  
     # Enable validator                                                         
                                                                                
                                                                                
                                  
     
hoodie.precommit.validators=org.apache.hudi.client.validator.SparkKafkaOffsetValidator
                                                                                
                                                                                
                       
                                                                                
                                                                                
                                                                                
                                  
     # Tolerance: 0.0=strict, >0=percentage (default: 0.0)                      
                                                                                
                                                                                
                                  
     hoodie.precommit.validators.streaming.offset.tolerance.percentage=0.0      
                                                                                
                                                                                
                                  
                                                                                
                                                                                
                                                                                
                                  
     # Warn-only mode: log but don't block (default: false)                     
                                                                                
                                                                                
                                  
     hoodie.precommit.validators.warn.only=false                                
                                                                                
                                                                                
                                  
                                                                                
                                                                                
                                                                                
                                  
     Usage Examples                                                             
                                                                                
                                                                                
                                  
                                                                                
                                                                                
                                                                                
                                  
     DeltaStreamer with strict validation:                                      
                                                                                
                                                                                
                                  
     spark-submit --class 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \                   
                                                                                
                                                                                
        
       hoodie-utilities.jar \                                                   
                                                                                
                                                                                
                                  
       --target-base-path /data/events \                                        
                                                                                
                                                                                
                                  
       --hoodie-conf 
hoodie.precommit.validators=org.apache.hudi.client.validator.SparkKafkaOffsetValidator
                                                                                
                                                                                
       
                                                                                
                                                                                
                                                                                
                                  
     Gradual rollout (warn-only):                                               
                                                                                
                                                                                
                                  
     hoodie.precommit.validators.warn.only=true  # Week 1-2: monitor            
                                                                                
                                                                                
                                  
     hoodie.precommit.validators.warn.only=false # Week 3+: enforce             
                                                                                
                                                                                
                                  
                                                                                
                                                                                
                                                                                
                                  
     With tolerance (for deduplication):                                        
                                                                                
                                                                                
                                  
     hoodie.precommit.validators.streaming.offset.tolerance.percentage=10.0     
                                                                                
                                                                                
                                  
                                                                                
                                                                                
                                                                                
                                  
     Flink streaming (Phase 2):                                                 
                                                                                
                                                                                
                                  
     Configuration conf = new Configuration();                                  
                                                                                
                                                                                
                                  
     conf.setString("hoodie.precommit.validators",                              
                                                                                
                                                                                
                                  
         "org.apache.hudi.flink.client.validator.FlinkKafkaOffsetValidator");   
                                                                                
                                                                                
                                  
                                                                                
                                                                                
                                                                                
                                  
     stream.sinkTo(HoodieSink.builder().withConfiguration(conf).build());       
                                                                                
                                                                                
                                  
                                                                                
                                                                                
                                                                                
                                  
     API Changes                                                                
                                                                                
                                                                                
                                  
                                                                                
                                                                                
                                                                                
                                  
     New public APIs (hudi-common):                                             
                                                                                
                                                                                
                                  
     public abstract class BasePreCommitValidator {                             
                                                                                
                                                                                
                                  
       protected boolean supportsMetadataValidation();                          
                                                                                
                                                                                
                                  
       protected void validateWithMetadata(ValidationContext context);          
                                                                                
                                                                                
                                  
     }                                                                          
                                                                                
                                                                                
                                  
                                                                                
                                                                                
                                                                                
                                  
     public interface ValidationContext {                                       
                                                                                
                                                                                
                                  
       String getInstantTime();                                                 
                                                                                
                                                                                
                                  
       Option<HoodieCommitMetadata> getCommitMetadata();                        
                                                                                
                                                                                
                                  
       Map<String, String> getExtraMetadata();                                  
                                                                                
                                                                                
                                  
       long getTotalRecordsWritten();                                           
                                                                                
                                                                                
                                  
     }                                                                          
                                                                                
                                                                                
                                  
                                                                                
                                                                                
                                                                                
                                  
     public abstract class StreamingOffsetValidator extends 
BasePreCommitValidator {                                                        
                                                                                
                                                      
       // Common offset validation logic                                        
                                                                                
                                                                                
                                  
     }                                                                          
                                                                                
                                                                                
                                  
                                                                                
                                                                                
                                                                                
                                  
     public class CheckpointUtils {                                             
                                                                                
                                                                                
                                  
       public static Map<Integer, Long> parseCheckpoint(CheckpointFormat, 
String);                                                                        
                                                                                
                                        
       public static long calculateOffsetDifference(CheckpointFormat, String 
prev, String curr);                                                             
                                                                                
                                     
     }                                                                          
                                                                                
                                                                                
                                  
                                                                                
                   
   
   ### Hudi RFC Requirements
   
   **RFC PR link:** (if applicable)
    Why RFC is needed                                                           
                                                                                
                                                                                
                                 
                                                                                
                                                                                
                                                                                
                                  
     Does this change public interfaces/APIs? Yes                               
                                                                                
                                                                                
                               
     - New public APIs: BasePreCommitValidator, ValidationContext, 
StreamingOffsetValidator, CheckpointUtils                                       
                                                                                
                                               
     - Extends existing SparkPreCommitValidator with new methods                
                                                                                
                                                                                
                                  
     - External users will extend these for custom validators                   
                                                                                
                                                                                
                                  
                                                                                
                                                                                
                                                                                
                                  
     Does this change storage format?  No                                       
                                                                                
                                                                                
                                
     - Uses existing extraMetadata mechanism for checkpoints                    
                                                                                
                                                                                
                                  
     - No new metadata files                                                    
                                                                                
                                                                                
                                  
     - Backward compatible with all tables                                      
                                                                                
                                                                                
                                  
                                                                                
                                                                                
                                                                                
                                  
     Justification:                                                             
                                                                                
                                                                                
                                  
     - New public API contracts need stability commitment                       
                                                                                
                                                                                
                                  
     - Multi-engine architecture requires design review                         
                                                                                
                                                                                
                                  
     - Modifies commit flow (metadata built before validation)                  
                                                                                
                                                                                
                                  
     - 3-phase implementation needs coordination                                
                                                                                
                                                                                
                                  
     - Extensibility design (future sources: Pulsar, Kinesis)    
   
    **Task Breakdown**                                                          
                                                                                
                                                                                
                                     
                                                                                
                                                                                
                                                                                
                                  
     Phase 1: Core Framework (hudi-common)                                      
                                                                                
                                                                                
                      
     - Create BasePreCommitValidator, ValidationContext, 
StreamingOffsetValidator                                                        
                                                                                
                                                         
     - Enhance CheckpointUtils with multi-format support                        
                                                                                
                                                                                
                                  
     - Add configuration properties and unit tests                              
                                                                                
                                                                                
                                  
                                                                                
                                                                                
                                                                                
                                  
     Phase 2: Flink Implementation (hudi-flink-client)                          
                                                                                
                                                                                
                       
     - Create Flink validator classes and context implementation                
                                                                                
                                                                                
                                  
     - Integrate with Flink checkpoint state extraction                         
                                                                                
                                                                                
                                  
     - Write tests and documentation                                            
                                                                                
                                                                                
                                  
                                                                                
                                                                                
                                                                                
                                  
     Phase 3: DeltaStreamer Implementation (hudi-spark-client)                  
                                                                                
                                                                                
                     
     - Extend SparkPreCommitValidator with metadata validation                  
                                                                                
                                                                                
                                  
     - Create Spark validator and context implementations                       
                                                                                
                                                                                
                                  
     - Modify commit flow, update existing validators                           
                                                                                
                                                                                
                                  
     - Write tests and documentation                             
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to