coolderli opened a new issue #2808:
URL: https://github.com/apache/iceberg/issues/2808
In my flink job (a job write binlog to iceberg), it commits twice when a
`CommitStateUnknownException` occurs. The exceptions is as follows:
```
2021-07-12 13:11:47.859 WARN org.apache.flink.runtime.taskmanager.Task
- IcebergFilesCommitter -> Sink: IcebergSink
iceberg_zjyprc_hadoop.dw_business.dwd_ord_ord_df (1/1)#21
(bd443bf5439bb8de4083b56499a8d346) switched from RUNNING to FAILED.
org.apache.iceberg.exceptions.CommitStateUnknownException:
org.apache.thrift.transport.TTransportException:
java.net.SocketTimeoutException: Read timed out
Cannot determine whether the commit was successful or not, the underlying
data files may or may not be needed. Manual intervention via the Remove Orphan
Files Action can remove these files when a connection to the Catalog can be
re-established if the commit was actually unsuccessful.
Please check to see whether or not your commit was successful before
retrying this commit. Retrying an already successful operation will result in
duplicate records or unintentional modifications.
At this time no files will be deleted including possibly unused manifest
lists.
at
org.apache.iceberg.hive.HiveTableOperations.doCommit(HiveTableOperations.java:271)
at
org.apache.iceberg.BaseMetastoreTableOperations.commit(BaseMetastoreTableOperations.java:124)
at
org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:307)
at
org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:404)
at
org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:213)
at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:197)
at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:189)
at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:289)
at
org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitOperation(IcebergFilesCommitter.java:308)
at
org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitDeltaTxn(IcebergFilesCommitter.java:295)
at
org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitUpToCheckpoint(IcebergFilesCommitter.java:219)
at
org.apache.iceberg.flink.sink.IcebergFilesCommitter.notifyCheckpointComplete(IcebergFilesCommitter.java:189)
at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:99)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:319)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1089)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$11(StreamTask.java:1054)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$13(StreamTask.java:1077)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:619)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:583)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException:
org.apache.thrift.transport.TTransportException:
java.net.SocketTimeoutException: Read timed out
at
org.apache.iceberg.relocated.com.google.common.base.Throwables.propagate(Throwables.java:241)
at
org.apache.iceberg.common.DynMethods$UnboundMethod.invoke(DynMethods.java:80)
at
org.apache.iceberg.hive.HiveTableOperations.lambda$persistTable$3(HiveTableOperations.java:308)
at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:51)
at
org.apache.iceberg.hive.CachedClientPool.run(CachedClientPool.java:77)
at
org.apache.iceberg.hive.HiveTableOperations.persistTable(HiveTableOperations.java:304)
at
org.apache.iceberg.hive.HiveTableOperations.doCommit(HiveTableOperations.java:259)
... 25 more
Caused by: org.apache.thrift.transport.TTransportException:
java.net.SocketTimeoutException: Read timed out
at
org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:129)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
at
org.apache.thrift.transport.TSaslTransport.readLength(TSaslTransport.java:376)
at
org.apache.thrift.transport.TSaslTransport.readFrame(TSaslTransport.java:453)
at
org.apache.thrift.transport.TSaslTransport.read(TSaslTransport.java:435)
at
org.apache.thrift.transport.TSaslClientTransport.read(TSaslClientTransport.java:37)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
at
org.apache.hadoop.hive.thrift.TFilterTransport.readAll(TFilterTransport.java:62)
at
org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:429)
at
org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:318)
at
org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:219)
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
at
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_alter_table_with_environment_context(ThriftHiveMetastore.java:1375)
at
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.alter_table_with_environment_context(ThriftHiveMetastore.java:1359)
at
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.alter_table(HiveMetaStoreClient.java:370)
at sun.reflect.GeneratedMethodAccessor63.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.iceberg.common.DynMethods$UnboundMethod.invokeChecked(DynMethods.java:65)
at
org.apache.iceberg.common.DynMethods$UnboundMethod.invoke(DynMethods.java:77)
... 30 more
Caused by: java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
at
org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:127)
... 49 more
2021-07-12 13:11:47.860 INFO org.apache.flink.runtime.taskmanager.Task
- Freeing task resources for IcebergFilesCommitter -> Sink:
IcebergSink iceberg_zjyprc_hadoop.dw_business.dwd_ord_ord_df (1/1)#21
(bd443bf5439bb8de4083b56499a8d346).
```
And I found duplicate snapshots:

I think we should check if the last snapshot is committed successfully
before we commit it again in
[initializeState](https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java#L154).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]