coolderli opened a new issue #4137:
URL: https://github.com/apache/iceberg/issues/4137


   My Flink Job Failed to restore from the checkpoint, and throw the exception 
as below:
   `
   2022-02-15 23:38:13.944 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       - 
IcebergFilesCommitter -> Sink: IcebergSink 
iceberg_zjyprc_hadoop.tmp.dws_diagnostice_flink_spark_test_copy (1/1) 
(521e1e169598446f0353ceafaec499fa) switched from RUNNING to FAILED on 
container_e36_1638340655551_39490_01_000002 @ zjy-hadoop-prc-streaming165.bj 
(dataPort=46101).
   org.apache.iceberg.exceptions.NotFoundException: Failed to open input stream 
for file: 
hdfs://zjyprc-hadoop/user/h_data_platform/datalake/tmp.db/dws_diagnostice_flink_spark_test/metadata/bac976d7f048dd39440ab9155dd5fd67-fbb4ef531e002f8fb3a2052db255adf5-00000-0-12-00011.avro
        at 
org.apache.iceberg.hadoop.HadoopInputFile.newStream(HadoopInputFile.java:177)
        at 
org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:101)
        at 
org.apache.iceberg.avro.AvroIterable.getMetadata(AvroIterable.java:66)
        at org.apache.iceberg.ManifestReader.<init>(ManifestReader.java:103)
        at org.apache.iceberg.ManifestFiles.read(ManifestFiles.java:93)
        at org.apache.iceberg.ManifestFiles.read(ManifestFiles.java:77)
        at 
org.apache.iceberg.flink.sink.FlinkManifestUtil.readDataFiles(FlinkManifestUtil.java:59)
        at 
org.apache.iceberg.flink.sink.FlinkManifestUtil.readCompletedFiles(FlinkManifestUtil.java:106)
        at 
org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitUpToCheckpoint(IcebergFilesCommitter.java:243)
        at 
org.apache.iceberg.flink.sink.IcebergFilesCommitter.initializeState(IcebergFilesCommitter.java:179)
        at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:325)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:427)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:545)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:535)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:575)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: java.io.FileNotFoundException: File does not exist: 
/user/h_data_platform/datalake/tmp.db/dws_diagnostice_flink_spark_test/metadata/bac976d7f048dd39440ab9155dd5fd67-fbb4ef531e002f8fb3a2052db255adf5-00000-0-12-00011.avro
        at 
org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:86)
        at 
org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:76)
        at 
org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getBlockLocations(FSDirStatAndListingOp.java:155)
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:2140)
        at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:845)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:489)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:532)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1070)
        at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1125)
        at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1053)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1805)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:3052)
   `
   
   After the verification, I found the downstream table has changed. In the 
current implementation, we query the history of snapshots to find the max 
committed checkpoint-id: 
https://github.com/apache/iceberg/blob/master/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java#L369
   If the downstream table has changed, the value of 
`getMaxCommittedCheckpointId` is unpredictable. I think we can store the table 
UUID on the checkpoint. When restoring, we can use the table UUID to validate 
the downstream, throw an exception if the UUIDs are inconsistent.
   What do you think about this? @stevenzwu @rdblue @rdblue 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to