[
https://issues.apache.org/jira/browse/FLINK-38621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18040140#comment-18040140
]
Zakelly Lan commented on FLINK-38621:
-------------------------------------
[~faltomare] From the logs above, I assume it's the same issue with
FLINK-38324. But in your case, the attempt number is 2 instead of 0, which
means it's even more rare to occur. It would be nice if you could verify or
reproduce this on 2.2, which will be available in a few days.
> AsyncKeyedProcessOperator fails to restore with a
> java.io.FileNotFoundException: No such file or directory exception in ForSt
> -----------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-38621
> URL: https://issues.apache.org/jira/browse/FLINK-38621
> Project: Flink
> Issue Type: Bug
> Affects Versions: 2.1.0
> Reporter: Francis
> Priority: Major
>
> I'm currently seeing task manager failures with a root exception of
> {code:java}
> java.io.FileNotFoundException: No such file or directory{code}
> This issue seems related to https://issues.apache.org/jira/browse/FLINK-38324
> and https://issues.apache.org/jira/browse/FLINK-38433 however in my case it
> looks to be impacting an AsyncKeyedCoProcessOperator and not a
> AsyncStreamFlatMap function as mentioned in the other ticket.
> However, the symptoms are the same. My job is in a restart loop where it is
> failing to restore from a checkpoint attempt that does not exist. I've
> included the full stack trace below.
>
> From a configuration standpoint I am using Flink 2.1.0 and version 1.13 of
> the K8s operator. I am using the Hadoop S3 filesystem with a state recovery
> mode of CLAIM. There are no lifecycle rules on my S3 bucket that would result
> in files being deleted externally.
>
> Here are the potentially relevant parts of my configuration:
>
> {code:java}
> execution:
> state-recovery.claim-mode: CLAIM
> checkpointing:
> dir: "s3a://{{ .Values.state.bucketName }}/checkpoints"
> interval: {{ .Values.checkpoint.interval }}
> timeout: {{ .Values.checkpoint.timeout }}
> incremental: true
> mode: {{ .Values.checkpoint.mode }}
> max-concurrent-checkpoints: 1
> snapshot-compression: true
> savepoint-dir: "s3a://{{ .Values.state.bucketName }}/savepoints"
> externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
> min-pause: {{ .Values.checkpoint.interval }}
> num-retained: 20
> tolerable-failed-checkpoints: 2147483647
> job:
> savepointRedeployNonce: 8
> state: running
> upgradeMode: savepoint
> jarURI: local:///opt/flink/usrlib/flink-job.jar
> entryClass: {{ .Values.entryClass }}
> parallelism: {{ .Values.parallelism }}
> allowNonRestoredState: false
> state:
> backend:
> type: forst
> forst:
> executor.read-io-parallelism: 2
> sync.enforce-local: false
> memory.managed: false
> memory.fixed-per-slot: 1g
> local-dir: {{ .Values.dataMount }}/forst-local
> cache.dir: {{ .Values.dataMount }}/forst-cache
> cache:
> size-based-limit: {{ .Values.state.diskCacheLimit }}
> high-availability:
> type: kubernetes
> storageDir: "s3a://{{ .Values.state.bucketName }}/ha"
> kubernetes:
> cluster-id: {{ .Values.global.serviceName }}
> namespace: {{ .Values.global.namespace }}
> kubernetes:
> taskmanager.memory.limit-factor: 5
> taskmanager.cpu.limit-factor: 5
> rest-service:
> exposed.type: ClusterIP
> operator:
> job.upgrade.last-state.max.allowed.checkpoint.age: 24h
> savepoint:
> format.type: NATIVE
> history:
> max.count: {{ .Values.savepoint.maxCountToKeep }}
> periodic:
> savepoint.interval: {{ .Values.savepoint.interval }}{code}
>
>
> And the stack trace:
>
> {code:java}
> op_AsyncKeyedCoProcessOperator_84984c1fa2897a7a40e728e4e87e1da9__19_20__attempt_2/db/91e56e4d-31ae-4ee4-ab17-704a00a45627
> [5343318 bytes], localPath='002991.sst'}],
> metaStateHandle=ByteStreamStateHandle{handleName='s3a://conversation-customer-thread-events-state-production/checkpoints/68e582d9490de1ef330c9d03ff6559d9/chk-12/da15b537-64d4-4651-800d-2ce8d8d4f5c9',
> dataBytes=1310}, stateHandleId=49cb0a39-a8a1-49fd-9e77-dd35dd3a3713}]).
> [2025-11-04 09:44:21] Exception in thread
> "processor-thread-conversation-id (19/20)#860" java.io.FileNotFoundException:
> No such file or directory:
> s3a://conversation-customer-thread-events-state-production/checkpoints/68e582d9490de1ef330c9d03ff6559d9/shared/op_AsyncKeyedProcessOperator_08ba072f7d4350d148d1fdf71e53ea7c__19_20__attempt_2/db/3acf0699-40c1-48ab-a42a-689829409fbf
> [2025-11-04 09:44:21] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3866)
> [2025-11-04 09:44:21] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)
> [2025-11-04 09:44:21] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getFileStatus$24(S3AFileSystem.java:3556)
> [2025-11-04 09:44:21] at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
> [2025-11-04 09:44:21] at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
> [2025-11-04 09:44:21] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
> [2025-11-04 09:44:21] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
> [2025-11-04 09:44:21] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:3554)
> [2025-11-04 09:44:21] at
> org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:88)
> [2025-11-04 09:44:21] at
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.getFileStatus(PluginFileSystemFactory.java:106)
> [2025-11-04 09:44:21] at
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.getFileStatus(SafetyNetWrapperFileSystem.java:78)
> [2025-11-04 09:44:21] at
> org.apache.flink.state.forst.fs.ForStFlinkFileSystem.getFileStatus(ForStFlinkFileSystem.java:332)
> [2025-11-04 09:44:21] at
> org.apache.flink.state.forst.fs.ForStFlinkFileSystem.listStatus(ForStFlinkFileSystem.java:368)
> [2025-11-04 09:44:21] at
> org.apache.flink.state.forst.fs.StringifiedForStFileSystem.listStatus(StringifiedForStFileSystem.java:52)
> [2025-11-04 09:44:21] at org.forstdb.RocksDB.open(Native Method)
> [2025-11-04 09:44:21] at org.forstdb.RocksDB.open(RocksDB.java:318)
> [2025-11-04 09:44:21] at
> org.apache.flink.state.forst.ForStOperationUtils.openDB(ForStOperationUtils.java:85)
> [2025-11-04 09:44:21] at
> org.apache.flink.state.forst.restore.ForStHandle.loadDb(ForStHandle.java:140)
> [2025-11-04 09:44:21] at
> org.apache.flink.state.forst.restore.ForStHandle.openDB(ForStHandle.java:128)
> [2025-11-04 09:44:21] at
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.restoreBaseDBFromMainHandle(ForStIncrementalRestoreOperation.java:379)
> [2025-11-04 09:44:21] at
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.initBaseDBFromSingleStateHandle(ForStIncrementalRestoreOperation.java:301)
> [2025-11-04 09:44:21] at
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.innerRestore(ForStIncrementalRestoreOperation.java:278)
> [2025-11-04 09:44:21] at
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.lambda$restore$1(ForStIncrementalRestoreOperation.java:237)
> [2025-11-04 09:44:21] at
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.runAndReportDuration(ForStIncrementalRestoreOperation.java:438)
> [2025-11-04 09:44:21] at
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.restore(ForStIncrementalRestoreOperation.java:237)
> [2025-11-04 09:44:21] at
> org.apache.flink.state.forst.ForStKeyedStateBackendBuilder.build(ForStKeyedStateBackendBuilder.java:261)
> [2025-11-04 09:44:21] at
> org.apache.flink.state.forst.ForStStateBackend.createAsyncKeyedStateBackend(ForStStateBackend.java:474)
> [2025-11-04 09:44:21] at
> org.apache.flink.state.forst.ForStStateBackend.createAsyncKeyedStateBackend(ForStStateBackend.java:98)
> [2025-11-04 09:44:21] at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3(StreamTaskStateInitializerImpl.java:475)
> [2025-11-04 09:44:21] at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:173)
> [2025-11-04 09:44:21] at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
> [2025-11-04 09:44:21] at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:486)
> [2025-11-04 09:44:21] at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
> [2025-11-04 09:44:21] at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:280)
> [2025-11-04 09:44:21] at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
> [2025-11-04 09:44:21] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
> [2025-11-04 09:44:21] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
> [2025-11-04 09:44:21] at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> [2025-11-04 09:44:21] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
> [2025-11-04 09:44:21] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
> [2025-11-04 09:44:21] at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:963)
> [2025-11-04 09:44:21] at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
> [2025-11-04 09:44:21] at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756)
> [2025-11-04 09:44:21] at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
> [2025-11-04 09:44:21] at java.base/java.lang.Thread.run(Unknown
> Source)
> [2025-11-04 09:44:21] RocksDBExceptionJni::ThrowNew/StatusJni - Error:
> unexpected exception!
> [2025-11-04 09:44:22] 2025-11-04 08:44:22,017 INFO
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer [] - Legacy
> kryo serializer scala extensions are not available.
> [2025-11-04 09:44:22] 2025-11-04 08:44:22,017 INFO
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer [] - Kryo
> serializer scala extensions are not available.
> [2025-11-04 09:44:22] Exception in thread
> "processor-thread-conversation-customer ->
> kafka-sink-thread-conversation-customer: Writer ->
> kafka-sink-thread-conversation-customer: Committer (19/20)#860"
> java.io.FileNotFoundException: No such file or directory:
> s3a://conversation-customer-thread-events-state-production/checkpoints/68e582d9490de1ef330c9d03ff6559d9/shared/op_AsyncKeyedCoProcessOperator_84984c1fa2897a7a40e728e4e87e1da9__19_20__attempt_2/db/ad742554-d9d8-4a0f-830d-61da66510143
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3866)
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getFileStatus$24(S3AFileSystem.java:3556)
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:3554)
> [2025-11-04 09:44:22] at
> org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:88)
> [2025-11-04 09:44:22] at
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.getFileStatus(PluginFileSystemFactory.java:106)
> [2025-11-04 09:44:22] at
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.getFileStatus(SafetyNetWrapperFileSystem.java:78)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.fs.ForStFlinkFileSystem.getFileStatus(ForStFlinkFileSystem.java:332)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.fs.ForStFlinkFileSystem.listStatus(ForStFlinkFileSystem.java:368)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.fs.StringifiedForStFileSystem.listStatus(StringifiedForStFileSystem.java:52)
> [2025-11-04 09:44:22] at org.forstdb.RocksDB.open(Native Method)
> [2025-11-04 09:44:22] at org.forstdb.RocksDB.open(RocksDB.java:318)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.ForStOperationUtils.openDB(ForStOperationUtils.java:85)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.restore.ForStHandle.loadDb(ForStHandle.java:140)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.restore.ForStHandle.openDB(ForStHandle.java:128)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.restoreBaseDBFromMainHandle(ForStIncrementalRestoreOperation.java:379)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.initBaseDBFromSingleStateHandle(ForStIncrementalRestoreOperation.java:301)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.innerRestore(ForStIncrementalRestoreOperation.java:278)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.lambda$restore$1(ForStIncrementalRestoreOperation.java:237)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.runAndReportDuration(ForStIncrementalRestoreOperation.java:438)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.restore(ForStIncrementalRestoreOperation.java:237)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.ForStKeyedStateBackendBuilder.build(ForStKeyedStateBackendBuilder.java:261)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.ForStStateBackend.createAsyncKeyedStateBackend(ForStStateBackend.java:474)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.ForStStateBackend.createAsyncKeyedStateBackend(ForStStateBackend.java:98)
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3(StreamTaskStateInitializerImpl.java:475)
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:173)
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:486)
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:280)
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
> [2025-11-04 09:44:22] at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:963)
> [2025-11-04 09:44:22] at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
> [2025-11-04 09:44:22] at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756)
> [2025-11-04 09:44:22] at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
> [2025-11-04 09:44:22] at java.base/java.lang.Thread.run(Unknown
> Source)
> [2025-11-04 09:44:22] Exception in thread
> "processor-thread-conversation-customer ->
> kafka-sink-thread-conversation-customer: Writer ->
> kafka-sink-thread-conversation-customer: Committer (19/20)#860"
> java.io.FileNotFoundException: No such file or directory:
> s3a://conversation-customer-thread-events-state-production/checkpoints/68e582d9490de1ef330c9d03ff6559d9/shared/op_AsyncKeyedCoProcessOperator_84984c1fa2897a7a40e728e4e87e1da9__19_20__attempt_2/db/ad742554-d9d8-4a0f-830d-61da66510143
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3866)
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getFileStatus$24(S3AFileSystem.java:3556)
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:3554)
> [2025-11-04 09:44:22] at
> org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:88)
> [2025-11-04 09:44:22] at
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.getFileStatus(PluginFileSystemFactory.java:106)
> [2025-11-04 09:44:22] at
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.getFileStatus(SafetyNetWrapperFileSystem.java:78)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.fs.ForStFlinkFileSystem.getFileStatus(ForStFlinkFileSystem.java:332)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.fs.ForStFlinkFileSystem.listStatus(ForStFlinkFileSystem.java:368)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.fs.StringifiedForStFileSystem.listStatus(StringifiedForStFileSystem.java:52)
> [2025-11-04 09:44:22] at org.forstdb.RocksDB.open(Native Method)
> [2025-11-04 09:44:22] at org.forstdb.RocksDB.open(RocksDB.java:318)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.ForStOperationUtils.openDB(ForStOperationUtils.java:85)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.restore.ForStHandle.loadDb(ForStHandle.java:140)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.restore.ForStHandle.openDB(ForStHandle.java:128)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.restoreBaseDBFromMainHandle(ForStIncrementalRestoreOperation.java:379)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.initBaseDBFromSingleStateHandle(ForStIncrementalRestoreOperation.java:301)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.innerRestore(ForStIncrementalRestoreOperation.java:278)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.lambda$restore$1(ForStIncrementalRestoreOperation.java:237)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.runAndReportDuration(ForStIncrementalRestoreOperation.java:438)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.restore(ForStIncrementalRestoreOperation.java:237)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.ForStKeyedStateBackendBuilder.build(ForStKeyedStateBackendBuilder.java:261)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.ForStStateBackend.createAsyncKeyedStateBackend(ForStStateBackend.java:474)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.ForStStateBackend.createAsyncKeyedStateBackend(ForStStateBackend.java:98)
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3(StreamTaskStateInitializerImpl.java:475)
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:173)
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:486)
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:280)
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
> [2025-11-04 09:44:22] at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:963)
> [2025-11-04 09:44:22] at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
> [2025-11-04 09:44:22] at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756)
> [2025-11-04 09:44:22] at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
> [2025-11-04 09:44:22] at java.base/java.lang.Thread.run(Unknown
> Source)
> [2025-11-04 09:44:22] 2025-11-04 08:44:22,059 INFO
> org.apache.flink.runtime.taskmanager.Task [] - Attempting
> to cancel task Source: kafka-source-thread_events_v1 (19/20)#860
> (56f77e2196fe7f3ac0662857735727d9_bd9e548cb0c865193602f71910cbd8d6_18_860).
> [2025-11-04 09:44:22] 2025-11-04 08:44:22,059 INFO
> org.apache.flink.runtime.taskmanager.Task [] - Source:
> kafka-source-thread_events_v1 (19/20)#860
> (56f77e2196fe7f3ac0662857735727d9_bd9e548cb0c865193602f71910cbd8d6_18_860)
> switched from RUNNING to CANCELING.
> [2025-11-04 09:44:22] 2025-11-04 08:44:22,059 INFO
> org.apache.flink.runtime.taskmanager.Task [] - Triggering
> cancellation of task code Source: kafka-source-thread_events_v1 (19/20)#860
> (56f77e2196fe7f3ac0662857735727d9_bd9e548cb0c865193602f71910cbd8d6_18_860).
> [2025-11-04 09:44:22] 2025-11-04 08:44:22,060 INFO
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing
> Source Reader.
> [2025-11-04 09:44:22] 2025-11-04 08:44:22,060 INFO
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
> Shutting down split fetcher 0
> [2025-11-04 09:44:22] 2025-11-04 08:44:22,060 INFO
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [] -
> [Consumer clientId=conversation-customer-thread-events-v1-consumer-18,
> groupId=conversation-customer-thread-events-v1-consumer] Resetting generation
> and member id due to: consumer pro-actively leaving the group
> [2025-11-04 09:44:22] 2025-11-04 08:44:22,060 INFO
> org.apache.flink.runtime.taskmanager.Task [] - Attempting
> to cancel task Source: kafka-source-conversation_customer_v1 (19/20)#860
> (56f77e2196fe7f3ac0662857735727d9_01393662c0a19669846621424653c56f_18_860).
> [2025-11-04 09:44:22] 2025-11-04 08:44:22,061 INFO
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [] -
> [Consumer clientId=conversation-customer-thread-events-v1-consumer-18,
> groupId=conversation-customer-thread-events-v1-consumer] Request joining
> group due to: consumer pro-actively leaving the group
> [2025-11-04 09:44:22] 2025-11-04 08:44:22,061 INFO
> org.apache.flink.runtime.taskmanager.Task [] - Source:
> kafka-source-conversation_customer_v1 (19/20)#860
> (56f77e2196fe7f3ac0662857735727d9_01393662c0a19669846621424653c56f_18_860)
> switched from RUNNING to CANCELING.
> [2025-11-04 09:44:22] 2025-11-04 08:44:22,061 INFO
> org.apache.flink.runtime.taskmanager.Task [] - Triggering
> cancellation of task code Source: kafka-source-conversation_customer_v1
> (19/20)#860
> (56f77e2196fe7f3ac0662857735727d9_01393662c0a19669846621424653c56f_18_860).
> [2025-11-04 09:44:22] 2025-11-04 08:44:22,062 INFO
> org.apache.flink.runtime.taskmanager.Task [] - Attempting
> to cancel task processor-thread-conversation-id (19/20)#860
> (56f77e2196fe7f3ac0662857735727d9_08ba072f7d4350d148d1fdf71e53ea7c_18_860).
> [2025-11-04 09:44:22] 2025-11-04 08:44:22,062 INFO
> org.apache.flink.runtime.taskmanager.Task [] -
> processor-thread-conversation-id (19/20)#860
> (56f77e2196fe7f3ac0662857735727d9_08ba072f7d4350d148d1fdf71e53ea7c_18_860)
> switched from INITIALIZING to CANCELING.
> [2025-11-04 09:44:22] 2025-11-04 08:44:22,062 INFO
> org.apache.flink.runtime.taskmanager.Task [] - Triggering
> cancellation of task code processor-thread-conversation-id (19/20)#860
> (56f77e2196fe7f3ac0662857735727d9_08ba072f7d4350d148d1fdf71e53ea7c_18_860).
> [2025-11-04 09:44:22] 2025-11-04 08:44:22,063 INFO
> org.apache.kafka.common.metrics.Metrics [] - Metrics
> scheduler closed
> [2025-11-04 09:44:22] 2025-11-04 08:44:22,063 INFO
> org.apache.kafka.common.metrics.Metrics [] - Closing
> reporter org.apache.kafka.common.metrics.JmxReporter
> [2025-11-04 09:44:22] 2025-11-04 08:44:22,063 INFO
> org.apache.kafka.common.metrics.Metrics [] - Closing
> reporter org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter
> [2025-11-04 09:44:22] 2025-11-04 08:44:22,063 INFO
> org.apache.kafka.common.metrics.Metrics [] - Metrics
> reporters closed
> [2025-11-04 09:44:22] 2025-11-04 08:44:22,064 INFO
> org.apache.flink.runtime.taskmanager.Task [] - Attempting
> to cancel task processor-thread-conversation-customer ->
> kafka-sink-thread-conversation-customer: Writer ->
> kafka-sink-thread-conversation-customer: Committer (19/20)#860
> (56f77e2196fe7f3ac0662857735727d9_84984c1fa2897a7a40e728e4e87e1da9_18_860).
> [2025-11-04 09:44:22] 2025-11-04 08:44:22,064 INFO
> org.apache.flink.runtime.taskmanager.Task [] -
> processor-thread-conversation-customer ->
> kafka-sink-thread-conversation-customer: Writer ->
> kafka-sink-thread-conversation-customer: Committer (19/20)#860
> (56f77e2196fe7f3ac0662857735727d9_84984c1fa2897a7a40e728e4e87e1da9_18_860)
> switched from INITIALIZING to CANCELING.
> [2025-11-04 09:44:22] 2025-11-04 08:44:22,064 INFO
> org.apache.flink.runtime.taskmanager.Task [] - Triggering
> cancellation of task code processor-thread-conversation-customer ->
> kafka-sink-thread-conversation-customer: Writer ->
> kafka-sink-thread-conversation-customer: Committer (19/20)#860
> (56f77e2196fe7f3ac0662857735727d9_84984c1fa2897a7a40e728e4e87e1da9_18_860).
> [2025-11-04 09:44:22] 2025-11-04 08:44:22,065 INFO
> org.apache.kafka.common.utils.AppInfoParser [] - App info
> kafka.consumer for conversation-customer-thread-events-v1-consumer-18
> unregistered
> [2025-11-04 09:44:22] 2025-11-04 08:44:22,065 INFO
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split
> fetcher 0 exited.
> [2025-11-04 09:44:22] 2025-11-04 08:44:22,065 INFO
> org.apache.flink.runtime.taskmanager.Task [] - Source:
> kafka-source-thread_events_v1 (19/20)#860
> (56f77e2196fe7f3ac0662857735727d9_bd9e548cb0c865193602f71910cbd8d6_18_860)
> switched from CANCELING to CANCELED.
> [2025-11-04 09:44:22] 2025-11-04 08:44:22,065 INFO
> org.apache.flink.runtime.taskmanager.Task [] - Freeing
> task resources for Source: kafka-source-thread_events_v1 (19/20)#860
> (56f77e2196fe7f3ac0662857735727d9_bd9e548cb0c865193602f71910cbd8d6_18_860).
> [2025-11-04 09:44:22] 2025-11-04 08:44:22,065 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] -
> Un-registering task and sending final execution state CANCELED to JobManager
> for task Source: kafka-source-thread_events_v1 (19/20)#860
> 56f77e2196fe7f3ac0662857735727d9_bd9e548cb0c865193602f71910cbd8d6_18_860.
> [2025-11-04 09:44:22] Exception in thread
> "processor-thread-conversation-customer ->
> kafka-sink-thread-conversation-customer: Writer ->
> kafka-sink-thread-conversation-customer: Committer (19/20)#860"
> java.io.InterruptedIOException: getFileStatus on
> s3a://conversation-customer-thread-events-state-production/checkpoints/68e582d9490de1ef330c9d03ff6559d9/shared/op_AsyncKeyedCoProcessOperator_84984c1fa2897a7a40e728e4e87e1da9__19_20__attempt_860/db:
> com.amazonaws.AbortedException:
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AUtils.translateInterruptedException(S3AUtils.java:395)
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:201)
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:175)
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3861)
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$exists$34(S3AFileSystem.java:4703)
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.exists(S3AFileSystem.java:4701)
> [2025-11-04 09:44:22] at
> org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.exists(HadoopFileSystem.java:168)
> [2025-11-04 09:44:22] at
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.exists(PluginFileSystemFactory.java:149)
> [2025-11-04 09:44:22] at
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.exists(SafetyNetWrapperFileSystem.java:117)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.fs.ForStFlinkFileSystem.exists(ForStFlinkFileSystem.java:298)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.fs.StringifiedForStFileSystem.exists(StringifiedForStFileSystem.java:44)
> [2025-11-04 09:44:22] at org.forstdb.RocksDB.open(Native Method)
> [2025-11-04 09:44:22] at org.forstdb.RocksDB.open(RocksDB.java:318)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.ForStOperationUtils.openDB(ForStOperationUtils.java:85)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.restore.ForStHandle.loadDb(ForStHandle.java:140)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.restore.ForStHandle.openDB(ForStHandle.java:128)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.restoreBaseDBFromMainHandle(ForStIncrementalRestoreOperation.java:379)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.initBaseDBFromSingleStateHandle(ForStIncrementalRestoreOperation.java:301)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.innerRestore(ForStIncrementalRestoreOperation.java:278)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.lambda$restore$1(ForStIncrementalRestoreOperation.java:237)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.runAndReportDuration(ForStIncrementalRestoreOperation.java:438)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.restore(ForStIncrementalRestoreOperation.java:237)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.ForStKeyedStateBackendBuilder.build(ForStKeyedStateBackendBuilder.java:261)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.ForStStateBackend.createAsyncKeyedStateBackend(ForStStateBackend.java:474)
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.ForStStateBackend.createAsyncKeyedStateBackend(ForStStateBackend.java:98)
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3(StreamTaskStateInitializerImpl.java:475)
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:173)
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:486)
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:280)
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
> [2025-11-04 09:44:22] at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:963)
> [2025-11-04 09:44:22] at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
> [2025-11-04 09:44:22] at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756)
> [2025-11-04 09:44:22] at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
> [2025-11-04 09:44:22] at java.base/java.lang.Thread.run(Unknown
> Source)
> [2025-11-04 09:44:22] Caused by: com.amazonaws.AbortedException:
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleInterruptedException(AmazonHttpClient.java:906)
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:781)
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:735)
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:717)
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:581)
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:559)
> [2025-11-04 09:44:22] at
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5593)
> [2025-11-04 09:44:22] at
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5540)
> [2025-11-04 09:44:22] at
> com.amazonaws.services.s3.AmazonS3Client.listObjectsV2(AmazonS3Client.java:1007)
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$listObjects$11(S3AFileSystem.java:2595)
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:414)
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:377)
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.listObjects(S3AFileSystem.java:2586)
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3832)
> [2025-11-04 09:44:22] ... 43 more
> [2025-11-04 09:44:22] Caused by:
> com.amazonaws.http.timers.client.SdkInterruptedException
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.checkInterrupted(AmazonHttpClient.java:961)
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.checkInterrupted(AmazonHttpClient.java:947)
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1141)
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:838)
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:805)
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:779)
> [2025-11-04 09:44:22] ... 56 more
> [2025-11-04 09:44:22] RocksDBExceptionJni::ThrowNew/StatusJni - Error:
> unexpected exception!
> [2025-11-04 09:44:22] 2025-11-04 08:44:22,069 WARN
> org.apache.flink.state.forst.ForStKeyedStateBackendBuilder [] - Failed to
> delete ForSt local base path
> /flink-data/forst-local/68e582d9490de1ef330c9d03ff6559d9/op_AsyncKeyedCoProcessOperator_84984c1fa2897a7a40e728e4e87e1da9__19_20__attempt_860,
> remote base path
> s3a://conversation-customer-thread-events-state-production/checkpoints/68e582d9490de1ef330c9d03ff6559d9/shared/op_AsyncKeyedCoProcessOperator_84984c1fa2897a7a40e728e4e87e1da9__19_20__attempt_860.
> [2025-11-04 09:44:22] java.io.InterruptedIOException: getFileStatus on
> s3a://conversation-customer-thread-events-state-production/checkpoints/68e582d9490de1ef330c9d03ff6559d9/shared/op_AsyncKeyedCoProcessOperator_84984c1fa2897a7a40e728e4e87e1da9__19_20__attempt_860:
> com.amazonaws.AbortedException:
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AUtils.translateInterruptedException(S3AUtils.java:395)
> ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:201) ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:175) ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3799)
> ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)
> ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$exists$34(S3AFileSystem.java:4703)
> ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
> ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
> ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
> ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
> ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.exists(S3AFileSystem.java:4701) ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.exists(HadoopFileSystem.java:168)
> ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.exists(PluginFileSystemFactory.java:149)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.exists(SafetyNetWrapperFileSystem.java:117)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.ForStResourceContainer.clearDirectories(ForStResourceContainer.java:446)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.ForStResourceContainer.forceClearRemoteDirectories(ForStResourceContainer.java:440)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.ForStKeyedStateBackendBuilder.build(ForStKeyedStateBackendBuilder.java:303)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.ForStStateBackend.createAsyncKeyedStateBackend(ForStStateBackend.java:474)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.ForStStateBackend.createAsyncKeyedStateBackend(ForStStateBackend.java:98)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3(StreamTaskStateInitializerImpl.java:475)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:173)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:486)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:280)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:963)
> [flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
> [flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756)
> [flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
> [flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at java.base/java.lang.Thread.run(Unknown
> Source) [?:?]
> [2025-11-04 09:44:22] Caused by: com.amazonaws.AbortedException:
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleInterruptedException(AmazonHttpClient.java:906)
> ~[?:?]
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:781)
> ~[?:?]
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:735)
> ~[?:?]
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:717)
> ~[?:?]
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:581) ~[?:?]
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:559) ~[?:?]
> [2025-11-04 09:44:22] at
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5593)
> ~[?:?]
> [2025-11-04 09:44:22] at
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5540)
> ~[?:?]
> [2025-11-04 09:44:22] at
> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1422)
> ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getObjectMetadata$10(S3AFileSystem.java:2545)
> ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:414) ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:377) ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2533)
> ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2513)
> ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3776)
> ~[?:?]
> [2025-11-04 09:44:22] ... 32 more
> [2025-11-04 09:44:22] Caused by:
> com.amazonaws.http.timers.client.SdkInterruptedException
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.checkInterrupted(AmazonHttpClient.java:961)
> ~[?:?]
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.checkInterrupted(AmazonHttpClient.java:947)
> ~[?:?]
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1141)
> ~[?:?]
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:838)
> ~[?:?]
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:805)
> ~[?:?]
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:779)
> ~[?:?]
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:735)
> ~[?:?]
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:717)
> ~[?:?]
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:581) ~[?:?]
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:559) ~[?:?]
> [2025-11-04 09:44:22] at
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5593)
> ~[?:?]
> [2025-11-04 09:44:22] at
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5540)
> ~[?:?]
> [2025-11-04 09:44:22] at
> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1422)
> ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getObjectMetadata$10(S3AFileSystem.java:2545)
> ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:414) ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:377) ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2533)
> ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2513)
> ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3776)
> ~[?:?]
> [2025-11-04 09:44:22] ... 32 more
> [2025-11-04 09:44:22] 2025-11-04 08:44:22,070 ERROR
> org.apache.flink.state.forst.ForStKeyedStateBackendBuilder [] - Caught
> unexpected exception.
> [2025-11-04 09:44:22] java.io.InterruptedIOException: getFileStatus on
> s3a://conversation-customer-thread-events-state-production/checkpoints/68e582d9490de1ef330c9d03ff6559d9/shared/op_AsyncKeyedCoProcessOperator_84984c1fa2897a7a40e728e4e87e1da9__19_20__attempt_860/db:
> com.amazonaws.AbortedException:
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AUtils.translateInterruptedException(S3AUtils.java:395)
> ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:201) ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:175) ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3861)
> ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)
> ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$exists$34(S3AFileSystem.java:4703)
> ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
> ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
> ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
> ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
> ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.exists(S3AFileSystem.java:4701) ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.exists(HadoopFileSystem.java:168)
> ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.exists(PluginFileSystemFactory.java:149)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.exists(SafetyNetWrapperFileSystem.java:117)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.fs.ForStFlinkFileSystem.exists(ForStFlinkFileSystem.java:298)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.fs.StringifiedForStFileSystem.exists(StringifiedForStFileSystem.java:44)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at org.forstdb.RocksDB.open(Native Method)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at org.forstdb.RocksDB.open(RocksDB.java:318)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.ForStOperationUtils.openDB(ForStOperationUtils.java:85)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.restore.ForStHandle.loadDb(ForStHandle.java:140)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.restore.ForStHandle.openDB(ForStHandle.java:128)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.restoreBaseDBFromMainHandle(ForStIncrementalRestoreOperation.java:379)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.initBaseDBFromSingleStateHandle(ForStIncrementalRestoreOperation.java:301)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.innerRestore(ForStIncrementalRestoreOperation.java:278)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.lambda$restore$1(ForStIncrementalRestoreOperation.java:237)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.runAndReportDuration(ForStIncrementalRestoreOperation.java:438)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.restore(ForStIncrementalRestoreOperation.java:237)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.ForStKeyedStateBackendBuilder.build(ForStKeyedStateBackendBuilder.java:261)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.ForStStateBackend.createAsyncKeyedStateBackend(ForStStateBackend.java:474)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.state.forst.ForStStateBackend.createAsyncKeyedStateBackend(ForStStateBackend.java:98)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3(StreamTaskStateInitializerImpl.java:475)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:173)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:486)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:280)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:963)
> [flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
> [flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756)
> [flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
> [flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04 09:44:22] at java.base/java.lang.Thread.run(Unknown
> Source) [?:?]
> [2025-11-04 09:44:22] Caused by: com.amazonaws.AbortedException:
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleInterruptedException(AmazonHttpClient.java:906)
> ~[?:?]
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:781)
> ~[?:?]
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:735)
> ~[?:?]
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:717)
> ~[?:?]
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:581) ~[?:?]
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:559) ~[?:?]
> [2025-11-04 09:44:22] at
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5593)
> ~[?:?]
> [2025-11-04 09:44:22] at
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5540)
> ~[?:?]
> [2025-11-04 09:44:22] at
> com.amazonaws.services.s3.AmazonS3Client.listObjectsV2(AmazonS3Client.java:1007)
> ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$listObjects$11(S3AFileSystem.java:2595)
> ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
> ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:414) ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:377) ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.listObjects(S3AFileSystem.java:2586)
> ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3832)
> ~[?:?]
> [2025-11-04 09:44:22] ... 43 more
> [2025-11-04 09:44:22] Caused by:
> com.amazonaws.http.timers.client.SdkInterruptedException
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.checkInterrupted(AmazonHttpClient.java:961)
> ~[?:?]
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.checkInterrupted(AmazonHttpClient.java:947)
> ~[?:?]
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1141)
> ~[?:?]
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:838)
> ~[?:?]
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:805)
> ~[?:?]
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:779)
> ~[?:?]
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:735)
> ~[?:?]
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:717)
> ~[?:?]
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:581) ~[?:?]
> [2025-11-04 09:44:22] at
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:559) ~[?:?]
> [2025-11-04 09:44:22] at
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5593)
> ~[?:?]
> [2025-11-04 09:44:22] at
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5540)
> ~[?:?]
> [2025-11-04 09:44:22] at
> com.amazonaws.services.s3.AmazonS3Client.listObjectsV2(AmazonS3Client.java:1007)
> ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$listObjects$11(S3AFileSystem.java:2595)
> ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
> ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:414) ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:377) ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.listObjects(S3AFileSystem.java:2586)
> ~[?:?]
> [2025-11-04 09:44:22] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3832)
> ~[?:?]
> [2025-11-04 09:44:22] ... 43 more
> [2025-11-04 09:44:22] 2025-11-04 08:44:22,071 INFO
> org.apache.kafka.clients.producer.KafkaProducer [] - [Producer
> clientId=conversation-customer-thread-events-v1-producer] Closing the Kafka
> producer with timeoutMillis = 0 ms.
> [2025-11-04 09:44:22] 2025-11-04 08:44:22,071 INFO
> org.apache.kafka.clients.producer.KafkaProducer [] - [Producer
> clientId=conversation-customer-thread-events-v1-producer] Proceeding to force
> close the producer since pending requests could not be completed within
> timeout 0 ms.
> [2025-11-04 09:44:22] 2025-11-04 08:44:22,072 INFO
> org.apache.kafka.common.metrics.Metrics [] - Metrics
> scheduler closed
> [2025-11-04 09:44:22] 2025-11-04 08:44:22,072 INFO
> org.apache.kafka.common.metrics.Metrics [] - Closing
> reporter org.apache.kafka.common.metrics.JmxReporter
> [2025-11-04 09:44:22] 2025-11-04 08:44:22,072 INFO
> org.apache.kafka.common.metrics.Metrics [] - Closing
> reporter org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter
> [2025-11-04 09:44:22] 2025-11-04 08:44:22,072 INFO
> org.apache.kafka.common.metrics.Metrics [] - Metrics
> reporters closed
> [2025-11-04 09:44:22] 2025-11-04 08:44:22,073 INFO
> org.apache.kafka.common.utils.AppInfoParser [] - App info
> kafka.producer for conversation-customer-thread-events-v1-producer
> unregistered
> [2025-11-04 09:44:22] 2025-11-04 08:44:22,073 INFO
> org.apache.flink.runtime.taskmanager.Task [] -
> processor-thread-conversation-customer ->
> kafka-sink-thread-conversation-customer: Writer ->
> kafka-sink-thread-conversation-customer: Committer (19/20)#860
> (56f77e2196fe7f3ac0662857735727d9_84984c1fa2897a7a40e728e4e87e1da9_18_860)
> switched from CANCELING to CANCELED.
> [2025-11-04 09:44:22] 2025-11-04 08:44:22,073 INFO
> org.apache.flink.runtime.taskmanager.Task [] - Freeing
> task resources for processor-thread-conversation-customer ->
> kafka-sink-thread-conversation-customer: Writer ->
> kafka-sink-thread-conversation-customer: Committer (19/20)#860
> (56f77e2196fe7f3ac0662857735727d9_84984c1fa2897a7a40e728e4e87e1da9_18_860).
> [2025-11-04 09:44:22] 2025-11-04 08:44:22,073 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] -
> Un-registering task and sending final execution state CANCELED to JobManager
> for task processor-thread-conversation-customer ->
> kafka-sink-thread-conversation-customer: Writer ->
> kafka-sink-thread-conversation-customer: Committer (19/20)#860 {code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)