wzx140 opened a new issue, #6717: URL: https://github.com/apache/paimon/issues/6717
### Search before asking - [x] I searched in the [issues](https://github.com/apache/paimon/issues) and found nothing similar. ### Paimon version 1.0.0 ### Compute Engine Flink ### Minimal reproduce step 1. Run a Flink streaming job writing to Paimon 2. Cause HMS to become temporarily unstable (OOM, high GC, network jitter) 3. Paimon writer fails 4. After HMS recovers, Paimon jobs keep failing 6. Restart the Flink job → Paimon works again ``` java.lang.RuntimeException: Exception occurs when committing snapshot #5487 (path xxx) by user d3186671-17f5-4e9d-850f-a064106e7524 with identifier 2819 and kind APPEND. Cannot clean up because we can't determine the success. at org.apache.paimon.operation.FileStoreCommitImpl.commitSnapshotImpl(FileStoreCommitImpl.java:1206) at org.apache.paimon.operation.FileStoreCommitImpl.tryCommitOnce(FileStoreCommitImpl.java:1029) at org.apache.paimon.operation.FileStoreCommitImpl.tryCommit(FileStoreCommitImpl.java:743) at org.apache.paimon.operation.FileStoreCommitImpl.commit(FileStoreCommitImpl.java: 318) at org.apache.paimon.table.sink.TableCommitImpl.commitMultiple(TableCommitImpl.java: 209) at org.apache.paimon.table.sink.TableCommitImpl.filterAndCommitMultiple(TableCommitImpl.java:248) at org.apache.paimon.flink.sink.StoreCommitter.filterAndCommit(StoreCommitter.java:119) at org.apache.paimon.flink.sink.Committer.filterAndCommit(Committer.java:60) at org.apache.paimon.flink.sink.RestoreAndFailCommittableStateManager.recover(RestoreAndFailCommittableStateManager.java: 82) at org.apache.paimon.flink.sink.RestoreAndFailCommittableStateManager.initializeState(RestoreAndFailCommittableStateManager.java:77) at org.apache.paimon.flink.sink.CommitterOperator.initializeState(Committer0perator.java: 142) Caused by: java.net.SocketException: Broken pipe (Write failed) at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111) at java.net.SocketOutputStream.write(SocketOutputStream.java:155) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) at org.apache.thrift.transport.TIOStreamTransport.write(TIOStreamTransport.java:145) at org.apache.thrift.protocol.TBinaryProtocol.writeI32(TBinaryProtocol.java:178) at org.apache.thrift.protocol.TBinaryProtocol.writeMessageBegin(TBinaryProtocol.java:106) at org.apache.thrift.TServiceClient.sendBase(TServiceClient.java:70) at org.apache.thrift.TServiceClient.sendBase(TServiceClient.java:62) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.send_lock(ThriftHiveMetastore.java:4672) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.lock(ThriftHiveMetastore.java:4664) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.lock(HiveMetaStoreClient.java:2153) at sun.reflect.GeneratedMethodAccessor35.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:173) at com.sun.proxy.$Proxy23.lock(Unknown Source) at sun.reflect.GeneratedMethodAccessor35.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient$SynchronizedHandler.invoke(HiveMetaStoreClient.java:2327) at com.sun.proxy.$Proxy23.lock(Unknown Source) at org.apache.paimon.hive.HiveCatalogLock.lambda$lock$0(HiveCatalogLock.java:85) at org.apache.paimon.client.ClientPool$ClientPoolImpl.run(ClientPool.java:68) at org.apache.paimon.hive.pool.CachedClientPool.run(CachedClientPool.java:133) at org.apache.paimon.hive.HiveCatalogLock.lock(HiveCatalogLock.java:85) at org.apache.paimon.hive.HiveCatalogLock.runWithLock(HiveCatalogLock.java:66) at org.apache.paimon.operation.Lock$CatalogLockImpl.runWithLock(Lock.java:127) at org.apache.paimon.operation.FileStoreCommitImpl.commitSnapshotImpl(FileStoreCommitImpl.java:1191) at org.apache.paimon.operation.FileStoreCommitImpl.tryCommitOnce(FileStoreCommitImpl.java:1029) at org.apache.paimon.operation.FileStoreCommitImpl.tryCommit(FileStoreCommitImpl.java:743) at org.apache.paimon.operation.FileStoreCommitImpl.commit(FileStoreCommitImpl.java:318) at org.apache.paimon.table.sink.TableCommitImpl.commitMultiple(TableCommitImpl.java:209) at org.apache.paimon.table.sink.TableCommitImpl.filterAndCommitMultiple(TableCommitImpl.java:248) at org.apache.paimon.flink.sink.StoreCommitter.filterAndCommit(StoreCommitter.java:119) at org.apache.paimon.flink.sink.Committer.filterAndCommit(Committer.java:60) at org.apache.paimon.flink.sink.RestoreAndFailCommittableStateManager.recover(RestoreAndFailCommittableStateManager.java:82) at org.apache.paimon.flink.sink.RestoreAndFailCommittableStateManager.initializeState(RestoreAndFailCommittableStateManager.java:77) at org.apache.paimon.flink.sink.CommitterOperator.initializeState(CommitterOperator.java:142) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:740) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:715) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:681) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:962) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:928) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:750) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:564) at java.lang.Thread.run(Thread.java:748) ``` ### What doesn't meet your expectations? - Paimon should automatically recover from HMS connection failures - Behavior should be at least aligned with Iceberg (no need to restart tasks manually) ### Anything else? We reviewed recent changes and suspect the issue may be related to the following PR: https://github.com/apache/paimon/pull/4256 This PR removed the automatic reconnection logic for HMS client connections. As a result: - When HMS becomes unstable, Paimon’s HMS connection breaks - After HMS recovers, the client does **not** attempt to re-establish the connection - Flink jobs remain in a failed state until manually restarted ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
