moon-fall opened a new pull request #3647:
URL: https://github.com/apache/iceberg/pull/3647
when flink job restore from checkpoint,IcebergFilesCommitter read the
maxCommittedCheckpointId from summary in metadata file, and only commit the
uncommitted DataFiles from maxCommittedCheckpointId +1,get
maxCommittedCheckpointId will traverse iceberg table's snapshots until get the
maxCommittedCheckpointId which is not null,but if after rewrite data files
,expire snapshots and only retain the latest snapshot after rewrite data files,
will lost the maxCommittedCheckpointId info and the flink job restore operation
will repeatedly commit the datafile that has committed,then an error will be
reported like this because manifest file has been deleted:
> org.apache.iceberg.exceptions.NotFoundException: Failed to open input
stream for file:
file:/tmp/iceberg10/metadata/41b2614f36cb4fff57efbecd063827ba-00000-0-3-00003.avro
at
org.apache.iceberg.hadoop.HadoopInputFile.newStream(HadoopInputFile.java:177)
at
org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:101)
at
org.apache.iceberg.avro.AvroIterable.getMetadata(AvroIterable.java:66)
at org.apache.iceberg.ManifestReader.<init>(ManifestReader.java:103)
at org.apache.iceberg.ManifestFiles.read(ManifestFiles.java:87)
at org.apache.iceberg.ManifestFiles.read(ManifestFiles.java:71)
at
org.apache.iceberg.flink.sink.FlinkManifestUtil.readDataFiles(FlinkManifestUtil.java:59)
at
org.apache.iceberg.flink.sink.FlinkManifestUtil.readCompletedFiles(FlinkManifestUtil.java:105)
at
org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitUpToCheckpoint(IcebergFilesCommitter.java:207)
at
org.apache.iceberg.flink.sink.IcebergFilesCommitter.initializeState(IcebergFilesCommitter.java:153)
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:432)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:545)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:535)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:575)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: File
file:/tmp/iceberg10/metadata/41b2614f36cb4fff57efbecd063827ba-00000-0-3-00003.avro
does not exist
at
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
at
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
at
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
at
org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
at
org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:142)
at
org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:346)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
at
org.apache.iceberg.hadoop.HadoopInputFile.newStream(HadoopInputFile.java:175)
... 19 more
i think a way to ensure restore flink job from checkpoint success is to
retain flinkCheckpointInfo when rewrite datafiles.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]