remeajayi2022 opened a new issue, #13744:
URL: https://github.com/apache/hudi/issues/13744

   I'm trying to get Protobuf records from a Confluent Cloud topic into a 
target table on Google Cloud Storage. I'm submitting Hudi Streamer jobs using 
Google Dataproc. The Hudi Streamer jobs fail with the following `Invalid Lambda 
Deserialization` error. 
   
   **Describe the problem you faced**
   ```
   Caused by: java.lang.reflect.InvocationTargetException
           at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
           at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.base/java.lang.reflect.Method.invoke(Method.java:566)
           at 
java.base/java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:237)
           at 
java.base/jdk.internal.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
           at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.base/java.lang.reflect.Method.invoke(Method.java:566)
           at 
java.base/java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1136)
           ... 128 more
   Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization
           at 
org.apache.hudi.utilities.streamer.SourceFormatAdapter.$deserializeLambda$(SourceFormatAdapter.java:72)
           ... 137 more```
   A clear and concise description of the problem.
   
   **To Reproduce**
   Environment Details
   Hudi version: v1.0.2
   Spark version: 3.5.3
   Scala version: 2.12
   Google Dataproc version: 2.3.6-debian12
   Storage: Google Cloud Storage
   
   **Spark Submit Command and Protobuf Configuration**
   
   ```
   gcloud dataproc jobs submit spark --cluster <cluster-name> \
       --region us-central1 \
       --class org.apache.hudi.utilities.streamer.HoodieStreamer \
       --project <project-id> \
       --properties 
spark.driver.userClassPathFirst=true,spark.executor.userClassPathFirst=true,spark.driver.extraClassPath=gs://<path>/hudi-utilities-bundle_2.12-1.0.2.jar,spark.executor.extraClassPath=gs://<path>/hudi-utilities-bundle_2.12-1.0.2.jar
 \
       --jars 
gs://<path>/jars/hudi-utilities-bundle_2.12-1.0.2.jar,gs://<path>/jars/wire-schema-3.1.0.jar,gs://<path>-storage/jars/kafka-protobuf-provider-5.5.0.jar
  \
       -- \
       --target-table users \
       --target-base-path gs://<path>/data/users \
       --source-ordering-field registertime \
       --min-sync-interval-seconds 60 \
       --source-limit 1000 \
       --continuous \
       --source-class org.apache.hudi.utilities.sources.ProtoKafkaSource \
       --schemaprovider-class 
org.apache.hudi.utilities.schema.SchemaRegistryProvider \
       --table-type COPY_ON_WRITE \
       --op UPSERT \
       --compact-scheduling-weight 3 \
       --delta-sync-scheduling-weight 4 \
       --post-write-termination-strategy-class 
org.apache.hudi.utilities.streamer.NoNewDataTerminationStrategy \
       --hoodie-conf hoodie.table.name=users \
       --hoodie-conf hoodie.base.path=gs://<path>/data/users \
       --hoodie-conf 
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
 \
       --hoodie-conf hoodie.keygen.timebased.timestamp.type=EPOCHMILLISECONDS \
       --hoodie-conf hoodie.keygen.timebased.input.dateformat=yyyy-MM-dd \
       --hoodie-conf hoodie.keygen.timebased.output.dateformat=yyyy-MM-dd \
       --hoodie-conf max.rounds.without.new.data.to.shutdown=5 \
       --hoodie-conf sasl.mechanism=PLAIN \
       --hoodie-conf security.protocol=SASL_SSL \
       --hoodie-conf hoodie.datasource.write.reconcile.schema=true \
       --hoodie-conf bootstrap.servers=<bootstrap-server>:9092 \
       --hoodie-conf 
sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule 
required username=<username> password=<password>;" \
       --hoodie-conf schema.registry.url=<schema-registry-url>/ \
       --hoodie-conf schema.registry.basic.auth.user.info=<username>:<password> 
\
       --hoodie-conf basic.auth.credentials.source=USER_INFO \
       --hoodie-conf 
hoodie.streamer.schemaprovider.registry.url=<username>:<password>@<schema-registry-url>/subjects/test_users_proto-value/versions/latest
 \
       --hoodie-conf hoodie.datasource.write.recordkey.field=userid \
       --hoodie-conf hoodie.datasource.write.partitionpath.field=regionid \
       --hoodie-conf hoodie.datasource.write.hive_style_partitioning=True \
       --hoodie-conf hoodie.datasource.write.precombine.field=registertime \
       --hoodie-conf hoodie.datasource.write.operation=UPSERT \
       --hoodie-conf hoodie.streamer.source.kafka.topic=test_users_proto \
       --hoodie-conf 
hoodie.streamer.schemaprovider.registry.schemaconverter=org.apache.hudi.utilities.schema.converter.ProtoSchemaToAvroSchemaConverter
 \
       --hoodie-conf 
hoodie.streamer.source.kafka.proto.value.deserializer.class=io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer
 \
       --hoodie-conf group.id=hudi-deltastreamer \
       --hoodie-conf auto.offset.reset=earliest \
       --hoodie-conf hoodie.write.concurrency.mode=SINGLE_WRITER \
       --hoodie-conf hoodie.datasource.write.drop.partition.columns=True \
       --hoodie-conf 
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider
 \
       --hoodie-conf hoodie.cleaner.policy.failed.writes=EAGER \
       --hoodie-conf hoodie.client.heartbeat.interval_in_ms=120000 \
       --hoodie-conf hoodie.client.heartbeat.tolerable.misses=10 \
       --hoodie-conf hoodie.keep.min.commits=100 \
       --hoodie-conf hoodie.keep.max.commits=130 \
       --hoodie-conf hoodie.cleaner.policy=KEEP_LATEST_COMMITS \
       --hoodie-conf hoodie.clean.automatic=true \
       --hoodie-conf hoodie.cleaner.commits.retained=10 \
       --hoodie-conf hoodie.cleaner.hours.retained=24 \
       --hoodie-conf hoodie.metadata.enable=True \
       --hoodie-conf 
hoodie.datasource.write.schema.allow.auto.evolution.column.drop=false \
       --hoodie-conf hoodie.write.set.null.for.missing.columns=true \
       --hoodie-conf request.timeout.ms=90000 \
       --hoodie-conf session.timeout.ms=120000 \
       --hoodie-conf heartbeat.interval.ms=5000 \
       --hoodie-conf retry.backoff.ms=500 \
       --hoodie-conf hoodie.datasource.hive_sync.assume_date_partitioning=False 
\
       --hoodie-conf hoodie.partition.metafile.use.base.format=True \
       --hoodie-conf use.latest.version=true \
       --hoodie-conf auto.register.schemas=true
   ```
   
   **Steps to Reproduce**
   
   1. Build a Hudi 0.15.0 JAR with Spark 3.1 and Scala 2.12.
   2. Use a Protobuf schema on an accessible schema registry, preferably an 
