[ 
https://issues.apache.org/jira/browse/FLINK-32964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17798229#comment-17798229
 ] 

Jan Kamieth edited comment on FLINK-32964 at 12/18/23 2:40 PM:
---------------------------------------------------------------

Hi,

we have the same issue in the same setup.
Flink 1.17 with Kinesis connector 
{{org.apache.flink:flink-connector-kinesis:4.1.0-1.17 }}running in EKS with 
access provided by an IAM role in a k8s service account.
After running the app, it is caught in a crash loop with the following stack 
trace after a couple of hours:
{code:java}
java.lang.IllegalStateException: Connection pool shut down
    at 
org.apache.flink.kinesis.shaded.org.apache.http.util.Asserts.check(Asserts.java:34)
    at 
org.apache.flink.kinesis.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection(PoolingHttpClientConnectionManager.java:269)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$DelegatingHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:75)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$InstrumentedHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:57)
    at 
org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:176)
    at 
org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
    at 
org.apache.flink.kinesis.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
    at 
org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
    at 
org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.impl.ApacheSdkHttpClient.execute(ApacheSdkHttpClient.java:72)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.execute(ApacheHttpClient.java:254)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.access$500(ApacheHttpClient.java:104)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:231)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:228)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java:63)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.executeHttpRequest(MakeHttpRequestStage.java:77)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:56)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:39)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:73)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:50)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:36)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:81)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:36)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:48)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:31)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:193)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:171)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:82)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:179)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:76)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:56)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.DefaultStsClient.assumeRoleWithWebIdentity(DefaultStsClient.java:760)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.auth.StsAssumeRoleWithWebIdentityCredentialsProvider.getUpdatedCredentials(StsAssumeRoleWithWebIdentityCredentialsProvider.java:73)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.auth.StsCredentialsProvider.updateSessionCredentials(StsCredentialsProvider.java:88)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.cache.CachedSupplier.lambda$jitteredPrefetchValueSupplier$3(CachedSupplier.java:283)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.cache.CachedSupplier$PrefetchStrategy.fetch(CachedSupplier.java:419)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.cache.CachedSupplier.refreshCache(CachedSupplier.java:198)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.cache.CachedSupplier.get(CachedSupplier.java:127)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.auth.StsCredentialsProvider.resolveCredentials(StsCredentialsProvider.java:99)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.auth.StsAssumeRoleWithWebIdentityCredentialsProvider.resolveCredentials(StsAssumeRoleWithWebIdentityCredentialsProvider.java:44)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.internal.StsWebIdentityCredentialsProviderFactory$StsWebIdentityCredentialsProvider.resolveCredentials(StsWebIdentityCredentialsProviderFactory.java:93)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider.resolveCredentials(WebIdentityTokenFileCredentialsProvider.java:113)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain.resolveCredentials(AwsCredentialsProviderChain.java:90)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials.internal.LazyAwsCredentialsProvider.resolveCredentials(LazyAwsCredentialsProvider.java:45)
    at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.resolveCredentials(DefaultCredentialsProvider.java:123)
    at 
org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.util.AWSGeneralUtil.validateAwsCredentials(AWSGeneralUtil.java:404)
    at 
org.apache.flink.connector.kinesis.sink.KinesisStreamsSinkWriter.buildClient(KinesisStreamsSinkWriter.java:159)
    at 
org.apache.flink.connector.kinesis.sink.KinesisStreamsSinkWriter.<init>(KinesisStreamsSinkWriter.java:154)
    at 
org.apache.flink.connector.kinesis.sink.KinesisStreamsSink.restoreWriter(KinesisStreamsSink.java:154)
    at 
org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler.createWriter(StatefulSinkWriterStateHandler.java:115)
    at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:146)
    at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274)
    at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
    at java.base/java.lang.Thread.run(Unknown Source) {code}
I'm quite sure that the `StsAssumeRoleWithWebIdentityCredentialsProvider` or at 
least the AWS authentication is the root cause of this issue, as we also 
deployed the same app with basic aws access key and secret access key 
credentials afterwards with which the app was running stable for multiple days.

The Iceberg project has a similar issue: 
[https://github.com/apache/iceberg/issues/8601]
Their issue occurred roughly around the same time.
They are stating that the cause is the *DefaultCredentialsProvider* which is 
used as a singleton. 
How I understand the code in 
*org/apache/flink/kinesis/shaded/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.class*
 flink uses a singleton here, too.
The suggested fix is to not use a singleton but to recreate the credentials 
provider on every credentials refresh.

I hope this helps.


was (Author: JIRAUSER303551):
Hi,

we have the same issue in the same setup.
Flink 1.17 with Kinesis connector 
{{org.apache.flink:flink-connector-kinesis:4.1.0-1.17 }}running in EKS with 
access provided by an IAM role in a k8s service account.
After running the app, it is caught in a crash loop with the following stack 
trace after a couple of hours:

{noformat}
java.lang.IllegalStateException: Connection pool shut down at 
org.apache.flink.kinesis.shaded.org.apache.http.util.Asserts.check(Asserts.java:34)
 at 
org.apache.flink.kinesis.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection(PoolingHttpClientConnectionManager.java:269)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$DelegatingHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:75)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$InstrumentedHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:57)
 at 
