[ https://issues.apache.org/jira/browse/FLINK-32964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17798229#comment-17798229 ]
Jan Kamieth edited comment on FLINK-32964 at 12/18/23 2:40 PM: --------------------------------------------------------------- Hi, we have the same issue in the same setup. Flink 1.17 with Kinesis connector {{org.apache.flink:flink-connector-kinesis:4.1.0-1.17 }}running in EKS with access provided by an IAM role in a k8s service account. After running the app, it is caught in a crash loop with the following stack trace after a couple of hours: {code:java} java.lang.IllegalStateException: Connection pool shut down at org.apache.flink.kinesis.shaded.org.apache.http.util.Asserts.check(Asserts.java:34) at org.apache.flink.kinesis.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection(PoolingHttpClientConnectionManager.java:269) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$DelegatingHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:75) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$InstrumentedHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:57) at org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:176) at org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) at org.apache.flink.kinesis.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) at org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) at org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.impl.ApacheSdkHttpClient.execute(ApacheSdkHttpClient.java:72) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.execute(ApacheHttpClient.java:254) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.access$500(ApacheHttpClient.java:104) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:231) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:228) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java:63) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.executeHttpRequest(MakeHttpRequestStage.java:77) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:56) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:39) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:73) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:50) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:36) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:81) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:36) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:48) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:31) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:193) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:171) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:82) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:179) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:76) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:56) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.DefaultStsClient.assumeRoleWithWebIdentity(DefaultStsClient.java:760) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.auth.StsAssumeRoleWithWebIdentityCredentialsProvider.getUpdatedCredentials(StsAssumeRoleWithWebIdentityCredentialsProvider.java:73) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.auth.StsCredentialsProvider.updateSessionCredentials(StsCredentialsProvider.java:88) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.cache.CachedSupplier.lambda$jitteredPrefetchValueSupplier$3(CachedSupplier.java:283) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.cache.CachedSupplier$PrefetchStrategy.fetch(CachedSupplier.java:419) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.cache.CachedSupplier.refreshCache(CachedSupplier.java:198) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.cache.CachedSupplier.get(CachedSupplier.java:127) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.auth.StsCredentialsProvider.resolveCredentials(StsCredentialsProvider.java:99) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.auth.StsAssumeRoleWithWebIdentityCredentialsProvider.resolveCredentials(StsAssumeRoleWithWebIdentityCredentialsProvider.java:44) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.internal.StsWebIdentityCredentialsProviderFactory$StsWebIdentityCredentialsProvider.resolveCredentials(StsWebIdentityCredentialsProviderFactory.java:93) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider.resolveCredentials(WebIdentityTokenFileCredentialsProvider.java:113) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain.resolveCredentials(AwsCredentialsProviderChain.java:90) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials.internal.LazyAwsCredentialsProvider.resolveCredentials(LazyAwsCredentialsProvider.java:45) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.resolveCredentials(DefaultCredentialsProvider.java:123) at org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.util.AWSGeneralUtil.validateAwsCredentials(AWSGeneralUtil.java:404) at org.apache.flink.connector.kinesis.sink.KinesisStreamsSinkWriter.buildClient(KinesisStreamsSinkWriter.java:159) at org.apache.flink.connector.kinesis.sink.KinesisStreamsSinkWriter.<init>(KinesisStreamsSinkWriter.java:154) at org.apache.flink.connector.kinesis.sink.KinesisStreamsSink.restoreWriter(KinesisStreamsSink.java:154) at org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler.createWriter(StatefulSinkWriterStateHandler.java:115) at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:146) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.base/java.lang.Thread.run(Unknown Source) {code} I'm quite sure that the `StsAssumeRoleWithWebIdentityCredentialsProvider` or at least the AWS authentication is the root cause of this issue, as we also deployed the same app with basic aws access key and secret access key credentials afterwards with which the app was running stable for multiple days. The Iceberg project has a similar issue: [https://github.com/apache/iceberg/issues/8601] Their issue occurred roughly around the same time. They are stating that the cause is the *DefaultCredentialsProvider* which is used as a singleton. How I understand the code in *org/apache/flink/kinesis/shaded/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.class* flink uses a singleton here, too. The suggested fix is to not use a singleton but to recreate the credentials provider on every credentials refresh. I hope this helps. was (Author: JIRAUSER303551): Hi, we have the same issue in the same setup. Flink 1.17 with Kinesis connector {{org.apache.flink:flink-connector-kinesis:4.1.0-1.17 }}running in EKS with access provided by an IAM role in a k8s service account. After running the app, it is caught in a crash loop with the following stack trace after a couple of hours: {noformat} java.lang.IllegalStateException: Connection pool shut down at org.apache.flink.kinesis.shaded.org.apache.http.util.Asserts.check(Asserts.java:34) at org.apache.flink.kinesis.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection(PoolingHttpClientConnectionManager.java:269) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$DelegatingHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:75) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$InstrumentedHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:57) at org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:176) at org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) at org.apache.flink.kinesis.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) at org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) at org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.impl.ApacheSdkHttpClient.execute(ApacheSdkHttpClient.java:72) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.execute(ApacheHttpClient.java:254) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.access$500(ApacheHttpClient.java:104) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:231) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:228) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java:63) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.executeHttpRequest(MakeHttpRequestStage.java:77) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:56) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:39) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:73) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:50) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:36) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:81) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:36) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:48) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:31) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:193) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:171) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:82) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:179) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:76) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:56) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.DefaultStsClient.assumeRoleWithWebIdentity(DefaultStsClient.java:760) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.auth.StsAssumeRoleWithWebIdentityCredentialsProvider.getUpdatedCredentials(StsAssumeRoleWithWebIdentityCredentialsProvider.java:73) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.auth.StsCredentialsProvider.updateSessionCredentials(StsCredentialsProvider.java:88) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.cache.CachedSupplier.lambda$jitteredPrefetchValueSupplier$3(CachedSupplier.java:283) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.cache.CachedSupplier$PrefetchStrategy.fetch(CachedSupplier.java:419) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.cache.CachedSupplier.refreshCache(CachedSupplier.java:198) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.cache.CachedSupplier.get(CachedSupplier.java:127) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.auth.StsCredentialsProvider.resolveCredentials(StsCredentialsProvider.java:99) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.auth.StsAssumeRoleWithWebIdentityCredentialsProvider.resolveCredentials(StsAssumeRoleWithWebIdentityCredentialsProvider.java:44) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.internal.StsWebIdentityCredentialsProviderFactory$StsWebIdentityCredentialsProvider.resolveCredentials(StsWebIdentityCredentialsProviderFactory.java:93) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider.resolveCredentials(WebIdentityTokenFileCredentialsProvider.java:113) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain.resolveCredentials(AwsCredentialsProviderChain.java:90) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials.internal.LazyAwsCredentialsProvider.resolveCredentials(LazyAwsCredentialsProvider.java:45) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.resolveCredentials(DefaultCredentialsProvider.java:123) at org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.util.AWSGeneralUtil.validateAwsCredentials(AWSGeneralUtil.java:404) at org.apache.flink.connector.kinesis.sink.KinesisStreamsSinkWriter.buildClient(KinesisStreamsSinkWriter.java:159) at org.apache.flink.connector.kinesis.sink.KinesisStreamsSinkWriter.<init>(KinesisStreamsSinkWriter.java:154) at org.apache.flink.connector.kinesis.sink.KinesisStreamsSink.restoreWriter(KinesisStreamsSink.java:154) at org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler.createWriter(StatefulSinkWriterStateHandler.java:115) at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:146) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.base/java.lang.Thread.run(Unknown Source) {noformat} I'm quite sure that the `StsAssumeRoleWithWebIdentityCredentialsProvider` or at least the AWS authentication is the root cause of this issue, as we also deployed the same app with basic aws access key and secret access key credentials afterwards with which the app was running stable for multiple days. The Iceberg project has a similar issue: [https://github.com/apache/iceberg/issues/8601] Their issue occurred roughly around the same time. They are stating that the cause is the `DefaultCredentialsProvider` which is used as a singleton. How I understand the code in `org/apache/flink/kinesis/shaded/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.class` flink uses a singleton here, too. The suggested fix is to not use a singleton but to recreate the credentials provider on every credentials refresh. I hope this helps. > KinesisStreamsSink cant renew credentials with > WebIdentityTokenFileCredentialsProvider > -------------------------------------------------------------------------------------- > > Key: FLINK-32964 > URL: https://issues.apache.org/jira/browse/FLINK-32964 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis > Affects Versions: 1.15.4, 1.16.2, 1.17.1 > Reporter: PhilippeB > Priority: Major > > (First time filling a ticket in Flink community, please let me know if there > are any guidelinges I need to follow) > I noticed a very strange behavior with the Kinesis Sink. I actually using > Flink in containerized and Application (reactive) mode on EKS with high > availability on S3. > Kinesis is configured with IAM role and appropried policies. > {code:java} > //Here a part of my flink-config.yaml: > parallelism.default: 2 > scheduler-mode: reactive > execution.checkpointing.interval: 10s > env.java.opts.jobmanager: -Dkubernetes.max.concurrent.requests=200 > containerized.master.env.KUBERNETES_MAX_CONCURRENT_REQUESTS: 200 > aws.credentials.provider: WEB_IDENTITY_TOKEN > aws.credentials.role.arn: role > aws.credentials.role.sessionName: session > aws.credentials.webIdentityToken.file: > /var/run/secrets/eks.amazonaws.com/serviceaccount/token {code} > When my project is deployed the application and cluster are working well but > when the project has been started for about an hour, I suppose the IAM roles > session need to be renew, then the job become to crashing continuously. > {code:java} > 2023-08-24 10:35:55 > java.lang.IllegalStateException: Connection pool shut down > at > org.apache.flink.kinesis.shaded.org.apache.http.util.Asserts.check(Asserts.java:34) > at > org.apache.flink.kinesis.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection(PoolingHttpClientConnectionManager.java:269) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$DelegatingHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:75) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$InstrumentedHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:57) > at > org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:176) > at > org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) > at > org.apache.flink.kinesis.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) > at > org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) > at > org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.impl.ApacheSdkHttpClient.execute(ApacheSdkHttpClient.java:72) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.execute(ApacheHttpClient.java:254) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.access$500(ApacheHttpClient.java:104) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:231) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:228) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java:63) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.executeHttpRequest(MakeHttpRequestStage.java:77) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:56) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:39) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:73) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:50) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:36) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:81) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:36) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:48) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:31) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:193) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:171) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:82) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:179) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:76) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:56) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.DefaultStsClient.assumeRoleWithWebIdentity(DefaultStsClient.java:760) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.auth.StsAssumeRoleWithWebIdentityCredentialsProvider.getUpdatedCredentials(StsAssumeRoleWithWebIdentityCredentialsProvider.java:73) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.auth.StsCredentialsProvider.updateSessionCredentials(StsCredentialsProvider.java:88) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.cache.CachedSupplier.lambda$jitteredPrefetchValueSupplier$3(CachedSupplier.java:283) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.cache.CachedSupplier$PrefetchStrategy.fetch(CachedSupplier.java:419) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.cache.CachedSupplier.refreshCache(CachedSupplier.java:198) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.cache.CachedSupplier.get(CachedSupplier.java:127) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.auth.StsCredentialsProvider.resolveCredentials(StsCredentialsProvider.java:99) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.auth.StsAssumeRoleWithWebIdentityCredentialsProvider.resolveCredentials(StsAssumeRoleWithWebIdentityCredentialsProvider.java:44) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.internal.StsWebIdentityCredentialsProviderFactory$StsWebIdentityCredentialsProvider.resolveCredentials(StsWebIdentityCredentialsProviderFactory.java:93) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider.resolveCredentials(WebIdentityTokenFileCredentialsProvider.java:113) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain.resolveCredentials(AwsCredentialsProviderChain.java:90) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials.internal.LazyAwsCredentialsProvider.resolveCredentials(LazyAwsCredentialsProvider.java:45) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.resolveCredentials(DefaultCredentialsProvider.java:123) > at > org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.util.AWSGeneralUtil.validateAwsCredentials(AWSGeneralUtil.java:404) > at > org.apache.flink.connector.kinesis.sink.KinesisStreamsSinkWriter.buildClient(KinesisStreamsSinkWriter.java:159) > at > org.apache.flink.connector.kinesis.sink.KinesisStreamsSinkWriter.<init>(KinesisStreamsSinkWriter.java:154) > at > org.apache.flink.connector.kinesis.sink.KinesisStreamsSink.restoreWriter(KinesisStreamsSink.java:154) > at > org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler.createWriter(StatefulSinkWriterStateHandler.java:115) > at > org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:146) > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.base/java.lang.Thread.run(Unknown Source) > {code} > I tested my project in many flink version with 1.15.4, 1.16.2 and 1.17.1 the > same issues is happening. > Please let me know if this can be filled as a bug or If you can helping me to > figure out my misunderstood. -- This message was sent by Atlassian Jira (v8.20.10#820010)