Toroidals commented on issue #13114:
URL: https://github.com/apache/hudi/issues/13114#issuecomment-2791391623

   @danny0405 **Please note**: My Flink job is running on YARN, and the storage 
system used is HDFS.
   
   The most frequent issue I encountered is the following:
   
   When writing to HDFS, I get the error:
   "Interrupted while waiting for data to be acknowledged by pipeline".
   I’ve checked that the HDFS cluster is functioning normally — other write 
processes (not involving Hudi) are working correctly.
   This problem only occurs during Hudi writes.
   
   I attempted to address it by adding the following configurations in 
org.apache.hudi.configuration.HadoopConfigurations, specifically in the 
getHadoopConf() and getParquetConf() methods:
   
   hadoopConf.set("dfs.client.socket-timeout", "1800000");
   hadoopConf.set("dfs.client.connect.timeout", "120000");
   hadoopConf.set("dfs.client.block.write.locateFollowingBlock.retries", "12");
   hadoopConf.set("dfs.client.block.write.replace-datanode-on-failure.policy", 
"DEFAULT");
   hadoopConf.set("dfs.client.block.write.replace-datanode-on-failure.enable", 
"true");
   hadoopConf.setBoolean("dfs.client.write.replica-pipeline-finalize", false);
   
   copy.set("dfs.client.socket-timeout", "1800000");
   copy.set("dfs.client.connect.timeout", "120000");
   copy.set("dfs.client.block.write.locateFollowingBlock.retries", "12");
   copy.set("dfs.client.block.write.replace-datanode-on-failure.policy", 
"DEFAULT");
   copy.set("dfs.client.block.write.replace-datanode-on-failure.enable", 
"true");
   copy.setBoolean("dfs.client.write.replica-pipeline-finalize", false);
   However, these changes had very limited effect, and the issue still persists.
   
   The final error seems to be caused by asynchronous compaction, and the 
