[
https://issues.apache.org/jira/browse/HUDI-9743?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ranga Reddy updated HUDI-9743:
------------------------------
Description:
*Describe the problem you faced*
I'm trying to get Protobuf records from a Confluent Cloud topic into a target
table on Google Cloud Storage. I'm submitting Hudi Streamer jobs using Google
Dataproc. The Hudi Streamer jobs fail with the following *{{Invalid Lambda
Deserialization}}* error.
{code:java}
Caused by: java.lang.reflect.InvocationTargetException
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
java.base/java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:237)
at java.base/jdk.internal.reflect.GeneratedMethodAccessor3.invoke(Unknown
Source)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
java.base/java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1136)
... 128 more
Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization
at
org.apache.hudi.utilities.streamer.SourceFormatAdapter.$deserializeLambda$(SourceFormatAdapter.java:72)
... 137 more{code}
*To Reproduce*
Environment Details
Hudi version: v1.0.2
Spark version: 3.5.3
Scala version: 2.12
Google Dataproc version: 2.3.6-debian12
Storage: Google Cloud Storage
{*}Spark Submit Command and Protobuf Configuration{*}{{{{}}{}}}
{code:java}
gcloud dataproc jobs submit spark --cluster <cluster-name> \ --region
us-central1 \ --class org.apache.hudi.utilities.streamer.HoodieStreamer \
--project <project-id> \ --properties
spark.driver.userClassPathFirst=true,spark.executor.userClassPathFirst=true,spark.driver.extraClassPath=gs://<path>/hudi-utilities-bundle_2.12-1.0.2.jar,spark.executor.extraClassPath=gs://<path>/hudi-utilities-bundle_2.12-1.0.2.jar
\ --jars
gs://<path>/jars/hudi-utilities-bundle_2.12-1.0.2.jar,gs://<path>/jars/wire-schema-3.1.0.jar,gs://<path>-storage/jars/kafka-protobuf-provider-5.5.0.jar
\ -- \ --target-table users \ --target-base-path gs://<path>/data/users \
--source-ordering-field registertime \ --min-sync-interval-seconds 60 \
--source-limit 1000 \ --continuous \ --source-class
org.apache.hudi.utilities.sources.ProtoKafkaSource \ --schemaprovider-class
org.apache.hudi.utilities.schema.SchemaRegistryProvider \ --table-type
COPY_ON_WRITE \ --op UPSERT \ --compact-scheduling-weight 3 \
--delta-sync-scheduling-weight 4 \ --post-write-termination-strategy-class
org.apache.hudi.utilities.streamer.NoNewDataTerminationStrategy \ --hoodie-conf
hoodie.table.name=users \ --hoodie-conf hoodie.base.path=gs://<path>/data/users
\ --hoodie-conf
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
\ --hoodie-conf hoodie.keygen.timebased.timestamp.type=EPOCHMILLISECONDS \
--hoodie-conf hoodie.keygen.timebased.input.dateformat=yyyy-MM-dd \
--hoodie-conf hoodie.keygen.timebased.output.dateformat=yyyy-MM-dd \
--hoodie-conf max.rounds.without.new.data.to.shutdown=5 \ --hoodie-conf
sasl.mechanism=PLAIN \ --hoodie-conf security.protocol=SASL_SSL \ --hoodie-conf
hoodie.datasource.write.reconcile.schema=true \ --hoodie-conf
bootstrap.servers=<bootstrap-server>:9092 \ --hoodie-conf
sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule
required username=<username> password=<password>;" \ --hoodie-conf
schema.registry.url=<schema-registry-url>/ \ --hoodie-conf
schema.registry.basic.auth.user.info=<username>:<password> \ --hoodie-conf
basic.auth.credentials.source=USER_INFO \ --hoodie-conf
hoodie.streamer.schemaprovider.registry.url=<username>:<password>@<schema-registry-url>/subjects/test_users_proto-value/versions/latest
\ --hoodie-conf hoodie.datasource.write.recordkey.field=userid \ --hoodie-conf
hoodie.datasource.write.partitionpath.field=regionid \ --hoodie-conf
hoodie.datasource.write.hive_style_partitioning=True \ --hoodie-conf
hoodie.datasource.write.precombine.field=registertime \ --hoodie-conf
hoodie.datasource.write.operation=UPSERT \ --hoodie-conf
hoodie.streamer.source.kafka.topic=test_users_proto \ --hoodie-conf
hoodie.streamer.schemaprovider.registry.schemaconverter=org.apache.hudi.utilities.schema.converter.ProtoSchemaToAvroSchemaConverter
\ --hoodie-conf
hoodie.streamer.source.kafka.proto.value.deserializer.class=io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer
\ --hoodie-conf group.id=hudi-deltastreamer \ --hoodie-conf
auto.offset.reset=earliest \ --hoodie-conf
hoodie.write.concurrency.mode=SINGLE_WRITER \ --hoodie-conf
hoodie.datasource.write.drop.partition.columns=True \ --hoodie-conf
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider
\ --hoodie-conf hoodie.cleaner.policy.failed.writes=EAGER \ --hoodie-conf
hoodie.client.heartbeat.interval_in_ms=120000 \ --hoodie-conf
hoodie.client.heartbeat.tolerable.misses=10 \ --hoodie-conf
hoodie.keep.min.commits=100 \ --hoodie-conf hoodie.keep.max.commits=130 \
--hoodie-conf hoodie.cleaner.policy=KEEP_LATEST_COMMITS \ --hoodie-conf
hoodie.clean.automatic=true \ --hoodie-conf hoodie.cleaner.commits.retained=10
\ --hoodie-conf hoodie.cleaner.hours.retained=24 \ --hoodie-conf
hoodie.metadata.enable=True \ --hoodie-conf
hoodie.datasource.write.schema.allow.auto.evolution.column.drop=false \
--hoodie-conf hoodie.write.set.null.for.missing.columns=true \ --hoodie-conf
request.timeout.ms=90000 \ --hoodie-conf session.timeout.ms=120000 \
--hoodie-conf heartbeat.interval.ms=5000 \ --hoodie-conf retry.backoff.ms=500 \
--hoodie-conf hoodie.datasource.hive_sync.assume_date_partitioning=False \
--hoodie-conf hoodie.partition.metafile.use.base.format=True \ --hoodie-conf
use.latest.version=true \ --hoodie-conf auto.register.schemas=true{code}
*Steps to Reproduce*
# Build a Hudi 1.0.2 JAR with Spark 3.5.3 and Scala 2.12.
# Use a Protobuf schema on an accessible schema registry, preferably an
authenticated one.
# Configure Hudi Streamer job with the spark submit command above.
# Run the Spark job.
*Expected behavior*
Records should be ingested properly into a partitioned Hudi table on Google
Cloud Storage
For more details refer the following hudi issue.
[https://github.com/apache/hudi/issues/13744]
was:
*Describe the problem you faced*
I'm trying to get Protobuf records from a Confluent Cloud topic into a target
table on Google Cloud Storage. I'm submitting Hudi Streamer jobs using Google
Dataproc. The Hudi Streamer jobs fail with the following {{Invalid Lambda
Deserialization}} error.
Caused by: java.lang.reflect.InvocationTargetException
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
java.base/java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:237)
at
java.base/jdk.internal.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
java.base/java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1136)
... 128 more
Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization
at
org.apache.hudi.utilities.streamer.SourceFormatAdapter.$deserializeLambda$(SourceFormatAdapter.java:72)
... 137 more{{}}
*To Reproduce*
Environment Details
Hudi version: v1.0.2
Spark version: 3.5.3
Scala version: 2.12
Google Dataproc version: 2.3.6-debian12
Storage: Google Cloud Storage
{*}Spark Submit Command and Protobuf Configuration{*}{{{}{}}}
{code:java}
gcloud dataproc jobs submit spark --cluster <cluster-name> \ --region
us-central1 \ --class org.apache.hudi.utilities.streamer.HoodieStreamer \
--project <project-id> \ --properties
spark.driver.userClassPathFirst=true,spark.executor.userClassPathFirst=true,spark.driver.extraClassPath=gs://<path>/hudi-utilities-bundle_2.12-1.0.2.jar,spark.executor.extraClassPath=gs://<path>/hudi-utilities-bundle_2.12-1.0.2.jar
\ --jars
gs://<path>/jars/hudi-utilities-bundle_2.12-1.0.2.jar,gs://<path>/jars/wire-schema-3.1.0.jar,gs://<path>-storage/jars/kafka-protobuf-provider-5.5.0.jar
\ -- \ --target-table users \ --target-base-path gs://<path>/data/users \
--source-ordering-field registertime \ --min-sync-interval-seconds 60 \
--source-limit 1000 \ --continuous \ --source-class
org.apache.hudi.utilities.sources.ProtoKafkaSource \ --schemaprovider-class
org.apache.hudi.utilities.schema.SchemaRegistryProvider \ --table-type
COPY_ON_WRITE \ --op UPSERT \ --compact-scheduling-weight 3 \
--delta-sync-scheduling-weight 4 \ --post-write-termination-strategy-class
org.apache.hudi.utilities.streamer.NoNewDataTerminationStrategy \ --hoodie-conf
hoodie.table.name=users \ --hoodie-conf hoodie.base.path=gs://<path>/data/users
\ --hoodie-conf
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
\ --hoodie-conf hoodie.keygen.timebased.timestamp.type=EPOCHMILLISECONDS \
--hoodie-conf hoodie.keygen.timebased.input.dateformat=yyyy-MM-dd \
--hoodie-conf hoodie.keygen.timebased.output.dateformat=yyyy-MM-dd \
--hoodie-conf max.rounds.without.new.data.to.shutdown=5 \ --hoodie-conf
sasl.mechanism=PLAIN \ --hoodie-conf security.protocol=SASL_SSL \ --hoodie-conf
hoodie.datasource.write.reconcile.schema=true \ --hoodie-conf
bootstrap.servers=<bootstrap-server>:9092 \ --hoodie-conf
sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule
required username=<username> password=<password>;" \ --hoodie-conf
schema.registry.url=<schema-registry-url>/ \ --hoodie-conf
schema.registry.basic.auth.user.info=<username>:<password> \ --hoodie-conf
basic.auth.credentials.source=USER_INFO \ --hoodie-conf
hoodie.streamer.schemaprovider.registry.url=<username>:<password>@<schema-registry-url>/subjects/test_users_proto-value/versions/latest
\ --hoodie-conf hoodie.datasource.write.recordkey.field=userid \ --hoodie-conf
hoodie.datasource.write.partitionpath.field=regionid \ --hoodie-conf
hoodie.datasource.write.hive_style_partitioning=True \ --hoodie-conf
hoodie.datasource.write.precombine.field=registertime \ --hoodie-conf
hoodie.datasource.write.operation=UPSERT \ --hoodie-conf
hoodie.streamer.source.kafka.topic=test_users_proto \ --hoodie-conf
hoodie.streamer.schemaprovider.registry.schemaconverter=org.apache.hudi.utilities.schema.converter.ProtoSchemaToAvroSchemaConverter
\ --hoodie-conf
hoodie.streamer.source.kafka.proto.value.deserializer.class=io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer
\ --hoodie-conf group.id=hudi-deltastreamer \ --hoodie-conf
auto.offset.reset=earliest \ --hoodie-conf
hoodie.write.concurrency.mode=SINGLE_WRITER \ --hoodie-conf
hoodie.datasource.write.drop.partition.columns=True \ --hoodie-conf
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider
\ --hoodie-conf hoodie.cleaner.policy.failed.writes=EAGER \ --hoodie-conf
hoodie.client.heartbeat.interval_in_ms=120000 \ --hoodie-conf
hoodie.client.heartbeat.tolerable.misses=10 \ --hoodie-conf
hoodie.keep.min.commits=100 \ --hoodie-conf hoodie.keep.max.commits=130 \
--hoodie-conf hoodie.cleaner.policy=KEEP_LATEST_COMMITS \ --hoodie-conf
hoodie.clean.automatic=true \ --hoodie-conf hoodie.cleaner.commits.retained=10
\ --hoodie-conf hoodie.cleaner.hours.retained=24 \ --hoodie-conf
hoodie.metadata.enable=True \ --hoodie-conf
hoodie.datasource.write.schema.allow.auto.evolution.column.drop=false \
--hoodie-conf hoodie.write.set.null.for.missing.columns=true \ --hoodie-conf
request.timeout.ms=90000 \ --hoodie-conf session.timeout.ms=120000 \
--hoodie-conf heartbeat.interval.ms=5000 \ --hoodie-conf retry.backoff.ms=500 \
--hoodie-conf hoodie.datasource.hive_sync.assume_date_partitioning=False \
--hoodie-conf hoodie.partition.metafile.use.base.format=True \ --hoodie-conf
use.latest.version=true \ --hoodie-conf auto.register.schemas=true{code}
{{}}
*Steps to Reproduce*
# Build a Hudi 1.0.2 JAR with Spark 3.5.3 and Scala 2.12.
# Use a Protobuf schema on an accessible schema registry, preferably an
authenticated one.
# Configure Hudi Streamer job with the spark submit command above.
For more details refer the following hudi issue.
https://github.com/apache/hudi/issues/13744
# Run the Spark job.
*Expected behavior*
Records should be ingested properly into a partitioned Hudi table on Google
Cloud Storage
> Invalid lambda deserialization error while ingesting ProtoKafkaSource with
> Hudi Streamer
> ----------------------------------------------------------------------------------------
>
> Key: HUDI-9743
> URL: https://issues.apache.org/jira/browse/HUDI-9743
> Project: Apache Hudi
> Issue Type: Bug
> Components: deltastreamer, hudi-utilities
> Affects Versions: 1.0.2
> Reporter: Ranga Reddy
> Priority: Blocker
> Labels: 1.0.2, hudi-streamer
> Fix For: 1.1.0
>
>
> *Describe the problem you faced*
> I'm trying to get Protobuf records from a Confluent Cloud topic into a target
> table on Google Cloud Storage. I'm submitting Hudi Streamer jobs using Google
> Dataproc. The Hudi Streamer jobs fail with the following *{{Invalid Lambda
> Deserialization}}* error.
> {code:java}
> Caused by: java.lang.reflect.InvocationTargetException
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at
> java.base/java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:237)
> at java.base/jdk.internal.reflect.GeneratedMethodAccessor3.invoke(Unknown
> Source)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at
> java.base/java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1136)
> ... 128 more
> Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization
> at
> org.apache.hudi.utilities.streamer.SourceFormatAdapter.$deserializeLambda$(SourceFormatAdapter.java:72)
> ... 137 more{code}
> *To Reproduce*
> Environment Details
> Hudi version: v1.0.2
> Spark version: 3.5.3
> Scala version: 2.12
> Google Dataproc version: 2.3.6-debian12
> Storage: Google Cloud Storage
> {*}Spark Submit Command and Protobuf Configuration{*}{{{{}}{}}}
> {code:java}
> gcloud dataproc jobs submit spark --cluster <cluster-name> \ --region
> us-central1 \ --class org.apache.hudi.utilities.streamer.HoodieStreamer \
> --project <project-id> \ --properties
> spark.driver.userClassPathFirst=true,spark.executor.userClassPathFirst=true,spark.driver.extraClassPath=gs://<path>/hudi-utilities-bundle_2.12-1.0.2.jar,spark.executor.extraClassPath=gs://<path>/hudi-utilities-bundle_2.12-1.0.2.jar
> \ --jars
> gs://<path>/jars/hudi-utilities-bundle_2.12-1.0.2.jar,gs://<path>/jars/wire-schema-3.1.0.jar,gs://<path>-storage/jars/kafka-protobuf-provider-5.5.0.jar
> \ -- \ --target-table users \ --target-base-path gs://<path>/data/users \
> --source-ordering-field registertime \ --min-sync-interval-seconds 60 \
> --source-limit 1000 \ --continuous \ --source-class
> org.apache.hudi.utilities.sources.ProtoKafkaSource \ --schemaprovider-class
> org.apache.hudi.utilities.schema.SchemaRegistryProvider \ --table-type
> COPY_ON_WRITE \ --op UPSERT \ --compact-scheduling-weight 3 \
> --delta-sync-scheduling-weight 4 \ --post-write-termination-strategy-class
> org.apache.hudi.utilities.streamer.NoNewDataTerminationStrategy \
> --hoodie-conf hoodie.table.name=users \ --hoodie-conf
> hoodie.base.path=gs://<path>/data/users \ --hoodie-conf
> hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
> \ --hoodie-conf hoodie.keygen.timebased.timestamp.type=EPOCHMILLISECONDS \
> --hoodie-conf hoodie.keygen.timebased.input.dateformat=yyyy-MM-dd \
> --hoodie-conf hoodie.keygen.timebased.output.dateformat=yyyy-MM-dd \
> --hoodie-conf max.rounds.without.new.data.to.shutdown=5 \ --hoodie-conf
> sasl.mechanism=PLAIN \ --hoodie-conf security.protocol=SASL_SSL \
> --hoodie-conf hoodie.datasource.write.reconcile.schema=true \ --hoodie-conf
> bootstrap.servers=<bootstrap-server>:9092 \ --hoodie-conf
> sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule
> required username=<username> password=<password>;" \ --hoodie-conf
> schema.registry.url=<schema-registry-url>/ \ --hoodie-conf
> schema.registry.basic.auth.user.info=<username>:<password> \ --hoodie-conf
> basic.auth.credentials.source=USER_INFO \ --hoodie-conf
> hoodie.streamer.schemaprovider.registry.url=<username>:<password>@<schema-registry-url>/subjects/test_users_proto-value/versions/latest
> \ --hoodie-conf hoodie.datasource.write.recordkey.field=userid \
> --hoodie-conf hoodie.datasource.write.partitionpath.field=regionid \
> --hoodie-conf hoodie.datasource.write.hive_style_partitioning=True \
> --hoodie-conf hoodie.datasource.write.precombine.field=registertime \
> --hoodie-conf hoodie.datasource.write.operation=UPSERT \ --hoodie-conf
> hoodie.streamer.source.kafka.topic=test_users_proto \ --hoodie-conf
> hoodie.streamer.schemaprovider.registry.schemaconverter=org.apache.hudi.utilities.schema.converter.ProtoSchemaToAvroSchemaConverter
> \ --hoodie-conf
> hoodie.streamer.source.kafka.proto.value.deserializer.class=io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer
> \ --hoodie-conf group.id=hudi-deltastreamer \ --hoodie-conf
> auto.offset.reset=earliest \ --hoodie-conf
> hoodie.write.concurrency.mode=SINGLE_WRITER \ --hoodie-conf
> hoodie.datasource.write.drop.partition.columns=True \ --hoodie-conf
> hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider
> \ --hoodie-conf hoodie.cleaner.policy.failed.writes=EAGER \ --hoodie-conf
> hoodie.client.heartbeat.interval_in_ms=120000 \ --hoodie-conf
> hoodie.client.heartbeat.tolerable.misses=10 \ --hoodie-conf
> hoodie.keep.min.commits=100 \ --hoodie-conf hoodie.keep.max.commits=130 \
> --hoodie-conf hoodie.cleaner.policy=KEEP_LATEST_COMMITS \ --hoodie-conf
> hoodie.clean.automatic=true \ --hoodie-conf
> hoodie.cleaner.commits.retained=10 \ --hoodie-conf
> hoodie.cleaner.hours.retained=24 \ --hoodie-conf hoodie.metadata.enable=True
> \ --hoodie-conf
> hoodie.datasource.write.schema.allow.auto.evolution.column.drop=false \
> --hoodie-conf hoodie.write.set.null.for.missing.columns=true \ --hoodie-conf
> request.timeout.ms=90000 \ --hoodie-conf session.timeout.ms=120000 \
> --hoodie-conf heartbeat.interval.ms=5000 \ --hoodie-conf retry.backoff.ms=500
> \ --hoodie-conf hoodie.datasource.hive_sync.assume_date_partitioning=False \
> --hoodie-conf hoodie.partition.metafile.use.base.format=True \ --hoodie-conf
> use.latest.version=true \ --hoodie-conf auto.register.schemas=true{code}
> *Steps to Reproduce*
> # Build a Hudi 1.0.2 JAR with Spark 3.5.3 and Scala 2.12.
> # Use a Protobuf schema on an accessible schema registry, preferably an
> authenticated one.
> # Configure Hudi Streamer job with the spark submit command above.
> # Run the Spark job.
> *Expected behavior*
> Records should be ingested properly into a partitioned Hudi table on Google
> Cloud Storage
>
> For more details refer the following hudi issue.
> [https://github.com/apache/hudi/issues/13744]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)