coolderli opened a new issue #4137:
URL: https://github.com/apache/iceberg/issues/4137
My Flink Job Failed to restore from the checkpoint, and throw the exception
as below:
`
2022-02-15 23:38:13.944 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph -
IcebergFilesCommitter -> Sink: IcebergSink
iceberg_zjyprc_hadoop.tmp.dws_diagnostice_flink_spark_test_copy (1/1)
(521e1e169598446f0353ceafaec499fa) switched from RUNNING to FAILED on
container_e36_1638340655551_39490_01_000002 @ zjy-hadoop-prc-streaming165.bj
(dataPort=46101).
org.apache.iceberg.exceptions.NotFoundException: Failed to open input stream
for file:
hdfs://zjyprc-hadoop/user/h_data_platform/datalake/tmp.db/dws_diagnostice_flink_spark_test/metadata/bac976d7f048dd39440ab9155dd5fd67-fbb4ef531e002f8fb3a2052db255adf5-00000-0-12-00011.avro
at
org.apache.iceberg.hadoop.HadoopInputFile.newStream(HadoopInputFile.java:177)
at
org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:101)
at
org.apache.iceberg.avro.AvroIterable.getMetadata(AvroIterable.java:66)
at org.apache.iceberg.ManifestReader.<init>(ManifestReader.java:103)
at org.apache.iceberg.ManifestFiles.read(ManifestFiles.java:93)
at org.apache.iceberg.ManifestFiles.read(ManifestFiles.java:77)
at
org.apache.iceberg.flink.sink.FlinkManifestUtil.readDataFiles(FlinkManifestUtil.java:59)
at
org.apache.iceberg.flink.sink.FlinkManifestUtil.readCompletedFiles(FlinkManifestUtil.java:106)
at
org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitUpToCheckpoint(IcebergFilesCommitter.java:243)
at
org.apache.iceberg.flink.sink.IcebergFilesCommitter.initializeState(IcebergFilesCommitter.java:179)
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:325)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:427)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:545)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:535)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:575)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: File does not exist:
/user/h_data_platform/datalake/tmp.db/dws_diagnostice_flink_spark_test/metadata/bac976d7f048dd39440ab9155dd5fd67-fbb4ef531e002f8fb3a2052db255adf5-00000-0-12-00011.avro
at
org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:86)
at
org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:76)
at
org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getBlockLocations(FSDirStatAndListingOp.java:155)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:2140)
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:845)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:489)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:532)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1070)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1125)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1053)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1805)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:3052)
`
After the verification, I found the downstream table has changed. In the
current implementation, we query the history of snapshots to find the max
committed checkpoint-id:
https://github.com/apache/iceberg/blob/master/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java#L369
If the downstream table has changed, the value of
`getMaxCommittedCheckpointId` is unpredictable. I think we can store the table
UUID on the checkpoint. When restoring, we can use the table UUID to validate
the downstream, throw an exception if the UUIDs are inconsistent.
What do you think about this? @stevenzwu @rdblue @rdblue
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]