[
https://issues.apache.org/jira/browse/FLINK-28007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Zichen Liu updated FLINK-28007:
-------------------------------
Labels: (was: pull-request-available)
> Tests for AWS Connectors Using SDK v2 to use Synchronous Clients
> ----------------------------------------------------------------
>
> Key: FLINK-28007
> URL: https://issues.apache.org/jira/browse/FLINK-28007
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Common, Connectors / Kinesis
> Affects Versions: 1.15.0, 1.15.1, 1.15.2
> Reporter: Zichen Liu
> Assignee: Zichen Liu
> Priority: Minor
> Fix For: 1.16.0
>
>
> h3. Background
> AWS SDK v2 async clients use a Netty async client for Kinesis Data
> Streams/Firehose sink and Kinesis Data Streams EFO consumer. The SDK creates
> a shared thread pool for Netty to use for network operations when one is not
> configured. The thread pool is managed by a shared ELG (event loop group),
> and this is stored in a static field. We do not configure this for the AWS
> connectors in the Flink codebase.
> When threads are spawned within the ELG, they inherit the context classloader
> from the current thread. If the ELG is created from a shared classloader, for
> instance Flink parent classloader, or MiniCluster parent classloader,
> multiple Flink jobs can share the same ELG. When an ELG thread is spawned
> from a Flink job, it will inherit the Flink user classloader. When this job
> completes/fails, the classloader is destroyed, however the Netty thread is
> still referencing it, and this leads to below exception.
> h3. Impact
> This issue *does not* impact jobs deployed to TM when AWS SDK v2 is loaded
> via the Flink User Classloader. It is expected this is the standard
> deployment configuration.
> This issue is known to impact:
> - Flink mini cluster, for example in integration tests (FLINK-26064)
> - Flink cluster loading AWS SDK v2 via parent classloader
> h3. Suggested solution
> There are a few possible solutions, as discussed
> https://github.com/apache/flink/pull/18733
> 1. Create a separate ELG per Flink job
> 2. Create a separate ELG per subtask
> 3. Attach the correct classloader to ELG spawned threads
> h3. Error Stack
> (shortened stack trace, as full is too large)
> {noformat}
> Feb 09 20:05:04 java.util.concurrent.ExecutionException:
> software.amazon.awssdk.core.exception.SdkClientException: Unable to execute
> HTTP request: Trying to access closed classloader. Please check if you store
> classloaders directly or indirectly in static fields. If the stacktrace
> suggests that the leak occurs in a third party library and cannot be fixed
> immediately, you can disable this check with the configuration
> 'classloader.check-leaked-classloader'.
> Feb 09 20:05:04 at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> Feb 09 20:05:04 at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> (...)
> Feb 09 20:05:04 Caused by:
> software.amazon.awssdk.core.exception.SdkClientException: Unable to execute
> HTTP request: Trying to access closed classloader. Please check if you store
> classloaders directly or indirectly in static fields. If the stacktrace
> suggests that the leak occurs in a third party library and cannot be fixed
> immediately, you can disable this check with the configuration
> 'classloader.check-leaked-classloader'.
> Feb 09 20:05:04 at
> software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:98)
> Feb 09 20:05:04 at
> software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:43)
> Feb 09 20:05:04 at
> software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:204)
> Feb 09 20:05:04 at
> software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:200)
> Feb 09 20:05:04 at
> software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:179)
> Feb 09 20:05:04 at
> software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:159)
> (...)
> Feb 09 20:05:04 Caused by: java.lang.IllegalStateException: Trying to access
> closed classloader. Please check if you store classloaders directly or
> indirectly in static fields. If the stacktrace suggests that the leak occurs
> in a third party library and cannot be fixed immediately, you can disable
> this check with the configuration 'classloader.check-leaked-classloader'.
> Feb 09 20:05:04 at
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:164)
> Feb 09 20:05:04 at
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResources(FlinkUserCodeClassLoaders.java:188)
> Feb 09 20:05:04 at
> java.util.ServiceLoader$LazyIterator.hasNextService(ServiceLoader.java:348)
> Feb 09 20:05:04 at
> java.util.ServiceLoader$LazyIterator.hasNext(ServiceLoader.java:393)
> Feb 09 20:05:04 at
> java.util.ServiceLoader$1.hasNext(ServiceLoader.java:474)
> Feb 09 20:05:04 at
> javax.xml.stream.FactoryFinder$1.run(FactoryFinder.java:352)
> Feb 09 20:05:04 at java.security.AccessController.doPrivileged(Native
> Method)
> Feb 09 20:05:04 at
> javax.xml.stream.FactoryFinder.findServiceProvider(FactoryFinder.java:341)
> Feb 09 20:05:04 at
> javax.xml.stream.FactoryFinder.find(FactoryFinder.java:313)
> Feb 09 20:05:04 at
> javax.xml.stream.FactoryFinder.find(FactoryFinder.java:227)
> Feb 09 20:05:04 at
> javax.xml.stream.XMLInputFactory.newInstance(XMLInputFactory.java:154)
> Feb 09 20:05:04 at
> software.amazon.awssdk.protocols.query.unmarshall.XmlDomParser.createXmlInputFactory(XmlDomParser.java:124)
> Feb 09 20:05:04 at
> java.lang.ThreadLocal$SuppliedThreadLocal.initialValue(ThreadLocal.java:284)
> Feb 09 20:05:04 at
> java.lang.ThreadLocal.setInitialValue(ThreadLocal.java:180)
> Feb 09 20:05:04 at java.lang.ThreadLocal.get(ThreadLocal.java:170)
> (...)
> {noformat}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)