[ 
https://issues.apache.org/jira/browse/FLINK-16142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17040163#comment-17040163
 ] 

Thomas Wozniakowski commented on FLINK-16142:
---------------------------------------------

Ok, I put the following in the OPEN method of our custom SQS sink (just because 
it's somewhere in the job it's easy to run arbitrary code from):

{code:java}
Thread.getAllStackTraces().keySet().stream().sorted(Comparator.comparing(Thread::getName)).forEach(thread
 ->
            System.out.printf("THREAD: %s (%s)%n", thread.getName(), 
thread.getState().toString())
        );
{code}

The only bits I have changed are the blocks of XXXs which are client-specific 
stuff that I can't post. They're just the names of CEP operators in the job.

First run output:
{code:java}
THREAD: XXXXXX -> Sink: SQS: pattern-detector-e2e-test-signal-queue (1/1) 
(RUNNABLE)
THREAD: CloseableReaperThread (WAITING)
THREAD: DestroyJavaVM (RUNNABLE)
THREAD: Finalizer (WAITING)
THREAD: Flink Netty Server (0) Thread 0 (RUNNABLE)
THREAD: Flink-MetricRegistry-thread-1 (TIMED_WAITING)
THREAD: Hashed wheel timer #1 (TIMED_WAITING)
THREAD: IOManager reader thread #1 (WAITING)
THREAD: IOManager writer thread #1 (WAITING)
THREAD: New I/O boss #3 (RUNNABLE)
THREAD: New I/O boss #9 (RUNNABLE)
THREAD: New I/O server boss #12 (RUNNABLE)
THREAD: New I/O server boss #6 (RUNNABLE)
THREAD: New I/O worker #1 (RUNNABLE)
THREAD: New I/O worker #10 (RUNNABLE)
THREAD: New I/O worker #11 (RUNNABLE)
THREAD: New I/O worker #2 (RUNNABLE)
THREAD: New I/O worker #4 (RUNNABLE)
THREAD: New I/O worker #5 (RUNNABLE)
THREAD: New I/O worker #7 (RUNNABLE)
THREAD: New I/O worker #8 (RUNNABLE)
THREAD: OutputFlusher for Source: Kinesis: pattern_detector_test_stream -> 
Remove Unwanted Events -> (Quorum Based Timestamps [SERVER_TIME] -> Filter NULL 
keys [deviceId], Quorum Based Timestamps [DEVICE_TIME]) (TIMED_WAITING)
THREAD: Reference Handler (WAITING)
THREAD: Signal Dispatcher (RUNNABLE)
THREAD: Source: Kinesis: pattern_detector_test_stream -> Remove Unwanted Events 
-> (Quorum Based Timestamps [SERVER_TIME] -> Filter NULL keys [deviceId], 
Quorum Based Timestamps [DEVICE_TIME]) (1/1) (RUNNABLE)
THREAD: Timer-0 (TIMED_WAITING)
THREAD: Timer-1 (TIMED_WAITING)
THREAD: flink-akka.actor.default-dispatcher-2 (TIMED_WAITING)
THREAD: flink-akka.actor.default-dispatcher-3 (WAITING)
THREAD: flink-akka.actor.default-dispatcher-4 (WAITING)
THREAD: flink-akka.actor.default-dispatcher-5 (WAITING)
THREAD: flink-akka.remote.default-remote-dispatcher-6 (TIMED_WAITING)
THREAD: flink-akka.remote.default-remote-dispatcher-7 (WAITING)
THREAD: flink-metrics-2 (TIMED_WAITING)
THREAD: flink-metrics-akka.remote.default-remote-dispatcher-3 (WAITING)
THREAD: flink-metrics-akka.remote.default-remote-dispatcher-4 (TIMED_WAITING)
THREAD: flink-metrics-scheduler-1 (TIMED_WAITING)
THREAD: flink-scheduler-1 (TIMED_WAITING)
THREAD: pool-3-thread-1 (TIMED_WAITING)
{code}

