samserpoosh commented on issue #8519:
URL: https://github.com/apache/hudi/issues/8519#issuecomment-1530412949

   I'm facing a **very similar** error/stack-trace when trying to leverage 
DeltaStreamer with `PostgresDebeziumSource` as well. In my case, I'm sure it's 
not due to DELETE/tombstone records because I'm testing the E2E Data-Flow via 
some dummy tables and I've only done INSERT into those dummy tables.
   
   Here's the command I'm executing to submit `DeltaStreamer` job:
   
   ```shell
   spark-submit \                                                               
                                                                                
                                                                                
                 
   --jars "opt/spark/jars/hudi-utilities-bundle.jar,..." \                      
                 
   --master <spark_master_url> \                                                
                                                                                
                                                                                
                 
   --total-executor-cores <executor_cnt> \                                      
                                                                                
                                                                                
                 
   --executor-memory <mem> \                                                    
                                                                                
                                                                                
                 
   --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \     
                                                                                
                                                                                
                 
   --conf spark.hadoop.fs.s3a.connection.maximum=10000 \                        
                                                                                
                                                                                
                 
   --conf spark.scheduler.mode=FAIR \                                           
                                                                                
                                                                                
                 
   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer 
opt/spark/jars/hudi-utilities-bundle.jar \                                      
                                                                                
                          
   --table-type COPY_ON_WRITE \                                                 
                                                                                
                                                                                
                 
   --target-base-path s3a://***/data_lake/cdc/<table_name> \                    
                                                                                
                                                                                
                 
   --target-table <table_name> \                                                
                                                                                
                                                                                
                 
   --min-sync-interval-seconds 60 \                                             
                                                                                
                                                                                
                 
   --source-ordering-field _event_lsn \                                         
                                                                                
                                                                                
                 
   --source-class 
org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource \             
                                                                                
                                                                               
   --payload-class 
org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload \             
                                                                                
                                                                              
   --op UPSERT \                                                                
                                                                                
                                                                                
                 
   --continuous \                                                               
                                                                                
                                                                                
                 
   --source-limit 5000 \                                                        
                                                                                
                                                                                
                 
   --hoodie-conf bootstrap.servers=<kafka_bootstrap_server> \                   
                                                                                
                                                                                
                 
   --hoodie-conf schema.registry.url=<schema_registry> \                        
                                                                                
                                                                                
                 
   --hoodie-conf 
hoodie.deltastreamer.schemaprovider.registry.url=<schema_registry>/api/artifacts/<kafka_topic>-value/versions/<version_no>
 \                                                                              
                                      
   --hoodie-conf 
hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer
 \                                                                              
                                          
   --hoodie-conf hoodie.deltastreamer.source.kafka.topic=<kafka_topic> \        
                                                                                
                                                                                
                 
   --hoodie-conf auto.offset.reset=earliest \                                   
                                                                                
                                                                                
                 
   --hoodie-conf hoodie.datasource.write.recordkey.field=id \                   
                                                                                
                                                                                
                 
   --hoodie-conf hoodie.datasource.write.partitionpath.field=name \             
                                                                                
                                                                                
                 
   --hoodie-conf 
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator
 \                                                                              
                                                                            
   --hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \         
                                                                                
                                                                                
                 
   --hoodie-conf hoodie.datasource.write.precombine.field=_event_lsn \          
                                                                                
                                                                                
                 
   --hoodie-conf hoodie.metadata.enable=true \                                  
                                                                                
                                                                                
                 
   --hoodie-conf hoodie.metadata.index.bloom.filter.column.list=id \            
                                                                                
                                                                                
                 
   --hoodie-conf hoodie.metadata.index.column.stats.enable=true \               
                                                                                
                                                                                
                 
   --hoodie-conf hoodie.parquet.small.file.limit=134217728
   ```
   
   Am I missing any configuration property or the like which leads to this 
issue?
   
   ### One Potentially Issue Maybe?!
   
   One thing I'm suspicious of is related to **Schema Registry**, 
**Serializer** and **Deserializer** in this Data-Flow:
   
   - I leverage 
[Apicurio](https://debezium.io/documentation/reference/stable/configuration/avro.html#apicurio-registry)
 as the Schema Registry, and its `AvroCovnerter` serializer in the Debezium 
Connector settings
   - Then on the DeltaStreamer job, I'm using 
`org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer` as you see above
   
   Can this ^^ lead to issues? Or given the exception we're seeing about 
`DeltaSync`, it's not related to this at all?
   
   Thank you very much in advance, appreciate your help.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to