liming30 opened a new issue, #1790:
URL: https://github.com/apache/incubator-paimon/issues/1790

   ### Search before asking
   
   - [X] I searched in the 
[issues](https://github.com/apache/incubator-paimon/issues) and found nothing 
similar.
   
   
   ### Motivation
   
   We are trying to run a high-traffic Flink job written to paimon. Paimon is 
set to 400 buckets, and the checkpoint interval is set to 3 minutes. The 
`Committer` operator will check the expired snapshot and delete invalid data 
files when the checkpoint is successful. 
   
   Because a single thread is used to delete a large number of files, it takes 
a long time, causing backpressure on the `Writer` operator, and the next 
checkpoint will fail.
   
   ```
   "Global Committer -> xxx -> Sink: end (1/1)#1" Id=573 TIMED_WAITING on 
org.apache.hadoop.ipc.Client$Call@4bd116c4
        at [email protected]/java.lang.Object.wait(Native Method)
        -  waiting on org.apache.hadoop.ipc.Client$Call@4bd116c4
        at app//org.apache.hadoop.ipc.Client.call(Client.java:1661)
        at app//org.apache.hadoop.ipc.Client.call(Client.java:1583)
        at 
app//org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:352)
        at app//com.sun.proxy.$Proxy28.delete(Unknown Source)
        at 
app//org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.delete(ClientNamenodeProtocolTranslatorPB.java:631)
        at jdk.internal.reflect.GeneratedMethodAccessor115.invoke(Unknown 
Source)
        at 
[email protected]/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at [email protected]/java.lang.reflect.Method.invoke(Method.java:566)
        at 
app//org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:328)
        at 
app//org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:121)
        at app//com.sun.proxy.$Proxy29.delete(Unknown Source)
        at app//org.apache.hadoop.hdfs.DFSClient.delete(DFSClient.java:2522)
        at 
app//org.apache.hadoop.hdfs.DistributedFileSystem$20.doCall(DistributedFileSystem.java:877)
        at 
app//org.apache.hadoop.hdfs.DistributedFileSystem$20.doCall(DistributedFileSystem.java:873)
        at 
app//org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at 
app//org.apache.hadoop.hdfs.DistributedFileSystem.delete(DistributedFileSystem.java:884)
        at 
org.apache.paimon.fs.hadoop.HadoopFileIO.delete(HadoopFileIO.java:103)
        at org.apache.paimon.fs.FileIO.deleteQuietly(FileIO.java:144)
        at 
org.apache.paimon.operation.FileStoreExpireImpl.lambda$expireMergeTreeFiles$3(FileStoreExpireImpl.java:290)
        at 
org.apache.paimon.operation.FileStoreExpireImpl$$Lambda$2738/0x0000000841215440.accept(Unknown
 Source)
        at [email protected]/java.util.HashMap.forEach(HashMap.java:1337)
        at 
org.apache.paimon.operation.FileStoreExpireImpl.expireMergeTreeFiles(FileStoreExpireImpl.java:287)
        at 
org.apache.paimon.operation.FileStoreExpireImpl.expireMergeTreeFiles(FileStoreExpireImpl.java:256)
        at 
org.apache.paimon.operation.FileStoreExpireImpl.expireUntil(FileStoreExpireImpl.java:185)
        at 
org.apache.paimon.operation.FileStoreExpireImpl.expire(FileStoreExpireImpl.java:139)
        at 
org.apache.paimon.table.sink.TableCommitImpl.expire(TableCommitImpl.java:141)
        at 
org.apache.paimon.table.sink.TableCommitImpl.commitMultiple(TableCommitImpl.java:118)
        at 
org.apache.paimon.flink.sink.StoreCommitter.commit(StoreCommitter.java:76)
        at 
org.apache.paimon.flink.sink.CommitterOperator.commitUpToCheckpoint(CommitterOperator.java:160)
        at 
org.apache.paimon.flink.sink.CommitterOperator.notifyCheckpointComplete(CommitterOperator.java:154)
   ```
   
   ### Solution
   
   Since the deletion of these data files does not affect reading, we can use 
an asynchronous thread pool to delete files and provide an Action to clean up 
zombie files.
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [X] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to