authenticated one.
   3. Configure Hudi Streamer job with the spark submit command above.
   4. Run the Spark job.
   
   **Expected behavior**
   Records should be ingested properly into a partitioned Hudi table on Google 
Cloud Storage
   
   
   **Additional context**
   @deepakpanda93 suggested explicitly setting the ClassPath of the spark job. 
I tried this by including the jar in `spark.driver.extraClassPath` and 
`spark.executor.extraClassPath` above. I also used the `spark:spark` prefix 
syntax for the above.
   I also tested this with and without the `kafka-protobuf-provider` jar
   I confirmed that there was no class duplication at runtime using 
`spark.executor.extraJavaOptions="-verbose:class" and 
`spark.driver.extraJavaOptions="-verbose:class"
   I confirmed that the schema is valid and did not contain corrupted records. 
   
   This is the Protobuf Schema
   ```
   syntax = "proto3";
   package ksql;
   
   message users {
     int64 registertime = 1;
     string userid = 2;
     string regionid = 3;
     string gender = 4;
   }
   ```
   
   **Stacktrace**
   
   ```
   25/08/21 13:46:19 INFO Javalin: You are running Javalin 4.6.7 (released 
October 24, 2022. Your Javalin version is 1032 days old. Consider checking for 
a newer version.).
   25/08/21 13:46:19 INFO Server: jetty-9.4.57.v20241219; built: 
2025-01-08T21:24:30.412Z; git: df524e6b29271c2e09ba9aea83c18dc9db464a31; jvm 
11.0.20.1+1
   25/08/21 13:46:19 INFO Server: Started @14067ms
   25/08/21 13:46:19 INFO Javalin: Listening on http://localhost:42205/
   25/08/21 13:46:19 INFO Javalin: Javalin started in 228ms \o/
   25/08/21 13:46:19 INFO TimelineService: Starting Timeline server on port: 
42205
   25/08/21 13:46:19 INFO EmbeddedTimelineService: Started embedded timeline 
server at dp-data-eng-gce-dev-m-0.c.non-prod-data-eng-dev.internal:42205
   25/08/21 13:46:19 INFO BaseHoodieClient: Timeline Server already running. 
Not restarting the service
   25/08/21 13:46:19 INFO BaseHoodieClient: Timeline Server already running. 
Not restarting the service
   25/08/21 13:46:19 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient 
from gs://<path>-storage/data/users
   25/08/21 13:46:19 INFO HoodieTableConfig: Loading table properties from 
gs://<path>-storage/data/users/.hoodie/hoodie.properties
   25/08/21 13:46:19 INFO HoodieTableMetaClient: Finished Loading Table of type 
COPY_ON_WRITE(version=2) from gs://<path>-storage/data/users
   25/08/21 13:46:19 INFO HoodieTableMetaClient: Loading Active commit timeline 
for gs://<path>-storage/data/users
   25/08/21 13:46:19 INFO ActiveTimelineV2: Loaded instants upto : 
Option{val=[==>20250820204407037__commit__REQUESTED]}
   25/08/21 13:46:19 WARN ConfigUtils: The configuration key 
'hoodie.cleaner.policy.failed.writes' has been deprecated and may be removed in 
the future. Please use the new key 'hoodie.clean.failed.writes.policy' instead.
   25/08/21 13:46:19 INFO CleanerUtils: Cleaned failed attempts if any
   25/08/21 13:46:19 WARN ConfigUtils: The configuration key 
'hoodie.cleaner.policy.failed.writes' has been deprecated and may be removed in 
the future. Please use the new key 'hoodie.clean.failed.writes.policy' instead.
   25/08/21 13:46:19 INFO TimeGeneratorBase: LockProvider for TimeGenerator: 
org.apache.hudi.client.transaction.lock.InProcessLockProvider
   25/08/21 13:46:20 INFO HoodieTableMetaClient: Loading Active commit timeline 
for gs://<path>-storage/data/users
   25/08/21 13:46:20 INFO ActiveTimelineV2: Loaded instants upto : 
Option{val=[==>20250820204407037__commit__REQUESTED]}
   25/08/21 13:46:20 INFO FileSystemViewManager: Creating View Manager with 
storage type REMOTE_FIRST.
   25/08/21 13:46:20 INFO BaseHoodieTableServiceClient: Scheduling Rollback at 
instant time : 20250821134619849 (exists in active timeline: true), with 
rollback plan: false for table gs://<path>-storage/data/users
   25/08/21 13:46:20 INFO ActiveTimelineV2: Loaded instants upto : 
Option{val=[==>20250821134619849__rollback__REQUESTED]}
   25/08/21 13:46:20 INFO BaseRollbackPlanActionExecutor: Requesting Rollback 
with instant time [==>20250821134619849__rollback__REQUESTED]
   25/08/21 13:46:20 INFO ActiveTimelineV2: Loaded instants upto : 
Option{val=[==>20250821134619849__rollback__REQUESTED]}
   25/08/21 13:46:20 INFO ActiveTimelineV2: Create new file for toInstant 
?gs://<path>-storage/data/users/.hoodie/timeline/20250821134619849.rollback.inflight
   25/08/21 13:46:20 WARN ConfigUtils: The configuration key 
'hoodie.cleaner.policy.failed.writes' has been deprecated and may be removed in 
the future. Please use the new key 'hoodie.clean.failed.writes.policy' instead.
   25/08/21 13:46:20 INFO CopyOnWriteRollbackActionExecutor: Time(in ms) taken 
to finish rollback 0
   25/08/21 13:46:20 INFO BaseRollbackActionExecutor: Rolled back inflight 
instant 20250820204407037
   25/08/21 13:46:20 INFO BaseRollbackActionExecutor: Index rolled back for 
commits [==>20250820204407037__commit__REQUESTED]
   25/08/21 13:46:20 INFO TransactionManager: Transaction starting for 
Option{val=[==>20250821134619849__rollback__INFLIGHT]} with latest completed 
transaction instant Optional.empty
   25/08/21 13:46:20 INFO TransactionManager: Transaction started for 
Option{val=[==>20250821134619849__rollback__INFLIGHT]} with latest completed 
transaction instant Optional.empty
   25/08/21 13:46:20 INFO BaseRollbackActionExecutor: Deleting 
instant=[==>20250820204407037__commit__REQUESTED]
   25/08/21 13:46:20 INFO ActiveTimelineV2: Deleting instant 
[==>20250820204407037__commit__REQUESTED]
   25/08/21 13:46:20 INFO ActiveTimelineV2: Removed instant 
[==>20250820204407037__commit__REQUESTED]
   25/08/21 13:46:20 INFO BaseRollbackActionExecutor: Deleted pending commit 
[==>20250820204407037__commit__REQUESTED]
   25/08/21 13:46:21 INFO ActiveTimelineV2: Created new file for toInstant 
?gs://<path>-storage/data/users/.hoodie/timeline/20250821134619849_20250821134620913.rollback
   25/08/21 13:46:21 INFO BaseRollbackActionExecutor: Rollback of Commits 
[20250820204407037] is complete
   25/08/21 13:46:21 INFO TransactionManager: Transaction ending with 
transaction owner Option{val=[==>20250821134619849__rollback__INFLIGHT]}
   25/08/21 13:46:21 INFO InProcessLockProvider: Base Path 
gs://<path>-storage/data/users, Lock Instance 
java.util.concurrent.locks.ReentrantReadWriteLock@29628e99[Write locks = 1, 
Read locks = 0], Thread pool-44-thread-1, In-process lock state RELEASING
   25/08/21 13:46:21 INFO InProcessLockProvider: Base Path 
gs://<path>-storage/data/users, Lock Instance 
java.util.concurrent.locks.ReentrantReadWriteLock@29628e99[Write locks = 0, 
Read locks = 0], Thread pool-44-thread-1, In-process lock state RELEASED
   25/08/21 13:46:21 INFO InProcessLockProvider: Base Path 
gs://<path>-storage/data/users, Lock Instance 
java.util.concurrent.locks.ReentrantReadWriteLock@29628e99[Write locks = 0, 
Read locks = 0], Thread pool-44-thread-1, In-process lock state ALREADY_RELEASED
   25/08/21 13:46:21 INFO LockManager: Released connection created for 
acquiring lock
   25/08/21 13:46:21 INFO TransactionManager: Transaction ended with 
transaction owner Option{val=[==>20250821134619849__rollback__INFLIGHT]}
   25/08/21 13:46:21 WARN ConfigUtils: The configuration key 
'hoodie.cleaner.policy.failed.writes' has been deprecated and may be removed in 
the future. Please use the new key 'hoodie.clean.failed.writes.policy' instead.
   25/08/21 13:46:21 INFO BaseHoodieWriteClient: Generate a new instant time: 
20250821134617535 action: commit
   25/08/21 13:46:21 WARN ConfigUtils: The configuration key 
'hoodie.cleaner.policy.failed.writes' has been deprecated and may be removed in 
the future. Please use the new key 'hoodie.clean.failed.writes.policy' instead.
   25/08/21 13:46:21 INFO ActiveTimelineV2: Creating a new instant 
[==>20250821134617535__commit__REQUESTED]
   25/08/21 13:46:21 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient 
from gs://<path>-storage/data/users
   25/08/21 13:46:21 INFO HoodieTableConfig: Loading table properties from 
gs://<path>-storage/data/users/.hoodie/hoodie.properties
   25/08/21 13:46:21 INFO HoodieTableMetaClient: Finished Loading Table of type 
COPY_ON_WRITE(version=2) from gs://<path>-storage/data/users
   25/08/21 13:46:21 INFO HoodieTableMetaClient: Loading Active commit timeline 
for gs://<path>-storage/data/users
   25/08/21 13:46:21 INFO ActiveTimelineV2: Loaded instants upto : 
Option{val=[20250821134619849__20250821134620913__rollback__COMPLETED]}
   25/08/21 13:46:21 INFO FileSystemViewManager: Creating View Manager with 
storage type REMOTE_FIRST.
   25/08/21 13:46:21 INFO AsyncCleanerService: The HoodieWriteClient is not 
configured to auto & async clean. Async clean service will not start.
   25/08/21 13:46:21 INFO AsyncArchiveService: The HoodieWriteClient is not 
configured to auto & async archive. Async archive service will not start.
   [dd.trace 2025-08-21 13:46:22:019 +0000] [dd-task-scheduler] INFO 
datadog.communication.monitor.DDAgentStatsDConnection - Detected 
/var/run/datadog/dsd.socket.  Using it to send StatsD data.
   25/08/21 13:46:23 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) 
(dp-data-eng-gce-dev-w-0.c.non-prod-data-eng-dev.internal executor 1): 
java.io.IOException: unexpected exception type
        at 
java.base/java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1512)
        at 
java.base/java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1142)
        at 
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2237)
        at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
        at 
java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:2134)
   
...java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)ache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:90)
 ~[spark-core_2.12-3.5.3.jar:3.5.3]
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54) 
~[spark-core_2.12-3.5.3.jar:3.5.3]
        at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166) 
~[spark-core_2.12-3.5.3.jar:3.5.3]
        at org.apache.spark.scheduler.Task.run(Task.scala:141) 
~[spark-core_2.12-3.5.3.jar:3.5.3]
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
 ~[spark-core_2.12-3.5.3.jar:3.5.3]
        at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
 ~[spark-common-utils_2.12-3.5.3.jar:3.5.3]
        at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
 ~[spark-common-utils_2.12-3.5.3.jar:3.5.3]
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:96) 
~[spark-core_2.12-3.5.3.jar:3.5.3]
        at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) 
~[spark-core_2.12-3.5.3.jar:3.5.3]
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 ~[?:?]
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
 ~[?:?]
        at java.base/java.lang.Thread.run(Thread.java:829) ~[?:?]
   [dd.trace 2025-08-21 13:46:23:562 +0000] [spark-listener-group-shared] INFO 
datadog.trace.instrumentation.spark.AbstractDatadogSparkListener - Received 
spark application end event, finish trace on this event: false
   [dd.trace 2025-08-21 13:46:23:590 +0000] [main] INFO 
datadog.trace.instrumentation.spark.AbstractDatadogSparkListener - Finishing 
spark application trace
   25/08/21 13:46:23 INFO DataprocSparkPlugin: Shutting down driver plugin. 
metrics=[action_http_patch_request=0, files_created=5, 
gcs_api_server_timeout_count=0, op_get_list_status_result_size=17, op_open=7, 
action_http_delete_request=2, gcs_api_time=4074, gcs_backoff_count=0, 
gcs_api_client_unauthorized_response_count=0, stream_read_close_operations=7, 
stream_read_bytes_backwards_on_seek=0, gs_filesystem_create=4, 
exception_count=0, gcs_exception_count=0, gcs_api_total_request_count=102, 
op_create=5, stream_read_vectored_operations=0, gcs_metadata_request=58, 
gcs_api_client_bad_request_count=0, action_http_put_request=6, 
op_create_non_recursive=0, gcs_api_client_gone_response_count=0, 
gs_filesystem_initialize=3, stream_read_vectored_incoming_ranges=0, 
stream_write_operations=4, gcs_list_dir_request=6, stream_read_operations=8, 
gcs_api_client_request_timeout_count=0, op_rename=0, op_get_file_status=10, 
op_glob_status=0, op_exists=9, stream_write_bytes=490061, op_xattr_list=0, 
op_get
 _delegation_token=0, gcs_api_server_unavailable_count=0, 
directories_created=1, files_delete_rejected=0, 
stream_read_vectored_combined_ranges=0, op_xattr_get_named=0, 
gcs_list_file_request=13, op_hsync=0, action_http_get_request=0, 
stream_read_operations_incomplete=20, op_delete=1, stream_read_bytes=7375, 
gcs_api_client_non_found_response_count=45, op_list_located_status=0, 
gcs_api_client_requested_range_not_statisfiable_count=0, op_hflush=13, 
op_list_status=6, stream_read_vectored_read_bytes_discarded=0, 
op_xattr_get_named_map=0, gcs_api_client_side_error_count=46, 
op_get_file_checksum=0, gcs_api_server_internal_error_count=0, 
stream_read_seek_bytes_skipped=0, stream_write_close_operations=6, 
gcs_get_media_request=7, gcs_connector_time=3787, files_deleted=1, 
action_http_post_request=9, op_mkdirs=1, 
gcs_api_client_rate_limit_error_count=0, op_copy_from_local_file=0, 
gcs_api_server_bad_gateway_count=0, stream_readVectored_range_duration=0, 
stream_read_seek_backward_operations=0, gcs_
 api_server_side_error_count=0, stream_read_seek_operations=0, 
gcs_get_other_request=1, stream_read_seek_forward_operations=0, 
gcs_api_client_precondition_failed_response_count=1, op_xattr_get_map=0, 
delegation_tokens_issued=0, gcs_backoff_time=0, gcs_list_dir_request_min=22, 
gcs_metadata_request_min=13, op_delete_min=108, op_glob_status_min=0, 
op_create_non_recursive_min=0, op_hsync_min=0, op_xattr_get_named_min=0, 
op_xattr_get_named_map_min=0, op_hflush_min=0, op_xattr_list_min=0, 
action_http_put_request_min=76, op_open_min=17, gcs_list_file_request_min=17, 
stream_write_close_operations_min=0, op_create_min=46, 
action_http_delete_request_min=44, op_mkdirs_min=173, op_list_status_min=47, 
gcs_get_media_request_min=37, stream_readVectored_range_duration_min=0, 
stream_read_vectored_operations_min=0, stream_read_close_operations_min=0, 
stream_read_operations_min=0, stream_read_seek_operations_min=0, 
op_xattr_get_map_min=0, stream_write_operations_min=0, 
action_http_patch_request_min=0, 
 op_get_file_status_min=22, op_rename_min=0, delegation_tokens_issued_min=0, 
action_http_post_request_min=31, 
   25/08/21 13:46:24 INFO RequestTracker: Detected high latency for 
[url=https://storage.googleapis.com/storage/v1/b/dataproc-temp-us-central1-213773471448-5sd4nwct/o/c25fae69-38ef-46ed-9c0a-0a370d1d5910%2Fspark-job-history%2Fapplication_1753813867992_0356.inprogress/compose?ifGenerationMatch=1755783982615514;
 invocationId=gl-java/11.0.20 gdcl/2.7.0 linux/6.1.0 
gccl-invocation-id/ca28763e-c0dd-4bb1-941c-9b67dc0ccb65]. durationMs=236; 
method=POST; thread=main [CONTEXT ratelimit_period="10 SECONDS" ]
   [dd.trace 2025-08-21 13:46:24:360 +0000] [main] INFO 
datadog.trace.instrumentation.spark.AbstractDatadogSparkListener - Finishing 
spark application trace
   Exception in thread "main" org.apache.hudi.exception.HoodieException: Failed 
to run HoodieStreamer 
        at 
org.apache.hudi.utilities.streamer.HoodieStreamer.main(HoodieStreamer.java:653)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1196)
        at 
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:195)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:218)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:92)
        at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1288)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1297)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
   Caused by: org.apache.hudi.utilities.ingestion.HoodieIngestionException: 
Ingestion service was shut down with exception.
        at 
org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:67)
        at org.apache.hudi.common.util.Option.ifPresent(Option.java:101)
        at 
org.apache.hudi.utilities.streamer.HoodieStreamer.sync(HoodieStreamer.java:229)
        at 
org.apache.hudi.utilities.streamer.HoodieStreamer.main(HoodieStreamer.java:650)
        ... 12 more
   Caused by: java.util.concurrent.ExecutionException: 
org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit 
time 20250821134617535
        at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
        at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
        at 
org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:102)
        at 
org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:65)
        ... 15 more
   Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to upsert 
for commit time 20250821134617535
        at 
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:67)
        at 
org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:44)
        at 
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:114)
        at 
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:103)
        at 
org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:132)
        at 
org.apache.hudi.utilities.streamer.StreamSync.writeToSink(StreamSync.java:980)
        at 
org.apache.hudi.utilities.streamer.StreamSync.writeToSinkAndDoMetaSync(StreamSync.java:810)
        at 
org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:477)
        at 
org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.lambda$startService$1(HoodieStreamer.java:836)
        at 
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
   Caused by: org.apache.spark.SparkException: Job aborted due to stage 
failure: Task 0 in stage 0.0 failed 10 times, most recent failure: Lost task 
0.9 in stage 0.0 (TID 9) 
(dp-data-eng-gce-dev-w-0.c.non-prod-data-eng-dev.internal executor 1): 
java.io.IOException: unexpected exception type
        at 
java.base/java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1512)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at 
java.base/java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:237)
        at 
java.base/jdk.internal.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at 
java.base/java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1136)
        ... 128 more
   Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization
        at 
org.apache.hudi.utilities.streamer.SourceFormatAdapter.$deserializeLambda$(SourceFormatAdapter.java:72)
        ... 137 more
   
   Driver stacktrace:
        at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
        ...
        at 
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
        at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
        at 
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
        at 
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
        at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
        at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:90)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
        at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
        at org.apache.spark.scheduler.Task.run(Task.scala:141)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
        at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
        at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:96)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
        ... 3 more
   Caused by: java.lang.reflect.InvocationTargetException
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at 
java.base/java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:237)
        at 
java.base/jdk.internal.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at 
java.base/java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1136)
        ... 128 more
   Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization
        at org.apache.hudi.utilities.streamer.SourceFormatAdapte.```
   
   I’d appreciate any insights into resolving this issue.
   Thank you for your help!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to