remeajayi2022 opened a new issue, #13744:
URL: https://github.com/apache/hudi/issues/13744
I'm trying to get Protobuf records from a Confluent Cloud topic into a
target table on Google Cloud Storage. I'm submitting Hudi Streamer jobs using
Google Dataproc. The Hudi Streamer jobs fail with the following `Invalid Lambda
Deserialization` error.
**Describe the problem you faced**
```
Caused by: java.lang.reflect.InvocationTargetException
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
java.base/java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:237)
at
java.base/jdk.internal.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
java.base/java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1136)
... 128 more
Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization
at
org.apache.hudi.utilities.streamer.SourceFormatAdapter.$deserializeLambda$(SourceFormatAdapter.java:72)
... 137 more```
A clear and concise description of the problem.
**To Reproduce**
Environment Details
Hudi version: v1.0.2
Spark version: 3.5.3
Scala version: 2.12
Google Dataproc version: 2.3.6-debian12
Storage: Google Cloud Storage
**Spark Submit Command and Protobuf Configuration**
```
gcloud dataproc jobs submit spark --cluster <cluster-name> \
--region us-central1 \
--class org.apache.hudi.utilities.streamer.HoodieStreamer \
--project <project-id> \
--properties
spark.driver.userClassPathFirst=true,spark.executor.userClassPathFirst=true,spark.driver.extraClassPath=gs://<path>/hudi-utilities-bundle_2.12-1.0.2.jar,spark.executor.extraClassPath=gs://<path>/hudi-utilities-bundle_2.12-1.0.2.jar
\
--jars
gs://<path>/jars/hudi-utilities-bundle_2.12-1.0.2.jar,gs://<path>/jars/wire-schema-3.1.0.jar,gs://<path>-storage/jars/kafka-protobuf-provider-5.5.0.jar
\
-- \
--target-table users \
--target-base-path gs://<path>/data/users \
--source-ordering-field registertime \
--min-sync-interval-seconds 60 \
--source-limit 1000 \
--continuous \
--source-class org.apache.hudi.utilities.sources.ProtoKafkaSource \
--schemaprovider-class
org.apache.hudi.utilities.schema.SchemaRegistryProvider \
--table-type COPY_ON_WRITE \
--op UPSERT \
--compact-scheduling-weight 3 \
--delta-sync-scheduling-weight 4 \
--post-write-termination-strategy-class
org.apache.hudi.utilities.streamer.NoNewDataTerminationStrategy \
--hoodie-conf hoodie.table.name=users \
--hoodie-conf hoodie.base.path=gs://<path>/data/users \
--hoodie-conf
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
\
--hoodie-conf hoodie.keygen.timebased.timestamp.type=EPOCHMILLISECONDS \
--hoodie-conf hoodie.keygen.timebased.input.dateformat=yyyy-MM-dd \
--hoodie-conf hoodie.keygen.timebased.output.dateformat=yyyy-MM-dd \
--hoodie-conf max.rounds.without.new.data.to.shutdown=5 \
--hoodie-conf sasl.mechanism=PLAIN \
--hoodie-conf security.protocol=SASL_SSL \
--hoodie-conf hoodie.datasource.write.reconcile.schema=true \
--hoodie-conf bootstrap.servers=<bootstrap-server>:9092 \
--hoodie-conf
sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule
required username=<username> password=<password>;" \
--hoodie-conf schema.registry.url=<schema-registry-url>/ \
--hoodie-conf schema.registry.basic.auth.user.info=<username>:<password>
\
--hoodie-conf basic.auth.credentials.source=USER_INFO \
--hoodie-conf
hoodie.streamer.schemaprovider.registry.url=<username>:<password>@<schema-registry-url>/subjects/test_users_proto-value/versions/latest
\
--hoodie-conf hoodie.datasource.write.recordkey.field=userid \
--hoodie-conf hoodie.datasource.write.partitionpath.field=regionid \
--hoodie-conf hoodie.datasource.write.hive_style_partitioning=True \
--hoodie-conf hoodie.datasource.write.precombine.field=registertime \
--hoodie-conf hoodie.datasource.write.operation=UPSERT \
--hoodie-conf hoodie.streamer.source.kafka.topic=test_users_proto \
--hoodie-conf
hoodie.streamer.schemaprovider.registry.schemaconverter=org.apache.hudi.utilities.schema.converter.ProtoSchemaToAvroSchemaConverter
\
--hoodie-conf
hoodie.streamer.source.kafka.proto.value.deserializer.class=io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer
\
--hoodie-conf group.id=hudi-deltastreamer \
--hoodie-conf auto.offset.reset=earliest \
--hoodie-conf hoodie.write.concurrency.mode=SINGLE_WRITER \
--hoodie-conf hoodie.datasource.write.drop.partition.columns=True \
--hoodie-conf
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider
\
--hoodie-conf hoodie.cleaner.policy.failed.writes=EAGER \
--hoodie-conf hoodie.client.heartbeat.interval_in_ms=120000 \
--hoodie-conf hoodie.client.heartbeat.tolerable.misses=10 \
--hoodie-conf hoodie.keep.min.commits=100 \
--hoodie-conf hoodie.keep.max.commits=130 \
--hoodie-conf hoodie.cleaner.policy=KEEP_LATEST_COMMITS \
--hoodie-conf hoodie.clean.automatic=true \
--hoodie-conf hoodie.cleaner.commits.retained=10 \
--hoodie-conf hoodie.cleaner.hours.retained=24 \
--hoodie-conf hoodie.metadata.enable=True \
--hoodie-conf
hoodie.datasource.write.schema.allow.auto.evolution.column.drop=false \
--hoodie-conf hoodie.write.set.null.for.missing.columns=true \
--hoodie-conf request.timeout.ms=90000 \
--hoodie-conf session.timeout.ms=120000 \
--hoodie-conf heartbeat.interval.ms=5000 \
--hoodie-conf retry.backoff.ms=500 \
--hoodie-conf hoodie.datasource.hive_sync.assume_date_partitioning=False
\
--hoodie-conf hoodie.partition.metafile.use.base.format=True \
--hoodie-conf use.latest.version=true \
--hoodie-conf auto.register.schemas=true
```
**Steps to Reproduce**
1. Build a Hudi 0.15.0 JAR with Spark 3.1 and Scala 2.12.
2. Use a Protobuf schema on an accessible schema registry, preferably an
authenticated one.
3. Configure Hudi Streamer job with the spark submit command above.
4. Run the Spark job.
**Expected behavior**
Records should be ingested properly into a partitioned Hudi table on Google
Cloud Storage
**Additional context**
@deepakpanda93 suggested explicitly setting the ClassPath of the spark job.
I tried this by including the jar in `spark.driver.extraClassPath` and
`spark.executor.extraClassPath` above. I also used the `spark:spark` prefix
syntax for the above.
I also tested this with and without the `kafka-protobuf-provider` jar
I confirmed that there was no class duplication at runtime using
`spark.executor.extraJavaOptions="-verbose:class" and
`spark.driver.extraJavaOptions="-verbose:class"
I confirmed that the schema is valid and did not contain corrupted records.
This is the Protobuf Schema
```
syntax = "proto3";
package ksql;
message users {
int64 registertime = 1;
string userid = 2;
string regionid = 3;
string gender = 4;
}
```
**Stacktrace**
```
25/08/21 13:46:19 INFO Javalin: You are running Javalin 4.6.7 (released
October 24, 2022. Your Javalin version is 1032 days old. Consider checking for
a newer version.).
25/08/21 13:46:19 INFO Server: jetty-9.4.57.v20241219; built:
2025-01-08T21:24:30.412Z; git: df524e6b29271c2e09ba9aea83c18dc9db464a31; jvm
11.0.20.1+1
25/08/21 13:46:19 INFO Server: Started @14067ms
25/08/21 13:46:19 INFO Javalin: Listening on http://localhost:42205/
25/08/21 13:46:19 INFO Javalin: Javalin started in 228ms \o/
25/08/21 13:46:19 INFO TimelineService: Starting Timeline server on port:
42205
25/08/21 13:46:19 INFO EmbeddedTimelineService: Started embedded timeline
server at dp-data-eng-gce-dev-m-0.c.non-prod-data-eng-dev.internal:42205
25/08/21 13:46:19 INFO BaseHoodieClient: Timeline Server already running.
Not restarting the service
25/08/21 13:46:19 INFO BaseHoodieClient: Timeline Server already running.
Not restarting the service
25/08/21 13:46:19 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient
from gs://<path>-storage/data/users
25/08/21 13:46:19 INFO HoodieTableConfig: Loading table properties from
gs://<path>-storage/data/users/.hoodie/hoodie.properties
25/08/21 13:46:19 INFO HoodieTableMetaClient: Finished Loading Table of type
COPY_ON_WRITE(version=2) from gs://<path>-storage/data/users
25/08/21 13:46:19 INFO HoodieTableMetaClient: Loading Active commit timeline
for gs://<path>-storage/data/users
25/08/21 13:46:19 INFO ActiveTimelineV2: Loaded instants upto :
Option{val=[==>20250820204407037__commit__REQUESTED]}
25/08/21 13:46:19 WARN ConfigUtils: The configuration key
'hoodie.cleaner.policy.failed.writes' has been deprecated and may be removed in
the future. Please use the new key 'hoodie.clean.failed.writes.policy' instead.
25/08/21 13:46:19 INFO CleanerUtils: Cleaned failed attempts if any
25/08/21 13:46:19 WARN ConfigUtils: The configuration key
'hoodie.cleaner.policy.failed.writes' has been deprecated and may be removed in
the future. Please use the new key 'hoodie.clean.failed.writes.policy' instead.
25/08/21 13:46:19 INFO TimeGeneratorBase: LockProvider for TimeGenerator:
org.apache.hudi.client.transaction.lock.InProcessLockProvider
25/08/21 13:46:20 INFO HoodieTableMetaClient: Loading Active commit timeline
for gs://<path>-storage/data/users
25/08/21 13:46:20 INFO ActiveTimelineV2: Loaded instants upto :
Option{val=[==>20250820204407037__commit__REQUESTED]}
25/08/21 13:46:20 INFO FileSystemViewManager: Creating View Manager with
storage type REMOTE_FIRST.
25/08/21 13:46:20 INFO BaseHoodieTableServiceClient: Scheduling Rollback at
instant time : 20250821134619849 (exists in active timeline: true), with
rollback plan: false for table gs://<path>-storage/data/users
25/08/21 13:46:20 INFO ActiveTimelineV2: Loaded instants upto :
Option{val=[==>20250821134619849__rollback__REQUESTED]}
25/08/21 13:46:20 INFO BaseRollbackPlanActionExecutor: Requesting Rollback
with instant time [==>20250821134619849__rollback__REQUESTED]
25/08/21 13:46:20 INFO ActiveTimelineV2: Loaded instants upto :
Option{val=[==>20250821134619849__rollback__REQUESTED]}
25/08/21 13:46:20 INFO ActiveTimelineV2: Create new file for toInstant
?gs://<path>-storage/data/users/.hoodie/timeline/20250821134619849.rollback.inflight
25/08/21 13:46:20 WARN ConfigUtils: The configuration key
'hoodie.cleaner.policy.failed.writes' has been deprecated and may be removed in
the future. Please use the new key 'hoodie.clean.failed.writes.policy' instead.
25/08/21 13:46:20 INFO CopyOnWriteRollbackActionExecutor: Time(in ms) taken
to finish rollback 0
25/08/21 13:46:20 INFO BaseRollbackActionExecutor: Rolled back inflight
instant 20250820204407037
25/08/21 13:46:20 INFO BaseRollbackActionExecutor: Index rolled back for
commits [==>20250820204407037__commit__REQUESTED]
25/08/21 13:46:20 INFO TransactionManager: Transaction starting for
Option{val=[==>20250821134619849__rollback__INFLIGHT]} with latest completed
transaction instant Optional.empty
25/08/21 13:46:20 INFO TransactionManager: Transaction started for
Option{val=[==>20250821134619849__rollback__INFLIGHT]} with latest completed
transaction instant Optional.empty
25/08/21 13:46:20 INFO BaseRollbackActionExecutor: Deleting
instant=[==>20250820204407037__commit__REQUESTED]
25/08/21 13:46:20 INFO ActiveTimelineV2: Deleting instant
[==>20250820204407037__commit__REQUESTED]
25/08/21 13:46:20 INFO ActiveTimelineV2: Removed instant
[==>20250820204407037__commit__REQUESTED]
25/08/21 13:46:20 INFO BaseRollbackActionExecutor: Deleted pending commit
[==>20250820204407037__commit__REQUESTED]
25/08/21 13:46:21 INFO ActiveTimelineV2: Created new file for toInstant
?gs://<path>-storage/data/users/.hoodie/timeline/20250821134619849_20250821134620913.rollback
25/08/21 13:46:21 INFO BaseRollbackActionExecutor: Rollback of Commits
[20250820204407037] is complete
25/08/21 13:46:21 INFO TransactionManager: Transaction ending with
transaction owner Option{val=[==>20250821134619849__rollback__INFLIGHT]}
25/08/21 13:46:21 INFO InProcessLockProvider: Base Path
gs://<path>-storage/data/users, Lock Instance
java.util.concurrent.locks.ReentrantReadWriteLock@29628e99[Write locks = 1,
Read locks = 0], Thread pool-44-thread-1, In-process lock state RELEASING
25/08/21 13:46:21 INFO InProcessLockProvider: Base Path
gs://<path>-storage/data/users, Lock Instance
java.util.concurrent.locks.ReentrantReadWriteLock@29628e99[Write locks = 0,
Read locks = 0], Thread pool-44-thread-1, In-process lock state RELEASED
25/08/21 13:46:21 INFO InProcessLockProvider: Base Path
gs://<path>-storage/data/users, Lock Instance
java.util.concurrent.locks.ReentrantReadWriteLock@29628e99[Write locks = 0,
Read locks = 0], Thread pool-44-thread-1, In-process lock state ALREADY_RELEASED
25/08/21 13:46:21 INFO LockManager: Released connection created for
acquiring lock
25/08/21 13:46:21 INFO TransactionManager: Transaction ended with
transaction owner Option{val=[==>20250821134619849__rollback__INFLIGHT]}
25/08/21 13:46:21 WARN ConfigUtils: The configuration key
'hoodie.cleaner.policy.failed.writes' has been deprecated and may be removed in
the future. Please use the new key 'hoodie.clean.failed.writes.policy' instead.
25/08/21 13:46:21 INFO BaseHoodieWriteClient: Generate a new instant time:
20250821134617535 action: commit
25/08/21 13:46:21 WARN ConfigUtils: The configuration key
'hoodie.cleaner.policy.failed.writes' has been deprecated and may be removed in
the future. Please use the new key 'hoodie.clean.failed.writes.policy' instead.
25/08/21 13:46:21 INFO ActiveTimelineV2: Creating a new instant
[==>20250821134617535__commit__REQUESTED]
25/08/21 13:46:21 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient
from gs://<path>-storage/data/users
25/08/21 13:46:21 INFO HoodieTableConfig: Loading table properties from
gs://<path>-storage/data/users/.hoodie/hoodie.properties
25/08/21 13:46:21 INFO HoodieTableMetaClient: Finished Loading Table of type
COPY_ON_WRITE(version=2) from gs://<path>-storage/data/users
25/08/21 13:46:21 INFO HoodieTableMetaClient: Loading Active commit timeline
for gs://<path>-storage/data/users
25/08/21 13:46:21 INFO ActiveTimelineV2: Loaded instants upto :
Option{val=[20250821134619849__20250821134620913__rollback__COMPLETED]}
25/08/21 13:46:21 INFO FileSystemViewManager: Creating View Manager with
storage type REMOTE_FIRST.
25/08/21 13:46:21 INFO AsyncCleanerService: The HoodieWriteClient is not
configured to auto & async clean. Async clean service will not start.
25/08/21 13:46:21 INFO AsyncArchiveService: The HoodieWriteClient is not
configured to auto & async archive. Async archive service will not start.
[dd.trace 2025-08-21 13:46:22:019 +0000] [dd-task-scheduler] INFO
datadog.communication.monitor.DDAgentStatsDConnection - Detected
/var/run/datadog/dsd.socket. Using it to send StatsD data.
25/08/21 13:46:23 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0)
(dp-data-eng-gce-dev-w-0.c.non-prod-data-eng-dev.internal executor 1):
java.io.IOException: unexpected exception type
at
java.base/java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1512)
at
java.base/java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1142)
at
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2237)
at
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
at
java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:2134)
...java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)ache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:90)
~[spark-core_2.12-3.5.3.jar:3.5.3]
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
~[spark-core_2.12-3.5.3.jar:3.5.3]
at
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
~[spark-core_2.12-3.5.3.jar:3.5.3]
at org.apache.spark.scheduler.Task.run(Task.scala:141)
~[spark-core_2.12-3.5.3.jar:3.5.3]
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
~[spark-core_2.12-3.5.3.jar:3.5.3]
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
~[spark-common-utils_2.12-3.5.3.jar:3.5.3]
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
~[spark-common-utils_2.12-3.5.3.jar:3.5.3]
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:96)
~[spark-core_2.12-3.5.3.jar:3.5.3]
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
~[spark-core_2.12-3.5.3.jar:3.5.3]
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
~[?:?]
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
~[?:?]
at java.base/java.lang.Thread.run(Thread.java:829) ~[?:?]
[dd.trace 2025-08-21 13:46:23:562 +0000] [spark-listener-group-shared] INFO
datadog.trace.instrumentation.spark.AbstractDatadogSparkListener - Received
spark application end event, finish trace on this event: false
[dd.trace 2025-08-21 13:46:23:590 +0000] [main] INFO
datadog.trace.instrumentation.spark.AbstractDatadogSparkListener - Finishing
spark application trace
25/08/21 13:46:23 INFO DataprocSparkPlugin: Shutting down driver plugin.
metrics=[action_http_patch_request=0, files_created=5,
gcs_api_server_timeout_count=0, op_get_list_status_result_size=17, op_open=7,
action_http_delete_request=2, gcs_api_time=4074, gcs_backoff_count=0,
gcs_api_client_unauthorized_response_count=0, stream_read_close_operations=7,
stream_read_bytes_backwards_on_seek=0, gs_filesystem_create=4,
exception_count=0, gcs_exception_count=0, gcs_api_total_request_count=102,
op_create=5, stream_read_vectored_operations=0, gcs_metadata_request=58,
gcs_api_client_bad_request_count=0, action_http_put_request=6,
op_create_non_recursive=0, gcs_api_client_gone_response_count=0,
gs_filesystem_initialize=3, stream_read_vectored_incoming_ranges=0,
stream_write_operations=4, gcs_list_dir_request=6, stream_read_operations=8,
gcs_api_client_request_timeout_count=0, op_rename=0, op_get_file_status=10,
op_glob_status=0, op_exists=9, stream_write_bytes=490061, op_xattr_list=0,
op_get
_delegation_token=0, gcs_api_server_unavailable_count=0,
directories_created=1, files_delete_rejected=0,
stream_read_vectored_combined_ranges=0, op_xattr_get_named=0,
gcs_list_file_request=13, op_hsync=0, action_http_get_request=0,
stream_read_operations_incomplete=20, op_delete=1, stream_read_bytes=7375,
gcs_api_client_non_found_response_count=45, op_list_located_status=0,
gcs_api_client_requested_range_not_statisfiable_count=0, op_hflush=13,
op_list_status=6, stream_read_vectored_read_bytes_discarded=0,
op_xattr_get_named_map=0, gcs_api_client_side_error_count=46,
op_get_file_checksum=0, gcs_api_server_internal_error_count=0,
stream_read_seek_bytes_skipped=0, stream_write_close_operations=6,
gcs_get_media_request=7, gcs_connector_time=3787, files_deleted=1,
action_http_post_request=9, op_mkdirs=1,
gcs_api_client_rate_limit_error_count=0, op_copy_from_local_file=0,
gcs_api_server_bad_gateway_count=0, stream_readVectored_range_duration=0,
stream_read_seek_backward_operations=0, gcs_
api_server_side_error_count=0, stream_read_seek_operations=0,
gcs_get_other_request=1, stream_read_seek_forward_operations=0,
gcs_api_client_precondition_failed_response_count=1, op_xattr_get_map=0,
delegation_tokens_issued=0, gcs_backoff_time=0, gcs_list_dir_request_min=22,
gcs_metadata_request_min=13, op_delete_min=108, op_glob_status_min=0,
op_create_non_recursive_min=0, op_hsync_min=0, op_xattr_get_named_min=0,
op_xattr_get_named_map_min=0, op_hflush_min=0, op_xattr_list_min=0,
action_http_put_request_min=76, op_open_min=17, gcs_list_file_request_min=17,
stream_write_close_operations_min=0, op_create_min=46,
action_http_delete_request_min=44, op_mkdirs_min=173, op_list_status_min=47,
gcs_get_media_request_min=37, stream_readVectored_range_duration_min=0,
stream_read_vectored_operations_min=0, stream_read_close_operations_min=0,
stream_read_operations_min=0, stream_read_seek_operations_min=0,
op_xattr_get_map_min=0, stream_write_operations_min=0,
action_http_patch_request_min=0,
op_get_file_status_min=22, op_rename_min=0, delegation_tokens_issued_min=0,
action_http_post_request_min=31,
25/08/21 13:46:24 INFO RequestTracker: Detected high latency for
[url=https://storage.googleapis.com/storage/v1/b/dataproc-temp-us-central1-213773471448-5sd4nwct/o/c25fae69-38ef-46ed-9c0a-0a370d1d5910%2Fspark-job-history%2Fapplication_1753813867992_0356.inprogress/compose?ifGenerationMatch=1755783982615514;
invocationId=gl-java/11.0.20 gdcl/2.7.0 linux/6.1.0
gccl-invocation-id/ca28763e-c0dd-4bb1-941c-9b67dc0ccb65]. durationMs=236;
method=POST; thread=main [CONTEXT ratelimit_period="10 SECONDS" ]
[dd.trace 2025-08-21 13:46:24:360 +0000] [main] INFO
datadog.trace.instrumentation.spark.AbstractDatadogSparkListener - Finishing
spark application trace
Exception in thread "main" org.apache.hudi.exception.HoodieException: Failed
to run HoodieStreamer
at
org.apache.hudi.utilities.streamer.HoodieStreamer.main(HoodieStreamer.java:653)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1196)
at
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:195)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:218)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:92)
at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1288)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1297)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.hudi.utilities.ingestion.HoodieIngestionException:
Ingestion service was shut down with exception.
at
org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:67)
at org.apache.hudi.common.util.Option.ifPresent(Option.java:101)
at
org.apache.hudi.utilities.streamer.HoodieStreamer.sync(HoodieStreamer.java:229)
at
org.apache.hudi.utilities.streamer.HoodieStreamer.main(HoodieStreamer.java:650)
... 12 more
Caused by: java.util.concurrent.ExecutionException:
org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit
time 20250821134617535
at
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
at
org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:102)
at
org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:65)
... 15 more
Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to upsert
for commit time 20250821134617535
at
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:67)
at
org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:44)
at
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:114)
at
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:103)
at
org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:132)
at
org.apache.hudi.utilities.streamer.StreamSync.writeToSink(StreamSync.java:980)
at
org.apache.hudi.utilities.streamer.StreamSync.writeToSinkAndDoMetaSync(StreamSync.java:810)
at
org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:477)
at
org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.lambda$startService$1(HoodieStreamer.java:836)
at
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 0 in stage 0.0 failed 10 times, most recent failure: Lost task
0.9 in stage 0.0 (TID 9)
(dp-data-eng-gce-dev-w-0.c.non-prod-data-eng-dev.internal executor 1):
java.io.IOException: unexpected exception type
at
java.base/java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1512)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
java.base/java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:237)
at
java.base/jdk.internal.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
java.base/java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1136)
... 128 more
Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization
at
org.apache.hudi.utilities.streamer.SourceFormatAdapter.$deserializeLambda$(SourceFormatAdapter.java:72)
... 137 more
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
at
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
...
at
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
at
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
at
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
at
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:90)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
at
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:96)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
... 3 more
Caused by: java.lang.reflect.InvocationTargetException
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
java.base/java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:237)
at
java.base/jdk.internal.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
java.base/java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1136)
... 128 more
Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization
at org.apache.hudi.utilities.streamer.SourceFormatAdapte.```
I’d appreciate any insights into resolving this issue.
Thank you for your help!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]