Second run output:
{code:java}
THREAD: XXXXXX -> Sink: SQS: pattern-detector-e2e-test-signal-queue (1/1) 
(RUNNABLE)
THREAD: CloseableReaperThread (WAITING)
THREAD: DestroyJavaVM (RUNNABLE)
THREAD: Finalizer (WAITING)
THREAD: Flink Netty Server (0) Thread 0 (RUNNABLE)
THREAD: Flink-MetricRegistry-thread-1 (TIMED_WAITING)
THREAD: Hashed wheel timer #1 (TIMED_WAITING)
THREAD: IOManager reader thread #1 (WAITING)
THREAD: IOManager writer thread #1 (WAITING)
THREAD: New I/O boss #3 (RUNNABLE)
THREAD: New I/O boss #9 (RUNNABLE)
THREAD: New I/O server boss #12 (RUNNABLE)
THREAD: New I/O server boss #6 (RUNNABLE)
THREAD: New I/O worker #1 (RUNNABLE)
THREAD: New I/O worker #10 (RUNNABLE)
THREAD: New I/O worker #11 (RUNNABLE)
THREAD: New I/O worker #2 (RUNNABLE)
THREAD: New I/O worker #4 (RUNNABLE)
THREAD: New I/O worker #5 (RUNNABLE)
THREAD: New I/O worker #7 (RUNNABLE)
THREAD: New I/O worker #8 (RUNNABLE)
THREAD: OutputFlusher for Source: Kinesis: pattern_detector_test_stream -> 
Remove Unwanted Events -> (Quorum Based Timestamps [SERVER_TIME] -> Filter NULL 
keys [deviceId], Quorum Based Timestamps [DEVICE_TIME]) (TIMED_WAITING)
THREAD: Reference Handler (WAITING)
THREAD: Signal Dispatcher (RUNNABLE)
THREAD: Source: Kinesis: pattern_detector_test_stream -> Remove Unwanted Events 
-> (Quorum Based Timestamps [SERVER_TIME] -> Filter NULL keys [deviceId], 
Quorum Based Timestamps [DEVICE_TIME]) (1/1) (RUNNABLE)
THREAD: Timer-0 (TIMED_WAITING)
THREAD: Timer-1 (TIMED_WAITING)
THREAD: flink-akka.actor.default-dispatcher-2 (TIMED_WAITING)
THREAD: flink-akka.actor.default-dispatcher-3 (WAITING)
THREAD: flink-akka.actor.default-dispatcher-4 (WAITING)
THREAD: flink-akka.actor.default-dispatcher-5 (WAITING)
THREAD: flink-akka.remote.default-remote-dispatcher-6 (WAITING)
THREAD: flink-akka.remote.default-remote-dispatcher-7 (TIMED_WAITING)
THREAD: flink-metrics-2 (TIMED_WAITING)
THREAD: flink-metrics-akka.remote.default-remote-dispatcher-3 (TIMED_WAITING)
THREAD: flink-metrics-akka.remote.default-remote-dispatcher-4 (WAITING)
THREAD: flink-metrics-scheduler-1 (TIMED_WAITING)
THREAD: flink-scheduler-1 (TIMED_WAITING)
THREAD: pool-3-thread-1 (TIMED_WAITING)
{code}

