Aleksandr Pilipenko created FLINK-34071:
-------------------------------------------

             Summary: Deadlock in AWS Kinesis Data Streams AsyncSink connector
                 Key: FLINK-34071
                 URL: https://issues.apache.org/jira/browse/FLINK-34071
             Project: Flink
          Issue Type: Bug
          Components: Connectors / AWS, Connectors / Kinesis
    Affects Versions: aws-connector-4.2.0, 1.15.4, aws-connector-3.0.0
            Reporter: Aleksandr Pilipenko


Sink operator hangs while flushing records, similarly to FLINK-32230. Error 
observed even when using AWS SDK version that contains fix for async client 
error handling https://github.com/aws/aws-sdk-java-v2/pull/4402

Thread dump of stuck thread:

 
{code:java}
"sdk-async-response-1-6236" Id=11213 RUNNABLE
    at 
app//org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.lambda$flush$5(AsyncSinkWriter.java:385)
    at 
app//org.apache.flink.connector.base.sink.writer.AsyncSinkWriter$$Lambda$1778/0x0000000801141040.accept(Unknown
 Source)
    at 
org.apache.flink.connector.kinesis.sink.KinesisStreamsSinkWriter.handleFullyFailedRequest(KinesisStreamsSinkWriter.java:210)
    at 
org.apache.flink.connector.kinesis.sink.KinesisStreamsSinkWriter.lambda$submitRequestEntries$1(KinesisStreamsSinkWriter.java:184)
    at 
org.apache.flink.connector.kinesis.sink.KinesisStreamsSinkWriter$$Lambda$1965/0x00000008011a0c40.accept(Unknown
 Source)
    at 
java.base@11.0.18/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    at 
java.base@11.0.18/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
    at 
java.base@11.0.18/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at 
java.base@11.0.18/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:79)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.CompletableFutureUtils$$Lambda$1925/0x0000000801181840.accept(Unknown
 Source)
    at 
java.base@11.0.18/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    at 
java.base@11.0.18/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
    at 
java.base@11.0.18/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at 
java.base@11.0.18/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage.lambda$execute$0(AsyncApiCallMetricCollectionStage.java:56)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage$$Lambda$1961/0x0000000801191c40.accept(Unknown
 Source)
    at 
java.base@11.0.18/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    at 
java.base@11.0.18/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
    at 
java.base@11.0.18/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at 
java.base@11.0.18/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallTimeoutTrackingStage.lambda$execute$2(AsyncApiCallTimeoutTrackingStage.java:67)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallTimeoutTrackingStage$$Lambda$1960/0x0000000801191840.accept(Unknown
 Source)
    at 
java.base@11.0.18/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    at 
java.base@11.0.18/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
    at 
java.base@11.0.18/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at 
java.base@11.0.18/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:79)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.CompletableFutureUtils$$Lambda$1925/0x0000000801181840.accept(Unknown
 Source)
    at 
java.base@11.0.18/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    at 
java.base@11.0.18/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
    at 
java.base@11.0.18/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at 
java.base@11.0.18/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeAttemptExecute(AsyncRetryableStage.java:103)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:184)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:170)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor$$Lambda$1956/0x0000000801192840.accept(Unknown
 Source)
    at 
java.base@11.0.18/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    at 
java.base@11.0.18/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
    at 
java.base@11.0.18/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at 
java.base@11.0.18/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$null$0(MakeAsyncHttpRequestStage.java:105)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$$Lambda$1954/0x0000000801193040.accept(Unknown
 Source)
    at 
java.base@11.0.18/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    at 
java.base@11.0.18/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
    at 
java.base@11.0.18/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at 
java.base@11.0.18/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.completeResponseFuture(MakeAsyncHttpRequestStage.java:238)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$executeHttpRequest$3(MakeAsyncHttpRequestStage.java:163)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$$Lambda$1952/0x0000000801193840.apply(Unknown
 Source)
    ... {code}
 

Alongside this issue following exception can be observed
{code:java}
java.io.IOException: An error occurred on the connection: 
java.nio.channels.ClosedChannelException, [channel: 159aa20c]. All streams will 
be closed    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.MultiplexedChannelRecord.decorateConnectionException(MultiplexedChannelRecord.java:213)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.MultiplexedChannelRecord.lambda$closeChildChannels$10(MultiplexedChannelRecord.java:205)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.MultiplexedChannelRecord.lambda$closeAndExecuteOnChildChannels$11(MultiplexedChannelRecord.java:229)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.doInEventLoop(NettyUtils.java:248)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.MultiplexedChannelRecord.closeAndExecuteOnChildChannels(MultiplexedChannelRecord.java:220)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.MultiplexedChannelRecord.closeChildChannels(MultiplexedChannelRecord.java:205)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.Http2MultiplexedChannelPool.closeAndReleaseParent(Http2MultiplexedChannelPool.java:353)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.Http2MultiplexedChannelPool.closeAndReleaseParent(Http2MultiplexedChannelPool.java:333)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.Http2MultiplexedChannelPool.access$200(Http2MultiplexedChannelPool.java:76)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.Http2MultiplexedChannelPool$ReleaseOnExceptionHandler.closeAndReleaseParent(Http2MultiplexedChannelPool.java:509)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.Http2MultiplexedChannelPool$ReleaseOnExceptionHandler.channelInactive(Http2MultiplexedChannelPool.java:486)
    at 
org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
    at 
org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
    at 
org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
    at 
org.apache.flink.kinesis.shaded.io.netty.handler.logging.LoggingHandler.channelInactive(LoggingHandler.java:206)
    at 
org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
    at 
org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
    at 
org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
    at 
org.apache.flink.kinesis.shaded.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
    at 
org.apache.flink.kinesis.shaded.io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277)
    at 
org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
    at 
org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
    at 
org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.Http2PingHandler.channelInactive(Http2PingHandler.java:77)
    at 
org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305)
    at 
org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
    at 
org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
    at 
org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:411)
    at 
org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:376)
    at 
org.apache.flink.kinesis.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.channelInactive(Http2ConnectionHandler.java:430)
    at 
org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305)
    at 
org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
    at 
org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
    at 
org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:411)
    at 
org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:376)
    at 
org.apache.flink.kinesis.shaded.io.netty.handler.ssl.SslHandler.channelInactive(SslHandler.java:1085)
    at 
org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305)
    at 
org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
    at 
org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
    at 
org.apache.flink.kinesis.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
    at 
org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:301)
    at 
org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
    at 
org.apache.flink.kinesis.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
    at 
org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813)
    at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
    at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
    at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
    at 
org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566)
    at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at 
org.apache.flink.kinesis.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at java.base/java.lang.Thread.run(Thread.java:829)Caused by: 
java.nio.channels.ClosedChannelException    ... 41 more
{code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to