gtwuser opened a new issue, #8532:
URL: https://github.com/apache/hudi/issues/8532

   **Describe the problem you faced**
   
   A clear and concise description of the problem.
   We are trying to load huge datasets into Hudi table, of size 7.9 GB via 
`insert` operation. With following hudi configs
   ```bash
    common_config = {
               "className": ORG_APACHE_HUDI,
               "hoodie.datasource.hive_sync.use_jdbc": "false",
               "hoodie.datasource.write.precombine.field": "lastUpdateDate",
               "hoodie.datasource.write.recordkey.field": 
local_var["record_key"],
               "hoodie.table.name": "ldf",
               "hoodie.datasource.hive_sync.mode": "hms",
               "hoodie.consistency.check.enabled": "false",
               "hoodie.datasource.hive_sync.database": database,
               "hoodie.datasource.write.reconcile.schema": "true",
               "hoodie.datasource.hive_sync.table": table_name,
               "hoodie.datasource.hive_sync.enable": "true",
               "hoodie.datasource.write.payload.class": 
"org.apache.hudi.common.model.DefaultHoodieRecordPayload",
               "path": "s3://" + args["raw_bucket"] + "/ldf-data/merge-test/" + 
local_var["conf_prefix"],
               # 1,024 * 1,024 * 128 = 134,217,728 (128 MB)
               "hoodie.parquet.small.file.limit": "134217728",
           }
   
     unpartition_data_config = {
         "hoodie.datasource.hive_sync.partition_extractor_class": 
"org.apache.hudi.hive.NonPartitionedExtractor",
         "hoodie.datasource.write.keygenerator.class": 
"org.apache.hudi.keygen.NonpartitionedKeyGenerator",
     }
   
     init_load_config = {WRITE_OPERATION: BULK_INSERT, 
"hoodie.bulkinsert.shuffle.parallelism": 200}
     incremental_config = {
         WRITE_OPERATION: UPSERT,
         "hoodie.upsert.shuffle.parallelism": 200,
         "hoodie.cleaner.policy": "KEEP_LATEST_COMMITS",
         "hoodie.cleaner.commits.retained": 10,
     }
   ```
   We are also merging the list of dataframes into a single dataframe before 
sending it to hudi table, as mentioned in the example below:
   ```bash
   def create_df(key, topic_schema_dict):
       local_var = {"topic": get_topic(key)}
       json_schema = topic_schema_dict[local_var["topic"]]
       data_frame = spark.read.option("mergeSchema", "true").json(key, 
schema=json_schema)
   
       return data_frame
   
   def get_aggregated_df(keys):
       # create dictionary of topic against its schema
       topic_schema_dict = get_topic_schema_map()
       # merge dataframes from all keys to a single dataframe
       all_df = [create_df(key, topic_schema_dict) for key in keys]
       aggregated_df = reduce(lambda left, right: left.unionByName(right, 
allowMissingColumns=True), all_df)
       return aggregated_df
   
   ```
   The hudi script is written into AWS glue scriptm running with bleow 
configuration:
   1. Glue 4.0
   2. 20 DPU (Data processing unit - The number of AWS Glue data processing 
units (DPUs) allocated to runs of this job. You can allocate a minimum of 2 
DPUs; the default is 10. A DPU is a relative measure of processing power that 
consists of 4 vCPUs of compute capacity and 16 GB of memory.)
   
   
   **Expected behavior**
   
   A clear and concise description of what you expected to happen.
   It should ingest the complete data of 7.9 GB into hudi table without errors.
   **Environment Description**
   
   * Hudi version : 0.12.1/ Glue 4.0
   
   * Spark version : 3.3
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : no
   
   
   **Additional context**
   
   Add any other context about the problem here.
   
   **Stacktrace**
   
   ```Add the stacktrace of the error.```
   ```bash
   2023-04-20 00:13:48,272,272 ERROR    [cxdl4-ldf-kkj-sbx.py:319] error 
ingesting data:An error occurred while calling o4774.save.
   : org.apache.spark.SparkException: Job aborted due to stage failure: 
ResultStage 34 (count at HoodieSparkSqlWriter.scala:706) has failed the maximum 
allowable number of times: 4. Most recent failure reason:
   org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output 
location for shuffle 11 partition 1
        at 
org.apache.spark.MapOutputTracker$.validateStatus(MapOutputTracker.scala:1701)
        at 
org.apache.spark.MapOutputTracker$.$anonfun$convertMapStatuses$10(MapOutputTracker.scala:1648)
        at 
org.apache.spark.MapOutputTracker$.$anonfun$convertMapStatuses$10$adapted(MapOutputTracker.scala:1647)
        at scala.collection.Iterator.foreach(Iterator.scala:937)
        at scala.collection.Iterator.foreach$(Iterator.scala:937)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
        at 
org.apache.spark.MapOutputTracker$.convertMapStatuses(MapOutputTracker.scala:1647)
        at 
org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorIdImpl(MapOutputTracker.scala:1290)
        at 
org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:1252)
        at 
org.apache.spark.shuffle.sort.SortShuffleManager.getReader(SortShuffleManager.scala:140)
        at 
org.apache.spark.shuffle.ShuffleManager.getReader(ShuffleManager.scala:63)
        at 
org.apache.spark.shuffle.ShuffleManager.getReader$(ShuffleManager.scala:57)
        at 
org.apache.spark.shuffle.sort.SortShuffleManager.getReader(SortShuffleManager.scala:73)
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:106)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:378)
        at 
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1525)
        at 
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1435)
        at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1499)
        at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1322)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:376)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:327)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:138)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   
        at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2863)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2799)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2798)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2798)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1995)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2993)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1009)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2229)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2250)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2269)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2294)
        at org.apache.spark.rdd.RDD.count(RDD.scala:1274)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:706)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:340)
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:144)
        at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:103)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
        at 
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615)
        at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591)
        at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:96)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:81)
        at 
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:124)
        at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
        at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
        at 
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.lang.Thread.run(Thread.java:750)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to