bkosuru opened a new issue, #9859:
URL: https://github.com/apache/hudi/issues/9859
UseCase:
We use Hudi COW tables. Data is stored in GCS (Google cloud). Our datasets
have 4 columns with few billions of rows. We ingest data on a weekly basis.
Weekly data can be few GB to few TB. Most of the time it is few GB. We want to
avoid inserting duplicates using INSERT_DROP_DUPS option. There are no issues
if the input data is less than 100GB. But when the data size is more than 100GB
we get HoodieUpsertException.
Steps to reproduce the behavior:
Here is the hudi options used to write data-
val DELETED_COL = "isDeleted"
private val AVG_RECORD_SIZE =
256 // approx bytes of our average record, contra Hudi default assumption of
1024
private val ONE_GIGABYTE =
1024 * 1024 * 1024 // used for Parquet file size & block size
private val BLOOM_MAX_ENTRIES = ONE_GIGABYTE / (2 * AVG_RECORD_SIZE)
def save(
df: DataFrame,
operation: Operation,
output: String,
tableName: String,
parallelism: Int,
saveMode: SaveMode
): Unit = {
df.write
.format(HUDI_FORMAT)
// DataSourceWriteOptions
.option(operation.parallelismOption, parallelism)
.options(
if (operation == InsertDedup)
Map(INSERT_DROP_DUPS_OPT_KEY -> true.toString)
else Map[String, String]()
)
.option(HIVE_STYLE_PARTITIONING_OPT_KEY, true)
.option(KEYGENERATOR_CLASS_OPT_KEY, classOf[SpoKeyGenerator].getName)
.option(
OPERATION_OPT_KEY,
operation.hudiOp
) // insert, bulk_insert, upsert, or delete
.option(PARTITIONPATH_FIELD_OPT_KEY, "g, p")
.option(PRECOMBINE_FIELD_OPT_KEY, DELETED_COL)
.option(RECORDKEY_FIELD_OPT_KEY, "s, o")
.option(URL_ENCODE_PARTITIONING_OPT_KEY, true)
// HoodieIndexConfig
.option(HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES, BLOOM_MAX_ENTRIES)
.option(BLOOM_INDEX_FILTER_TYPE, BloomFilterTypeCode.DYNAMIC_V0.name)
// HoodieCompactionConfig
// For first commit to a hudi table, to determine how many records can fit
into a data file
// Useful for hudi copy; can be tweaked if filecount differs from the
source; default 1024
.option(COPY_ON_WRITE_TABLE_RECORD_SIZE_ESTIMATE, 64)
// Commit history; MIN should be less than MAX; CLEANER should be less
than MIN
.option(CLEANER_COMMITS_RETAINED_PROP, Integer.MAX_VALUE - 2)
.option(MIN_COMMITS_TO_KEEP_PROP, Integer.MAX_VALUE - 1)
.option(MAX_COMMITS_TO_KEEP_PROP, Integer.MAX_VALUE)
// HoodieStorageConfig
.option(LOGFILE_SIZE_MAX_BYTES, ONE_GIGABYTE / 0.35)
.option(PARQUET_BLOCK_SIZE_BYTES, ONE_GIGABYTE)
.option(
PARQUET_FILE_MAX_BYTES,
ONE_GIGABYTE
) // Current hadoop cfg uses 256MB block size.
// HoodieWriteConfig
.option(EMBEDDED_TIMELINE_SERVER_ENABLED, false)
.option(HoodieWriteConfig.TABLE_NAME, tableName)
.option("hoodie.metadata.enable", false)
.option("hoodie.index.type", "BLOOM")
.mode(saveMode)
.save(path)
}
class SpoKeyGenerator(props: TypedProperties)
extends ComplexKeyGenerator(props) {
def hash128(s: String): String = {
val h: Array[Long] = MurmurHash3.hash128(s.getBytes)
h(0).toString + h(1).toString
}
override def getRecordKey(record: GenericRecord): String = {
val s = HoodieAvroUtils.getNestedFieldValAsString(record, "s", false, false)
val o = HoodieAvroUtils.getNestedFieldValAsString(record, "o", false, false)
genKey(s, o)
}
private def genKey(s: String, o: String): String = hash128(s + o)
override def getRecordKey(row: Row): String = {
val s = row.getAs(0).toString
val o = row.getAs(1).toString
genKey(s, o)
}
}
**Environment Description**
* Hudi version : 0.13.1
* Spark version : 3.3.2
* Storage (HDFS/S3/GCS..) : GCS
* Running on Docker? (yes/no) : No
**Stacktrace**
FetchFailed(BlockManagerId(27, 10.12.0.218, 42247, None), shuffleId=14,
mapIndex=405, mapId=199610, reduceId=569, message=
org.apache.spark.shuffle.FetchFailedException
at
org.apache.spark.errors.SparkCoreErrors$.fetchFailedError(SparkCoreErrors.scala:312)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1330)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:1034)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:87)
at
org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:205)
at
org.apache.spark.util.collection.ExternalSorter.insertAllAndUpdateMetrics(ExternalSorter.scala:681)
at
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:154)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:106)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrR

eadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.ExecutorDeadException: Executor 27 is dead.
at
org.apache.spark.network.netty.NettyBlockTransferService.org$apache$spark$network$netty$NettyBlockTransferService$$throwExecutorDeadException(NettyBlockTransferService.scala:262)
at
org.apache.spark.network.netty.NettyBlockTransferService.ensureExecutorAlive(NettyBlockTransferService.scala:239)
at
org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:125)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:475)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.send$1(ShuffleBlockFetcherIterator.scala:1300)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:1292)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:1144)
... 38 more
Spark UI screenshots attached.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]