soumilshah1995 opened a new issue, #11913: URL: https://github.com/apache/hudi/issues/11913
### Steps to reproduce #### Step1 : Download dataset upload it to S3  https://drive.google.com/drive/folders/1mQzZSVgxQGksoXkYGb8DSn2jeR48VehS?usp=share_link # Download jar and upload to S3  DOWNLOAD JAR FROM https://drive.google.com/drive/folders/1mQzZSVgxQGksoXkYGb8DSn2jeR48VehS?usp=share_link ``` aws s3 cp /Users/soumilshah/IdeaProjects/SparkProject/EMRServerlessLabsLakehouse/xtable-emr/jar s3://soumilshah-dev-1995/jars/ --recursive ``` # Event ``` event = { "jar": [ "/usr/lib/hudi/hudi-utilities-bundle.jar", "s3://soumilshah-dev-1995/jars/hudi-extensions-0.1.0-SNAPSHOT-bundled.jar", "s3://soumilshah-dev-1995/jars/hudi-java-client-0.14.0.jar" ], "spark_submit_parameters": [ "--conf spark.serializer=org.apache.spark.serializer.KryoSerializer", "--conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension", "--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog", "--conf spark.sql.hive.convertMetastoreParquet=false", "--conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory", "--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer" ], "arguments": { "table-type": "COPY_ON_WRITE", "op": "UPSERT", "enable-sync": True, "sync-tool-classes": "io.onetable.hudi.sync.OneTableSyncTool", "source-ordering-field": "replicadmstimestamp", "source-class": "org.apache.hudi.utilities.sources.ParquetDFSSource", "target-table": "invoice", "target-base-path": "s3://soumilshah-dev-1995/tmp/invoice/", "hoodie-conf": { "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.SimpleKeyGenerator", "hoodie.datasource.write.recordkey.field": "invoiceid", "hoodie.datasource.write.partitionpath.field": "destinationstate", "hoodie.deltastreamer.source.dfs.root": "s3://soumilshah-dev-1995/raw/parquet/", "hoodie.datasource.write.precombine.field": "replicadmstimestamp", "hoodie.onetable.formats.to.sync": "DELTA,ICEBERG" } }, "job": { "job_name": "delta_streamer_invoice", "created_by": "Soumil Shah", "created_at": "2024-03-20", "ApplicationId": os.getenv("APPLICATION_ID"), "ExecutionTime": 600, "JobActive": True, "schedule": "0 8 * * *", "JobStatusPolling": True, "JobDescription": "Ingest data from parquet source", "ExecutionArn": os.getenv("IAM_ROLE"), } } ``` # Full code ``` import os import time import uuid import json import boto3 def check_job_status(client, run_id, applicationId): response = client.get_job_run(applicationId=applicationId, jobRunId=run_id) return response['jobRun']['state'] def lambda_handler(event, context): try: # Create EMR serverless client object client = boto3.client("emr-serverless") # Extracting parameters from the event jar = event.get("jar", []) # Add --conf spark.jars with comma-separated values from the jar object spark_submit_parameters = ' '.join(event.get("spark_submit_parameters", [])) # Convert list to string spark_submit_parameters = f'--conf spark.jars={",".join(jar)} {spark_submit_parameters}' # Join with existing parameters arguments = event.get("arguments", {}) job = event.get("job", {}) # Extracting job details JobName = job.get("job_name") ApplicationId = job.get("ApplicationId") ExecutionTime = job.get("ExecutionTime") ExecutionArn = job.get("ExecutionArn") # Processing arguments entryPointArguments = [] for key, value in arguments.items(): if key == "hoodie-conf": # Extract hoodie-conf key-value pairs and add to entryPointArguments for hoodie_key, hoodie_value in value.items(): entryPointArguments.extend(["--hoodie-conf", f"{hoodie_key}={hoodie_value}"]) elif isinstance(value, bool): # Add boolean parameters without values if True if value: entryPointArguments.append(f"--{key}") else: entryPointArguments.extend([f"--{key}", f"{value}"]) # Starting the EMR job run response = client.start_job_run( applicationId=ApplicationId, clientToken=str(uuid.uuid4()), executionRoleArn=ExecutionArn, jobDriver={ 'sparkSubmit': { 'entryPoint': "command-runner.jar", 'entryPointArguments': entryPointArguments, 'sparkSubmitParameters': spark_submit_parameters }, }, executionTimeoutMinutes=ExecutionTime, name=JobName ) if job.get("JobStatusPolling") == True: # Polling for job status run_id = response['jobRunId'] print("Job run ID:", run_id) polling_interval = 3 while True: status = check_job_status(client=client, run_id=run_id, applicationId=ApplicationId) print("Job status:", status) if status in ["CANCELLED", "FAILED", "SUCCESS"]: break time.sleep(polling_interval) # Poll every 3 seconds return { "statusCode": 200, "body": json.dumps(response) } except Exception as e: print(f"An error occurred: {e}") return { "statusCode": 500, "body": json.dumps({"error": str(e)}) } # Test event event = { "jar": [ "/usr/lib/hudi/hudi-utilities-bundle.jar", "s3://soumilshah-dev-1995/jars/hudi-extensions-0.1.0-SNAPSHOT-bundled.jar", "s3://soumilshah-dev-1995/jars/hudi-java-client-0.14.0.jar" ], "spark_submit_parameters": [ "--conf spark.serializer=org.apache.spark.serializer.KryoSerializer", "--conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension", "--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog", "--conf spark.sql.hive.convertMetastoreParquet=false", "--conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory", "--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer" ], "arguments": { "table-type": "COPY_ON_WRITE", "op": "UPSERT", "enable-sync": True, "sync-tool-classes": "io.onetable.hudi.sync.OneTableSyncTool", "source-ordering-field": "replicadmstimestamp", "source-class": "org.apache.hudi.utilities.sources.ParquetDFSSource", "target-table": "invoice", "target-base-path": "s3://soumilshah-dev-1995/tmp/invoice/", "hoodie-conf": { "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.SimpleKeyGenerator", "hoodie.datasource.write.recordkey.field": "invoiceid", "hoodie.datasource.write.partitionpath.field": "destinationstate", "hoodie.deltastreamer.source.dfs.root": "s3://soumilshah-dev-1995/raw/parquet/", "hoodie.datasource.write.precombine.field": "replicadmstimestamp", "hoodie.onetable.formats.to.sync": "DELTA,ICEBERG" } }, "job": { "job_name": "delta_streamer_invoice", "created_by": "Soumil Shah", "created_at": "2024-03-20", "ApplicationId": os.getenv("APPLICATION_ID"), "ExecutionTime": 600, "JobActive": True, "schedule": "0 8 * * *", "JobStatusPolling": True, "JobDescription": "Ingest data from parquet source", "ExecutionArn": os.getenv("IAM_ROLE"), } } print(json.dumps(event, indent=3)) lambda_handler(event=event, context=None) ``` # Error It seems to be a bit inconsistent—sometimes it works, and other times it doesn't. For instance, there are occasions when I can see all the files in S3, but the EMR job still fails with an error. Other times, when I delete the S3 folder for testing and resubmit the job, I encounter unusual errors. ``` 4/09/07 18:11:16 INFO BlockManagerMaster: BlockManagerMaster stopped 24/09/07 18:11:16 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 24/09/07 18:11:16 INFO SparkContext: Successfully stopped SparkContext Exception in thread "main" org.apache.hudi.exception.HoodieMetaSyncException: Could not sync using the meta sync class io.onetable.hudi.sync.OneTableSyncTool at org.apache.hudi.sync.common.util.SyncUtilHelpers.runHoodieMetaSync(SyncUtilHelpers.java:81) at org.apache.hudi.utilities.streamer.StreamSync.runMetaSync(StreamSync.java:938) at org.apache.hudi.utilities.streamer.StreamSync.writeToSink(StreamSync.java:851) at org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:446) at org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.ingestOnce(HoodieStreamer.java:840) at org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:72) at org.apache.hudi.common.util.Option.ifPresent(Option.java:97) at org.apache.hudi.utilities.streamer.HoodieStreamer.sync(HoodieStreamer.java:205) at org.apache.hudi.utilities.streamer.HoodieStreamer.main(HoodieStreamer.java:584) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:568) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1075) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1167) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1176) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: org.apache.hudi.exception.HoodieException: Unable to sync to OneTable for formats: DELTA at io.onetable.hudi.sync.OneTableSyncTool.syncHoodieTable(OneTableSyncTool.java:90) at org.apache.hudi.sync.common.util.SyncUtilHelpers.runHoodieMetaSync(SyncUtilHelpers.java:79) ... 20 more ``` # Other times I get these error ``` 4/09/07 18:16:34 WARN TaskSetManager: Lost task 0.0 in stage 12.0 (TID 11) ([2600:1f18:6236:7a00:5c7d:22b7:29ae:18c3] executor 3): java.io.InvalidObjectException: ReflectiveOperationException during deserialization at java.base/java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:280) at java.base/jdk.internal.reflect.GeneratedMethodAccessor13.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:568) at java.base/java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1190) at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2266) at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733) at java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:2157) at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1721) at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606) at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457) at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257) at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733) at java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:2157) at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1721) at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606) at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457) at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257) at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733) at java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:2157) at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1721) at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606) at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457) at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257) at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733) at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606) at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457) at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257) at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733) at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606) at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457) at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257) at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733) at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:509) at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:467) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:90) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:143) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:629) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:95) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:632) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840) Caused by: java.lang.reflect.InvocationTargetException at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:568) at java.base/java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:278) ... 48 more Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization at org.apache.hudi.table.action.commit.HoodieWriteHelper.$deserializeLambda$(HoodieWriteHelper.java:37) ... 53 more 24/09/07 18:16:34 INFO TaskSetManager: Starting task 0.1 in stage 12.0 (TID 12) ([2600:1f18:6236:7a00:1c98:6d15:bfa9:25eb], executor 1, partition 0, ANY, 8887 bytes) 24/09/07 18:16:35 INFO BlockManagerInfo: Added broadcast_13_piece0 in memory on [2600:1f18:6236:7a00:1c98:6d15:bfa9:25eb]:46171 (size: 26.9 KiB, free: 7.9 GiB) 24/09/07 18:16:35 INFO TaskSetManager: Lost task 0.1 in stage 12.0 (TID 12) on [2600:1f18:6236:7a00:1c98:6d15:bfa9:25eb], executor 1: java.io.InvalidObjectException (ReflectiveOperationException during deserialization) [duplicate 1] 24/09/07 18:16:35 INFO TaskSetManager: Starting task 0.2 in stage 12.0 (TID 13) ([2600:1f18:6236:7a00:1c98:6d15:bfa9:25eb], executor 1, partition 0, ANY, 8887 bytes) 24/09/07 18:16:35 INFO TaskSetManager: Lost task 0.2 in stage 12.0 (TID 13) on [2600:1f18:6236:7a00:1c98:6d15:bfa9:25eb], executor 1: java.io.InvalidObjectException (ReflectiveOperationException during deserialization) [duplicate 2] 24/09/07 18:16:35 INFO TaskSetManager: Starting task 0.3 in stage 12.0 (TID 14) ([2600:1f18:6236:7a00:5c7d:22b7:29ae:18c3], executor 3, partition 0, ANY, 8887 bytes) 24/09/07 18:16:35 INFO TaskSetManager: Lost task 0.3 in stage 12.0 (TID 14) on [2600:1f18:6236:7a00:5c7d:22b7:29ae:18c3], executor 3: java.io.InvalidObjectException (ReflectiveOperationException during deserialization) [duplicate 3] 24/09/07 18:16:35 ERROR TaskSetManager: Task 0 in stage 12.0 failed 4 times; aborting job 24/09/07 18:16:35 INFO TaskSchedulerImpl: Removed TaskSet 12.0, whose tasks have all completed, from pool 24/09/07 18:16:35 INFO TaskSchedulerImpl: Cancelling stage 12 24/09/07 18:16:35 INFO TaskSchedulerImpl: Killing all running tasks in stage 12: Stage cancelled: Job aborted due to stage failure: Task 0 in stage 12.0 failed 4 times, most recent failure: Lost task 0.3 in stage 12.0 (TID 14) ([2600:1f18:6236:7a00:5c7d:22b7:29ae:18c3] executor 3): java.io.InvalidObjectException: ReflectiveOperationException during deserialization ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
