skdfeitian commented on issue #4958:
URL: https://github.com/apache/paimon/issues/4958#issuecomment-2602409498
To add more information: After adding the parameter
fs.hdfs.impl.disable.cache, the error Caused by: java.io.IOException:
Filesystem closed seems to have disappeared. It appears that this parameter is
very effective.
However, the occasional occurrence of the
java.nio.channels.ClosedChannelException error has not been resolved. I have
nearly 200 Flink-related Paimon tasks, and 4 different types of tasks report
this error. Some of them occur after running for dozens of days, but it is not
very frequent. It feels like there might be an issue with the HDFS
configuration, but we also have many real-time Flink tasks writing to HDFS, and
we haven't encountered similar issues.
For example:
(1)Data lake ingestion tasks from MySQL CDC to Paimon via
KafkaSyncDatabaseAction.
(2)Flink SQL tasks calculating PV and UV metrics and writing them to Paimon
tables.
(3)Tasks that use tableEnv.executeSql to execute insert into statements,
collecting Kafka data into Paimon tables.
(4)Tasks executed through paimon-flink-action.jar to perform compaction.
such as:
java.io.IOException: Could not perform checkpoint 14784 for operator Writer
: ods_analyser_realtime (1/20)#3.
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1256)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488)
at
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
at
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:118)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Encountered an error while do
compaction
at
org.apache.paimon.flink.sink.AppendOnlyTableCompactionWorkerOperator.prepareCommit(AppendOnlyTableCompactionWorkerOperator.java:101)
at
org.apache.paimon.flink.sink.PrepareCommitOperator.emitCommittables(PrepareCommitOperator.java:100)
at
org.apache.paimon.flink.sink.PrepareCommitOperator.prepareSnapshotPreBarrier(PrepareCommitOperator.java:80)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:321)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(StreamTask.java:1299)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1287)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1244)
... 22 more
Caused by: java.util.concurrent.ExecutionException:
java.nio.channels.ClosedChannelException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at
org.apache.paimon.flink.sink.AppendOnlyTableCompactionWorkerOperator.prepareCommit(AppendOnlyTableCompactionWorkerOperator.java:93)
... 30 more
Caused by: java.nio.channels.ClosedChannelException
at
org.apache.hadoop.hdfs.ExceptionLastSeen.throwException4Close(ExceptionLastSeen.java:73)
at
org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:158)
at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:106)
at
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:62)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at
org.apache.paimon.fs.hadoop.HadoopFileIO$HadoopPositionOutputStream.write(HadoopFileIO.java:300)
at
org.apache.paimon.format.parquet.writer.PositionOutputStreamAdapter.write(PositionOutputStreamAdapter.java:54)
at java.io.OutputStream.write(OutputStream.java:75)
at
org.apache.paimon.shade.org.apache.parquet.bytes.ConcatenatingByteArrayCollector.writeAllTo(ConcatenatingByteArrayCollector.java:46)
at
org.apache.paimon.shade.org.apache.parquet.hadoop.ParquetFileWriter.writeColumnChunk(ParquetFileWriter.java:903)
at
org.apache.paimon.shade.org.apache.parquet.hadoop.ParquetFileWriter.writeColumnChunk(ParquetFileWriter.java:848)
at
org.apache.paimon.shade.org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writeToFileWriter(ColumnChunkPageWriteStore.java:310)
at
org.apache.paimon.shade.org.apache.parquet.hadoop.ColumnChunkPageWriteStore.flushToFileWriter(ColumnChunkPageWriteStore.java:458)
at
org.apache.paimon.shade.org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:186)
at
org.apache.paimon.shade.org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:124)
at
org.apache.paimon.shade.org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:319)
at
org.apache.paimon.format.parquet.writer.ParquetBulkWriter.finish(ParquetBulkWriter.java:57)
at
org.apache.paimon.io.SingleFileWriter.close(SingleFileWriter.java:144)
at
org.apache.paimon.io.RowDataFileWriter.close(RowDataFileWriter.java:95)
at
org.apache.paimon.io.RollingFileWriter.closeCurrentWriter(RollingFileWriter.java:107)
at
org.apache.paimon.io.RollingFileWriter.close(RollingFileWriter.java:144)
at
org.apache.paimon.operation.AppendOnlyFileStoreWrite.lambda$compactRewriter$0(AppendOnlyFileStoreWrite.java:184)
at
org.apache.paimon.append.AppendOnlyCompactionTask.doCompact(AppendOnlyCompactionTask.java:64)
at
org.apache.paimon.flink.sink.AppendOnlyTableCompactionWorkerOperator.lambda$processElement$1(AppendOnlyTableCompactionWorkerOperator.java:108)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]