bkosuru opened a new issue, #9859:
URL: https://github.com/apache/hudi/issues/9859

   UseCase:
   We use Hudi COW tables. Data is stored in GCS (Google cloud). Our datasets 
have 4 columns with few billions of rows. We ingest data on a weekly basis. 
Weekly data can be few GB to few TB. Most of the time it is few GB. We want to 
avoid inserting duplicates using INSERT_DROP_DUPS option. There are no issues 
if the input data is less than 100GB. But when the data size is more than 100GB 
we get HoodieUpsertException. 
   
   Steps to reproduce the behavior:
   
   Here is the hudi options used to write data-
   
   val DELETED_COL = "isDeleted"
   
   private val AVG_RECORD_SIZE =
   256 // approx bytes of our average record, contra Hudi default assumption of 
1024
   private val ONE_GIGABYTE =
   1024 * 1024 * 1024 // used for Parquet file size & block size
   private val BLOOM_MAX_ENTRIES = ONE_GIGABYTE / (2 * AVG_RECORD_SIZE)
   
   def save(
   df: DataFrame,
   operation: Operation,
   output: String,
   tableName: String,
   parallelism: Int,
   saveMode: SaveMode
   ): Unit = {
   df.write
   .format(HUDI_FORMAT)
   
     // DataSourceWriteOptions
     .option(operation.parallelismOption, parallelism)
     .options(
       if (operation == InsertDedup)
         Map(INSERT_DROP_DUPS_OPT_KEY -> true.toString)
       else Map[String, String]()
     )
     .option(HIVE_STYLE_PARTITIONING_OPT_KEY, true)
     .option(KEYGENERATOR_CLASS_OPT_KEY, classOf[SpoKeyGenerator].getName)
     .option(
       OPERATION_OPT_KEY,
       operation.hudiOp
     ) // insert, bulk_insert, upsert, or delete
     .option(PARTITIONPATH_FIELD_OPT_KEY, "g, p")
     .option(PRECOMBINE_FIELD_OPT_KEY, DELETED_COL)
     .option(RECORDKEY_FIELD_OPT_KEY, "s, o")
     .option(URL_ENCODE_PARTITIONING_OPT_KEY, true)
   
     // HoodieIndexConfig
     .option(HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES, BLOOM_MAX_ENTRIES)
     .option(BLOOM_INDEX_FILTER_TYPE, BloomFilterTypeCode.DYNAMIC_V0.name)
   
     // HoodieCompactionConfig
     // For first commit to a hudi table, to determine how many records can fit 
into a data file
     // Useful for hudi copy; can be tweaked if filecount differs from the 
source; default 1024
     .option(COPY_ON_WRITE_TABLE_RECORD_SIZE_ESTIMATE, 64)
     // Commit history; MIN should be less than MAX; CLEANER should be less 
than MIN
     .option(CLEANER_COMMITS_RETAINED_PROP, Integer.MAX_VALUE - 2)
     .option(MIN_COMMITS_TO_KEEP_PROP, Integer.MAX_VALUE - 1)
     .option(MAX_COMMITS_TO_KEEP_PROP, Integer.MAX_VALUE)
   
     // HoodieStorageConfig
     .option(LOGFILE_SIZE_MAX_BYTES, ONE_GIGABYTE / 0.35)
     .option(PARQUET_BLOCK_SIZE_BYTES, ONE_GIGABYTE)
     .option(
       PARQUET_FILE_MAX_BYTES,
       ONE_GIGABYTE
     ) // Current hadoop cfg uses 256MB block size.
   
     // HoodieWriteConfig
     .option(EMBEDDED_TIMELINE_SERVER_ENABLED, false)
     .option(HoodieWriteConfig.TABLE_NAME, tableName)
     
     .option("hoodie.metadata.enable", false)
     .option("hoodie.index.type", "BLOOM")
     .mode(saveMode)
     .save(path)
   }
   
   class SpoKeyGenerator(props: TypedProperties)
   extends ComplexKeyGenerator(props) {
   
   def hash128(s: String): String = {
   val h: Array[Long] = MurmurHash3.hash128(s.getBytes)
   h(0).toString + h(1).toString
   }
   
   override def getRecordKey(record: GenericRecord): String = {
   val s = HoodieAvroUtils.getNestedFieldValAsString(record, "s", false, false)
   val o = HoodieAvroUtils.getNestedFieldValAsString(record, "o", false, false)
   genKey(s, o)
   }
   
   private def genKey(s: String, o: String): String = hash128(s + o)
   
   override def getRecordKey(row: Row): String = {
   val s = row.getAs(0).toString
   val o = row.getAs(1).toString
   genKey(s, o)
   }
   
   }
   
   **Environment Description**
   
   * Hudi version : 0.13.1
   
   * Spark version : 3.3.2
   
   * Storage (HDFS/S3/GCS..) : GCS
   
   * Running on Docker? (yes/no) : No
   
   **Stacktrace**
   
   FetchFailed(BlockManagerId(27, 10.12.0.218, 42247, None), shuffleId=14, 
mapIndex=405, mapId=199610, reduceId=569, message=
   org.apache.spark.shuffle.FetchFailedException
        at 
org.apache.spark.errors.SparkCoreErrors$.fetchFailedError(SparkCoreErrors.scala:312)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1330)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:1034)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:87)
        at 
org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
        at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
        at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:205)
        at 
org.apache.spark.util.collection.ExternalSorter.insertAllAndUpdateMetrics(ExternalSorter.scala:681)
        at 
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:154)
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:106)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrR
   
![insert_dedup](https://github.com/apache/hudi/assets/7408351/6ce85421-e504-4d9f-9fdf-5470698359e0)
   eadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
   Caused by: org.apache.spark.ExecutorDeadException: Executor 27 is dead.
        at 
org.apache.spark.network.netty.NettyBlockTransferService.org$apache$spark$network$netty$NettyBlockTransferService$$throwExecutorDeadException(NettyBlockTransferService.scala:262)
        at 
org.apache.spark.network.netty.NettyBlockTransferService.ensureExecutorAlive(NettyBlockTransferService.scala:239)
        at 
org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:125)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:475)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.send$1(ShuffleBlockFetcherIterator.scala:1300)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:1292)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:1144)
        ... 38 more
   
   Spark UI screenshots attached.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to