Third run Output
{code:java}
THREAD: XXXXXX-> Sink: SQS: pattern-detector-e2e-test-signal-queue (1/1) 
(RUNNABLE)
THREAD: CloseableReaperThread (WAITING)
THREAD: DestroyJavaVM (RUNNABLE)
THREAD: Finalizer (WAITING)
THREAD: Flink Netty Server (0) Thread 0 (RUNNABLE)
THREAD: Flink-MetricRegistry-thread-1 (TIMED_WAITING)
THREAD: Hashed wheel timer #1 (TIMED_WAITING)
THREAD: IOManager reader thread #1 (WAITING)
THREAD: IOManager writer thread #1 (WAITING)
THREAD: New I/O boss #3 (RUNNABLE)
THREAD: New I/O boss #9 (RUNNABLE)
THREAD: New I/O server boss #12 (RUNNABLE)
THREAD: New I/O server boss #6 (RUNNABLE)
THREAD: New I/O worker #1 (RUNNABLE)
THREAD: New I/O worker #10 (RUNNABLE)
THREAD: New I/O worker #11 (RUNNABLE)
THREAD: New I/O worker #2 (RUNNABLE)
THREAD: New I/O worker #4 (RUNNABLE)
THREAD: New I/O worker #5 (RUNNABLE)
THREAD: New I/O worker #7 (RUNNABLE)
THREAD: New I/O worker #8 (RUNNABLE)
THREAD: OutputFlusher for Source: Kinesis: pattern_detector_test_stream -> 
Remove Unwanted Events -> (Quorum Based Timestamps [SERVER_TIME] -> Filter NULL 
keys [deviceId], Quorum Based Timestamps [DEVICE_TIME]) (TIMED_WAITING)
THREAD: Reference Handler (WAITING)
THREAD: Signal Dispatcher (RUNNABLE)
THREAD: Source: Kinesis: pattern_detector_test_stream -> Remove Unwanted Events 
-> (Quorum Based Timestamps [SERVER_TIME] -> Filter NULL keys [deviceId], 
Quorum Based Timestamps [DEVICE_TIME]) (1/1) (RUNNABLE)
THREAD: Timer-0 (TIMED_WAITING)
THREAD: Timer-1 (TIMED_WAITING)
THREAD: Timer-2 (WAITING)
THREAD: flink-akka.actor.default-dispatcher-15 (WAITING)
THREAD: flink-akka.actor.default-dispatcher-2 (WAITING)
THREAD: flink-akka.actor.default-dispatcher-3 (TIMED_WAITING)
THREAD: flink-akka.actor.default-dispatcher-4 (WAITING)
THREAD: flink-akka.actor.default-dispatcher-5 (WAITING)
THREAD: flink-akka.remote.default-remote-dispatcher-6 (TIMED_WAITING)
THREAD: flink-akka.remote.default-remote-dispatcher-7 (WAITING)
THREAD: flink-metrics-2 (TIMED_WAITING)
THREAD: flink-metrics-akka.remote.default-remote-dispatcher-3 (WAITING)
THREAD: flink-metrics-akka.remote.default-remote-dispatcher-4 (TIMED_WAITING)
THREAD: flink-metrics-scheduler-1 (TIMED_WAITING)
THREAD: flink-scheduler-1 (TIMED_WAITING)
THREAD: java-sdk-http-connection-reaper (TIMED_WAITING)
THREAD: pool-3-thread-1 (TIMED_WAITING)
{code}

