soumilshah1995 opened a new issue, #11913:
URL: https://github.com/apache/hudi/issues/11913

   
   
   ### Steps to reproduce 
   
   #### Step1 : Download dataset  upload it to S3 
   
   
![image](https://github.com/user-attachments/assets/149120f3-0648-4a49-850a-fdc864146ebf)
   
   
https://drive.google.com/drive/folders/1mQzZSVgxQGksoXkYGb8DSn2jeR48VehS?usp=share_link
   
   
   
   # Download jar and upload to S3 
   
![image](https://github.com/user-attachments/assets/5dc59441-7938-4e3b-b726-754c97aa98a7)
   
   DOWNLOAD JAR FROM 
https://drive.google.com/drive/folders/1mQzZSVgxQGksoXkYGb8DSn2jeR48VehS?usp=share_link
   
   ```
   aws s3 cp 
/Users/soumilshah/IdeaProjects/SparkProject/EMRServerlessLabsLakehouse/xtable-emr/jar
 s3://soumilshah-dev-1995/jars/ --recursive
   ```
   
   # Event 
   ```
   event = {
       "jar": [
           "/usr/lib/hudi/hudi-utilities-bundle.jar",
           
"s3://soumilshah-dev-1995/jars/hudi-extensions-0.1.0-SNAPSHOT-bundled.jar",
           "s3://soumilshah-dev-1995/jars/hudi-java-client-0.14.0.jar"
       ],
       "spark_submit_parameters": [
           "--conf spark.serializer=org.apache.spark.serializer.KryoSerializer",
           "--conf 
spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension",
           "--conf 
spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog",
           "--conf spark.sql.hive.convertMetastoreParquet=false",
           "--conf 
spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
           "--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer"
       ],
       "arguments": {
           "table-type": "COPY_ON_WRITE",
           "op": "UPSERT",
           "enable-sync": True,
           "sync-tool-classes": "io.onetable.hudi.sync.OneTableSyncTool",
           "source-ordering-field": "replicadmstimestamp",
           "source-class": "org.apache.hudi.utilities.sources.ParquetDFSSource",
           "target-table": "invoice",
           "target-base-path": "s3://soumilshah-dev-1995/tmp/invoice/",
           "hoodie-conf": {
               "hoodie.datasource.write.keygenerator.class": 
"org.apache.hudi.keygen.SimpleKeyGenerator",
               "hoodie.datasource.write.recordkey.field": "invoiceid",
               "hoodie.datasource.write.partitionpath.field": 
"destinationstate",
               "hoodie.deltastreamer.source.dfs.root": 
"s3://soumilshah-dev-1995/raw/parquet/",
               "hoodie.datasource.write.precombine.field": 
"replicadmstimestamp",
               "hoodie.onetable.formats.to.sync": "DELTA,ICEBERG"
           }
       },
       "job": {
           "job_name": "delta_streamer_invoice",
           "created_by": "Soumil Shah",
           "created_at": "2024-03-20",
           "ApplicationId": os.getenv("APPLICATION_ID"),
           "ExecutionTime": 600,
           "JobActive": True,
           "schedule": "0 8 * * *",
           "JobStatusPolling": True,
           "JobDescription": "Ingest data from parquet source",
           "ExecutionArn": os.getenv("IAM_ROLE"),
       }
   }
   ```
   
   # Full code 
   ```
   import os
   import time
   import uuid
   import json
   import boto3
   
   
   def check_job_status(client, run_id, applicationId):
       response = client.get_job_run(applicationId=applicationId, 
jobRunId=run_id)
       return response['jobRun']['state']
   
   
   def lambda_handler(event, context):
       try:
           # Create EMR serverless client object
           client = boto3.client("emr-serverless")
   
           # Extracting parameters from the event
           jar = event.get("jar", [])
           # Add --conf spark.jars with comma-separated values from the jar 
object
           spark_submit_parameters = ' 
'.join(event.get("spark_submit_parameters", []))  # Convert list to string
           spark_submit_parameters = f'--conf spark.jars={",".join(jar)} 
{spark_submit_parameters}'  # Join with existing parameters
   
           arguments = event.get("arguments", {})
           job = event.get("job", {})
   
           # Extracting job details
           JobName = job.get("job_name")
           ApplicationId = job.get("ApplicationId")
           ExecutionTime = job.get("ExecutionTime")
           ExecutionArn = job.get("ExecutionArn")
   
           # Processing arguments
           entryPointArguments = []
           for key, value in arguments.items():
               if key == "hoodie-conf":
   
                   # Extract hoodie-conf key-value pairs and add to 
entryPointArguments
                   for hoodie_key, hoodie_value in value.items():
                       entryPointArguments.extend(["--hoodie-conf", 
f"{hoodie_key}={hoodie_value}"])
               elif isinstance(value, bool):
                   # Add boolean parameters without values if True
                   if value:
                       entryPointArguments.append(f"--{key}")
               else:
                   entryPointArguments.extend([f"--{key}", f"{value}"])
   
           # Starting the EMR job run
           response = client.start_job_run(
               applicationId=ApplicationId,
               clientToken=str(uuid.uuid4()),
               executionRoleArn=ExecutionArn,
               jobDriver={
                   'sparkSubmit': {
                       'entryPoint': "command-runner.jar",
                       'entryPointArguments': entryPointArguments,
                       'sparkSubmitParameters': spark_submit_parameters
                   },
               },
               executionTimeoutMinutes=ExecutionTime,
               name=JobName
           )
   
           if job.get("JobStatusPolling") == True:
               # Polling for job status
               run_id = response['jobRunId']
               print("Job run ID:", run_id)
   
               polling_interval = 3
               while True:
                   status = check_job_status(client=client, run_id=run_id, 
applicationId=ApplicationId)
                   print("Job status:", status)
                   if status in ["CANCELLED", "FAILED", "SUCCESS"]:
                       break
                   time.sleep(polling_interval)  # Poll every 3 seconds
   
           return {
               "statusCode": 200,
               "body": json.dumps(response)
           }
   
       except Exception as e:
           print(f"An error occurred: {e}")
           return {
               "statusCode": 500,
               "body": json.dumps({"error": str(e)})
           }
   
   
   # Test event
   
   
   event = {
       "jar": [
           "/usr/lib/hudi/hudi-utilities-bundle.jar",
           
"s3://soumilshah-dev-1995/jars/hudi-extensions-0.1.0-SNAPSHOT-bundled.jar",
           "s3://soumilshah-dev-1995/jars/hudi-java-client-0.14.0.jar"
       ],
       "spark_submit_parameters": [
           "--conf spark.serializer=org.apache.spark.serializer.KryoSerializer",
           "--conf 
spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension",
           "--conf 
spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog",
           "--conf spark.sql.hive.convertMetastoreParquet=false",
           "--conf 
spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
           "--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer"
       ],
       "arguments": {
           "table-type": "COPY_ON_WRITE",
           "op": "UPSERT",
           "enable-sync": True,
           "sync-tool-classes": "io.onetable.hudi.sync.OneTableSyncTool",
           "source-ordering-field": "replicadmstimestamp",
           "source-class": "org.apache.hudi.utilities.sources.ParquetDFSSource",
           "target-table": "invoice",
           "target-base-path": "s3://soumilshah-dev-1995/tmp/invoice/",
           "hoodie-conf": {
               "hoodie.datasource.write.keygenerator.class": 
"org.apache.hudi.keygen.SimpleKeyGenerator",
               "hoodie.datasource.write.recordkey.field": "invoiceid",
               "hoodie.datasource.write.partitionpath.field": 
"destinationstate",
               "hoodie.deltastreamer.source.dfs.root": 
"s3://soumilshah-dev-1995/raw/parquet/",
               "hoodie.datasource.write.precombine.field": 
"replicadmstimestamp",
               "hoodie.onetable.formats.to.sync": "DELTA,ICEBERG"
           }
       },
       "job": {
           "job_name": "delta_streamer_invoice",
           "created_by": "Soumil Shah",
           "created_at": "2024-03-20",
           "ApplicationId": os.getenv("APPLICATION_ID"),
           "ExecutionTime": 600,
           "JobActive": True,
           "schedule": "0 8 * * *",
           "JobStatusPolling": True,
           "JobDescription": "Ingest data from parquet source",
           "ExecutionArn": os.getenv("IAM_ROLE"),
       }
   }
   
   print(json.dumps(event, indent=3))
   lambda_handler(event=event, context=None)
   
   ```
   
   # Error
   It seems to be a bit inconsistent—sometimes it works, and other times it 
doesn't. For instance, there are occasions when I can see all the files in S3, 
but the EMR job still fails with an error. Other times, when I delete the S3 
folder for testing and resubmit the job, I encounter unusual errors.
   
   ```
   
   4/09/07 18:11:16 INFO BlockManagerMaster: BlockManagerMaster stopped
   24/09/07 18:11:16 INFO 
OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: 
OutputCommitCoordinator stopped!
   24/09/07 18:11:16 INFO SparkContext: Successfully stopped SparkContext
   Exception in thread "main" 
org.apache.hudi.exception.HoodieMetaSyncException: Could not sync using the 
meta sync class io.onetable.hudi.sync.OneTableSyncTool
        at 
org.apache.hudi.sync.common.util.SyncUtilHelpers.runHoodieMetaSync(SyncUtilHelpers.java:81)
        at 
org.apache.hudi.utilities.streamer.StreamSync.runMetaSync(StreamSync.java:938)
        at 
org.apache.hudi.utilities.streamer.StreamSync.writeToSink(StreamSync.java:851)
        at 
org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:446)
        at 
org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.ingestOnce(HoodieStreamer.java:840)
        at 
org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:72)
        at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
        at 
org.apache.hudi.utilities.streamer.HoodieStreamer.sync(HoodieStreamer.java:205)
        at 
org.apache.hudi.utilities.streamer.HoodieStreamer.main(HoodieStreamer.java:584)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:568)
        at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1075)
        at 
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
        at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1167)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1176)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
   Caused by: org.apache.hudi.exception.HoodieException: Unable to sync to 
OneTable for formats: DELTA
        at 
io.onetable.hudi.sync.OneTableSyncTool.syncHoodieTable(OneTableSyncTool.java:90)
        at 
org.apache.hudi.sync.common.util.SyncUtilHelpers.runHoodieMetaSync(SyncUtilHelpers.java:79)
        ... 20 more
   ```
   
   # Other times I get these error 
   ```
   4/09/07 18:16:34 WARN TaskSetManager: Lost task 0.0 in stage 12.0 (TID 11) 
([2600:1f18:6236:7a00:5c7d:22b7:29ae:18c3] executor 3): 
java.io.InvalidObjectException: ReflectiveOperationException during 
deserialization
        at 
java.base/java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:280)
        at 
java.base/jdk.internal.reflect.GeneratedMethodAccessor13.invoke(Unknown Source)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:568)
        at 
java.base/java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1190)
        at 
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2266)
        at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
        at 
java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:2157)
        at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1721)
        at 
java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
        at 
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
        at 
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
        at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
        at 
java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:2157)
        at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1721)
        at 
java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
        at 
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
        at 
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
        at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
        at 
java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:2157)
        at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1721)
        at 
java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
        at 
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
        at 
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
        at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
        at 
java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
        at 
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
        at 
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
        at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
        at 
java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
        at 
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
        at 
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
        at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
        at 
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:509)
        at 
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:467)
        at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
        at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:90)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
        at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
        at org.apache.spark.scheduler.Task.run(Task.scala:143)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:629)
        at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
        at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:95)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:632)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:840)
   Caused by: java.lang.reflect.InvocationTargetException
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:568)
        at 
java.base/java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:278)
        ... 48 more
   Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization
        at 
org.apache.hudi.table.action.commit.HoodieWriteHelper.$deserializeLambda$(HoodieWriteHelper.java:37)
        ... 53 more
   
   24/09/07 18:16:34 INFO TaskSetManager: Starting task 0.1 in stage 12.0 (TID 
12) ([2600:1f18:6236:7a00:1c98:6d15:bfa9:25eb], executor 1, partition 0, ANY, 
8887 bytes) 
   24/09/07 18:16:35 INFO BlockManagerInfo: Added broadcast_13_piece0 in memory 
on [2600:1f18:6236:7a00:1c98:6d15:bfa9:25eb]:46171 (size: 26.9 KiB, free: 7.9 
GiB)
   24/09/07 18:16:35 INFO TaskSetManager: Lost task 0.1 in stage 12.0 (TID 12) 
on [2600:1f18:6236:7a00:1c98:6d15:bfa9:25eb], executor 1: 
java.io.InvalidObjectException (ReflectiveOperationException during 
deserialization) [duplicate 1]
   24/09/07 18:16:35 INFO TaskSetManager: Starting task 0.2 in stage 12.0 (TID 
13) ([2600:1f18:6236:7a00:1c98:6d15:bfa9:25eb], executor 1, partition 0, ANY, 
8887 bytes) 
   24/09/07 18:16:35 INFO TaskSetManager: Lost task 0.2 in stage 12.0 (TID 13) 
on [2600:1f18:6236:7a00:1c98:6d15:bfa9:25eb], executor 1: 
java.io.InvalidObjectException (ReflectiveOperationException during 
deserialization) [duplicate 2]
   24/09/07 18:16:35 INFO TaskSetManager: Starting task 0.3 in stage 12.0 (TID 
14) ([2600:1f18:6236:7a00:5c7d:22b7:29ae:18c3], executor 3, partition 0, ANY, 
8887 bytes) 
   24/09/07 18:16:35 INFO TaskSetManager: Lost task 0.3 in stage 12.0 (TID 14) 
on [2600:1f18:6236:7a00:5c7d:22b7:29ae:18c3], executor 3: 
java.io.InvalidObjectException (ReflectiveOperationException during 
deserialization) [duplicate 3]
   24/09/07 18:16:35 ERROR TaskSetManager: Task 0 in stage 12.0 failed 4 times; 
aborting job
   24/09/07 18:16:35 INFO TaskSchedulerImpl: Removed TaskSet 12.0, whose tasks 
have all completed, from pool 
   24/09/07 18:16:35 INFO TaskSchedulerImpl: Cancelling stage 12
   24/09/07 18:16:35 INFO TaskSchedulerImpl: Killing all running tasks in stage 
12: Stage cancelled: Job aborted due to stage failure: Task 0 in stage 12.0 
failed 4 times, most recent failure: Lost task 0.3 in stage 12.0 (TID 14) 
([2600:1f18:6236:7a00:5c7d:22b7:29ae:18c3] executor 3): 
java.io.InvalidObjectException: ReflectiveOperationException during 
deserialization
        
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to