Toroidals commented on issue #13114:
URL: https://github.com/apache/hudi/issues/13114#issuecomment-2791391623
@danny0405 **Please note**: My Flink job is running on YARN, and the storage
system used is HDFS.
The most frequent issue I encountered is the following:
When writing to HDFS, I get the error:
"Interrupted while waiting for data to be acknowledged by pipeline".
I’ve checked that the HDFS cluster is functioning normally — other write
processes (not involving Hudi) are working correctly.
This problem only occurs during Hudi writes.
I attempted to address it by adding the following configurations in
org.apache.hudi.configuration.HadoopConfigurations, specifically in the
getHadoopConf() and getParquetConf() methods:
hadoopConf.set("dfs.client.socket-timeout", "1800000");
hadoopConf.set("dfs.client.connect.timeout", "120000");
hadoopConf.set("dfs.client.block.write.locateFollowingBlock.retries", "12");
hadoopConf.set("dfs.client.block.write.replace-datanode-on-failure.policy",
"DEFAULT");
hadoopConf.set("dfs.client.block.write.replace-datanode-on-failure.enable",
"true");
hadoopConf.setBoolean("dfs.client.write.replica-pipeline-finalize", false);
copy.set("dfs.client.socket-timeout", "1800000");
copy.set("dfs.client.connect.timeout", "120000");
copy.set("dfs.client.block.write.locateFollowingBlock.retries", "12");
copy.set("dfs.client.block.write.replace-datanode-on-failure.policy",
"DEFAULT");
copy.set("dfs.client.block.write.replace-datanode-on-failure.enable",
"true");
copy.setBoolean("dfs.client.write.replica-pipeline-finalize", false);
However, these changes had very limited effect, and the issue still persists.
The final error seems to be caused by asynchronous compaction, and the
exception was not properly caught.
`2025-04-09 21:18:53,163 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor [] -
Un-registering task and sending final execution state CANCELED to JobManager
for task Map -> row_data_to_hoodie_record (2/3)#0
a635b57c048c14ea24a8375d01c54e0c.
2025-04-09 21:19:08,167 WARN org.apache.hadoop.hdfs.DataStreamer
[] - DataStreamer Exception
java.io.InterruptedIOException: Interrupted while waiting for IO on channel
java.nio.channels.SocketChannel[connected local=/192.168.74.182:54472
remote=/192.168.74.182:1019]. Total timeout mills is 500000, 499997 millis
timeout left.
at
org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:350)
~[hadoop-client-api-3.3.4.jar:?]
at
org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:156)
~[hadoop-client-api-3.3.4.jar:?]
at
org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:158)
~[hadoop-client-api-3.3.4.jar:?]
at
org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:116)
~[hadoop-client-api-3.3.4.jar:?]
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
~[?:1.8.0_211]
at java.io.DataOutputStream.write(DataOutputStream.java:107)
~[?:1.8.0_211]
at org.apache.hadoop.hdfs.DFSPacket.writeTo(DFSPacket.java:193)
~[hadoop-client-api-3.3.4.jar:?]
at
org.apache.hadoop.hdfs.DataStreamer.sendPacket(DataStreamer.java:857)
~[hadoop-client-api-3.3.4.jar:?]
at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:762)
[hadoop-client-api-3.3.4.jar:?]
2025-04-09 21:19:08,181 ERROR
org.apache.hudi.common.util.queue.SimpleExecutor [] - Failed
consuming records
org.apache.hudi.exception.HoodieUpsertException: Failed to close UpdateHandle
at
org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:480)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.finish(BaseMergeHelper.java:59)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.finish(BaseMergeHelper.java:44)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:72)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at org.apache.hudi.table.HoodieTable.runMerge(HoodieTable.java:1139)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.hudi.table.HoodieFlinkCopyOnWriteTable.handleUpdateInternal(HoodieFlinkCopyOnWriteTable.java:424)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.hudi.table.HoodieFlinkCopyOnWriteTable.handleUpdate(HoodieFlinkCopyOnWriteTable.java:419)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.hudi.table.action.compact.CompactionExecutionHelper.writeFileAndGetWriteStats(CompactionExecutionHelper.java:63)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.hudi.table.action.compact.HoodieCompactor.compact(HoodieCompactor.java:269)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.hudi.table.action.compact.HoodieCompactor.compact(HoodieCompactor.java:179)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.hudi.sink.compact.CompactOperator.doCompaction(CompactOperator.java:142)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.hudi.sink.compact.CompactOperator.lambda$processElement$0(CompactOperator.java:124)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_211]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_211]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_211]
Caused by: java.io.InterruptedIOException: Interrupted while waiting for
data to be acknowledged by pipeline
at
org.apache.hadoop.hdfs.DataStreamer.waitForAckedSeqno(DataStreamer.java:937)
~[hadoop-client-api-3.3.4.jar:?]
at
org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:778)
~[hadoop-client-api-3.3.4.jar:?]
at
org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:888)
~[hadoop-client-api-3.3.4.jar:?]
at
org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:847)
~[hadoop-client-api-3.3.4.jar:?]
at
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:77)
~[hadoop-client-api-3.3.4.jar:?]
at
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
~[hadoop-client-api-3.3.4.jar:?]
at
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:77)
~[hadoop-client-api-3.3.4.jar:?]
at
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
~[hadoop-client-api-3.3.4.jar:?]
at
org.apache.hudi.hadoop.fs.SizeAwareFSDataOutputStream.close(SizeAwareFSDataOutputStream.java:79)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.parquet.hadoop.util.HadoopPositionOutputStream.close(HadoopPositionOutputStream.java:64)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:1106)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:132)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:319)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.hudi.io.hadoop.HoodieBaseParquetWriter.close(HoodieBaseParquetWriter.java:164)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.hudi.io.hadoop.HoodieAvroParquetWriter.close(HoodieAvroParquetWriter.java:88)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:456)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
... 16 more
2025-04-09 21:19:08,182 ERROR org.apache.hudi.sink.compact.CompactOperator
[] - Executor executes action [Execute compaction for instant
20250409210624395 from task 1] error
org.apache.hudi.exception.HoodieException:
org.apache.hudi.exception.HoodieException:
org.apache.hudi.exception.HoodieUpsertException: Failed to close UpdateHandle
at
org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:151)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at org.apache.hudi.table.HoodieTable.runMerge(HoodieTable.java:1139)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.hudi.table.HoodieFlinkCopyOnWriteTable.handleUpdateInternal(HoodieFlinkCopyOnWriteTable.java:424)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.hudi.table.HoodieFlinkCopyOnWriteTable.handleUpdate(HoodieFlinkCopyOnWriteTable.java:419)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.hudi.table.action.compact.CompactionExecutionHelper.writeFileAndGetWriteStats(CompactionExecutionHelper.java:63)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.hudi.table.action.compact.HoodieCompactor.compact(HoodieCompactor.java:269)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.hudi.table.action.compact.HoodieCompactor.compact(HoodieCompactor.java:179)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.hudi.sink.compact.CompactOperator.doCompaction(CompactOperator.java:142)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.hudi.sink.compact.CompactOperator.lambda$processElement$0(CompactOperator.java:124)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_211]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_211]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_211]
Caused by: org.apache.hudi.exception.HoodieException:
org.apache.hudi.exception.HoodieUpsertException: Failed to close UpdateHandle
at
org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:75)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
... 12 more
Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to close
UpdateHandle
at
org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:480)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.finish(BaseMergeHelper.java:59)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.finish(BaseMergeHelper.java:44)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:72)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
... 12 more
Caused by: java.io.InterruptedIOException: Interrupted while waiting for
data to be acknowledged by pipeline
at
org.apache.hadoop.hdfs.DataStreamer.waitForAckedSeqno(DataStreamer.java:937)
~[hadoop-client-api-3.3.4.jar:?]
at
org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:778)
~[hadoop-client-api-3.3.4.jar:?]
at
org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:888)
~[hadoop-client-api-3.3.4.jar:?]
at
org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:847)
~[hadoop-client-api-3.3.4.jar:?]
at
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:77)
~[hadoop-client-api-3.3.4.jar:?]
at
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
~[hadoop-client-api-3.3.4.jar:?]
at
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:77)
~[hadoop-client-api-3.3.4.jar:?]
at
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
~[hadoop-client-api-3.3.4.jar:?]
at
org.apache.hudi.hadoop.fs.SizeAwareFSDataOutputStream.close(SizeAwareFSDataOutputStream.java:79)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.parquet.hadoop.util.HadoopPositionOutputStream.close(HadoopPositionOutputStream.java:64)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:1106)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:132)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:319)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.hudi.io.hadoop.HoodieBaseParquetWriter.close(HoodieBaseParquetWriter.java:164)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.hudi.io.hadoop.HoodieAvroParquetWriter.close(HoodieAvroParquetWriter.java:88)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:456)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.finish(BaseMergeHelper.java:59)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.finish(BaseMergeHelper.java:44)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:72)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
... 12 more
2025-04-09 21:19:08,183 ERROR
org.apache.flink.runtime.util.ClusterUncaughtExceptionHandler [] - WARNING:
Thread 'pool-19-thread-1' produced an uncaught exception. If you want to fail
on uncaught exceptions, then configure cluster.uncaught-exception-handling
accordingly
org.apache.flink.runtime.execution.CancelTaskException: Buffer pool has
already been destroyed.
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.checkDestroyed(LocalBufferPool.java:404)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:373)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilder(LocalBufferPool.java:316)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:394)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:377)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForNewRecord(BufferWritingResultPartition.java:281)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:157)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:106)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:104)
~[flink-streaming-java-1.15.2.jar:1.15.2]
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:90)
~[flink-streaming-java-1.15.2.jar:1.15.2]
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:44)
~[flink-streaming-java-1.15.2.jar:1.15.2]
at
org.apache.hudi.adapter.MaskingOutputAdapter.collect(MaskingOutputAdapter.java:60)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.hudi.adapter.MaskingOutputAdapter.collect(MaskingOutputAdapter.java:30)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
~[flink-streaming-java-1.15.2.jar:1.15.2]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
~[flink-streaming-java-1.15.2.jar:1.15.2]
at
org.apache.flink.table.runtime.util.StreamRecordCollector.collect(StreamRecordCollector.java:44)
~[flink-table-runtime-1.15.2.jar:1.15.2]
at
org.apache.hudi.sink.compact.CompactOperator.lambda$processElement$1(CompactOperator.java:125)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.hudi.sink.utils.NonThrownExecutor.handleException(NonThrownExecutor.java:142)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:133)
~[hudi-flink1.15-bundle-1.0.0.jar:1.0.0]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
~[?:1.8.0_211]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
~[?:1.8.0_211]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_211]`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]