Fourth run output:
{code:java}
THREAD: XXXXXX-> Sink: SQS: pattern-detector-e2e-test-signal-queue (1/1) 
(RUNNABLE)
THREAD: CloseableReaperThread (WAITING)
THREAD: DestroyJavaVM (RUNNABLE)
THREAD: Finalizer (WAITING)
THREAD: Flink Netty Server (0) Thread 0 (RUNNABLE)
THREAD: Flink-MetricRegistry-thread-1 (TIMED_WAITING)
THREAD: Hashed wheel timer #1 (TIMED_WAITING)
THREAD: IOManager reader thread #1 (WAITING)
THREAD: IOManager writer thread #1 (WAITING)
THREAD: New I/O boss #3 (RUNNABLE)
THREAD: New I/O boss #9 (RUNNABLE)
THREAD: New I/O server boss #12 (RUNNABLE)
THREAD: New I/O server boss #6 (RUNNABLE)
THREAD: New I/O worker #1 (RUNNABLE)
THREAD: New I/O worker #10 (RUNNABLE)
THREAD: New I/O worker #11 (RUNNABLE)
THREAD: New I/O worker #2 (RUNNABLE)
THREAD: New I/O worker #4 (RUNNABLE)
THREAD: New I/O worker #5 (RUNNABLE)
THREAD: New I/O worker #7 (RUNNABLE)
THREAD: New I/O worker #8 (RUNNABLE)
THREAD: OutputFlusher for Source: Kinesis: pattern_detector_test_stream -> 
Remove Unwanted Events -> (Quorum Based Timestamps [SERVER_TIME] -> Filter NULL 
keys [deviceId], Quorum Based Timestamps [DEVICE_TIME]) (TIMED_WAITING)
THREAD: Reference Handler (WAITING)
THREAD: Signal Dispatcher (RUNNABLE)
THREAD: Source: Kinesis: pattern_detector_test_stream -> Remove Unwanted Events 
-> (Quorum Based Timestamps [SERVER_TIME] -> Filter NULL keys [deviceId], 
Quorum Based Timestamps [DEVICE_TIME]) (1/1) (RUNNABLE)
THREAD: Timer-0 (TIMED_WAITING)
THREAD: Timer-1 (TIMED_WAITING)
THREAD: flink-akka.actor.default-dispatcher-15 (WAITING)
THREAD: flink-akka.actor.default-dispatcher-2 (WAITING)
THREAD: flink-akka.actor.default-dispatcher-3 (WAITING)
THREAD: flink-akka.actor.default-dispatcher-4 (TIMED_WAITING)
THREAD: flink-akka.actor.default-dispatcher-5 (WAITING)
THREAD: flink-akka.remote.default-remote-dispatcher-6 (WAITING)
THREAD: flink-akka.remote.default-remote-dispatcher-7 (TIMED_WAITING)
THREAD: flink-metrics-2 (TIMED_WAITING)
THREAD: flink-metrics-akka.remote.default-remote-dispatcher-3 (TIMED_WAITING)
THREAD: flink-metrics-akka.remote.default-remote-dispatcher-4 (WAITING)
THREAD: flink-metrics-scheduler-1 (TIMED_WAITING)
THREAD: flink-scheduler-1 (TIMED_WAITING)
THREAD: java-sdk-http-connection-reaper (TIMED_WAITING)
THREAD: pool-3-thread-1 (TIMED_WAITING)
{code}

Final run output (Metaspace OOM error moments after this)
{code:java}
THREAD: XXXXXX -> Sink: SQS: pattern-detector-e2e-test-signal-queue (1/1) 
(RUNNABLE)
THREAD: CloseableReaperThread (WAITING)
THREAD: DestroyJavaVM (RUNNABLE)
THREAD: Finalizer (WAITING)
THREAD: Flink Netty Server (0) Thread 0 (RUNNABLE)
THREAD: Flink-MetricRegistry-thread-1 (TIMED_WAITING)
THREAD: Hashed wheel timer #1 (TIMED_WAITING)
THREAD: IOManager reader thread #1 (WAITING)
THREAD: IOManager writer thread #1 (WAITING)
THREAD: New I/O boss #3 (RUNNABLE)
THREAD: New I/O boss #9 (RUNNABLE)
THREAD: New I/O server boss #12 (RUNNABLE)
THREAD: New I/O server boss #6 (RUNNABLE)
THREAD: New I/O worker #1 (RUNNABLE)
THREAD: New I/O worker #10 (RUNNABLE)
THREAD: New I/O worker #11 (RUNNABLE)
THREAD: New I/O worker #2 (RUNNABLE)
THREAD: New I/O worker #4 (RUNNABLE)
THREAD: New I/O worker #5 (RUNNABLE)
THREAD: New I/O worker #7 (RUNNABLE)
THREAD: New I/O worker #8 (RUNNABLE)
THREAD: OutputFlusher for Source: Kinesis: pattern_detector_test_stream -> 
Remove Unwanted Events -> (Quorum Based Timestamps [SERVER_TIME] -> Filter NULL 
keys [deviceId], Quorum Based Timestamps [DEVICE_TIME]) (TIMED_WAITING)
THREAD: Reference Handler (WAITING)
THREAD: Signal Dispatcher (RUNNABLE)
THREAD: Source: Kinesis: pattern_detector_test_stream -> Remove Unwanted Events 
-> (Quorum Based Timestamps [SERVER_TIME] -> Filter NULL keys [deviceId], 
Quorum Based Timestamps [DEVICE_TIME]) (1/1) (RUNNABLE)
THREAD: Timer-0 (TIMED_WAITING)
THREAD: Timer-1 (TIMED_WAITING)
THREAD: flink-akka.actor.default-dispatcher-15 (WAITING)
THREAD: flink-akka.actor.default-dispatcher-2 (WAITING)
THREAD: flink-akka.actor.default-dispatcher-3 (TIMED_WAITING)
THREAD: flink-akka.actor.default-dispatcher-4 (WAITING)
THREAD: flink-akka.actor.default-dispatcher-5 (WAITING)
THREAD: flink-akka.remote.default-remote-dispatcher-6 (WAITING)
THREAD: flink-akka.remote.default-remote-dispatcher-7 (TIMED_WAITING)
THREAD: flink-metrics-2 (TIMED_WAITING)
THREAD: flink-metrics-akka.remote.default-remote-dispatcher-3 (WAITING)
THREAD: flink-metrics-akka.remote.default-remote-dispatcher-4 (TIMED_WAITING)
THREAD: flink-metrics-scheduler-1 (TIMED_WAITING)
THREAD: flink-scheduler-1 (TIMED_WAITING)
THREAD: pool-3-thread-1 (TIMED_WAITING)
{code}


