TengHuo commented on code in PR #6733:
URL: https://github.com/apache/hudi/pull/6733#discussion_r1010190971
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java:
##########
@@ -129,9 +128,6 @@ private void scheduleCompaction(HoodieFlinkTable<?> table,
long checkpointId) th
List<CompactionOperation> operations =
compactionPlan.getOperations().stream()
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
LOG.info("Execute compaction plan for instant {} as {} file groups",
compactionInstantTime, operations.size());
- WriteMarkersFactory
- .get(table.getConfig().getMarkersType(), table,
compactionInstantTime)
- .deleteMarkerDir(table.getContext(),
table.getConfig().getMarkersDeleteParallelism());
Review Comment:
Hi @danny0405
I just reviewed the log files from our pipeline.
There is another issue I forgot to mention, which is the executor in
`CompactFunction` didn't close properly, it will cause a thread leak. The
leaked thread will do compaction after a task was destroyed, failed when this
executor trying to call `collector.collect(new CompactionCommitEvent(...))` in
`CompactFunction#processElement`.
In a very rare case, it could left an unfinished data file in HDFS which
can't be deleted in a rollback. E.g. a rollback was performed before creating
data file.
For fixing this issue, I added a method `CompactFunction#close()`, it will
close the executor properly and close the write client if needed.
This is the task manager log from our MOR pipeline
```log
2022-09-19 07:08:23,132 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task
compact_plan_generate (1/1)#32 (0ac61651b7df52113e9e8d457b4611a7), deploy into
slot with allocation id 0eb063453441ba3ed8b582ba71268f01.
2022-09-19 07:08:23,133 INFO org.apache.flink.runtime.taskmanager.Task
[] - compact_plan_generate (1/1)#32
(0ac61651b7df52113e9e8d457b4611a7) switched from CREATED to DEPLOYING.
2022-09-19 07:08:23,133 INFO org.apache.flink.runtime.taskmanager.Task
[] - Loading JAR files for task compact_plan_generate (1/1)#32
(0ac61651b7df52113e9e8d457b4611a7) [DEPLOYING].
2022-09-19 07:08:23,134 INFO
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot
0eb063453441ba3ed8b582ba71268f01.
2022-09-19 07:08:23,135 INFO org.apache.flink.runtime.taskmanager.Task
[] - compact_plan_generate (1/1)#32
(0ac61651b7df52113e9e8d457b4611a7) switched from DEPLOYING to INITIALIZING.
2022-09-19 07:08:23,136 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task
compact_task (1/32)#32 (d5c0456dc51cb2d8b93d723c40b95dfb), deploy into slot
with allocation id 0eb063453441ba3ed8b582ba71268f01.
2022-09-19 07:08:23,137 INFO org.apache.flink.runtime.taskmanager.Task
[] - compact_task (1/32)#32 (d5c0456dc51cb2d8b93d723c40b95dfb)
switched from CREATED to DEPLOYING.
2022-09-19 07:08:23,137 INFO org.apache.flink.runtime.taskmanager.Task
[] - Loading JAR files for task compact_task (1/32)#32
(d5c0456dc51cb2d8b93d723c40b95dfb) [DEPLOYING].
2022-09-19 07:08:23,137 INFO
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder [] - Operator
state restore duration : 0 ms.
2022-09-19 07:08:23,138 INFO
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot
c8091bbeb6786d2ef42e964d08eac2da.
2022-09-19 07:08:23,139 INFO org.apache.flink.runtime.taskmanager.Task
[] - compact_task (1/32)#32 (d5c0456dc51cb2d8b93d723c40b95dfb)
switched from DEPLOYING to INITIALIZING.
2022-09-19 07:08:23,140 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task
compact_task (16/32)#32 (758c1f13d16eee52a1088f04a092176d), deploy into slot
with allocation id c8091bbeb6786d2ef42e964d08eac2da.
2022-09-19 07:08:23,141 INFO org.apache.flink.runtime.taskmanager.Task
[] - compact_task (16/32)#32 (758c1f13d16eee52a1088f04a092176d)
switched from CREATED to DEPLOYING.
2022-09-19 07:08:23,141 INFO org.apache.flink.runtime.taskmanager.Task
[] - Loading JAR files for task compact_task (16/32)#32
(758c1f13d16eee52a1088f04a092176d) [DEPLOYING].
2022-09-19 07:08:23,142 INFO
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot
0eb063453441ba3ed8b582ba71268f01.
2022-09-19 07:08:23,142 INFO
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder [] - Operator
state restore duration : 0 ms.
2022-09-19 07:08:23,142 INFO
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder [] - Operator
state restore duration : 10 ms.
2022-09-19 07:08:23,142 INFO org.apache.flink.runtime.taskmanager.Task
[] - compact_task (16/32)#32 (758c1f13d16eee52a1088f04a092176d)
switched from DEPLOYING to INITIALIZING.
2022-09-19 07:08:23,143 INFO
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder [] - Operator
state restore duration : 11 ms.
2022-09-19 07:08:23,143 INFO org.apache.hudi.util.ViewStorageProperties
[] - Loading filesystem view storage properties from
hdfs://.../.hoodie/.aux/view_storage_conf.properties
2022-09-19 07:08:23,144 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task
Sink: compact_commit (1/1)#32 (7157a69aa5013201f736bf71d88babed), deploy into
slot with allocation id 0eb063453441ba3ed8b582ba71268f01.
2022-09-19 07:08:23,144 INFO org.apache.flink.runtime.taskmanager.Task
[] - Sink: compact_commit (1/1)#32
(7157a69aa5013201f736bf71d88babed) switched from CREATED to DEPLOYING.
2022-09-19 07:08:23,145 INFO org.apache.flink.runtime.taskmanager.Task
[] - Loading JAR files for task Sink: compact_commit (1/1)#32
(7157a69aa5013201f736bf71d88babed) [DEPLOYING].
2022-09-19 07:08:23,146 INFO org.apache.flink.runtime.taskmanager.Task
[] - Sink: compact_commit (1/1)#32
(7157a69aa5013201f736bf71d88babed) switched from DEPLOYING to INITIALIZING.
2022-09-19 07:08:23,146 INFO
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder [] - Operator
state restore duration : 0 ms.
2022-09-19 07:08:23,146 INFO org.apache.hudi.util.ViewStorageProperties
[] - Loading filesystem view storage properties from
hdfs://.../.hoodie/.aux/view_storage_conf.properties
2022-09-19 07:08:23,147 INFO
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder [] - Operator
state restore duration : 0 ms.
2022-09-19 07:08:23,149 INFO org.apache.hudi.util.ViewStorageProperties
[] - Loading filesystem view storage properties from
hdfs://.../.hoodie/.aux/view_storage_conf.properties
2022-09-19 07:08:23,150 INFO org.apache.hudi.util.ViewStorageProperties
[] - Loading filesystem view storage properties from
hdfs://.../.hoodie/.aux/view_storage_conf.properties
2022-09-19 07:08:23,196 INFO org.apache.flink.runtime.taskmanager.Task
[] - compact_task (1/32)#32 (d5c0456dc51cb2d8b93d723c40b95dfb)
switched from INITIALIZING to RUNNING.
2022-09-19 07:08:23,199 INFO org.apache.flink.runtime.taskmanager.Task
[] - compact_task (16/32)#32 (758c1f13d16eee52a1088f04a092176d)
switched from INITIALIZING to RUNNING.
2022-09-19 07:08:23,200 INFO org.apache.hudi.sink.CleanFunction
[] - exec sync clean with instant time 20220919070823199...
2022-09-19 07:08:23,206 INFO org.apache.hudi.util.CompactionUtil
[] - Rollback the inflight compaction instant:
[==>20220919020324533__compaction__INFLIGHT] for failover
2022-09-19 07:08:23,217 INFO org.apache.flink.runtime.taskmanager.Task
[] - Sink: compact_commit (1/1)#32
(7157a69aa5013201f736bf71d88babed) switched from INITIALIZING to RUNNING.
2022-09-19 07:08:23,308 INFO
org.apache.hudi.sink.common.AbstractStreamWriteFunction [] - Send
bootstrap write metadata event to coordinator, task[15].
2022-09-19 07:08:23,309 INFO org.apache.flink.runtime.taskmanager.Task
[] - bucket_write: hudi_mor (16/32)#32
(dc15bf98eb8675018312ca317099a005) switched from INITIALIZING to RUNNING.
2022-09-19 07:08:23,318 INFO
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase [] -
Consumer subtask 15 will start reading 1 partitions with offsets in restored
state: {KafkaTopicPartition{topic='...', partition=1}=6787076}
2022-09-19 07:08:23,318 INFO org.apache.flink.runtime.taskmanager.Task
[] - Source: TableSourceScan(...)#32
(389bb286011a261bfa9e111568f3fc49) switched from INITIALIZING to RUNNING.
2022-09-19 07:08:24,030 INFO org.apache.hudi.util.CompactionUtil
[] - Rollback the inflight compaction instant:
[==>20220919054842744__compaction__INFLIGHT] for failover
2022-09-19 07:08:24,889 INFO org.apache.flink.runtime.taskmanager.Task
[] - compact_plan_generate (1/1)#32
(0ac61651b7df52113e9e8d457b4611a7) switched from INITIALIZING to RUNNING.
...deleted unrelated logs
2022-09-19 07:08:41,700 WARN org.apache.hadoop.hdfs.DataStreamer
[] - DataStreamer Exception
java.io.FileNotFoundException: File does not exist:
/.../00000027-451d-4b7e-a213-eb71e5aa0979_15-32-31_20220919020324533.parquet
(inode 8691986250) Holder DFSClient_NONMAPREDUCE does not have any open files.
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2867)
at
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.analyzeFileState(FSDirWriteFileOp.java:521)
at
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.validateAddBlock(FSDirWriteFileOp.java:161)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2747)
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:918)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:532)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:529)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1087)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1126)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1042)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:2052)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:3059)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method) ~[?:1.8.0_252]
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
~[?:1.8.0_252]
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
~[?:1.8.0_252]
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
~[?:1.8.0_252]
at
org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:121)
at
org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:88)
at
org.apache.hadoop.hdfs.DFSOutputStream.addBlock(DFSOutputStream.java:1122)
at
org.apache.hadoop.hdfs.DataStreamer.locateFollowingBlock(DataStreamer.java:1944)
at
org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1750)
at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:751)
Caused by: org.apache.hadoop.ipc.RemoteException: File does not exist:
/.../00000027-451d-4b7e-a213-eb71e5aa0979_15-32-31_20220919020324533.parquet
(inode 8691986250) Holder DFSClient_NONMAPREDUCE does not have any open files.
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2867)
at
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.analyzeFileState(FSDirWriteFileOp.java:521)
at
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.validateAddBlock(FSDirWriteFileOp.java:161)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2747)
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:918)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:532)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:529)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1087)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1126)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1042)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:2052)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:3059)
at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1590)
at org.apache.hadoop.ipc.Client.call(Client.java:1521)
at org.apache.hadoop.ipc.Client.call(Client.java:1418)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:251)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:130)
at com.sun.proxy.$Proxy27.addBlock(Unknown Source) ~[?:?]
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:472)
at sun.reflect.GeneratedMethodAccessor77.invoke(Unknown Source) ~[?:?]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_252]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_252]
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:449)
at
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:175)
at
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:167)
at
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:105)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:375)
at com.sun.proxy.$Proxy28.addBlock(Unknown Source) ~[?:?]
at
org.apache.hadoop.hdfs.DFSOutputStream.addBlock(DFSOutputStream.java:1119)
... 3 more
2022-09-19 07:08:41,707 ERROR org.apache.hudi.sink.compact.CompactFunction
[] - Executor executes action [Execute compaction for instant
20220919020324533 from task 15] error
org.apache.hudi.exception.HoodieUpsertException: Failed to close UpdateHandle
at
org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:431)
at
org.apache.hudi.table.action.commit.FlinkMergeHelper.runMerge(FlinkMergeHelper.java:114)
at
org.apache.hudi.table.HoodieFlinkCopyOnWriteTable.handleUpdateInternal(HoodieFlinkCopyOnWriteTable.java:379)
at
org.apache.hudi.table.HoodieFlinkCopyOnWriteTable.handleUpdate(HoodieFlinkCopyOnWriteTable.java:370)
at
org.apache.hudi.table.action.compact.HoodieCompactor.compact(HoodieCompactor.java:227)
at
org.apache.hudi.sink.compact.CompactFunction.doCompaction(CompactFunction.java:109)
at
org.apache.hudi.sink.compact.CompactFunction.lambda$processElement$0(CompactFunction.java:94)
at
org.apache.hudi.sink.utils.NonThrownExecutor.lambda$execute$0(NonThrownExecutor.java:93)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_252]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_252]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
Caused by: java.io.FileNotFoundException: File does not exist:
/.../00000027-451d-4b7e-a213-eb71e5aa0979_15-32-31_20220919020324533.parquet
(inode 8691986250) Holder DFSClient_NONMAPREDUCE does not have any open files.
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2867)
at
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.analyzeFileState(FSDirWriteFileOp.java:521)
at
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.validateAddBlock(FSDirWriteFileOp.java:161)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2747)
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:918)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:532)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:529)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1087)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1126)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1042)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:2052)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:3059)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method) ~[?:1.8.0_252]
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
~[?:1.8.0_252]
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
~[?:1.8.0_252]
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
~[?:1.8.0_252]
at
org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:121)
at
org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:88)
at
org.apache.hadoop.hdfs.DFSOutputStream.addBlock(DFSOutputStream.java:1122)
at
org.apache.hadoop.hdfs.DataStreamer.locateFollowingBlock(DataStreamer.java:1944)
at
org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1750)
at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:751)
Caused by: org.apache.hadoop.ipc.RemoteException: File does not exist:
/.../00000027-451d-4b7e-a213-eb71e5aa0979_15-32-31_20220919020324533.parquet
(inode 8691986250) Holder DFSClient_NONMAPREDUCE does not have any open files.
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2867)
at
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.analyzeFileState(FSDirWriteFileOp.java:521)
at
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.validateAddBlock(FSDirWriteFileOp.java:161)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2747)
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:918)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:532)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:529)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1087)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1126)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1042)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:2052)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:3059)
at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1590)
at org.apache.hadoop.ipc.Client.call(Client.java:1521)
at org.apache.hadoop.ipc.Client.call(Client.java:1418)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:251)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:130)
at com.sun.proxy.$Proxy27.addBlock(Unknown Source) ~[?:?]
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:472)
at sun.reflect.GeneratedMethodAccessor77.invoke(Unknown Source) ~[?:?]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_252]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_252]
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:449)
at
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:175)
at
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:167)
at
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:105)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:375)
at com.sun.proxy.$Proxy28.addBlock(Unknown Source) ~[?:?]
at
org.apache.hadoop.hdfs.DFSOutputStream.addBlock(DFSOutputStream.java:1119)
at
org.apache.hadoop.hdfs.DataStreamer.locateFollowingBlock(DataStreamer.java:1944)
at
org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1750)
at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:751)
2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging
[] - java.lang.RuntimeException: Buffer pool is destroyed.
2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging
[] - at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:109)
2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging
[] - at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:93)
2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging
[] - at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:44)
2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging
[] - at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging
[] - at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging
[] - at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging
[] - at
org.apache.hudi.sink.compact.CompactFunction.lambda$processElement$1(CompactFunction.java:95)
2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging
[] - at
org.apache.hudi.sink.utils.NonThrownExecutor.lambda$execute$0(NonThrownExecutor.java:103)
2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging
[] - at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging
[] - at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging
[] - at java.lang.Thread.run(Thread.java:748)
2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging
[] - Caused by: java.lang.IllegalStateException: Buffer pool is
destroyed.
2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging
[] - at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:337)
2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging
[] - at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilder(LocalBufferPool.java:279)
2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging
[] - at
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:348)
2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging
[] - at
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:331)
2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging
[] - at
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForNewRecord(BufferWritingResultPartition.java:263)
2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging
[] - at
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:142)
2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging
[] - at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:106)
2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging
[] - at
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54)
2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging
[] - at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
2022-09-19 07:08:41,710 ERROR org.apache.flink.core.io.SysoutLogging
[] - ... 10 more
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]