samserpoosh opened a new issue, #8761: URL: https://github.com/apache/hudi/issues/8761
### Describe The Problem You Faced**
I'm trying to get Postgres CDC events published to Kafka by Debezium
ingested into a partitioned Hudi Table in S3. I'm currently testing this E2E
Data Flow using a dummy and pretty simple DB Table. When submitting the
DeltaStreamer job, it throws the "**Illegal Lambda Deserialization**" exception.
### To Reproduce
- My `spark-submit` Command:
```
spark-submit \
--jars "opt/spark/jars/hudi-utils-bundle.jar,..." \
--master spark://<SPARK_MASTER_URL>:7077 \
--total-executor-cores 1 \
--executor-memory 4g \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.hadoop.fs.s3a.connection.maximum=10000 \
--conf spark.scheduler.mode=FAIR \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer
opt/spark/jars/hudi-utils-bundle.jar \
--table-type COPY_ON_WRITE \
--target-base-path s3a://path/to/samser_customers \
--target-table samser_customers \
--min-sync-interval-seconds 30 \
--source-class
org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource \
--payload-class
org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload \
--source-ordering-field _event_lsn \
--op UPSERT \
--continuous \
--source-limit 5000 \
--hoodie-conf bootstrap.servers=<KAFKA_BOOTSTRAP_SRVER>:9092 \
--hoodie-conf group.id=<GROUP_ID> \
--hoodie-conf schema.registry.url=http://<REGISTRY_URL>:8081 \
--hoodie-conf
hoodie.deltastreamer.schemaprovider.registry.url=http://<REGISTRY_URL>:8081/subjects/<TOPIC>-value/versions/1
\
--hoodie-conf
hoodie.deltastreamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.KafkaAvroDeserializer
\
--hoodie-conf hoodie.deltastreamer.source.kafka.topic=<TOPIC> \
--hoodie-conf auto.offset.reset=earliest \
--hoodie-conf hoodie.datasource.write.recordkey.field=id \
--hoodie-conf hoodie.datasource.write.partitionpath.field=name \
--hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \
--hoodie-conf hoodie.datasource.write.precombine.field=_event_lsn \
--hoodie-conf hoodie.metadata.enable=true \
--hoodie-conf hoodie.metadata.index.column.stats.enable=true \
--hoodie-conf hoodie.parquet.small.file.limit=134217728
```
- Relevant Debezium-PG Configuration:
```
class: io.debezium.connector.postgresql.PostgresConnector
plugin.name: pgoutput
database.hostname: <DB_HOST>
database.port: 5432
database.user: <DB_USER>
database.password: <DB_PWD>
database.dbname : <DB_NAME>
topic.prefix: <TOPIC_PREFIX>
schema.include.list: public
key.converter: io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url: http://<REGISTRY_URL>:8081
value.converter: io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url: http://<REGISTRY_URL>:8081
table.include.list: public.samser_customers
topic.creation.enable: true
topic.creation.default.replication.factor: 1
topic.creation.default.partitions: 1
topic.creation.default.cleanup.policy: compact
topic.creation.default.compression.type: lz4
decimal.handling.mode: double
tombstones.on.delete: false
```
- My DB Table's Schema:
```
\d samser_customers
Table "public.samser_customers"
Column | Type | Collation | Nullable |
Default
------------+-----------------------------+-----------+----------+----------------------------------------------
id | integer | | not null |
nextval('samser_customers_id_seq'::regclass)
name | character varying(50) | | not null |
age | integer | | not null |
created_at | timestamp without time zone | | |
event_ts | bigint | | |
Indexes:
"samser_customers_pkey" PRIMARY KEY, btree (id)
Referenced by:
TABLE "samser_orders" CONSTRAINT "fk_customer" FOREIGN KEY (customer_id)
REFERENCES samser_customers(id)
Publications:
"dbz_publication"
```
## Expected Behavior
I expected Hudi `PostgresDebeziumSource` to properly **deserialize** the
Kafka CDC events and ingest them into a **partitioned** Hudi Table in S3 based
on the partitioning FIELD provided in the command named `name`. However, the
exception mentioned above was thrown.
## Environment Description
- Hudi Version: **0.13.0**
- Forked the repository and applied the changes laid out in [this
patch](https://github.com/sydneyhoran/hudi/commit/b864a69e27d50424b6984f28a31c3bd99a025762)
to `DebeziumSource`, built a JAR and used that one.
- Spark Version: **3.1.2**
- Hive Version: N/A
- Hadoop Version: **3.2.0**
- Storage (HDFS/S3/GCS..): **S3**
- Running on Docker? (yes/no): YES ... Docker & Kubernetes
## Additional Context
Noticed some challenges previously which are detailed across several
comments (such as [this
one](https://github.com/apache/hudi/issues/8519#issuecomment-1550062066)) in
#8519. I synced with @ad1happy2go and turns out, in `0.13.0`, I should make two
key changes to my previous `spark-submit` command:
- REMOVE `--schemaprovider-class` since it's **not** needed
- Use `io.confluent.kafka.serializers.KafkaAvroDeserializer` instead of
`org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer`
Looks like that got me further now. Since I see in the logs outputted by
these lines:
https://github.com/apache/hudi/blob/a3f0615c857bed2d6783b700b2505938ee8a1bf2/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java#L124-L125
The **schema** is captured accurately and so is the **OFFSETS**:
```
23/05/19 02:57:56 INFO DebeziumSource: Spark schema of Kafka Payload for
topic <TOPIC_PREFIX>.public.samser_customers:
root
|-- _change_operation_type: string (nullable = true)
|-- _upstream_event_processed_ts_ms: long (nullable = true)
|-- db_shard_source_partition: string (nullable = true)
|-- db_schema_source_partition: string (nullable = true)
|-- _event_origin_ts_ms: long (nullable = true)
|-- _event_tx_id: long (nullable = true)
|-- _event_lsn: long (nullable = true)
|-- _event_xmin: long (nullable = true)
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- created_at: long (nullable = true)
|-- event_ts: long (nullable = true)
23/05/19 02:57:56 INFO DebeziumSource: New checkpoint string:
<TOPIC_PREFIX>.public.samser_customers,0:13
```
However the exception mentioned below is then thrown.
## Stacktrace
```
23/05/19 02:57:56 INFO CodeGenerator: Code generated in 13.51554 ms
23/05/19 02:57:56 INFO SparkContext: Starting job: isEmpty at
DeltaSync.java:545
23/05/19 02:57:56 INFO DAGScheduler: Got job 1 (isEmpty at
DeltaSync.java:545) with 1 output partitions
23/05/19 02:57:56 INFO DAGScheduler: Final stage: ResultStage 1 (isEmpty at
DeltaSync.java:545)
23/05/19 02:57:56 INFO DAGScheduler: Parents of final stage: List()
23/05/19 02:57:56 INFO DAGScheduler: Missing parents: List()
23/05/19 02:57:56 INFO DAGScheduler: Submitting ResultStage 1
(SQLConfInjectingRDD[15] at SQLConfInjectingRDD at HoodieSparkUtils.scala:124),
which has no missing parents
23/05/19 02:57:56 INFO MemoryStore: Block broadcast_1 stored as values in
memory (estimated size 85.1 KiB, free 413.8 MiB)
23/05/19 02:57:56 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes
in memory (estimated size 22.3 KiB, free 413.8 MiB)
23/05/19 02:57:56 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory
on
spark-deltastreamer-driver.spark-deltastreamer-driver-headless.simian-dev8.svc.cluster.local:35567
(size: 22.3 KiB, free: 413.9 MiB)
23/05/19 02:57:56 INFO SparkContext: Created broadcast 1 from broadcast at
DAGScheduler.scala:1388
23/05/19 02:57:56 INFO DAGScheduler: Submitting 1 missing tasks from
ResultStage 1 (SQLConfInjectingRDD[15] at SQLConfInjectingRDD at
HoodieSparkUtils.scala:124) (first 15 tasks are for partitions Vector(0))
23/05/19 02:57:56 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
resource profile 0
23/05/19 02:57:56 INFO FairSchedulableBuilder: Added task set TaskSet_1.0
tasks to pool default
23/05/19 02:57:56 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID
1) (10.10.231.141, executor 0, partition 0, PROCESS_LOCAL, 4492 bytes)
taskResourceAssignments Map()
23/05/19 02:57:56 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory
on 10.10.231.141:40357 (size: 22.3 KiB, free: 2004.6 MiB)
23/05/19 02:57:57 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1)
(10.10.231.141 executor 0): java.io.IOException: unexpected exception type
at
java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1750)
at
java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1280)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2222)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2119)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1657)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
at
scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2322)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1274)
... 47 more
Caused by: java.lang.IllegalArgumentException: Illegal lambda deserialization
at
scala.runtime.LambdaDeserializer$.makeCallSite$1(LambdaDeserializer.scala:89)
at
scala.runtime.LambdaDeserializer$.deserializeLambda(LambdaDeserializer.scala:114)
at
scala.runtime.LambdaDeserialize.deserializeLambda(LambdaDeserialize.java:38)
at
org.apache.hudi.HoodieSparkUtils$.$deserializeLambda$(HoodieSparkUtils.scala)
... 56 more
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
