HEPBO3AH opened a new issue, #7062:
URL: https://github.com/apache/hudi/issues/7062

   **Describe the problem you faced**
   
   During the run of the upsert, we sometimes have executors die due to the 
memory issue. This issue usally cannot be resolved over the 4 retries and the 
entire job fail. Please look at the executor stack trace below.
   
   We tracked this issue down to the `HoodieMergeHandle` class. Please have a 
look at the `To Reproduce` section for more details.
   
   **To Reproduce**
   
   We had hard time replicating the issue in a small example which we can share 
with the Hudi team.
   The main condition for the issue to appear was **appending data to exsiting 
small files during upsert**.
   
   For us, the appending of the data cupled with an `AvgRecordSize` estimation 
[being too small due to clustering](https://github.com/apache/hudi/issues/5939) 
triggered the OOM issue quite often. After the fix of the `AvgRecordSize` 
estimate this issue is still there but it was not as prevalent.
   
   In our code conitions are following:
   ```
   PARQUET_SMALL_FILE_LIMIT = 67108864 //64MB
   PARQUET_MAX_FILE_SIZE = 83886080 //80MB
   INLINE_CLUSTERING = true
   INLINE_CLUSTERING_MAX_COMMITS = 2
   PLAN_STRATEGY_SMALL_FILE_LIMIT = 73400320 //70MB
   PLAN_STRATEGY_TARGET_FILE_MAX_BYTES = 209715200 //200MB
   TABLE_TYPE = COW
   INDEX_TYPE = BLOOM
   ```
   
   When the `AvgRecordSize` estimate was wrong, the Hudi would it can pack many 
more records into a single file. Those executors which didn't fail produced 
files of 250MB in size, well above the target of 80MB.
   
   We tracked this issue down to the `HoodieMergeHandle` class.
   
   **Why we suspect there there is an issue with  this class?**
   The reason we think there is an issue with the class is that when we change 
settings to following:
   ```
   PARQUET_MAX_FILE_SIZE = 262144000 // 250MB
   PARQUET_SMALL_FILE_LIMIT = 0 // dont create small files
   ```
   We noticed that the class `HoodieMergeHandle` is not being used due to 
`PARQUET_SMALL_FILE_LIMIT = 0` and the job passes successfully.
   **This indicates that the problem is not in the amount of data being 
processed by single executor but rather the part which appends to files**.
   
   **Expected behavior**
   
   When appending to exsiting files the job should succeed similar to how it 
does when writing large files.
   
   **Environment Description**
   
   AWS Glue 3.0 running on G2X machines
   
   * Hudi version : 0.11
   
   * Spark version : 3.1
   
   * Hive version : NA
   
   * Hadoop version : N/A
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : No
   
   **Additional context**
   
   Add any other context about the problem here.
   
   **Stacktrace**
   
   ```
   2022-10-06 06:38:15,938 INFO [Executor task launch worker for task 0.0 in 
stage 39.1 (TID 18599)] io.HoodieMergeHandle 
(HoodieMergeHandle.java:init(249)): Number of entries in MemoryBasedMap => 
601116, Total size in bytes of MemoryBasedMap => 858994816, Number of entries 
in BitCaskDiskMap => 5477112, Size of file spilled to disk => 5073839525  
   2022-10-06 06:38:15,939 INFO [Executor task launch worker for task 0.0 in 
stage 39.1 (TID 18599)] io.HoodieMergeHandle 
(HoodieMergeHandle.java:init(161)): partitionPath:<pathtopartition>, fileId to 
be merged:20404289-5191-4634-b3bb-217d1f341940-0  
   
   2022-10-06 06:38:15,963 INFO [Executor task launch worker for task 0.0 in 
stage 39.1 (TID 18599)] io.HoodieMergeHandle 
(HoodieMergeHandle.java:init(177)): Merging new data into oldPath 
s3a://<oldparquetfile>, as newPath s3a://<newparquetfile>  
   
   2022-10-06 06:38:16,000 INFO [Executor task launch worker for task 0.0 in 
stage 39.1 (TID 18599)] marker.DirectWriteMarkers 
(DirectWriteMarkers.java:create(173)): Creating Marker 
Path=s3a://<pathtomarker>.parquet.marker.MERGE  
   
   2022-10-06 06:38:16,209 INFO [Executor task launch worker for task 0.0 in 
stage 39.1 (TID 18599)] marker.DirectWriteMarkers 
(DirectWriteMarkers.java:create(178)): [direct] Created marker file 
s3a://<pathtomarker>.parquet.marker.MERGE in 246 ms  
   
   2022-10-06 06:38:16,419 INFO [producer-thread-1] 
queue.IteratorBasedQueueProducer (IteratorBasedQueueProducer.java:produce(44)): 
starting to buffer records  
   
   2022-10-06 06:38:16,419 INFO [consumer-thread-1] 
queue.BoundedInMemoryExecutor 
(BoundedInMemoryExecutor.java:lambda$null$2(131)): starting consumer thread  
   
   2022-10-06 06:38:16,460 INFO [producer-thread-1] s3a.S3AInputStream 
(S3AInputStream.java:seekInStream(304)): Switching to Random IO seek policy  
   
   2022-10-06 06:38:34,399 INFO [producer-thread-1] 
queue.IteratorBasedQueueProducer (IteratorBasedQueueProducer.java:produce(48)): 
finished buffering records  
   
   2022-10-06 06:38:34,402 INFO [consumer-thread-1] 
queue.BoundedInMemoryExecutor 
(BoundedInMemoryExecutor.java:lambda$null$2(135)): Queue Consumption is done; 
notifying producer threads
   2022-10-06 06:39:14,505 ERROR [Executor task launch worker for task 0.0 in 
stage 39.1 (TID 18599)] executor.Executor (Logging.scala:logError(94)): 
Exception in task 0.0 in stage 39.1 (TID 18599)
   org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType 
UPDATE for partition :5
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244)
        at 
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
        at 
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
        at 
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)
        at 
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
        at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
        at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   Caused by: org.apache.hudi.exception.HoodieIOException: Unable to 
readFromDisk Hoodie Record from disk
        at 
org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:214)
        at 
org.apache.hudi.common.util.collection.LazyFileIterable$LazyFileIterator.next(LazyFileIterable.java:102)
        at 
org.apache.hudi.common.util.collection.ExternalSpillableMap$IteratorWrapper.next(ExternalSpillableMap.java:332)
        at 
org.apache.hudi.io.HoodieMergeHandle.writeIncomingRecords(HoodieMergeHandle.java:378)
        at 
org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:388)
        at 
org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:154)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
        ... 28 more
   Caused by: java.io.IOException: Cannot allocate memory
        at java.io.RandomAccessFile.readBytes(Native Method)
        at java.io.RandomAccessFile.read(RandomAccessFile.java:377)
        at 
org.apache.hudi.common.util.BufferedRandomAccessFile.fillBuffer(BufferedRandomAccessFile.java:174)
        at 
org.apache.hudi.common.util.BufferedRandomAccessFile.seek(BufferedRandomAccessFile.java:220)
        at 
org.apache.hudi.common.util.BufferedRandomAccessFile.loadNewBlockToBuffer(BufferedRandomAccessFile.java:268)
        at 
org.apache.hudi.common.util.BufferedRandomAccessFile.read(BufferedRandomAccessFile.java:311)
        at java.io.RandomAccessFile.readFully(RandomAccessFile.java:436)
        at 
org.apache.hudi.common.util.SpillableMapUtils.readInternal(SpillableMapUtils.java:70)
        at 
org.apache.hudi.common.util.SpillableMapUtils.readBytesFromDisk(SpillableMapUtils.java:50)
        at 
org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:208)
        ... 36 more
   2022-10-06 06:40:20,294 ERROR [shuffle-server-6-5] 
server.ChunkFetchRequestHandler 
(ChunkFetchRequestHandler.java:lambda$respond$1(148)): Error sending result 
ChunkFetchSuccess[streamChunkId=StreamChunkId[streamId=1351148232721,chunkIndex=0],buffer=FileSegmentManagedBuffer[file=/tmp/blockmgr-505d9da3-7ed3-4a7c-9f23-3b0464c6d313/09/shuffle_11_18220_0.data,offset=487986,length=1570101]]
 to /172.34.92.1:41572; closing connection
   java.io.IOException: Cannot allocate memory
        at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
        at 
sun.nio.ch.FileChannelImpl.transferToDirectlyInternal(FileChannelImpl.java:428)
        at 
sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:493)
        at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:605)
        at 
io.netty.channel.DefaultFileRegion.transferTo(DefaultFileRegion.java:130)
        at 
org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:121)
        at 
io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:362)
        at 
io.netty.channel.nio.AbstractNioByteChannel.doWriteInternal(AbstractNioByteChannel.java:235)
        at 
io.netty.channel.nio.AbstractNioByteChannel.doWrite0(AbstractNioByteChannel.java:209)
        at 
io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:400)
        at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:930)
        at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:354)
        at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:897)
        at 
io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742)
        at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728)
        at 
io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:127)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
        at 
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
        at 
io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
        at 
io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:808)
        at 
io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1025)
        at 
io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:294)
        at 
org.apache.spark.network.server.ChunkFetchRequestHandler.respond(ChunkFetchRequestHandler.java:142)
        at 
org.apache.spark.network.server.ChunkFetchRequestHandler.processFetchRequest(ChunkFetchRequestHandler.java:116)
        at 
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:107)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
        at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:750)
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to