IamJusKyrat opened a new issue, #12186: URL: https://github.com/apache/hudi/issues/12186
**Describe the problem you faced** Problem Statement: HUDI Configuration: ============== Running an ETL on a table "Aggregate X Details" which uses the following hudi properties: 1. hoodie.datasource.write.operation: upsert 2. hoodie.datasource.write.keygenerator.class: org.apache.hudi.keygen.TimestampBasedKeyGenerator 3. hoodie.upsert.shuffle.parallelism: 2 4. hoodie.datasource.write.partitionpath.field: timestamp (which is year truncated) 5. hoodie.datasource.write.table.type: COPY_ON_WRITE 6. hoodie.datasource.write.precombine.field: timestamp Indexing: using the default index behavior hoodie.index.type: simple Spark Config: ======== spark.executor.instances=0; spark.executor.memory=32g; spark.driver.memory=4g; spark.driver.cores=4; spark.dynamicAllocation.initialExecutors=1; spark.dynamicAllocation.maxExecutors=5 **Environment Description** * Amazon EMR Version: * Hudi version : 0.14.0-amzn-0 * Spark version : 3.4.1-amzn-2 * Hive version : 3.1.3 * Hadoop version : 3.3.6-amzn-1 * Storage (HDFS/S3/GCS..) : S3 (But spark uses HDFS for processing tasks in EMR serverless) * Running on Docker? (yes/no) : No **Stacktrace** When running this on a large dataset i.e ~30M rows of data we partition our dataset into 50 partitions using RDD.coalesce() command. But the executors are failing repeatedly at : `org.apache.spark.api.java.JavaRDD.distinct(JavaRDD.scala:85) org.apache.hudi.data.HoodieJavaRDD.distinct(HoodieJavaRDD.java:157) org.apache.hudi.index.simple.HoodieSimpleIndex.fetchRecordLocationsForAffectedPartitions(HoodieSimpleIndex.java:147) org.apache.hudi.index.simple.HoodieSimpleIndex.tagLocationInternal(HoodieSimpleIndex.java:118) org.apache.hudi.index.simple.HoodieSimpleIndex.tagLocation(HoodieSimpleIndex.java:91) org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:59) org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:41) org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:63) org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:44) org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:114) org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:103) org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:142) org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:224) org.apache.hudi.HoodieSparkSqlWriter$.writeInternal(HoodieSparkSqlWriter.scala:431) org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:132) org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150) org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47) org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)`  Losing shuffle mapped locations: ` MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 1 to 2600:1f13:d89:d701:c1b3:f3b3:161c:a99e:41804 24/10/23 10:31:07 WARN TaskSetManager: Lost task 1.1 in stage 4.0 (TID 7) ([2600:1f13:d89:d701:c1b3:f3b3:161c:a99e] executor 2): FetchFailed(null, shuffleId=1, mapIndex=-1, mapId=-1, reduceId=1, message= org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1 partition 1 at org.apache.spark.MapOutputTracker$.validateStatus(MapOutputTracker.scala:1732) at org.apache.spark.MapOutputTracker$.$anonfun$convertMapStatuses$11(MapOutputTracker.scala:1679) at org.apache.spark.MapOutputTracker$.$anonfun$convertMapStatuses$11$adapted(MapOutputTracker.scala:1678) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at org.apache.spark.MapOutputTracker$.convertMapStatuses(MapOutputTracker.scala:1678) at org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorIdImpl(MapOutputTracker.scala:1320) at org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:1282) at org.apache.spark.shuffle.sort.SortShuffleManager.getReader(SortShuffleManager.scala:140) at org.apache.spark.shuffle.ShuffleManager.getReader(ShuffleManager.scala:63) at org.apache.spark.shuffle.ShuffleManager.getReader$(ShuffleManager.scala:57) at org.apache.spark.shuffle.sort.SortShuffleManager.getReader(SortShuffleManager.scala:73) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:106) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:377) at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1552) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1462) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1526) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1349) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:375) at org.apache.spark.rdd.RDD.iterator(RDD.scala:326) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:141) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:563) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:566) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) )` **Expected behavior** Should be able to parallelize the distinct operation to allow Upsert to de-duplicate records operations for these data rows. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