> Memory Leak causes Metaspace OOM error on repeated job submission
> -----------------------------------------------------------------
>
>                 Key: FLINK-16142
>                 URL: https://issues.apache.org/jira/browse/FLINK-16142
>             Project: Flink
>          Issue Type: Bug
>          Components: Client / Job Submission
>    Affects Versions: 1.10.0
>            Reporter: Thomas Wozniakowski
>            Priority: Blocker
>             Fix For: 1.10.1, 1.11.0
>
>
> Hi Guys,
> We've just tried deploying 1.10.0 as it has lots of shiny stuff that fits our 
> use-case exactly (RocksDB state backend running in a containerised cluster). 
> Unfortunately, it seems like there is a memory leak somewhere in the job 
> submission logic. We are getting this error:
> {code:java}
> 2020-02-18 10:22:10,020 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - OPERATOR_NAME 
> switched from RUNNING to FAILED.
> java.lang.OutOfMemoryError: Metaspace
> at java.lang.ClassLoader.defineClass1(Native Method)
> at java.lang.ClassLoader.defineClass(ClassLoader.java:757)
> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
> at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
> at 
> org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:60)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.jmx.SdkMBeanRegistrySupport.registerMetricAdminMBean(SdkMBeanRegistrySupport.java:27)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.metrics.AwsSdkMetrics.registerMetricAdminMBean(AwsSdkMetrics.java:398)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.metrics.AwsSdkMetrics.<clinit>(AwsSdkMetrics.java:359)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.requestMetricCollector(AmazonWebServiceClient.java:728)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.isRMCEnabledAtClientOrSdkLevel(AmazonWebServiceClient.java:660)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.isRequestMetricsEnabled(AmazonWebServiceClient.java:652)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.createExecutionContext(AmazonWebServiceClient.java:611)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.createExecutionContext(AmazonWebServiceClient.java:606)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeListShards(AmazonKinesisClient.java:1534)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.listShards(AmazonKinesisClient.java:1528)
> at 
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.listShards(KinesisProxy.java:439)
> at 
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardsOfStream(KinesisProxy.java:389)
> at 
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardList(KinesisProxy.java:279)
> at 
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.discoverNewShardsToSubscribe(KinesisDataFetcher.java:686)
> at 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:287)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> {code}
> (The only change in the above text is the OPERATOR_NAME text where I removed 
> some of the internal specifics of our system).
> This will reliably happen on a fresh cluster after submitting and cancelling 
> our job 3 times.
> We are using the presto-s3 plugin, the CEP library and the Kinesis connector.
> Please let me know what other diagnostics would be useful.
> Tom



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to