skdfeitian commented on issue #4958:
URL: https://github.com/apache/paimon/issues/4958#issuecomment-2602409498

   To add more information: After adding the parameter 
fs.hdfs.impl.disable.cache, the error Caused by: java.io.IOException: 
Filesystem closed seems to have disappeared. It appears that this parameter is 
very effective.
   
   However, the occasional occurrence of the 
java.nio.channels.ClosedChannelException error has not been resolved. I have 
nearly 200 Flink-related Paimon tasks, and 4 different types of tasks report 
this error. Some of them occur after running for dozens of days, but it is not 
very frequent. It feels like there might be an issue with the HDFS 
configuration, but we also have many real-time Flink tasks writing to HDFS, and 
we haven't encountered similar issues.
   
   For example:
   (1)Data lake ingestion tasks from MySQL CDC to Paimon via 
KafkaSyncDatabaseAction.
   (2)Flink SQL tasks calculating PV and UV metrics and writing them to Paimon 
tables.
   (3)Tasks that use tableEnv.executeSql to execute insert into statements, 
collecting Kafka data into Paimon tables.
   (4)Tasks executed through paimon-flink-action.jar to perform compaction.
   
   such as:
   
   java.io.IOException: Could not perform checkpoint 14784 for operator Writer 
: ods_analyser_realtime (1/20)#3.
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1256)
        at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
        at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
        at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
        at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488)
        at 
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
        at 
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
        at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
        at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
        at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
        at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
        at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
        at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:118)
        at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: java.lang.RuntimeException: Encountered an error while do 
compaction
        at 
org.apache.paimon.flink.sink.AppendOnlyTableCompactionWorkerOperator.prepareCommit(AppendOnlyTableCompactionWorkerOperator.java:101)
        at 
org.apache.paimon.flink.sink.PrepareCommitOperator.emitCommittables(PrepareCommitOperator.java:100)
        at 
org.apache.paimon.flink.sink.PrepareCommitOperator.prepareSnapshotPreBarrier(PrepareCommitOperator.java:80)
        at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89)
        at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:321)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(StreamTask.java:1299)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1287)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1244)
        ... 22 more
   Caused by: java.util.concurrent.ExecutionException: 
java.nio.channels.ClosedChannelException
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at 
org.apache.paimon.flink.sink.AppendOnlyTableCompactionWorkerOperator.prepareCommit(AppendOnlyTableCompactionWorkerOperator.java:93)
        ... 30 more
   Caused by: java.nio.channels.ClosedChannelException
        at 
org.apache.hadoop.hdfs.ExceptionLastSeen.throwException4Close(ExceptionLastSeen.java:73)
        at 
org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:158)
        at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:106)
        at 
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:62)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at 
org.apache.paimon.fs.hadoop.HadoopFileIO$HadoopPositionOutputStream.write(HadoopFileIO.java:300)
        at 
org.apache.paimon.format.parquet.writer.PositionOutputStreamAdapter.write(PositionOutputStreamAdapter.java:54)
        at java.io.OutputStream.write(OutputStream.java:75)
        at 
org.apache.paimon.shade.org.apache.parquet.bytes.ConcatenatingByteArrayCollector.writeAllTo(ConcatenatingByteArrayCollector.java:46)
        at 
org.apache.paimon.shade.org.apache.parquet.hadoop.ParquetFileWriter.writeColumnChunk(ParquetFileWriter.java:903)
        at 
org.apache.paimon.shade.org.apache.parquet.hadoop.ParquetFileWriter.writeColumnChunk(ParquetFileWriter.java:848)
        at 
org.apache.paimon.shade.org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writeToFileWriter(ColumnChunkPageWriteStore.java:310)
        at 
org.apache.paimon.shade.org.apache.parquet.hadoop.ColumnChunkPageWriteStore.flushToFileWriter(ColumnChunkPageWriteStore.java:458)
        at 
org.apache.paimon.shade.org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:186)
        at 
org.apache.paimon.shade.org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:124)
        at 
org.apache.paimon.shade.org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:319)
        at 
org.apache.paimon.format.parquet.writer.ParquetBulkWriter.finish(ParquetBulkWriter.java:57)
        at 
org.apache.paimon.io.SingleFileWriter.close(SingleFileWriter.java:144)
        at 
org.apache.paimon.io.RowDataFileWriter.close(RowDataFileWriter.java:95)
        at 
org.apache.paimon.io.RollingFileWriter.closeCurrentWriter(RollingFileWriter.java:107)
        at 
org.apache.paimon.io.RollingFileWriter.close(RollingFileWriter.java:144)
        at 
org.apache.paimon.operation.AppendOnlyFileStoreWrite.lambda$compactRewriter$0(AppendOnlyFileStoreWrite.java:184)
        at 
org.apache.paimon.append.AppendOnlyCompactionTask.doCompact(AppendOnlyCompactionTask.java:64)
        at 
org.apache.paimon.flink.sink.AppendOnlyTableCompactionWorkerOperator.lambda$processElement$1(AppendOnlyTableCompactionWorkerOperator.java:108)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to