exception was not properly caught.
   
   `2025-04-09 21:18:53,163 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - 
Un-registering task and sending final execution state CANCELED to JobManager 
for task Map -> row_data_to_hoodie_record (2/3)#0 
a635b57c048c14ea24a8375d01c54e0c.
   2025-04-09 21:19:08,167 WARN  org.apache.hadoop.hdfs.DataStreamer            
              [] - DataStreamer Exception
   java.io.InterruptedIOException: Interrupted while waiting for IO on channel 
java.nio.channels.SocketChannel[connected local=/192.168.74.182:54472 
remote=/192.168.74.182:1019]. Total timeout mills is 500000, 499997 millis 
timeout left.
           at 
org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:350)
 ~[hadoop-client-api-3.3.4.jar:?]
           at 
org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:156) 
~[hadoop-client-api-3.3.4.jar:?]
           at 
org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:158) 
~[hadoop-client-api-3.3.4.jar:?]
           at 
org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:116) 
~[hadoop-client-api-3.3.4.jar:?]
           at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) 
~[?:1.8.0_211]
           at java.io.DataOutputStream.write(DataOutputStream.java:107) 
~[?:1.8.0_211]
           at org.apache.hadoop.hdfs.DFSPacket.writeTo(DFSPacket.java:193) 
~[hadoop-client-api-3.3.4.jar:?]
           at 
org.apache.hadoop.hdfs.DataStreamer.sendPacket(DataStreamer.java:857) 
~[hadoop-client-api-3.3.4.jar:?]
           at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:762) 
[hadoop-client-api-3.3.4.jar:?]
   2025-04-09 21:19:08,181 ERROR 
org.apache.hudi.common.util.queue.SimpleExecutor             [] - Failed 
consuming records
   org.apache.hudi.exception.HoodieUpsertException: Failed to close UpdateHandle
           at 
org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:480) 
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.finish(BaseMergeHelper.java:59)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.finish(BaseMergeHelper.java:44)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:72)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at org.apache.hudi.table.HoodieTable.runMerge(HoodieTable.java:1139) 
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.hudi.table.HoodieFlinkCopyOnWriteTable.handleUpdateInternal(HoodieFlinkCopyOnWriteTable.java:424)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.hudi.table.HoodieFlinkCopyOnWriteTable.handleUpdate(HoodieFlinkCopyOnWriteTable.java:419)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.hudi.table.action.compact.CompactionExecutionHelper.writeFileAndGetWriteStats(CompactionExecutionHelper.java:63)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.hudi.table.action.compact.HoodieCompactor.compact(HoodieCompactor.java:269)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.hudi.table.action.compact.HoodieCompactor.compact(HoodieCompactor.java:179)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.hudi.sink.compact.CompactOperator.doCompaction(CompactOperator.java:142)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.hudi.sink.compact.CompactOperator.lambda$processElement$0(CompactOperator.java:124)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_211]
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_211]
           at java.lang.Thread.run(Thread.java:748) [?:1.8.0_211]
   Caused by: java.io.InterruptedIOException: Interrupted while waiting for 
data to be acknowledged by pipeline
           at 
org.apache.hadoop.hdfs.DataStreamer.waitForAckedSeqno(DataStreamer.java:937) 
~[hadoop-client-api-3.3.4.jar:?]
           at 
org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:778) 
~[hadoop-client-api-3.3.4.jar:?]
           at 
org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:888) 
~[hadoop-client-api-3.3.4.jar:?]
           at 
org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:847) 
~[hadoop-client-api-3.3.4.jar:?]
           at 
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:77)
 ~[hadoop-client-api-3.3.4.jar:?]
           at 
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) 
~[hadoop-client-api-3.3.4.jar:?]
           at 
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:77)
 ~[hadoop-client-api-3.3.4.jar:?]
           at 
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) 
~[hadoop-client-api-3.3.4.jar:?]
           at 
org.apache.hudi.hadoop.fs.SizeAwareFSDataOutputStream.close(SizeAwareFSDataOutputStream.java:79)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.parquet.hadoop.util.HadoopPositionOutputStream.close(HadoopPositionOutputStream.java:64)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:1106) 
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:132)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:319) 
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.hudi.io.hadoop.HoodieBaseParquetWriter.close(HoodieBaseParquetWriter.java:164)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.hudi.io.hadoop.HoodieAvroParquetWriter.close(HoodieAvroParquetWriter.java:88)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:456) 
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           ... 16 more
   2025-04-09 21:19:08,182 ERROR org.apache.hudi.sink.compact.CompactOperator   
              [] - Executor executes action [Execute compaction for instant 
20250409210624395 from task 1] error
   org.apache.hudi.exception.HoodieException: 
org.apache.hudi.exception.HoodieException: 
org.apache.hudi.exception.HoodieUpsertException: Failed to close UpdateHandle
           at 
org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:151)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at org.apache.hudi.table.HoodieTable.runMerge(HoodieTable.java:1139) 
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.hudi.table.HoodieFlinkCopyOnWriteTable.handleUpdateInternal(HoodieFlinkCopyOnWriteTable.java:424)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.hudi.table.HoodieFlinkCopyOnWriteTable.handleUpdate(HoodieFlinkCopyOnWriteTable.java:419)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.hudi.table.action.compact.CompactionExecutionHelper.writeFileAndGetWriteStats(CompactionExecutionHelper.java:63)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.hudi.table.action.compact.HoodieCompactor.compact(HoodieCompactor.java:269)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.hudi.table.action.compact.HoodieCompactor.compact(HoodieCompactor.java:179)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.hudi.sink.compact.CompactOperator.doCompaction(CompactOperator.java:142)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.hudi.sink.compact.CompactOperator.lambda$processElement$0(CompactOperator.java:124)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_211]
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_211]
           at java.lang.Thread.run(Thread.java:748) [?:1.8.0_211]
   Caused by: org.apache.hudi.exception.HoodieException: 
org.apache.hudi.exception.HoodieUpsertException: Failed to close UpdateHandle
           at 
org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:75)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           ... 12 more
   Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to close 
UpdateHandle
           at 
org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:480) 
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.finish(BaseMergeHelper.java:59)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.finish(BaseMergeHelper.java:44)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:72)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           ... 12 more
   Caused by: java.io.InterruptedIOException: Interrupted while waiting for 
data to be acknowledged by pipeline
           at 
org.apache.hadoop.hdfs.DataStreamer.waitForAckedSeqno(DataStreamer.java:937) 
~[hadoop-client-api-3.3.4.jar:?]
           at 
org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:778) 
~[hadoop-client-api-3.3.4.jar:?]
           at 
org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:888) 
~[hadoop-client-api-3.3.4.jar:?]
           at 
org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:847) 
~[hadoop-client-api-3.3.4.jar:?]
           at 
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:77)
 ~[hadoop-client-api-3.3.4.jar:?]
           at 
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) 
~[hadoop-client-api-3.3.4.jar:?]
           at 
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:77)
 ~[hadoop-client-api-3.3.4.jar:?]
           at 
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) 
~[hadoop-client-api-3.3.4.jar:?]
           at 
org.apache.hudi.hadoop.fs.SizeAwareFSDataOutputStream.close(SizeAwareFSDataOutputStream.java:79)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.parquet.hadoop.util.HadoopPositionOutputStream.close(HadoopPositionOutputStream.java:64)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:1106) 
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:132)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:319) 
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.hudi.io.hadoop.HoodieBaseParquetWriter.close(HoodieBaseParquetWriter.java:164)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.hudi.io.hadoop.HoodieAvroParquetWriter.close(HoodieAvroParquetWriter.java:88)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:456) 
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.finish(BaseMergeHelper.java:59)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.finish(BaseMergeHelper.java:44)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:72)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           ... 12 more
   2025-04-09 21:19:08,183 ERROR 
org.apache.flink.runtime.util.ClusterUncaughtExceptionHandler [] - WARNING: 
Thread 'pool-19-thread-1' produced an uncaught exception. If you want to fail 
on uncaught exceptions, then configure cluster.uncaught-exception-handling 
accordingly
   org.apache.flink.runtime.execution.CancelTaskException: Buffer pool has 
already been destroyed.
           at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.checkDestroyed(LocalBufferPool.java:404)
 ~[flink-dist-1.15.2.jar:1.15.2]
           at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:373)
 ~[flink-dist-1.15.2.jar:1.15.2]
           at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilder(LocalBufferPool.java:316)
 ~[flink-dist-1.15.2.jar:1.15.2]
           at 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:394)
 ~[flink-dist-1.15.2.jar:1.15.2]
           at 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:377)
 ~[flink-dist-1.15.2.jar:1.15.2]
           at 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForNewRecord(BufferWritingResultPartition.java:281)
 ~[flink-dist-1.15.2.jar:1.15.2]
           at 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:157)
 ~[flink-dist-1.15.2.jar:1.15.2]
           at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:106)
 ~[flink-dist-1.15.2.jar:1.15.2]
           at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54)
 ~[flink-dist-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:104)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:90)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:44)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.hudi.adapter.MaskingOutputAdapter.collect(MaskingOutputAdapter.java:60)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.hudi.adapter.MaskingOutputAdapter.collect(MaskingOutputAdapter.java:30)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
           at 
org.apache.flink.table.runtime.util.StreamRecordCollector.collect(StreamRecordCollector.java:44)
 ~[flink-table-runtime-1.15.2.jar:1.15.2]
           at 
org.apache.hudi.sink.compact.CompactOperator.lambda$processElement$1(CompactOperator.java:125)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.hudi.sink.utils.NonThrownExecutor.handleException(NonThrownExecutor.java:142)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:133)
 ~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_211]
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_211]
           at java.lang.Thread.run(Thread.java:748) [?:1.8.0_211]`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to