samserpoosh opened a new issue, #8761:
URL: https://github.com/apache/hudi/issues/8761

   ### Describe The Problem You Faced**
   
   I'm trying to get Postgres CDC events published to Kafka by Debezium 
ingested into a partitioned Hudi Table in S3. I'm currently testing this E2E 
Data Flow using a dummy and pretty simple DB Table. When submitting the 
DeltaStreamer job, it throws the "**Illegal Lambda Deserialization**" exception.
   
   ### To Reproduce
   
   - My `spark-submit` Command:
   
   ```
   spark-submit \
   --jars "opt/spark/jars/hudi-utils-bundle.jar,..." \
   --master spark://<SPARK_MASTER_URL>:7077 \
   --total-executor-cores 1 \
   --executor-memory 4g \
   --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
   --conf spark.hadoop.fs.s3a.connection.maximum=10000 \
   --conf spark.scheduler.mode=FAIR \
   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer 
opt/spark/jars/hudi-utils-bundle.jar \
   --table-type COPY_ON_WRITE \
   --target-base-path s3a://path/to/samser_customers \
   --target-table samser_customers \
   --min-sync-interval-seconds 30 \
   --source-class 
org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource \
   --payload-class 
org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload \
   --source-ordering-field _event_lsn \
   --op UPSERT \
   --continuous \
   --source-limit 5000 \
   --hoodie-conf bootstrap.servers=<KAFKA_BOOTSTRAP_SRVER>:9092 \
   --hoodie-conf group.id=<GROUP_ID> \
   --hoodie-conf schema.registry.url=http://<REGISTRY_URL>:8081 \
   --hoodie-conf 
hoodie.deltastreamer.schemaprovider.registry.url=http://<REGISTRY_URL>:8081/subjects/<TOPIC>-value/versions/1
 \
   --hoodie-conf 
hoodie.deltastreamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.KafkaAvroDeserializer
 \
   --hoodie-conf hoodie.deltastreamer.source.kafka.topic=<TOPIC> \
   --hoodie-conf auto.offset.reset=earliest \
   --hoodie-conf hoodie.datasource.write.recordkey.field=id \
   --hoodie-conf hoodie.datasource.write.partitionpath.field=name \
   --hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \
   --hoodie-conf hoodie.datasource.write.precombine.field=_event_lsn \
   --hoodie-conf hoodie.metadata.enable=true \
   --hoodie-conf hoodie.metadata.index.column.stats.enable=true \
   --hoodie-conf hoodie.parquet.small.file.limit=134217728
   ```
   
   - Relevant Debezium-PG Configuration:
   
   ```
   class: io.debezium.connector.postgresql.PostgresConnector                    
                                                                                
                                                                                
                 
   plugin.name: pgoutput                                                        
                                                                                
                                                                                
                 
   database.hostname: <DB_HOST>                                                 
                                                                                
                                                                                
                 
   database.port: 5432                                                          
                                                                                
                                                                                
                 
   database.user: <DB_USER>                                                     
                                                                                
                                                                                
                 
   database.password: <DB_PWD>                                                  
                                                                                
                                                                                
                 
   database.dbname : <DB_NAME>                                                  
                                                                                
                                                                                
                 
   topic.prefix: <TOPIC_PREFIX>                                                 
                                                                                
                                                                                
                 
   schema.include.list: public                                                  
                                                                                
                                                                                
                 
   key.converter: io.confluent.connect.avro.AvroConverter                       
                                                                                
                                                                                
                 
   key.converter.schema.registry.url: http://<REGISTRY_URL>:8081                
                                                                                
                                                                                
                 
   value.converter: io.confluent.connect.avro.AvroConverter                     
                                                                                
                                                                                
                 
   value.converter.schema.registry.url: http://<REGISTRY_URL>:8081              
                                                                                
                                                                                
                 
   table.include.list: public.samser_customers                                  
                                                                                
                                                                                
                 
   topic.creation.enable: true                                                  
                                                                                
                                                                                
                 
   topic.creation.default.replication.factor: 1                                 
                                                                                
                                                                                
                 
   topic.creation.default.partitions: 1                                         
                                                                                
                                                                                
                 
   topic.creation.default.cleanup.policy: compact                               
                                                                                
                                                                                
                 
   topic.creation.default.compression.type: lz4                                 
                                                                                
                                                                                
                 
   decimal.handling.mode: double                                                
                                                                                
                                                                                
                 
   tombstones.on.delete: false
   ```
   
   - My DB Table's Schema:
   
   ```
   \d samser_customers
                                           Table "public.samser_customers"
      Column   |            Type             | Collation | Nullable |           
        Default
   
------------+-----------------------------+-----------+----------+----------------------------------------------
    id         | integer                     |           | not null | 
nextval('samser_customers_id_seq'::regclass)
    name       | character varying(50)       |           | not null |
    age        | integer                     |           | not null |
    created_at | timestamp without time zone |           |          |
    event_ts   | bigint                      |           |          |
   Indexes:
       "samser_customers_pkey" PRIMARY KEY, btree (id)
   Referenced by:
       TABLE "samser_orders" CONSTRAINT "fk_customer" FOREIGN KEY (customer_id) 
REFERENCES samser_customers(id)
   Publications:
       "dbz_publication"
   ```
   
   ## Expected Behavior
   
   I expected Hudi `PostgresDebeziumSource` to properly **deserialize** the 
