Ranga Reddy created HUDI-9743:
---------------------------------

             Summary: Invalid lambda deserialization error while ingesting 
ProtoKafkaSource with Hudi Streamer
                 Key: HUDI-9743
                 URL: https://issues.apache.org/jira/browse/HUDI-9743
             Project: Apache Hudi
          Issue Type: Bug
          Components: deltastreamer, hudi-utilities
    Affects Versions: 1.0.2
            Reporter: Ranga Reddy
             Fix For: 1.1.0


*Describe the problem you faced*

I'm trying to get Protobuf records from a Confluent Cloud topic into a target 
table on Google Cloud Storage. I'm submitting Hudi Streamer jobs using Google 
Dataproc. The Hudi Streamer jobs fail with the following {{Invalid Lambda 
Deserialization}} error.
Caused by: java.lang.reflect.InvocationTargetException
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at 
java.base/java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:237)
        at 
java.base/jdk.internal.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at 
java.base/java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1136)
        ... 128 more
Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization
        at 
org.apache.hudi.utilities.streamer.SourceFormatAdapter.$deserializeLambda$(SourceFormatAdapter.java:72)
        ... 137 more{{}}
 
*To Reproduce*
Environment Details
Hudi version: v1.0.2
Spark version: 3.5.3
Scala version: 2.12
Google Dataproc version: 2.3.6-debian12
Storage: Google Cloud Storage

{*}Spark Submit Command and Protobuf Configuration{*}{{{}{}}}
{code:java}
gcloud dataproc jobs submit spark --cluster <cluster-name> \ --region 
us-central1 \ --class org.apache.hudi.utilities.streamer.HoodieStreamer \ 
--project <project-id> \ --properties 
spark.driver.userClassPathFirst=true,spark.executor.userClassPathFirst=true,spark.driver.extraClassPath=gs://<path>/hudi-utilities-bundle_2.12-1.0.2.jar,spark.executor.extraClassPath=gs://<path>/hudi-utilities-bundle_2.12-1.0.2.jar
 \ --jars 
gs://<path>/jars/hudi-utilities-bundle_2.12-1.0.2.jar,gs://<path>/jars/wire-schema-3.1.0.jar,gs://<path>-storage/jars/kafka-protobuf-provider-5.5.0.jar
 \ -- \ --target-table users \ --target-base-path gs://<path>/data/users \ 
--source-ordering-field registertime \ --min-sync-interval-seconds 60 \ 
--source-limit 1000 \ --continuous \ --source-class 
org.apache.hudi.utilities.sources.ProtoKafkaSource \ --schemaprovider-class 
org.apache.hudi.utilities.schema.SchemaRegistryProvider \ --table-type 
COPY_ON_WRITE \ --op UPSERT \ --compact-scheduling-weight 3 \ 
--delta-sync-scheduling-weight 4 \ --post-write-termination-strategy-class 
org.apache.hudi.utilities.streamer.NoNewDataTerminationStrategy \ --hoodie-conf 
hoodie.table.name=users \ --hoodie-conf hoodie.base.path=gs://<path>/data/users 
\ --hoodie-conf 
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
 \ --hoodie-conf hoodie.keygen.timebased.timestamp.type=EPOCHMILLISECONDS \ 
--hoodie-conf hoodie.keygen.timebased.input.dateformat=yyyy-MM-dd \ 
--hoodie-conf hoodie.keygen.timebased.output.dateformat=yyyy-MM-dd \ 
--hoodie-conf max.rounds.without.new.data.to.shutdown=5 \ --hoodie-conf 
sasl.mechanism=PLAIN \ --hoodie-conf security.protocol=SASL_SSL \ --hoodie-conf 
hoodie.datasource.write.reconcile.schema=true \ --hoodie-conf 
bootstrap.servers=<bootstrap-server>:9092 \ --hoodie-conf 
sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule 
required username=<username> password=<password>;" \ --hoodie-conf 
schema.registry.url=<schema-registry-url>/ \ --hoodie-conf 
schema.registry.basic.auth.user.info=<username>:<password> \ --hoodie-conf 
basic.auth.credentials.source=USER_INFO \ --hoodie-conf 
hoodie.streamer.schemaprovider.registry.url=<username>:<password>@<schema-registry-url>/subjects/test_users_proto-value/versions/latest
 \ --hoodie-conf hoodie.datasource.write.recordkey.field=userid \ --hoodie-conf 
hoodie.datasource.write.partitionpath.field=regionid \ --hoodie-conf 
hoodie.datasource.write.hive_style_partitioning=True \ --hoodie-conf 
hoodie.datasource.write.precombine.field=registertime \ --hoodie-conf 
hoodie.datasource.write.operation=UPSERT \ --hoodie-conf 
hoodie.streamer.source.kafka.topic=test_users_proto \ --hoodie-conf 
hoodie.streamer.schemaprovider.registry.schemaconverter=org.apache.hudi.utilities.schema.converter.ProtoSchemaToAvroSchemaConverter
 \ --hoodie-conf 
hoodie.streamer.source.kafka.proto.value.deserializer.class=io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer
 \ --hoodie-conf group.id=hudi-deltastreamer \ --hoodie-conf 
auto.offset.reset=earliest \ --hoodie-conf 
hoodie.write.concurrency.mode=SINGLE_WRITER \ --hoodie-conf 
hoodie.datasource.write.drop.partition.columns=True \ --hoodie-conf 
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider
 \ --hoodie-conf hoodie.cleaner.policy.failed.writes=EAGER \ --hoodie-conf 
hoodie.client.heartbeat.interval_in_ms=120000 \ --hoodie-conf 
hoodie.client.heartbeat.tolerable.misses=10 \ --hoodie-conf 
hoodie.keep.min.commits=100 \ --hoodie-conf hoodie.keep.max.commits=130 \ 
--hoodie-conf hoodie.cleaner.policy=KEEP_LATEST_COMMITS \ --hoodie-conf 
hoodie.clean.automatic=true \ --hoodie-conf hoodie.cleaner.commits.retained=10 
\ --hoodie-conf hoodie.cleaner.hours.retained=24 \ --hoodie-conf 
hoodie.metadata.enable=True \ --hoodie-conf 
hoodie.datasource.write.schema.allow.auto.evolution.column.drop=false \ 
--hoodie-conf hoodie.write.set.null.for.missing.columns=true \ --hoodie-conf 
request.timeout.ms=90000 \ --hoodie-conf session.timeout.ms=120000 \ 
--hoodie-conf heartbeat.interval.ms=5000 \ --hoodie-conf retry.backoff.ms=500 \ 
--hoodie-conf hoodie.datasource.hive_sync.assume_date_partitioning=False \ 
--hoodie-conf hoodie.partition.metafile.use.base.format=True \ --hoodie-conf 
use.latest.version=true \ --hoodie-conf auto.register.schemas=true{code}
{{}}

*Steps to Reproduce*
 # Build a Hudi 1.0.2 JAR with Spark 3.5.3 and Scala 2.12.
 # Use a Protobuf schema on an accessible schema registry, preferably an 
authenticated one.
 # Configure Hudi Streamer job with the spark submit command above.

For more details refer the following hudi issue.

https://github.com/apache/hudi/issues/13744
 # Run the Spark job.

*Expected behavior*
Records should be ingested properly into a partitioned Hudi table on Google 
Cloud Storage



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to