org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:176)
 at 
org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
 at 
org.apache.flink.kinesis.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
 at 
org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
 at 
org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.impl.ApacheSdkHttpClient.execute(ApacheSdkHttpClient.java:72)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.execute(ApacheHttpClient.java:254)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.access$500(ApacheHttpClient.java:104)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:231)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:228)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java:63)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.executeHttpRequest(MakeHttpRequestStage.java:77)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:56)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:39)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:73)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:50)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:36)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:81)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:36)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:48)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:31)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:193)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:171)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:82)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:179)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:76)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:56)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.DefaultStsClient.assumeRoleWithWebIdentity(DefaultStsClient.java:760)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.auth.StsAssumeRoleWithWebIdentityCredentialsProvider.getUpdatedCredentials(StsAssumeRoleWithWebIdentityCredentialsProvider.java:73)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.auth.StsCredentialsProvider.updateSessionCredentials(StsCredentialsProvider.java:88)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.cache.CachedSupplier.lambda$jitteredPrefetchValueSupplier$3(CachedSupplier.java:283)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.cache.CachedSupplier$PrefetchStrategy.fetch(CachedSupplier.java:419)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.cache.CachedSupplier.refreshCache(CachedSupplier.java:198)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.cache.CachedSupplier.get(CachedSupplier.java:127)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.auth.StsCredentialsProvider.resolveCredentials(StsCredentialsProvider.java:99)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.auth.StsAssumeRoleWithWebIdentityCredentialsProvider.resolveCredentials(StsAssumeRoleWithWebIdentityCredentialsProvider.java:44)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.internal.StsWebIdentityCredentialsProviderFactory$StsWebIdentityCredentialsProvider.resolveCredentials(StsWebIdentityCredentialsProviderFactory.java:93)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider.resolveCredentials(WebIdentityTokenFileCredentialsProvider.java:113)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain.resolveCredentials(AwsCredentialsProviderChain.java:90)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials.internal.LazyAwsCredentialsProvider.resolveCredentials(LazyAwsCredentialsProvider.java:45)
 at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.resolveCredentials(DefaultCredentialsProvider.java:123)
 at 
org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.util.AWSGeneralUtil.validateAwsCredentials(AWSGeneralUtil.java:404)
 at 
org.apache.flink.connector.kinesis.sink.KinesisStreamsSinkWriter.buildClient(KinesisStreamsSinkWriter.java:159)
 at 
org.apache.flink.connector.kinesis.sink.KinesisStreamsSinkWriter.<init>(KinesisStreamsSinkWriter.java:154)
 at 
org.apache.flink.connector.kinesis.sink.KinesisStreamsSink.restoreWriter(KinesisStreamsSink.java:154)
 at 
org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler.createWriter(StatefulSinkWriterStateHandler.java:115)
 at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:146)
 at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274)
 at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)
 at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
 at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at 
java.base/java.lang.Thread.run(Unknown Source)
{noformat}

 

I'm quite sure that the `StsAssumeRoleWithWebIdentityCredentialsProvider` or at 
least the AWS authentication is the root cause of this issue, as we also 
deployed the same app with basic aws access key and secret access key 
credentials afterwards with which the app was running stable for multiple days.

