TengHuo commented on code in PR #6733:
URL: https://github.com/apache/hudi/pull/6733#discussion_r1010190971


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java:
##########
@@ -129,9 +128,6 @@ private void scheduleCompaction(HoodieFlinkTable<?> table, 
long checkpointId) th
       List<CompactionOperation> operations = 
compactionPlan.getOperations().stream()
           
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
       LOG.info("Execute compaction plan for instant {} as {} file groups", 
compactionInstantTime, operations.size());
-      WriteMarkersFactory
-          .get(table.getConfig().getMarkersType(), table, 
compactionInstantTime)
-          .deleteMarkerDir(table.getContext(), 
table.getConfig().getMarkersDeleteParallelism());

Review Comment:
   Hi @danny0405 
   
   I just reviewed the log files from our pipeline.
   
   There is another issue I forgot to mention, which is the executor in 
`CompactFunction` didn't close properly, it will cause a thread leak. The 
leaked thread will do compaction after a task was destroyed, failed when this 
executor trying to call `collector.collect(new CompactionCommitEvent(...))` in 
`CompactFunction#processElement`.
   
   In a very rare case, it could left an unfinished data file in HDFS which 
can't be deleted in a rollback. E.g. a rollback was performed before creating 
data file.
   
   For fixing this issue, I added a method `CompactFunction#close()`, it will 
