[ 
https://issues.apache.org/jira/browse/FLINK-38621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18040140#comment-18040140
 ] 

Zakelly Lan commented on FLINK-38621:
-------------------------------------

[~faltomare] From the logs above, I assume it's the same issue with 
FLINK-38324. But in your case, the attempt number is 2 instead of 0, which 
means it's even more rare to occur. It would be nice if you could verify or 
reproduce this on 2.2, which will be available in a few days.

> AsyncKeyedProcessOperator fails to restore with a 
> java.io.FileNotFoundException: No such file or directory exception in ForSt
> -----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-38621
>                 URL: https://issues.apache.org/jira/browse/FLINK-38621
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 2.1.0
>            Reporter: Francis
>            Priority: Major
>
> I'm currently seeing task manager failures with a root exception of 
> {code:java}
> java.io.FileNotFoundException: No such file or directory{code}
> This issue seems related to https://issues.apache.org/jira/browse/FLINK-38324 
> and https://issues.apache.org/jira/browse/FLINK-38433 however in my case it 
> looks to be impacting an AsyncKeyedCoProcessOperator and not a 
> AsyncStreamFlatMap function as mentioned in the other ticket.
> However, the symptoms are the same. My job is in a restart loop where it is 
> failing to restore from a checkpoint attempt that does not exist. I've 
> included the full stack trace below.
>  
> From a configuration standpoint I am using Flink 2.1.0 and version 1.13 of 
> the K8s operator. I am using the Hadoop S3 filesystem with a state recovery 
> mode of CLAIM. There are no lifecycle rules on my S3 bucket that would result 
> in files being deleted externally. 
>  
> Here are the potentially relevant parts of my configuration:
>  
> {code:java}
> execution:
>   state-recovery.claim-mode: CLAIM
>   checkpointing:
>     dir: "s3a://{{ .Values.state.bucketName }}/checkpoints"
>     interval: {{ .Values.checkpoint.interval }}
>     timeout: {{ .Values.checkpoint.timeout }}
>     incremental: true
>     mode: {{ .Values.checkpoint.mode }}
>     max-concurrent-checkpoints: 1
>     snapshot-compression: true
>     savepoint-dir: "s3a://{{ .Values.state.bucketName }}/savepoints"
>     externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
>     min-pause: {{ .Values.checkpoint.interval }}
>     num-retained: 20
>     tolerable-failed-checkpoints: 2147483647 
> job:
>   savepointRedeployNonce: 8
>   state: running
>   upgradeMode: savepoint
>   jarURI: local:///opt/flink/usrlib/flink-job.jar
>   entryClass: {{ .Values.entryClass }}
>   parallelism: {{ .Values.parallelism }}
>   allowNonRestoredState: false
> state:
>   backend:
>     type: forst
>     forst:
>       executor.read-io-parallelism: 2
>       sync.enforce-local: false 
>       memory.managed: false 
>       memory.fixed-per-slot: 1g
>       local-dir: {{ .Values.dataMount }}/forst-local
>       cache.dir: {{ .Values.dataMount }}/forst-cache
>       cache:
>         size-based-limit: {{ .Values.state.diskCacheLimit }}
> high-availability:
>   type: kubernetes
>   storageDir: "s3a://{{ .Values.state.bucketName }}/ha"
>   kubernetes:
>     cluster-id: {{ .Values.global.serviceName }}
>     namespace: {{ .Values.global.namespace }}
> kubernetes:
>   taskmanager.memory.limit-factor: 5
>   taskmanager.cpu.limit-factor: 5
>   rest-service:
>     exposed.type: ClusterIP
>   operator:
>     job.upgrade.last-state.max.allowed.checkpoint.age: 24h
>     savepoint:
>       format.type: NATIVE
>       history:
>         max.count: {{ .Values.savepoint.maxCountToKeep }}
>     periodic:
>       savepoint.interval: {{ .Values.savepoint.interval }}{code}
>  
>  
> And the stack trace:
>  
> {code:java}
> op_AsyncKeyedCoProcessOperator_84984c1fa2897a7a40e728e4e87e1da9__19_20__attempt_2/db/91e56e4d-31ae-4ee4-ab17-704a00a45627
>  [5343318 bytes], localPath='002991.sst'}], 
> metaStateHandle=ByteStreamStateHandle{handleName='s3a://conversation-customer-thread-events-state-production/checkpoints/68e582d9490de1ef330c9d03ff6559d9/chk-12/da15b537-64d4-4651-800d-2ce8d8d4f5c9',
>  dataBytes=1310}, stateHandleId=49cb0a39-a8a1-49fd-9e77-dd35dd3a3713}]).
> [2025-11-04  09:44:21]    Exception in thread 
> "processor-thread-conversation-id (19/20)#860" java.io.FileNotFoundException: 
> No such file or directory: 
> s3a://conversation-customer-thread-events-state-production/checkpoints/68e582d9490de1ef330c9d03ff6559d9/shared/op_AsyncKeyedProcessOperator_08ba072f7d4350d148d1fdf71e53ea7c__19_20__attempt_2/db/3acf0699-40c1-48ab-a42a-689829409fbf
> [2025-11-04  09:44:21]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3866)
> [2025-11-04  09:44:21]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)
> [2025-11-04  09:44:21]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getFileStatus$24(S3AFileSystem.java:3556)
> [2025-11-04  09:44:21]        at 
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
> [2025-11-04  09:44:21]        at 
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
> [2025-11-04  09:44:21]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
> [2025-11-04  09:44:21]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
> [2025-11-04  09:44:21]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:3554)
> [2025-11-04  09:44:21]        at 
> org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:88)
> [2025-11-04  09:44:21]        at 
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.getFileStatus(PluginFileSystemFactory.java:106)
> [2025-11-04  09:44:21]        at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.getFileStatus(SafetyNetWrapperFileSystem.java:78)
> [2025-11-04  09:44:21]        at 
> org.apache.flink.state.forst.fs.ForStFlinkFileSystem.getFileStatus(ForStFlinkFileSystem.java:332)
> [2025-11-04  09:44:21]        at 
> org.apache.flink.state.forst.fs.ForStFlinkFileSystem.listStatus(ForStFlinkFileSystem.java:368)
> [2025-11-04  09:44:21]        at 
> org.apache.flink.state.forst.fs.StringifiedForStFileSystem.listStatus(StringifiedForStFileSystem.java:52)
> [2025-11-04  09:44:21]        at org.forstdb.RocksDB.open(Native Method)
> [2025-11-04  09:44:21]        at org.forstdb.RocksDB.open(RocksDB.java:318)
> [2025-11-04  09:44:21]        at 
> org.apache.flink.state.forst.ForStOperationUtils.openDB(ForStOperationUtils.java:85)
> [2025-11-04  09:44:21]        at 
> org.apache.flink.state.forst.restore.ForStHandle.loadDb(ForStHandle.java:140)
> [2025-11-04  09:44:21]        at 
> org.apache.flink.state.forst.restore.ForStHandle.openDB(ForStHandle.java:128)
> [2025-11-04  09:44:21]        at 
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.restoreBaseDBFromMainHandle(ForStIncrementalRestoreOperation.java:379)
> [2025-11-04  09:44:21]        at 
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.initBaseDBFromSingleStateHandle(ForStIncrementalRestoreOperation.java:301)
> [2025-11-04  09:44:21]        at 
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.innerRestore(ForStIncrementalRestoreOperation.java:278)
> [2025-11-04  09:44:21]        at 
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.lambda$restore$1(ForStIncrementalRestoreOperation.java:237)
> [2025-11-04  09:44:21]        at 
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.runAndReportDuration(ForStIncrementalRestoreOperation.java:438)
> [2025-11-04  09:44:21]        at 
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.restore(ForStIncrementalRestoreOperation.java:237)
> [2025-11-04  09:44:21]        at 
> org.apache.flink.state.forst.ForStKeyedStateBackendBuilder.build(ForStKeyedStateBackendBuilder.java:261)
> [2025-11-04  09:44:21]        at 
> org.apache.flink.state.forst.ForStStateBackend.createAsyncKeyedStateBackend(ForStStateBackend.java:474)
> [2025-11-04  09:44:21]        at 
> org.apache.flink.state.forst.ForStStateBackend.createAsyncKeyedStateBackend(ForStStateBackend.java:98)
> [2025-11-04  09:44:21]        at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3(StreamTaskStateInitializerImpl.java:475)
> [2025-11-04  09:44:21]        at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:173)
> [2025-11-04  09:44:21]        at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
> [2025-11-04  09:44:21]        at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:486)
> [2025-11-04  09:44:21]        at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
> [2025-11-04  09:44:21]        at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:280)
> [2025-11-04  09:44:21]        at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
> [2025-11-04  09:44:21]        at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
> [2025-11-04  09:44:21]        at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
> [2025-11-04  09:44:21]        at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> [2025-11-04  09:44:21]        at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
> [2025-11-04  09:44:21]        at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
> [2025-11-04  09:44:21]        at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:963)
> [2025-11-04  09:44:21]        at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
> [2025-11-04  09:44:21]        at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756)
> [2025-11-04  09:44:21]        at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
> [2025-11-04  09:44:21]        at java.base/java.lang.Thread.run(Unknown 
> Source)
> [2025-11-04  09:44:21]    RocksDBExceptionJni::ThrowNew/StatusJni - Error: 
> unexpected exception!
> [2025-11-04  09:44:22]    2025-11-04 08:44:22,017 INFO  
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer [] - Legacy 
> kryo serializer scala extensions are not available.
> [2025-11-04  09:44:22]    2025-11-04 08:44:22,017 INFO  
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer [] - Kryo 
> serializer scala extensions are not available.
> [2025-11-04  09:44:22]    Exception in thread 
> "processor-thread-conversation-customer -> 
> kafka-sink-thread-conversation-customer: Writer -> 
> kafka-sink-thread-conversation-customer: Committer (19/20)#860" 
> java.io.FileNotFoundException: No such file or directory: 
> s3a://conversation-customer-thread-events-state-production/checkpoints/68e582d9490de1ef330c9d03ff6559d9/shared/op_AsyncKeyedCoProcessOperator_84984c1fa2897a7a40e728e4e87e1da9__19_20__attempt_2/db/ad742554-d9d8-4a0f-830d-61da66510143
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3866)
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getFileStatus$24(S3AFileSystem.java:3556)
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:3554)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:88)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.getFileStatus(PluginFileSystemFactory.java:106)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.getFileStatus(SafetyNetWrapperFileSystem.java:78)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.fs.ForStFlinkFileSystem.getFileStatus(ForStFlinkFileSystem.java:332)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.fs.ForStFlinkFileSystem.listStatus(ForStFlinkFileSystem.java:368)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.fs.StringifiedForStFileSystem.listStatus(StringifiedForStFileSystem.java:52)
> [2025-11-04  09:44:22]        at org.forstdb.RocksDB.open(Native Method)
> [2025-11-04  09:44:22]        at org.forstdb.RocksDB.open(RocksDB.java:318)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.ForStOperationUtils.openDB(ForStOperationUtils.java:85)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.restore.ForStHandle.loadDb(ForStHandle.java:140)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.restore.ForStHandle.openDB(ForStHandle.java:128)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.restoreBaseDBFromMainHandle(ForStIncrementalRestoreOperation.java:379)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.initBaseDBFromSingleStateHandle(ForStIncrementalRestoreOperation.java:301)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.innerRestore(ForStIncrementalRestoreOperation.java:278)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.lambda$restore$1(ForStIncrementalRestoreOperation.java:237)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.runAndReportDuration(ForStIncrementalRestoreOperation.java:438)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.restore(ForStIncrementalRestoreOperation.java:237)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.ForStKeyedStateBackendBuilder.build(ForStKeyedStateBackendBuilder.java:261)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.ForStStateBackend.createAsyncKeyedStateBackend(ForStStateBackend.java:474)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.ForStStateBackend.createAsyncKeyedStateBackend(ForStStateBackend.java:98)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3(StreamTaskStateInitializerImpl.java:475)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:173)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:486)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:280)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:963)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
> [2025-11-04  09:44:22]        at java.base/java.lang.Thread.run(Unknown 
> Source)
> [2025-11-04  09:44:22]    Exception in thread 
> "processor-thread-conversation-customer -> 
> kafka-sink-thread-conversation-customer: Writer -> 
> kafka-sink-thread-conversation-customer: Committer (19/20)#860" 
> java.io.FileNotFoundException: No such file or directory: 
> s3a://conversation-customer-thread-events-state-production/checkpoints/68e582d9490de1ef330c9d03ff6559d9/shared/op_AsyncKeyedCoProcessOperator_84984c1fa2897a7a40e728e4e87e1da9__19_20__attempt_2/db/ad742554-d9d8-4a0f-830d-61da66510143
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3866)
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getFileStatus$24(S3AFileSystem.java:3556)
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:3554)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:88)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.getFileStatus(PluginFileSystemFactory.java:106)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.getFileStatus(SafetyNetWrapperFileSystem.java:78)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.fs.ForStFlinkFileSystem.getFileStatus(ForStFlinkFileSystem.java:332)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.fs.ForStFlinkFileSystem.listStatus(ForStFlinkFileSystem.java:368)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.fs.StringifiedForStFileSystem.listStatus(StringifiedForStFileSystem.java:52)
> [2025-11-04  09:44:22]        at org.forstdb.RocksDB.open(Native Method)
> [2025-11-04  09:44:22]        at org.forstdb.RocksDB.open(RocksDB.java:318)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.ForStOperationUtils.openDB(ForStOperationUtils.java:85)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.restore.ForStHandle.loadDb(ForStHandle.java:140)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.restore.ForStHandle.openDB(ForStHandle.java:128)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.restoreBaseDBFromMainHandle(ForStIncrementalRestoreOperation.java:379)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.initBaseDBFromSingleStateHandle(ForStIncrementalRestoreOperation.java:301)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.innerRestore(ForStIncrementalRestoreOperation.java:278)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.lambda$restore$1(ForStIncrementalRestoreOperation.java:237)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.runAndReportDuration(ForStIncrementalRestoreOperation.java:438)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.restore(ForStIncrementalRestoreOperation.java:237)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.ForStKeyedStateBackendBuilder.build(ForStKeyedStateBackendBuilder.java:261)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.ForStStateBackend.createAsyncKeyedStateBackend(ForStStateBackend.java:474)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.ForStStateBackend.createAsyncKeyedStateBackend(ForStStateBackend.java:98)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3(StreamTaskStateInitializerImpl.java:475)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:173)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:486)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:280)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:963)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
> [2025-11-04  09:44:22]        at java.base/java.lang.Thread.run(Unknown 
> Source)
> [2025-11-04  09:44:22]    2025-11-04 08:44:22,059 INFO  
> org.apache.flink.runtime.taskmanager.Task                    [] - Attempting 
> to cancel task Source: kafka-source-thread_events_v1 (19/20)#860 
> (56f77e2196fe7f3ac0662857735727d9_bd9e548cb0c865193602f71910cbd8d6_18_860).
> [2025-11-04  09:44:22]    2025-11-04 08:44:22,059 INFO  
> org.apache.flink.runtime.taskmanager.Task                    [] - Source: 
> kafka-source-thread_events_v1 (19/20)#860 
> (56f77e2196fe7f3ac0662857735727d9_bd9e548cb0c865193602f71910cbd8d6_18_860) 
> switched from RUNNING to CANCELING.
> [2025-11-04  09:44:22]    2025-11-04 08:44:22,059 INFO  
> org.apache.flink.runtime.taskmanager.Task                    [] - Triggering 
> cancellation of task code Source: kafka-source-thread_events_v1 (19/20)#860 
> (56f77e2196fe7f3ac0662857735727d9_bd9e548cb0c865193602f71910cbd8d6_18_860).
> [2025-11-04  09:44:22]    2025-11-04 08:44:22,060 INFO  
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing 
> Source Reader.
> [2025-11-04  09:44:22]    2025-11-04 08:44:22,060 INFO  
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Shutting down split fetcher 0
> [2025-11-04  09:44:22]    2025-11-04 08:44:22,060 INFO  
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [] - 
> [Consumer clientId=conversation-customer-thread-events-v1-consumer-18, 
> groupId=conversation-customer-thread-events-v1-consumer] Resetting generation 
> and member id due to: consumer pro-actively leaving the group
> [2025-11-04  09:44:22]    2025-11-04 08:44:22,060 INFO  
> org.apache.flink.runtime.taskmanager.Task                    [] - Attempting 
> to cancel task Source: kafka-source-conversation_customer_v1 (19/20)#860 
> (56f77e2196fe7f3ac0662857735727d9_01393662c0a19669846621424653c56f_18_860).
> [2025-11-04  09:44:22]    2025-11-04 08:44:22,061 INFO  
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [] - 
> [Consumer clientId=conversation-customer-thread-events-v1-consumer-18, 
> groupId=conversation-customer-thread-events-v1-consumer] Request joining 
> group due to: consumer pro-actively leaving the group
> [2025-11-04  09:44:22]    2025-11-04 08:44:22,061 INFO  
> org.apache.flink.runtime.taskmanager.Task                    [] - Source: 
> kafka-source-conversation_customer_v1 (19/20)#860 
> (56f77e2196fe7f3ac0662857735727d9_01393662c0a19669846621424653c56f_18_860) 
> switched from RUNNING to CANCELING.
> [2025-11-04  09:44:22]    2025-11-04 08:44:22,061 INFO  
> org.apache.flink.runtime.taskmanager.Task                    [] - Triggering 
> cancellation of task code Source: kafka-source-conversation_customer_v1 
> (19/20)#860 
> (56f77e2196fe7f3ac0662857735727d9_01393662c0a19669846621424653c56f_18_860).
> [2025-11-04  09:44:22]    2025-11-04 08:44:22,062 INFO  
> org.apache.flink.runtime.taskmanager.Task                    [] - Attempting 
> to cancel task processor-thread-conversation-id (19/20)#860 
> (56f77e2196fe7f3ac0662857735727d9_08ba072f7d4350d148d1fdf71e53ea7c_18_860).
> [2025-11-04  09:44:22]    2025-11-04 08:44:22,062 INFO  
> org.apache.flink.runtime.taskmanager.Task                    [] - 
> processor-thread-conversation-id (19/20)#860 
> (56f77e2196fe7f3ac0662857735727d9_08ba072f7d4350d148d1fdf71e53ea7c_18_860) 
> switched from INITIALIZING to CANCELING.
> [2025-11-04  09:44:22]    2025-11-04 08:44:22,062 INFO  
> org.apache.flink.runtime.taskmanager.Task                    [] - Triggering 
> cancellation of task code processor-thread-conversation-id (19/20)#860 
> (56f77e2196fe7f3ac0662857735727d9_08ba072f7d4350d148d1fdf71e53ea7c_18_860).
> [2025-11-04  09:44:22]    2025-11-04 08:44:22,063 INFO  
> org.apache.kafka.common.metrics.Metrics                      [] - Metrics 
> scheduler closed
> [2025-11-04  09:44:22]    2025-11-04 08:44:22,063 INFO  
> org.apache.kafka.common.metrics.Metrics                      [] - Closing 
> reporter org.apache.kafka.common.metrics.JmxReporter
> [2025-11-04  09:44:22]    2025-11-04 08:44:22,063 INFO  
> org.apache.kafka.common.metrics.Metrics                      [] - Closing 
> reporter org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter
> [2025-11-04  09:44:22]    2025-11-04 08:44:22,063 INFO  
> org.apache.kafka.common.metrics.Metrics                      [] - Metrics 
> reporters closed
> [2025-11-04  09:44:22]    2025-11-04 08:44:22,064 INFO  
> org.apache.flink.runtime.taskmanager.Task                    [] - Attempting 
> to cancel task processor-thread-conversation-customer -> 
> kafka-sink-thread-conversation-customer: Writer -> 
> kafka-sink-thread-conversation-customer: Committer (19/20)#860 
> (56f77e2196fe7f3ac0662857735727d9_84984c1fa2897a7a40e728e4e87e1da9_18_860).
> [2025-11-04  09:44:22]    2025-11-04 08:44:22,064 INFO  
> org.apache.flink.runtime.taskmanager.Task                    [] - 
> processor-thread-conversation-customer -> 
> kafka-sink-thread-conversation-customer: Writer -> 
> kafka-sink-thread-conversation-customer: Committer (19/20)#860 
> (56f77e2196fe7f3ac0662857735727d9_84984c1fa2897a7a40e728e4e87e1da9_18_860) 
> switched from INITIALIZING to CANCELING.
> [2025-11-04  09:44:22]    2025-11-04 08:44:22,064 INFO  
> org.apache.flink.runtime.taskmanager.Task                    [] - Triggering 
> cancellation of task code processor-thread-conversation-customer -> 
> kafka-sink-thread-conversation-customer: Writer -> 
> kafka-sink-thread-conversation-customer: Committer (19/20)#860 
> (56f77e2196fe7f3ac0662857735727d9_84984c1fa2897a7a40e728e4e87e1da9_18_860).
> [2025-11-04  09:44:22]    2025-11-04 08:44:22,065 INFO  
> org.apache.kafka.common.utils.AppInfoParser                  [] - App info 
> kafka.consumer for conversation-customer-thread-events-v1-consumer-18 
> unregistered
> [2025-11-04  09:44:22]    2025-11-04 08:44:22,065 INFO  
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split 
> fetcher 0 exited.
> [2025-11-04  09:44:22]    2025-11-04 08:44:22,065 INFO  
> org.apache.flink.runtime.taskmanager.Task                    [] - Source: 
> kafka-source-thread_events_v1 (19/20)#860 
> (56f77e2196fe7f3ac0662857735727d9_bd9e548cb0c865193602f71910cbd8d6_18_860) 
> switched from CANCELING to CANCELED.
> [2025-11-04  09:44:22]    2025-11-04 08:44:22,065 INFO  
> org.apache.flink.runtime.taskmanager.Task                    [] - Freeing 
> task resources for Source: kafka-source-thread_events_v1 (19/20)#860 
> (56f77e2196fe7f3ac0662857735727d9_bd9e548cb0c865193602f71910cbd8d6_18_860).
> [2025-11-04  09:44:22]    2025-11-04 08:44:22,065 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - 
> Un-registering task and sending final execution state CANCELED to JobManager 
> for task Source: kafka-source-thread_events_v1 (19/20)#860 
> 56f77e2196fe7f3ac0662857735727d9_bd9e548cb0c865193602f71910cbd8d6_18_860.
> [2025-11-04  09:44:22]    Exception in thread 
> "processor-thread-conversation-customer -> 
> kafka-sink-thread-conversation-customer: Writer -> 
> kafka-sink-thread-conversation-customer: Committer (19/20)#860" 
> java.io.InterruptedIOException: getFileStatus on 
> s3a://conversation-customer-thread-events-state-production/checkpoints/68e582d9490de1ef330c9d03ff6559d9/shared/op_AsyncKeyedCoProcessOperator_84984c1fa2897a7a40e728e4e87e1da9__19_20__attempt_860/db:
>  com.amazonaws.AbortedException: 
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AUtils.translateInterruptedException(S3AUtils.java:395)
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:201)
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:175)
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3861)
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$exists$34(S3AFileSystem.java:4703)
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.exists(S3AFileSystem.java:4701)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.exists(HadoopFileSystem.java:168)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.exists(PluginFileSystemFactory.java:149)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.exists(SafetyNetWrapperFileSystem.java:117)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.fs.ForStFlinkFileSystem.exists(ForStFlinkFileSystem.java:298)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.fs.StringifiedForStFileSystem.exists(StringifiedForStFileSystem.java:44)
> [2025-11-04  09:44:22]        at org.forstdb.RocksDB.open(Native Method)
> [2025-11-04  09:44:22]        at org.forstdb.RocksDB.open(RocksDB.java:318)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.ForStOperationUtils.openDB(ForStOperationUtils.java:85)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.restore.ForStHandle.loadDb(ForStHandle.java:140)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.restore.ForStHandle.openDB(ForStHandle.java:128)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.restoreBaseDBFromMainHandle(ForStIncrementalRestoreOperation.java:379)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.initBaseDBFromSingleStateHandle(ForStIncrementalRestoreOperation.java:301)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.innerRestore(ForStIncrementalRestoreOperation.java:278)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.lambda$restore$1(ForStIncrementalRestoreOperation.java:237)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.runAndReportDuration(ForStIncrementalRestoreOperation.java:438)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.restore(ForStIncrementalRestoreOperation.java:237)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.ForStKeyedStateBackendBuilder.build(ForStKeyedStateBackendBuilder.java:261)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.ForStStateBackend.createAsyncKeyedStateBackend(ForStStateBackend.java:474)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.ForStStateBackend.createAsyncKeyedStateBackend(ForStStateBackend.java:98)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3(StreamTaskStateInitializerImpl.java:475)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:173)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:486)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:280)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:963)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756)
> [2025-11-04  09:44:22]        at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
> [2025-11-04  09:44:22]        at java.base/java.lang.Thread.run(Unknown 
> Source)
> [2025-11-04  09:44:22]    Caused by: com.amazonaws.AbortedException: 
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleInterruptedException(AmazonHttpClient.java:906)
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:781)
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:735)
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:717)
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:581)
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:559)
> [2025-11-04  09:44:22]        at 
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5593)
> [2025-11-04  09:44:22]        at 
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5540)
> [2025-11-04  09:44:22]        at 
> com.amazonaws.services.s3.AmazonS3Client.listObjectsV2(AmazonS3Client.java:1007)
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$listObjects$11(S3AFileSystem.java:2595)
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:414)
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:377)
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.listObjects(S3AFileSystem.java:2586)
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3832)
> [2025-11-04  09:44:22]        ... 43 more
> [2025-11-04  09:44:22]    Caused by: 
> com.amazonaws.http.timers.client.SdkInterruptedException
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.checkInterrupted(AmazonHttpClient.java:961)
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.checkInterrupted(AmazonHttpClient.java:947)
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1141)
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:838)
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:805)
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:779)
> [2025-11-04  09:44:22]        ... 56 more
> [2025-11-04  09:44:22]    RocksDBExceptionJni::ThrowNew/StatusJni - Error: 
> unexpected exception!
> [2025-11-04  09:44:22]    2025-11-04 08:44:22,069 WARN  
> org.apache.flink.state.forst.ForStKeyedStateBackendBuilder   [] - Failed to 
> delete ForSt local base path 
> /flink-data/forst-local/68e582d9490de1ef330c9d03ff6559d9/op_AsyncKeyedCoProcessOperator_84984c1fa2897a7a40e728e4e87e1da9__19_20__attempt_860,
>  remote base path 
> s3a://conversation-customer-thread-events-state-production/checkpoints/68e582d9490de1ef330c9d03ff6559d9/shared/op_AsyncKeyedCoProcessOperator_84984c1fa2897a7a40e728e4e87e1da9__19_20__attempt_860.
> [2025-11-04  09:44:22]    java.io.InterruptedIOException: getFileStatus on 
> s3a://conversation-customer-thread-events-state-production/checkpoints/68e582d9490de1ef330c9d03ff6559d9/shared/op_AsyncKeyedCoProcessOperator_84984c1fa2897a7a40e728e4e87e1da9__19_20__attempt_860:
>  com.amazonaws.AbortedException: 
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AUtils.translateInterruptedException(S3AUtils.java:395)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:201) ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:175) ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3799)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$exists$34(S3AFileSystem.java:4703)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.exists(S3AFileSystem.java:4701) ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.exists(HadoopFileSystem.java:168)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.exists(PluginFileSystemFactory.java:149)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.exists(SafetyNetWrapperFileSystem.java:117)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.ForStResourceContainer.clearDirectories(ForStResourceContainer.java:446)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.ForStResourceContainer.forceClearRemoteDirectories(ForStResourceContainer.java:440)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.ForStKeyedStateBackendBuilder.build(ForStKeyedStateBackendBuilder.java:303)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.ForStStateBackend.createAsyncKeyedStateBackend(ForStStateBackend.java:474)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.ForStStateBackend.createAsyncKeyedStateBackend(ForStStateBackend.java:98)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3(StreamTaskStateInitializerImpl.java:475)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:173)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:486)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:280)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:963)
>  [flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) 
> [flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756) 
> [flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:568) 
> [flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at java.base/java.lang.Thread.run(Unknown 
> Source) [?:?]
> [2025-11-04  09:44:22]    Caused by: com.amazonaws.AbortedException: 
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleInterruptedException(AmazonHttpClient.java:906)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:781)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:735)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:717)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:581) ~[?:?]
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:559) ~[?:?]
> [2025-11-04  09:44:22]        at 
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5593) 
> ~[?:?]
> [2025-11-04  09:44:22]        at 
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5540) 
> ~[?:?]
> [2025-11-04  09:44:22]        at 
> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1422)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getObjectMetadata$10(S3AFileSystem.java:2545)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:414) ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:377) ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2533)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2513)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3776)
>  ~[?:?]
> [2025-11-04  09:44:22]        ... 32 more
> [2025-11-04  09:44:22]    Caused by: 
> com.amazonaws.http.timers.client.SdkInterruptedException
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.checkInterrupted(AmazonHttpClient.java:961)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.checkInterrupted(AmazonHttpClient.java:947)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1141)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:838)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:805)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:779)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:735)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:717)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:581) ~[?:?]
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:559) ~[?:?]
> [2025-11-04  09:44:22]        at 
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5593) 
> ~[?:?]
> [2025-11-04  09:44:22]        at 
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5540) 
> ~[?:?]
> [2025-11-04  09:44:22]        at 
> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1422)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getObjectMetadata$10(S3AFileSystem.java:2545)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:414) ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:377) ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2533)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2513)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3776)
>  ~[?:?]
> [2025-11-04  09:44:22]        ... 32 more
> [2025-11-04  09:44:22]    2025-11-04 08:44:22,070 ERROR 
> org.apache.flink.state.forst.ForStKeyedStateBackendBuilder   [] - Caught 
> unexpected exception.
> [2025-11-04  09:44:22]    java.io.InterruptedIOException: getFileStatus on 
> s3a://conversation-customer-thread-events-state-production/checkpoints/68e582d9490de1ef330c9d03ff6559d9/shared/op_AsyncKeyedCoProcessOperator_84984c1fa2897a7a40e728e4e87e1da9__19_20__attempt_860/db:
>  com.amazonaws.AbortedException: 
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AUtils.translateInterruptedException(S3AUtils.java:395)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:201) ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:175) ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3861)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$exists$34(S3AFileSystem.java:4703)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.exists(S3AFileSystem.java:4701) ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.exists(HadoopFileSystem.java:168)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.exists(PluginFileSystemFactory.java:149)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.exists(SafetyNetWrapperFileSystem.java:117)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.fs.ForStFlinkFileSystem.exists(ForStFlinkFileSystem.java:298)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.fs.StringifiedForStFileSystem.exists(StringifiedForStFileSystem.java:44)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at org.forstdb.RocksDB.open(Native Method) 
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at org.forstdb.RocksDB.open(RocksDB.java:318) 
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.ForStOperationUtils.openDB(ForStOperationUtils.java:85)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.restore.ForStHandle.loadDb(ForStHandle.java:140) 
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.restore.ForStHandle.openDB(ForStHandle.java:128) 
> ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.restoreBaseDBFromMainHandle(ForStIncrementalRestoreOperation.java:379)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.initBaseDBFromSingleStateHandle(ForStIncrementalRestoreOperation.java:301)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.innerRestore(ForStIncrementalRestoreOperation.java:278)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.lambda$restore$1(ForStIncrementalRestoreOperation.java:237)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.runAndReportDuration(ForStIncrementalRestoreOperation.java:438)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.restore(ForStIncrementalRestoreOperation.java:237)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.ForStKeyedStateBackendBuilder.build(ForStKeyedStateBackendBuilder.java:261)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.ForStStateBackend.createAsyncKeyedStateBackend(ForStStateBackend.java:474)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.state.forst.ForStStateBackend.createAsyncKeyedStateBackend(ForStStateBackend.java:98)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3(StreamTaskStateInitializerImpl.java:475)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:173)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:486)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:280)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
>  ~[flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:963)
>  [flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) 
> [flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756) 
> [flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:568) 
> [flink-dist-2.1.0.jar:2.1.0]
> [2025-11-04  09:44:22]        at java.base/java.lang.Thread.run(Unknown 
> Source) [?:?]
> [2025-11-04  09:44:22]    Caused by: com.amazonaws.AbortedException: 
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleInterruptedException(AmazonHttpClient.java:906)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:781)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:735)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:717)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:581) ~[?:?]
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:559) ~[?:?]
> [2025-11-04  09:44:22]        at 
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5593) 
> ~[?:?]
> [2025-11-04  09:44:22]        at 
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5540) 
> ~[?:?]
> [2025-11-04  09:44:22]        at 
> com.amazonaws.services.s3.AmazonS3Client.listObjectsV2(AmazonS3Client.java:1007)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$listObjects$11(S3AFileSystem.java:2595)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:414) ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:377) ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.listObjects(S3AFileSystem.java:2586) 
> ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3832)
>  ~[?:?]
> [2025-11-04  09:44:22]        ... 43 more
> [2025-11-04  09:44:22]    Caused by: 
> com.amazonaws.http.timers.client.SdkInterruptedException
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.checkInterrupted(AmazonHttpClient.java:961)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.checkInterrupted(AmazonHttpClient.java:947)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1141)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:838)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:805)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:779)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:735)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:717)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:581) ~[?:?]
> [2025-11-04  09:44:22]        at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:559) ~[?:?]
> [2025-11-04  09:44:22]        at 
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5593) 
> ~[?:?]
> [2025-11-04  09:44:22]        at 
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5540) 
> ~[?:?]
> [2025-11-04  09:44:22]        at 
> com.amazonaws.services.s3.AmazonS3Client.listObjectsV2(AmazonS3Client.java:1007)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$listObjects$11(S3AFileSystem.java:2595)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
>  ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:414) ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:377) ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.listObjects(S3AFileSystem.java:2586) 
> ~[?:?]
> [2025-11-04  09:44:22]        at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3832)
>  ~[?:?]
> [2025-11-04  09:44:22]        ... 43 more
> [2025-11-04  09:44:22]    2025-11-04 08:44:22,071 INFO  
> org.apache.kafka.clients.producer.KafkaProducer              [] - [Producer 
> clientId=conversation-customer-thread-events-v1-producer] Closing the Kafka 
> producer with timeoutMillis = 0 ms.
> [2025-11-04  09:44:22]    2025-11-04 08:44:22,071 INFO  
> org.apache.kafka.clients.producer.KafkaProducer              [] - [Producer 
> clientId=conversation-customer-thread-events-v1-producer] Proceeding to force 
> close the producer since pending requests could not be completed within 
> timeout 0 ms.
> [2025-11-04  09:44:22]    2025-11-04 08:44:22,072 INFO  
> org.apache.kafka.common.metrics.Metrics                      [] - Metrics 
> scheduler closed
> [2025-11-04  09:44:22]    2025-11-04 08:44:22,072 INFO  
> org.apache.kafka.common.metrics.Metrics                      [] - Closing 
> reporter org.apache.kafka.common.metrics.JmxReporter
> [2025-11-04  09:44:22]    2025-11-04 08:44:22,072 INFO  
> org.apache.kafka.common.metrics.Metrics                      [] - Closing 
> reporter org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter
> [2025-11-04  09:44:22]    2025-11-04 08:44:22,072 INFO  
> org.apache.kafka.common.metrics.Metrics                      [] - Metrics 
> reporters closed
> [2025-11-04  09:44:22]    2025-11-04 08:44:22,073 INFO  
> org.apache.kafka.common.utils.AppInfoParser                  [] - App info 
> kafka.producer for conversation-customer-thread-events-v1-producer 
> unregistered
> [2025-11-04  09:44:22]    2025-11-04 08:44:22,073 INFO  
> org.apache.flink.runtime.taskmanager.Task                    [] - 
> processor-thread-conversation-customer -> 
> kafka-sink-thread-conversation-customer: Writer -> 
> kafka-sink-thread-conversation-customer: Committer (19/20)#860 
> (56f77e2196fe7f3ac0662857735727d9_84984c1fa2897a7a40e728e4e87e1da9_18_860) 
> switched from CANCELING to CANCELED.
> [2025-11-04  09:44:22]    2025-11-04 08:44:22,073 INFO  
> org.apache.flink.runtime.taskmanager.Task                    [] - Freeing 
> task resources for processor-thread-conversation-customer -> 
> kafka-sink-thread-conversation-customer: Writer -> 
> kafka-sink-thread-conversation-customer: Committer (19/20)#860 
> (56f77e2196fe7f3ac0662857735727d9_84984c1fa2897a7a40e728e4e87e1da9_18_860).
> [2025-11-04  09:44:22]    2025-11-04 08:44:22,073 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - 
> Un-registering task and sending final execution state CANCELED to JobManager 
> for task processor-thread-conversation-customer -> 
> kafka-sink-thread-conversation-customer: Writer -> 
> kafka-sink-thread-conversation-customer: Committer (19/20)#860  {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to