[ 
https://issues.apache.org/jira/browse/FLINK-25846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17497646#comment-17497646
 ] 

Danny Cranmer commented on FLINK-25846:
---------------------------------------

Merged to master 
https://github.com/apache/flink/commit/ec83c087a53cf081cb8d0a01c7d6c310ce755c2e

> [FLIP-171] Async Sink does not gracefully shutdown on Cancel
> ------------------------------------------------------------
>
>                 Key: FLINK-25846
>                 URL: https://issues.apache.org/jira/browse/FLINK-25846
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Common
>            Reporter: Danny Cranmer
>            Assignee: Zichen Liu
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.15.0
>
>
> h4. Description
> Async Sink does not react gracefully to cancellation signal
> h4. Reproduction Steps
> - Start a job using an Async Sink implementation, for example KDS
> - Navigate to Flink Dashboard 
> - Click Job > Cancel
> h4. Actual Results
> - Sink operator stuck in Cancelling, retrying 
> {code}
> 2022-01-27 08:33:40,301 WARN  
> org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSinkWriter [] - KDS 
> Sink failed to persist 5 entries to KDS
> java.util.concurrent.CompletionException: java.lang.IllegalStateException: 
> Interrupted waiting to refresh the value.
>       at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
>  ~[?:?]
>       at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
>  ~[?:?]
>       at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:870)
>  ~[?:?]
>       at 
> java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:883)
>  ~[?:?]
>       at 
> java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2251)
>  ~[?:?]
>       at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.DefaultKinesisAsyncClient.putRecords(DefaultKinesisAsyncClient.java:2112)
>  
> ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
>       at 
> org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSinkWriter.submitRequestEntries(KinesisDataStreamsSinkWriter.java:122)
>  
> ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
>       at 
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.flush(AsyncSinkWriter.java:311)
>  ~[flink-connector-files-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
>       at 
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.prepareCommit(AsyncSinkWriter.java:391)
>  ~[flink-connector-files-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
>       at 
> org.apache.flink.streaming.runtime.operators.sink.SinkOperator.endInput(SinkOperator.java:192)
>  ~[flink-dist-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:96)
>  ~[flink-dist-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
>       at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.endInput(RegularOperatorChain.java:97)
>  ~[flink-dist-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
>       at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:68)
>  ~[flink-dist-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:517)
>  ~[flink-dist-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>  ~[flink-dist-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:802)
>  ~[flink-dist-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:751)
>  ~[flink-dist-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
>       at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>  [flink-dist-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
>       at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
> [flink-dist-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) 
> [flink-dist-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) 
> [flink-dist-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
>       at java.lang.Thread.run(Thread.java:829) [?:?]
> Caused by: java.lang.IllegalStateException: Interrupted waiting to refresh 
> the value.
>       at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.cache.CachedSupplier.handleInterruptedException(CachedSupplier.java:146)
>  
> ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
>       at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.cache.CachedSupplier.refreshCache(CachedSupplier.java:140)
>  
> ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
>       at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.cache.CachedSupplier.get(CachedSupplier.java:89)
>  
> ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
>       at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.auth.StsCredentialsProvider.resolveCredentials(StsCredentialsProvider.java:91)
>  
> ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
>       at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider.resolveCredentials(StsAssumeRoleCredentialsProvider.java:41)
>  
> ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
>       at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.internal.AwsExecutionContextBuilder.resolveCredentials(AwsExecutionContextBuilder.java:165)
>  
> ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
>       at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.internal.AwsExecutionContextBuilder.invokeInterceptorsAndCreateExecutionContext(AwsExecutionContextBuilder.java:102)
>  
> ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
>       at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.client.handler.AwsAsyncClientHandler.invokeInterceptorsAndCreateExecutionContext(AwsAsyncClientHandler.java:65)
>  
> ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
>       at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseAsyncClientHandler.lambda$execute$1(BaseAsyncClientHandler.java:77)
>  
> ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
>       at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseAsyncClientHandler.measureApiCallSuccess(BaseAsyncClientHandler.java:282)
>  
> ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
>       at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseAsyncClientHandler.execute(BaseAsyncClientHandler.java:75)
>  
> ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
>       at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.client.handler.AwsAsyncClientHandler.execute(AwsAsyncClientHandler.java:52)
>  
> ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
>       at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.DefaultKinesisAsyncClient.putRecords(DefaultKinesisAsyncClient.java:2107)
>  
> ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
>       ... 16 more
> Caused by: java.lang.InterruptedException
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireNanos(AbstractQueuedSynchronizer.java:1286)
>  ~[?:?]
>       at 
> java.util.concurrent.locks.ReentrantLock.tryLock(ReentrantLock.java:424) 
> ~[?:?]
>       at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.cache.CachedSupplier.refreshCache(CachedSupplier.java:126)
>  
> ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
>       at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.cache.CachedSupplier.get(CachedSupplier.java:89)
>  
> ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
>       at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.auth.StsCredentialsProvider.resolveCredentials(StsCredentialsProvider.java:91)
>  
> ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
>       at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider.resolveCredentials(StsAssumeRoleCredentialsProvider.java:41)
>  
> ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
>       at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.internal.AwsExecutionContextBuilder.resolveCredentials(AwsExecutionContextBuilder.java:165)
>  
> ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
>       at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.internal.AwsExecutionContextBuilder.invokeInterceptorsAndCreateExecutionContext(AwsExecutionContextBuilder.java:102)
>  
> ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
>       at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.client.handler.AwsAsyncClientHandler.invokeInterceptorsAndCreateExecutionContext(AwsAsyncClientHandler.java:65)
>  
> ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
>       at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseAsyncClientHandler.lambda$execute$1(BaseAsyncClientHandler.java:77)
>  
> ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
>       at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseAsyncClientHandler.measureApiCallSuccess(BaseAsyncClientHandler.java:282)
>  
> ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
>       at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseAsyncClientHandler.execute(BaseAsyncClientHandler.java:75)
>  
> ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
>       at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.client.handler.AwsAsyncClientHandler.execute(AwsAsyncClientHandler.java:52)
>  
> ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
>       at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.DefaultKinesisAsyncClient.putRecords(DefaultKinesisAsyncClient.java:2107)
>  
> ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
>       ... 16 more
> {code}
> h4. Expected Results
> - Sink operator closes
> h4. Suggested Resolution
> - Async Sink should treat `InterruptedException` as stop signal



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to