close the executor properly and close the write client if needed.
   
   This is the task manager log from our MOR pipeline
   
   ```log
   2022-09-19 07:08:23,132 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received task 
compact_plan_generate (1/1)#32 (0ac61651b7df52113e9e8d457b4611a7), deploy into 
slot with allocation id 0eb063453441ba3ed8b582ba71268f01.
   2022-09-19 07:08:23,133 INFO  org.apache.flink.runtime.taskmanager.Task      
              [] - compact_plan_generate (1/1)#32 
(0ac61651b7df52113e9e8d457b4611a7) switched from CREATED to DEPLOYING.
   2022-09-19 07:08:23,133 INFO  org.apache.flink.runtime.taskmanager.Task      
              [] - Loading JAR files for task compact_plan_generate (1/1)#32 
(0ac61651b7df52113e9e8d457b4611a7) [DEPLOYING].
   2022-09-19 07:08:23,134 INFO  
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 
0eb063453441ba3ed8b582ba71268f01.
   2022-09-19 07:08:23,135 INFO  org.apache.flink.runtime.taskmanager.Task      
              [] - compact_plan_generate (1/1)#32 
(0ac61651b7df52113e9e8d457b4611a7) switched from DEPLOYING to INITIALIZING.
   2022-09-19 07:08:23,136 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received task 
compact_task (1/32)#32 (d5c0456dc51cb2d8b93d723c40b95dfb), deploy into slot 
with allocation id 0eb063453441ba3ed8b582ba71268f01.
   2022-09-19 07:08:23,137 INFO  org.apache.flink.runtime.taskmanager.Task      
              [] - compact_task (1/32)#32 (d5c0456dc51cb2d8b93d723c40b95dfb) 
switched from CREATED to DEPLOYING.
   2022-09-19 07:08:23,137 INFO  org.apache.flink.runtime.taskmanager.Task      
              [] - Loading JAR files for task compact_task (1/32)#32 
(d5c0456dc51cb2d8b93d723c40b95dfb) [DEPLOYING].
   2022-09-19 07:08:23,137 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder [] - Operator 
state restore duration : 0 ms.
   2022-09-19 07:08:23,138 INFO  
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 
c8091bbeb6786d2ef42e964d08eac2da.
   2022-09-19 07:08:23,139 INFO  org.apache.flink.runtime.taskmanager.Task      
              [] - compact_task (1/32)#32 (d5c0456dc51cb2d8b93d723c40b95dfb) 
switched from DEPLOYING to INITIALIZING.
   2022-09-19 07:08:23,140 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received task 
compact_task (16/32)#32 (758c1f13d16eee52a1088f04a092176d), deploy into slot 
with allocation id c8091bbeb6786d2ef42e964d08eac2da.
   2022-09-19 07:08:23,141 INFO  org.apache.flink.runtime.taskmanager.Task      
              [] - compact_task (16/32)#32 (758c1f13d16eee52a1088f04a092176d) 
switched from CREATED to DEPLOYING.
   2022-09-19 07:08:23,141 INFO  org.apache.flink.runtime.taskmanager.Task      
              [] - Loading JAR files for task compact_task (16/32)#32 
(758c1f13d16eee52a1088f04a092176d) [DEPLOYING].
   2022-09-19 07:08:23,142 INFO  
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 
0eb063453441ba3ed8b582ba71268f01.
   2022-09-19 07:08:23,142 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder [] - Operator 
state restore duration : 0 ms.
   2022-09-19 07:08:23,142 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder [] - Operator 
state restore duration : 10 ms.
   2022-09-19 07:08:23,142 INFO  org.apache.flink.runtime.taskmanager.Task      
              [] - compact_task (16/32)#32 (758c1f13d16eee52a1088f04a092176d) 
switched from DEPLOYING to INITIALIZING.
   2022-09-19 07:08:23,143 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder [] - Operator 
state restore duration : 11 ms.
   2022-09-19 07:08:23,143 INFO  org.apache.hudi.util.ViewStorageProperties     
              [] - Loading filesystem view storage properties from 
hdfs://.../.hoodie/.aux/view_storage_conf.properties
   2022-09-19 07:08:23,144 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received task 
Sink: compact_commit (1/1)#32 (7157a69aa5013201f736bf71d88babed), deploy into 
slot with allocation id 0eb063453441ba3ed8b582ba71268f01.
   2022-09-19 07:08:23,144 INFO  org.apache.flink.runtime.taskmanager.Task      
              [] - Sink: compact_commit (1/1)#32 
(7157a69aa5013201f736bf71d88babed) switched from CREATED to DEPLOYING.
   2022-09-19 07:08:23,145 INFO  org.apache.flink.runtime.taskmanager.Task      
              [] - Loading JAR files for task Sink: compact_commit (1/1)#32 
(7157a69aa5013201f736bf71d88babed) [DEPLOYING].
   2022-09-19 07:08:23,146 INFO  org.apache.flink.runtime.taskmanager.Task      
              [] - Sink: compact_commit (1/1)#32 
(7157a69aa5013201f736bf71d88babed) switched from DEPLOYING to INITIALIZING.
   2022-09-19 07:08:23,146 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder [] - Operator 
state restore duration : 0 ms.
   2022-09-19 07:08:23,146 INFO  org.apache.hudi.util.ViewStorageProperties     
              [] - Loading filesystem view storage properties from 
hdfs://.../.hoodie/.aux/view_storage_conf.properties
   2022-09-19 07:08:23,147 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder [] - Operator 
state restore duration : 0 ms.
   2022-09-19 07:08:23,149 INFO  org.apache.hudi.util.ViewStorageProperties     
              [] - Loading filesystem view storage properties from 
hdfs://.../.hoodie/.aux/view_storage_conf.properties
   2022-09-19 07:08:23,150 INFO  org.apache.hudi.util.ViewStorageProperties     
              [] - Loading filesystem view storage properties from 
hdfs://.../.hoodie/.aux/view_storage_conf.properties
   2022-09-19 07:08:23,196 INFO  org.apache.flink.runtime.taskmanager.Task      
              [] - compact_task (1/32)#32 (d5c0456dc51cb2d8b93d723c40b95dfb) 
switched from INITIALIZING to RUNNING.
   2022-09-19 07:08:23,199 INFO  org.apache.flink.runtime.taskmanager.Task      
              [] - compact_task (16/32)#32 (758c1f13d16eee52a1088f04a092176d) 
switched from INITIALIZING to RUNNING.
   2022-09-19 07:08:23,200 INFO  org.apache.hudi.sink.CleanFunction             
              [] - exec sync clean with instant time 20220919070823199...
   2022-09-19 07:08:23,206 INFO  org.apache.hudi.util.CompactionUtil            
              [] - Rollback the inflight compaction instant: 
[==>20220919020324533__compaction__INFLIGHT] for failover
   2022-09-19 07:08:23,217 INFO  org.apache.flink.runtime.taskmanager.Task      
              [] - Sink: compact_commit (1/1)#32 
(7157a69aa5013201f736bf71d88babed) switched from INITIALIZING to RUNNING.
   2022-09-19 07:08:23,308 INFO  
org.apache.hudi.sink.common.AbstractStreamWriteFunction      [] - Send 
bootstrap write metadata event to coordinator, task[15].
   2022-09-19 07:08:23,309 INFO  org.apache.flink.runtime.taskmanager.Task      
              [] - bucket_write: hudi_mor (16/32)#32 
(dc15bf98eb8675018312ca317099a005) switched from INITIALIZING to RUNNING.
   2022-09-19 07:08:23,318 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase [] - 
Consumer subtask 15 will start reading 1 partitions with offsets in restored 
state: {KafkaTopicPartition{topic='...', partition=1}=6787076}
   2022-09-19 07:08:23,318 INFO  org.apache.flink.runtime.taskmanager.Task      
              [] - Source: TableSourceScan(...)#32 
(389bb286011a261bfa9e111568f3fc49) switched from INITIALIZING to RUNNING.
   2022-09-19 07:08:24,030 INFO  org.apache.hudi.util.CompactionUtil            
              [] - Rollback the inflight compaction instant: 
[==>20220919054842744__compaction__INFLIGHT] for failover
   2022-09-19 07:08:24,889 INFO  org.apache.flink.runtime.taskmanager.Task      
              [] - compact_plan_generate (1/1)#32 
(0ac61651b7df52113e9e8d457b4611a7) switched from INITIALIZING to RUNNING.
   ...deleted unrelated logs
   2022-09-19 07:08:41,700 WARN  org.apache.hadoop.hdfs.DataStreamer            
              [] - DataStreamer Exception
   java.io.FileNotFoundException: File does not exist: 
/.../00000027-451d-4b7e-a213-eb71e5aa0979_15-32-31_20220919020324533.parquet 
(inode 8691986250) Holder DFSClient_NONMAPREDUCE does not have any open files.
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2867)
        at 
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.analyzeFileState(FSDirWriteFileOp.java:521)
        at 
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.validateAddBlock(FSDirWriteFileOp.java:161)
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2747)
        at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:918)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:532)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:529)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1087)
        at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1126)
        at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1042)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:2052)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:3059)
   
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native 
Method) ~[?:1.8.0_252]
        at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
 ~[?:1.8.0_252]
        at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 ~[?:1.8.0_252]
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423) 
~[?:1.8.0_252]
        at 
org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:121)
        at 
org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:88)
        at 
org.apache.hadoop.hdfs.DFSOutputStream.addBlock(DFSOutputStream.java:1122)
        at 
org.apache.hadoop.hdfs.DataStreamer.locateFollowingBlock(DataStreamer.java:1944)
        at 
org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1750)
        at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:751)
   Caused by: org.apache.hadoop.ipc.RemoteException: File does not exist: 
/.../00000027-451d-4b7e-a213-eb71e5aa0979_15-32-31_20220919020324533.parquet 
(inode 8691986250) Holder DFSClient_NONMAPREDUCE does not have any open files.
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2867)
        at 
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.analyzeFileState(FSDirWriteFileOp.java:521)
        at 
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.validateAddBlock(FSDirWriteFileOp.java:161)
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2747)
        at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:918)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:532)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:529)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1087)
        at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1126)
        at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1042)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:2052)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:3059)
   
        at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1590)
        at org.apache.hadoop.ipc.Client.call(Client.java:1521)
        at org.apache.hadoop.ipc.Client.call(Client.java:1418)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:251)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:130)
        at com.sun.proxy.$Proxy27.addBlock(Unknown Source) ~[?:?]
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:472)
        at sun.reflect.GeneratedMethodAccessor77.invoke(Unknown Source) ~[?:?]
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_252]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_252]
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:449)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:175)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:167)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:105)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:375)
        at com.sun.proxy.$Proxy28.addBlock(Unknown Source) ~[?:?]
        at 
org.apache.hadoop.hdfs.DFSOutputStream.addBlock(DFSOutputStream.java:1119)
        ... 3 more
   2022-09-19 07:08:41,707 ERROR org.apache.hudi.sink.compact.CompactFunction   
              [] - Executor executes action [Execute compaction for instant 
20220919020324533 from task 15] error
   org.apache.hudi.exception.HoodieUpsertException: Failed to close UpdateHandle
        at 
org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:431)
        at 
org.apache.hudi.table.action.commit.FlinkMergeHelper.runMerge(FlinkMergeHelper.java:114)
        at 
org.apache.hudi.table.HoodieFlinkCopyOnWriteTable.handleUpdateInternal(HoodieFlinkCopyOnWriteTable.java:379)
        at 
org.apache.hudi.table.HoodieFlinkCopyOnWriteTable.handleUpdate(HoodieFlinkCopyOnWriteTable.java:370)
        at 
org.apache.hudi.table.action.compact.HoodieCompactor.compact(HoodieCompactor.java:227)
        at 
org.apache.hudi.sink.compact.CompactFunction.doCompaction(CompactFunction.java:109)
        at 
org.apache.hudi.sink.compact.CompactFunction.lambda$processElement$0(CompactFunction.java:94)
        at 
org.apache.hudi.sink.utils.NonThrownExecutor.lambda$execute$0(NonThrownExecutor.java:93)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_252]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_252]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
   Caused by: java.io.FileNotFoundException: File does not exist: 
/.../00000027-451d-4b7e-a213-eb71e5aa0979_15-32-31_20220919020324533.parquet 
(inode 8691986250) Holder DFSClient_NONMAPREDUCE does not have any open files.
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2867)
        at 
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.analyzeFileState(FSDirWriteFileOp.java:521)
        at 
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.validateAddBlock(FSDirWriteFileOp.java:161)
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2747)
        at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:918)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:532)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:529)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1087)
        at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1126)
        at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1042)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:2052)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:3059)
   
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native 
Method) ~[?:1.8.0_252]
        at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
 ~[?:1.8.0_252]
        at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 ~[?:1.8.0_252]
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423) 
~[?:1.8.0_252]
        at 
org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:121)
        at 
org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:88)
        at 
org.apache.hadoop.hdfs.DFSOutputStream.addBlock(DFSOutputStream.java:1122)
        at 
org.apache.hadoop.hdfs.DataStreamer.locateFollowingBlock(DataStreamer.java:1944)
        at 
org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1750)
        at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:751)
   Caused by: org.apache.hadoop.ipc.RemoteException: File does not exist: 
/.../00000027-451d-4b7e-a213-eb71e5aa0979_15-32-31_20220919020324533.parquet 
(inode 8691986250) Holder DFSClient_NONMAPREDUCE does not have any open files.
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2867)
        at 
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.analyzeFileState(FSDirWriteFileOp.java:521)
        at 
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.validateAddBlock(FSDirWriteFileOp.java:161)
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2747)
        at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:918)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:532)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:529)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1087)
        at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1126)
        at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1042)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:2052)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:3059)
   
        at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1590)
        at org.apache.hadoop.ipc.Client.call(Client.java:1521)
        at org.apache.hadoop.ipc.Client.call(Client.java:1418)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:251)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:130)
        at com.sun.proxy.$Proxy27.addBlock(Unknown Source) ~[?:?]
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:472)
        at sun.reflect.GeneratedMethodAccessor77.invoke(Unknown Source) ~[?:?]
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_252]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_252]
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:449)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:175)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:167)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:105)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:375)
        at com.sun.proxy.$Proxy28.addBlock(Unknown Source) ~[?:?]
        at 
org.apache.hadoop.hdfs.DFSOutputStream.addBlock(DFSOutputStream.java:1119)
        at 
org.apache.hadoop.hdfs.DataStreamer.locateFollowingBlock(DataStreamer.java:1944)
        at 
org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1750)
        at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:751)
   2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging         
              [] - java.lang.RuntimeException: Buffer pool is destroyed.
   2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging         
              [] -      at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:109)
   2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging         
              [] -      at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:93)
   2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging         
              [] -      at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:44)
   2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging         
              [] -      at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
   2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging         
              [] -      at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
   2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging         
              [] -      at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
   2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging         
              [] -      at 
org.apache.hudi.sink.compact.CompactFunction.lambda$processElement$1(CompactFunction.java:95)
   2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging         
              [] -      at 
org.apache.hudi.sink.utils.NonThrownExecutor.lambda$execute$0(NonThrownExecutor.java:103)
   2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging         
              [] -      at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging         
              [] -      at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging         
              [] -      at java.lang.Thread.run(Thread.java:748)
   2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging         
              [] - Caused by: java.lang.IllegalStateException: Buffer pool is 
destroyed.
   2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging         
              [] -      at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:337)
   2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging         
              [] -      at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilder(LocalBufferPool.java:279)
   2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging         
              [] -      at 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:348)
   2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging         
              [] -      at 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:331)
   2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging         
              [] -      at 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForNewRecord(BufferWritingResultPartition.java:263)
   2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging         
              [] -      at 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:142)
   2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging         
              [] -      at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:106)
   2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging         
              [] -      at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54)
   2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging         
              [] -      at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
   2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging         
              [] -      ... 10 more
   ```
   
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to