IamJusKyrat opened a new issue, #12186:
URL: https://github.com/apache/hudi/issues/12186

   **Describe the problem you faced**
   Problem Statement:
   
   HUDI Configuration:
   ==============
   Running an ETL on a table "Aggregate X Details" which uses the following 
hudi properties:
   1. hoodie.datasource.write.operation: upsert
   2. hoodie.datasource.write.keygenerator.class: 
org.apache.hudi.keygen.TimestampBasedKeyGenerator
   3. hoodie.upsert.shuffle.parallelism: 2
   4. hoodie.datasource.write.partitionpath.field: timestamp (which is year 
truncated)
   5. hoodie.datasource.write.table.type: COPY_ON_WRITE
   6. hoodie.datasource.write.precombine.field: timestamp
   
   Indexing: using the default index behavior
   hoodie.index.type: simple
   
   Spark Config: 
   ========
   spark.executor.instances=0; spark.executor.memory=32g; 
spark.driver.memory=4g; spark.driver.cores=4; 
spark.dynamicAllocation.initialExecutors=1; 
spark.dynamicAllocation.maxExecutors=5 
   
   **Environment Description**
   * Amazon EMR Version: 
   
   * Hudi version : 0.14.0-amzn-0
   
   * Spark version : 3.4.1-amzn-2
   
   * Hive version : 3.1.3
   
   * Hadoop version : 3.3.6-amzn-1
   
   * Storage (HDFS/S3/GCS..) : S3 (But spark uses HDFS for processing tasks in 
EMR serverless)
   
   * Running on Docker? (yes/no) : No
   
   **Stacktrace**
   
   When running this on a large dataset i.e ~30M rows of data we partition our 
dataset into 50 partitions using RDD.coalesce() command. But the executors are 
failing repeatedly at :
   
   `org.apache.spark.api.java.JavaRDD.distinct(JavaRDD.scala:85)
   org.apache.hudi.data.HoodieJavaRDD.distinct(HoodieJavaRDD.java:157)
   
org.apache.hudi.index.simple.HoodieSimpleIndex.fetchRecordLocationsForAffectedPartitions(HoodieSimpleIndex.java:147)
   
org.apache.hudi.index.simple.HoodieSimpleIndex.tagLocationInternal(HoodieSimpleIndex.java:118)
   
org.apache.hudi.index.simple.HoodieSimpleIndex.tagLocation(HoodieSimpleIndex.java:91)
   
org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:59)
   
org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:41)
   
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:63)
   
org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:44)
   
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:114)
   
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:103)
   
org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:142)
   org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:224)
   
org.apache.hudi.HoodieSparkSqlWriter$.writeInternal(HoodieSparkSqlWriter.scala:431)
   org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:132)
   org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150)
   
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
   
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
   
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
   
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)`
   
   
![image](https://github.com/user-attachments/assets/b9555577-4c7f-4618-91b4-5578a259b290)
   
   Losing shuffle mapped locations:
   
   ` MapOutputTrackerMasterEndpoint: Asked to send map output locations for 
shuffle 1 to 2600:1f13:d89:d701:c1b3:f3b3:161c:a99e:41804
   24/10/23 10:31:07 WARN TaskSetManager: Lost task 1.1 in stage 4.0 (TID 7) 
([2600:1f13:d89:d701:c1b3:f3b3:161c:a99e] executor 2): FetchFailed(null, 
shuffleId=1, mapIndex=-1, mapId=-1, reduceId=1, message=
   org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output 
location for shuffle 1 partition 1
        at 
org.apache.spark.MapOutputTracker$.validateStatus(MapOutputTracker.scala:1732)
        at 
org.apache.spark.MapOutputTracker$.$anonfun$convertMapStatuses$11(MapOutputTracker.scala:1679)
        at 
org.apache.spark.MapOutputTracker$.$anonfun$convertMapStatuses$11$adapted(MapOutputTracker.scala:1678)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at 
org.apache.spark.MapOutputTracker$.convertMapStatuses(MapOutputTracker.scala:1678)
        at 
org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorIdImpl(MapOutputTracker.scala:1320)
        at 
org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:1282)
        at 
org.apache.spark.shuffle.sort.SortShuffleManager.getReader(SortShuffleManager.scala:140)
        at 
org.apache.spark.shuffle.ShuffleManager.getReader(ShuffleManager.scala:63)
        at 
org.apache.spark.shuffle.ShuffleManager.getReader$(ShuffleManager.scala:57)
        at 
org.apache.spark.shuffle.sort.SortShuffleManager.getReader(SortShuffleManager.scala:73)
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:106)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
        at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:377)
        at 
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1552)
        at 
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1462)
        at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1526)
        at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1349)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:375)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:326)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
        at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
        at org.apache.spark.scheduler.Task.run(Task.scala:141)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:563)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:566)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   
   )`
   
   
   **Expected behavior**
   
   Should be able to parallelize the distinct operation to allow Upsert to 
de-duplicate records operations for these data rows.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to