Kafka CDC events and ingest them into a **partitioned** Hudi Table in S3 based 
on the partitioning FIELD provided in the command named `name`. However, the 
exception mentioned above was thrown.
   
   ## Environment Description
   
   - Hudi Version: **0.13.0**
     - Forked the repository and applied the changes laid out in [this 
patch](https://github.com/sydneyhoran/hudi/commit/b864a69e27d50424b6984f28a31c3bd99a025762)
 to `DebeziumSource`, built a JAR and used that one.
   - Spark Version: **3.1.2**
   - Hive Version: N/A 
   - Hadoop Version: **3.2.0**
   - Storage (HDFS/S3/GCS..): **S3**
   - Running on Docker? (yes/no): YES ... Docker & Kubernetes
   
   ## Additional Context
   
   Noticed some challenges previously which are detailed across several 
comments (such as [this 
one](https://github.com/apache/hudi/issues/8519#issuecomment-1550062066)) in 
#8519. I synced with @ad1happy2go and turns out, in `0.13.0`, I should make two 
key changes to my previous `spark-submit` command:
   
   - REMOVE `--schemaprovider-class` since it's **not** needed
   - Use `io.confluent.kafka.serializers.KafkaAvroDeserializer` instead of 
`org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer`
   
   Looks like that got me further now. Since I see in the logs outputted by 
these lines:
   
   
https://github.com/apache/hudi/blob/a3f0615c857bed2d6783b700b2505938ee8a1bf2/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java#L124-L125
   
   The **schema** is captured accurately and so is the **OFFSETS**:
   
   ```
   23/05/19 02:57:56 INFO DebeziumSource: Spark schema of Kafka Payload for 
topic <TOPIC_PREFIX>.public.samser_customers:   
                                                                                
                                                               
   root                                                                         
                                                                                
                                                                                
                 
    |-- _change_operation_type: string (nullable = true)                        
                                                                                
                                                                                
                 
    |-- _upstream_event_processed_ts_ms: long (nullable = true)
    |-- db_shard_source_partition: string (nullable = true)
    |-- db_schema_source_partition: string (nullable = true)
    |-- _event_origin_ts_ms: long (nullable = true)
    |-- _event_tx_id: long (nullable = true)
    |-- _event_lsn: long (nullable = true)
    |-- _event_xmin: long (nullable = true)
    |-- id: integer (nullable = true)
    |-- name: string (nullable = true)
    |-- age: integer (nullable = true)
    |-- created_at: long (nullable = true)
    |-- event_ts: long (nullable = true)
   
   23/05/19 02:57:56 INFO DebeziumSource: New checkpoint string: 
<TOPIC_PREFIX>.public.samser_customers,0:13
   ``` 
   
   However the exception mentioned below is then thrown.
   
   ## Stacktrace
   
   ```
   23/05/19 02:57:56 INFO CodeGenerator: Code generated in 13.51554 ms          
                                                                                
                                                                                
                 
   23/05/19 02:57:56 INFO SparkContext: Starting job: isEmpty at 
DeltaSync.java:545                                                              
                                                                                
                                
   23/05/19 02:57:56 INFO DAGScheduler: Got job 1 (isEmpty at 
DeltaSync.java:545) with 1 output partitions                                    
                                                                                
                                   
   23/05/19 02:57:56 INFO DAGScheduler: Final stage: ResultStage 1 (isEmpty at 
DeltaSync.java:545)                                                             
                                                                                
                  
   23/05/19 02:57:56 INFO DAGScheduler: Parents of final stage: List()          
                                                                                
                                                                                
                 
   23/05/19 02:57:56 INFO DAGScheduler: Missing parents: List()                 
                                                                                
                                                                                
                 
   23/05/19 02:57:56 INFO DAGScheduler: Submitting ResultStage 1 
(SQLConfInjectingRDD[15] at SQLConfInjectingRDD at HoodieSparkUtils.scala:124), 
which has no missing parents                                                    
                                
   23/05/19 02:57:56 INFO MemoryStore: Block broadcast_1 stored as values in 
memory (estimated size 85.1 KiB, free 413.8 MiB)                                
                                                                                
                    
   23/05/19 02:57:56 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes 
in memory (estimated size 22.3 KiB, free 413.8 MiB)                             
                                                                                
                 
   23/05/19 02:57:56 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory 
on 
spark-deltastreamer-driver.spark-deltastreamer-driver-headless.simian-dev8.svc.cluster.local:35567
 (size: 22.3 KiB, free: 413.9 MiB)                                           
   23/05/19 02:57:56 INFO SparkContext: Created broadcast 1 from broadcast at 
DAGScheduler.scala:1388                                                         
                                                                                
                   
   23/05/19 02:57:56 INFO DAGScheduler: Submitting 1 missing tasks from 
ResultStage 1 (SQLConfInjectingRDD[15] at SQLConfInjectingRDD at 
HoodieSparkUtils.scala:124) (first 15 tasks are for partitions Vector(0))       
                                        
   23/05/19 02:57:56 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 
resource profile 0                                                              
                                                                                
                   
   23/05/19 02:57:56 INFO FairSchedulableBuilder: Added task set TaskSet_1.0 
tasks to pool default                                                           
                                                                                
                    
   23/05/19 02:57:56 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 
1) (10.10.231.141, executor 0, partition 0, PROCESS_LOCAL, 4492 bytes) 
taskResourceAssignments Map()                                                   
                            
   23/05/19 02:57:56 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory 
on 10.10.231.141:40357 (size: 22.3 KiB, free: 2004.6 MiB)                       
                                                                                
                  
   23/05/19 02:57:57 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1) 
(10.10.231.141 executor 0): java.io.IOException: unexpected exception type      
                                                                                
                    
           at 
java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1750)       
                                                                                
                                                                                
   
           at 
java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1280)        
                                                                                
                                                                                
   
           at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2222)       
                                                                                
                                                                                
   
           at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)              
                                                                                
                                                                                
   
           at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2119)  
                                                                                
                                                                                
                 
           at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1657)              
                                                                                
                                                                                
   
           at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)        
                                                                                
                                                                                
   
           at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)           
                                                                                
                                                                                
   
           at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)       
                                                                                
                                                                                
   
           at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)              
                                                                                
                                                                                
   
           at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)        
                                                                                
                                                                                
   
           at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)           
                                                                                
                                                                                
   
           at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)       
                                                                                
                                                                                
   
           at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)              
                                                                                
                                                                                
   
           at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)        
                                                                                
                                                                                
   
           at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)           
                                                                                
                                                                                
   
           at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)       
                                                                                
                                                                                
   
           at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)              
                                                                                
                                                                                
   
           at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)  
                                                                                
                                                                                
                 
           at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)  
                                                                                
                                                                                
                 
           at 
scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)   
                                                                                
                                                                                
   
           at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)       
                                                                                
                                                                                
                 
           at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)   
                                                                                
                                                                                
   
           at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                                                                                
                                                                              
           at java.lang.reflect.Method.invoke(Method.java:498)                  
                                                                                
                                                                                
                 
           at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)         
                                                                                
                                                                                
   
           at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2322)
           at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
           at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
           at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)
           at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
           at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
           at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
           at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)
           at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
           at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
           at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
           at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
           at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
           at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
           at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
           at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
           at org.apache.spark.scheduler.Task.run(Task.scala:131)
           at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
           at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
           at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:750)
   Caused by: java.lang.reflect.InvocationTargetException
           at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
           at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.lang.reflect.Method.invoke(Method.java:498)
           at 
java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
           at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
           at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.lang.reflect.Method.invoke(Method.java:498)
           at 
java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1274)
           ... 47 more
   Caused by: java.lang.IllegalArgumentException: Illegal lambda deserialization
           at 
scala.runtime.LambdaDeserializer$.makeCallSite$1(LambdaDeserializer.scala:89)
           at 
scala.runtime.LambdaDeserializer$.deserializeLambda(LambdaDeserializer.scala:114)
           at 
scala.runtime.LambdaDeserialize.deserializeLambda(LambdaDeserialize.java:38)
           at 
org.apache.hudi.HoodieSparkUtils$.$deserializeLambda$(HoodieSparkUtils.scala)
           ... 56 more
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to