The Iceberg project has a similar issue: 
[https://github.com/apache/iceberg/issues/8601]
Their issue occurred roughly around the same time.
They are stating that the cause is the `DefaultCredentialsProvider` which is 
used as a singleton. 
How I understand the code in 
`org/apache/flink/kinesis/shaded/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.class`
 flink uses a singleton here, too.
The suggested fix is to not use a singleton but to recreate the credentials 
provider on every credentials refresh.

I hope this helps.

> KinesisStreamsSink cant renew credentials with 
> WebIdentityTokenFileCredentialsProvider
> --------------------------------------------------------------------------------------
>
>                 Key: FLINK-32964
>                 URL: https://issues.apache.org/jira/browse/FLINK-32964
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kinesis
>    Affects Versions: 1.15.4, 1.16.2, 1.17.1
>            Reporter: PhilippeB
>            Priority: Major
>
> (First time filling a ticket in Flink community, please let me know if there 
> are any guidelinges I need to follow)
> I noticed a very strange behavior with the Kinesis Sink. I actually using 
> Flink in containerized and Application (reactive) mode on EKS with high 
> availability on S3. 
> Kinesis is configured with IAM role and appropried policies. 
> {code:java}
> //Here a part of my flink-config.yaml:
> parallelism.default: 2
> scheduler-mode: reactive
> execution.checkpointing.interval: 10s
> env.java.opts.jobmanager: -Dkubernetes.max.concurrent.requests=200
> containerized.master.env.KUBERNETES_MAX_CONCURRENT_REQUESTS: 200
> aws.credentials.provider: WEB_IDENTITY_TOKEN
> aws.credentials.role.arn: role
> aws.credentials.role.sessionName: session
> aws.credentials.webIdentityToken.file: 
> /var/run/secrets/eks.amazonaws.com/serviceaccount/token {code}
> When my project is deployed the application and cluster are working well but 
> when the project has been started for about an hour, I suppose the IAM roles 
> session need to be renew, then the job become to crashing continuously.
> {code:java}
> 2023-08-24 10:35:55
> java.lang.IllegalStateException: Connection pool shut down
>     at 
> org.apache.flink.kinesis.shaded.org.apache.http.util.Asserts.check(Asserts.java:34)
>     at 
> org.apache.flink.kinesis.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection(PoolingHttpClientConnectionManager.java:269)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$DelegatingHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:75)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$InstrumentedHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:57)
>     at 
> org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:176)
>     at 
> org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
>     at 
> org.apache.flink.kinesis.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
>     at 
> org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
>     at 
> org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.impl.ApacheSdkHttpClient.execute(ApacheSdkHttpClient.java:72)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.execute(ApacheHttpClient.java:254)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.access$500(ApacheHttpClient.java:104)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:231)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:228)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java:63)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.executeHttpRequest(MakeHttpRequestStage.java:77)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:56)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:39)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:73)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:50)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:36)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:81)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:36)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:48)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:31)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:193)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:171)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:82)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:179)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:76)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:56)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.DefaultStsClient.assumeRoleWithWebIdentity(DefaultStsClient.java:760)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.auth.StsAssumeRoleWithWebIdentityCredentialsProvider.getUpdatedCredentials(StsAssumeRoleWithWebIdentityCredentialsProvider.java:73)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.auth.StsCredentialsProvider.updateSessionCredentials(StsCredentialsProvider.java:88)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.cache.CachedSupplier.lambda$jitteredPrefetchValueSupplier$3(CachedSupplier.java:283)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.cache.CachedSupplier$PrefetchStrategy.fetch(CachedSupplier.java:419)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.cache.CachedSupplier.refreshCache(CachedSupplier.java:198)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.cache.CachedSupplier.get(CachedSupplier.java:127)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.auth.StsCredentialsProvider.resolveCredentials(StsCredentialsProvider.java:99)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.auth.StsAssumeRoleWithWebIdentityCredentialsProvider.resolveCredentials(StsAssumeRoleWithWebIdentityCredentialsProvider.java:44)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.internal.StsWebIdentityCredentialsProviderFactory$StsWebIdentityCredentialsProvider.resolveCredentials(StsWebIdentityCredentialsProviderFactory.java:93)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider.resolveCredentials(WebIdentityTokenFileCredentialsProvider.java:113)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain.resolveCredentials(AwsCredentialsProviderChain.java:90)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials.internal.LazyAwsCredentialsProvider.resolveCredentials(LazyAwsCredentialsProvider.java:45)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.resolveCredentials(DefaultCredentialsProvider.java:123)
>     at 
> org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.util.AWSGeneralUtil.validateAwsCredentials(AWSGeneralUtil.java:404)
>     at 
> org.apache.flink.connector.kinesis.sink.KinesisStreamsSinkWriter.buildClient(KinesisStreamsSinkWriter.java:159)
>     at 
> org.apache.flink.connector.kinesis.sink.KinesisStreamsSinkWriter.<init>(KinesisStreamsSinkWriter.java:154)
>     at 
> org.apache.flink.connector.kinesis.sink.KinesisStreamsSink.restoreWriter(KinesisStreamsSink.java:154)
>     at 
> org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler.createWriter(StatefulSinkWriterStateHandler.java:115)
>     at 
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:146)
>     at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274)
>     at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>     at java.base/java.lang.Thread.run(Unknown Source)
>  {code}
> I tested my project in many flink version with 1.15.4, 1.16.2 and 1.17.1 the 
> same issues is happening.
> Please let me know if this can be filled as a bug or If you can helping me to 
> figure out my misunderstood.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to