HEPBO3AH opened a new issue, #7062: URL: https://github.com/apache/hudi/issues/7062
**Describe the problem you faced** During the run of the upsert, we sometimes have executors die due to the memory issue. This issue usally cannot be resolved over the 4 retries and the entire job fail. Please look at the executor stack trace below. We tracked this issue down to the `HoodieMergeHandle` class. Please have a look at the `To Reproduce` section for more details. **To Reproduce** We had hard time replicating the issue in a small example which we can share with the Hudi team. The main condition for the issue to appear was **appending data to exsiting small files during upsert**. For us, the appending of the data cupled with an `AvgRecordSize` estimation [being too small due to clustering](https://github.com/apache/hudi/issues/5939) triggered the OOM issue quite often. After the fix of the `AvgRecordSize` estimate this issue is still there but it was not as prevalent. In our code conitions are following: ``` PARQUET_SMALL_FILE_LIMIT = 67108864 //64MB PARQUET_MAX_FILE_SIZE = 83886080 //80MB INLINE_CLUSTERING = true INLINE_CLUSTERING_MAX_COMMITS = 2 PLAN_STRATEGY_SMALL_FILE_LIMIT = 73400320 //70MB PLAN_STRATEGY_TARGET_FILE_MAX_BYTES = 209715200 //200MB TABLE_TYPE = COW INDEX_TYPE = BLOOM ``` When the `AvgRecordSize` estimate was wrong, the Hudi would it can pack many more records into a single file. Those executors which didn't fail produced files of 250MB in size, well above the target of 80MB. We tracked this issue down to the `HoodieMergeHandle` class. **Why we suspect there there is an issue with this class?** The reason we think there is an issue with the class is that when we change settings to following: ``` PARQUET_MAX_FILE_SIZE = 262144000 // 250MB PARQUET_SMALL_FILE_LIMIT = 0 // dont create small files ``` We noticed that the class `HoodieMergeHandle` is not being used due to `PARQUET_SMALL_FILE_LIMIT = 0` and the job passes successfully. **This indicates that the problem is not in the amount of data being processed by single executor but rather the part which appends to files**. **Expected behavior** When appending to exsiting files the job should succeed similar to how it does when writing large files. **Environment Description** AWS Glue 3.0 running on G2X machines * Hudi version : 0.11 * Spark version : 3.1 * Hive version : NA * Hadoop version : N/A * Storage (HDFS/S3/GCS..) : S3 * Running on Docker? (yes/no) : No **Additional context** Add any other context about the problem here. **Stacktrace** ``` 2022-10-06 06:38:15,938 INFO [Executor task launch worker for task 0.0 in stage 39.1 (TID 18599)] io.HoodieMergeHandle (HoodieMergeHandle.java:init(249)): Number of entries in MemoryBasedMap => 601116, Total size in bytes of MemoryBasedMap => 858994816, Number of entries in BitCaskDiskMap => 5477112, Size of file spilled to disk => 5073839525 2022-10-06 06:38:15,939 INFO [Executor task launch worker for task 0.0 in stage 39.1 (TID 18599)] io.HoodieMergeHandle (HoodieMergeHandle.java:init(161)): partitionPath:<pathtopartition>, fileId to be merged:20404289-5191-4634-b3bb-217d1f341940-0 2022-10-06 06:38:15,963 INFO [Executor task launch worker for task 0.0 in stage 39.1 (TID 18599)] io.HoodieMergeHandle (HoodieMergeHandle.java:init(177)): Merging new data into oldPath s3a://<oldparquetfile>, as newPath s3a://<newparquetfile> 2022-10-06 06:38:16,000 INFO [Executor task launch worker for task 0.0 in stage 39.1 (TID 18599)] marker.DirectWriteMarkers (DirectWriteMarkers.java:create(173)): Creating Marker Path=s3a://<pathtomarker>.parquet.marker.MERGE 2022-10-06 06:38:16,209 INFO [Executor task launch worker for task 0.0 in stage 39.1 (TID 18599)] marker.DirectWriteMarkers (DirectWriteMarkers.java:create(178)): [direct] Created marker file s3a://<pathtomarker>.parquet.marker.MERGE in 246 ms 2022-10-06 06:38:16,419 INFO [producer-thread-1] queue.IteratorBasedQueueProducer (IteratorBasedQueueProducer.java:produce(44)): starting to buffer records 2022-10-06 06:38:16,419 INFO [consumer-thread-1] queue.BoundedInMemoryExecutor (BoundedInMemoryExecutor.java:lambda$null$2(131)): starting consumer thread 2022-10-06 06:38:16,460 INFO [producer-thread-1] s3a.S3AInputStream (S3AInputStream.java:seekInStream(304)): Switching to Random IO seek policy 2022-10-06 06:38:34,399 INFO [producer-thread-1] queue.IteratorBasedQueueProducer (IteratorBasedQueueProducer.java:produce(48)): finished buffering records 2022-10-06 06:38:34,402 INFO [consumer-thread-1] queue.BoundedInMemoryExecutor (BoundedInMemoryExecutor.java:lambda$null$2(135)): Queue Consumption is done; notifying producer threads 2022-10-06 06:39:14,505 ERROR [Executor task launch worker for task 0.0 in stage 39.1 (TID 18599)] executor.Executor (Logging.scala:logError(94)): Exception in task 0.0 in stage 39.1 (TID 18599) org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :5 at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244) at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102) at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386) at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384) at org.apache.spark.rdd.RDD.iterator(RDD.scala:335) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.hudi.exception.HoodieIOException: Unable to readFromDisk Hoodie Record from disk at org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:214) at org.apache.hudi.common.util.collection.LazyFileIterable$LazyFileIterator.next(LazyFileIterable.java:102) at org.apache.hudi.common.util.collection.ExternalSpillableMap$IteratorWrapper.next(ExternalSpillableMap.java:332) at org.apache.hudi.io.HoodieMergeHandle.writeIncomingRecords(HoodieMergeHandle.java:378) at org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:388) at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:154) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322) ... 28 more Caused by: java.io.IOException: Cannot allocate memory at java.io.RandomAccessFile.readBytes(Native Method) at java.io.RandomAccessFile.read(RandomAccessFile.java:377) at org.apache.hudi.common.util.BufferedRandomAccessFile.fillBuffer(BufferedRandomAccessFile.java:174) at org.apache.hudi.common.util.BufferedRandomAccessFile.seek(BufferedRandomAccessFile.java:220) at org.apache.hudi.common.util.BufferedRandomAccessFile.loadNewBlockToBuffer(BufferedRandomAccessFile.java:268) at org.apache.hudi.common.util.BufferedRandomAccessFile.read(BufferedRandomAccessFile.java:311) at java.io.RandomAccessFile.readFully(RandomAccessFile.java:436) at org.apache.hudi.common.util.SpillableMapUtils.readInternal(SpillableMapUtils.java:70) at org.apache.hudi.common.util.SpillableMapUtils.readBytesFromDisk(SpillableMapUtils.java:50) at org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:208) ... 36 more 2022-10-06 06:40:20,294 ERROR [shuffle-server-6-5] server.ChunkFetchRequestHandler (ChunkFetchRequestHandler.java:lambda$respond$1(148)): Error sending result ChunkFetchSuccess[streamChunkId=StreamChunkId[streamId=1351148232721,chunkIndex=0],buffer=FileSegmentManagedBuffer[file=/tmp/blockmgr-505d9da3-7ed3-4a7c-9f23-3b0464c6d313/09/shuffle_11_18220_0.data,offset=487986,length=1570101]] to /172.34.92.1:41572; closing connection java.io.IOException: Cannot allocate memory at sun.nio.ch.FileChannelImpl.transferTo0(Native Method) at sun.nio.ch.FileChannelImpl.transferToDirectlyInternal(FileChannelImpl.java:428) at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:493) at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:605) at io.netty.channel.DefaultFileRegion.transferTo(DefaultFileRegion.java:130) at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:121) at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:362) at io.netty.channel.nio.AbstractNioByteChannel.doWriteInternal(AbstractNioByteChannel.java:235) at io.netty.channel.nio.AbstractNioByteChannel.doWrite0(AbstractNioByteChannel.java:209) at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:400) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:930) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:354) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:897) at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728) at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:127) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750) at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765) at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790) at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758) at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:808) at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1025) at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:294) at org.apache.spark.network.server.ChunkFetchRequestHandler.respond(ChunkFetchRequestHandler.java:142) at org.apache.spark.network.server.ChunkFetchRequestHandler.processFetchRequest(ChunkFetchRequestHandler.java:116) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:107) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:750) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
