[ 
https://issues.apache.org/jira/browse/FLINK-4536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Scott Kidder closed FLINK-4536.
-------------------------------
    Resolution: Not A Bug

> Possible thread leak in Task Manager
> ------------------------------------
>
>                 Key: FLINK-4536
>                 URL: https://issues.apache.org/jira/browse/FLINK-4536
>             Project: Flink
>          Issue Type: Bug
>          Components: TaskManager
>    Affects Versions: 1.1.0
>            Reporter: Scott Kidder
>
> Running Flink release 1.1.1 commit 61bfb36 in the following configuration:
> Job Manager
> 2 x Task Manager (2 CPU cores on each Task Manager)
> I've also updated the Kinesis source to use the latest AWS Java SDK, release 
> 1.11.29.
> I've got a single Flink application using all 4 slots. It consumes from a 
> Kinesis stream configured with 2 shards. I've limited the Kinesis source to a 
> parallelism of 2 as a workaround for FLINK-4341.
> Occasionally the Kinesis consumer fails because of provisioned-throughput 
> limits being hit. The application automatically restarts, and resumes 
> processing with the checkpoint stored on the Job Manager with no outward 
> indication of problems.
> I recently enabled the StatsD metrics reporter in Flink and noticed that the 
> number of threads running on each Task Manager increases by about 20 threads 
> each time the application restarts. Over the course of a day the application 
> might hit provisioned-throughput limits 20 times or so (this is not fully 
> production yet, so hitting these limits is acceptable for now). But the 
> number of threads continues to grow unbounded with no increase in workload on 
> the Task Managers.
> The following link includes charts for the overall Flink cluster performance 
> & Task Manager JVM threads over the course of 12 hours:
> http://imgur.com/a/K59hz
> Each decrease and subsequent spike in threads corresponds to the job being 
> restarted due to an AWS Kinesis source error.
> Here are the logs from one of the Task Manager instances on startup:
> {code}
> 2016-08-30 14:52:50,438 WARN  org.apache.hadoop.util.NativeCodeLoader         
>               - Unable to load native-hadoop library for your platform... 
> using builtin-java classes where applicable
> 2016-08-30 14:52:50,540 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              - 
> --------------------------------------------------------------------------------
> 2016-08-30 14:52:50,540 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              -  Starting 
> TaskManager (Version: 1.1.1, Rev:61bfb36, Date:09.08.2016 @ 12:09:08 UTC)
> 2016-08-30 14:52:50,540 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              -  Current 
> user: root
> 2016-08-30 14:52:50,541 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              -  JVM: OpenJDK 
> 64-Bit Server VM - Oracle Corporation - 1.8/25.92-b14
> 2016-08-30 14:52:50,541 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              -  Maximum heap 
> size: 2048 MiBytes
> 2016-08-30 14:52:50,541 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              -  JAVA_HOME: 
> /usr/lib/jvm/java-1.8-openjdk/jre
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              -  Hadoop 
> version: 2.7.2
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              -  JVM Options:
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              -     
> -XX:+UseG1GC
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              -     -Xms2048M
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              -     -Xmx2048M
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              -     
> -XX:MaxDirectMemorySize=8388607T
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              -     
> -Dlog.file=/usr/local/flink-1.1.1/log/flink--taskmanager-1-ip-10-55-2-218.log
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              -     
> -Dlog4j.configuration=file:/usr/local/flink-1.1.1/conf/log4j.properties
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              -     
> -Dlogback.configurationFile=file:/usr/local/flink-1.1.1/conf/logback.xml
> 2016-08-30 14:52:50,544 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              -  Program 
> Arguments:
> 2016-08-30 14:52:50,544 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              -     
> --configDir
> 2016-08-30 14:52:50,544 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              -     
> /usr/local/flink-1.1.1/conf
> 2016-08-30 14:52:50,544 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              -  Classpath: 
> /usr/local/flink-1.1.1/lib/flink-dist_2.11-1.1.1.jar:/usr/local/flink-1.1.1/lib/flink-metrics-statsd-1.1.1.jar:/usr/local/flink-1.1.1/lib/flink-python_2.11-1.1.1.jar:/usr/local/flink-1.1.1/lib/log4j-1.2.17.jar:/usr/local/flink-1.1.1/lib/slf4j-log4j12-1.7.7.jar:::
> 2016-08-30 14:52:50,544 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              - 
> --------------------------------------------------------------------------------
> 2016-08-30 14:52:50,544 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              - Registered 
> UNIX signal handlers for [TERM, HUP, INT]
> 2016-08-30 14:52:50,547 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              - Maximum 
> number of open file descriptors is 1048576
> 2016-08-30 14:52:50,565 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              - Loading 
> configuration from /usr/local/flink-1.1.1/conf
> 2016-08-30 14:52:50,610 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              - Security is 
> not enabled. Starting non-authenticated TaskManager.
> 2016-08-30 14:52:50,610 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              - Using 
> configured hostname/address for TaskManager: 10.55.2.218
> 2016-08-30 14:52:50,611 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              - Starting 
> TaskManager
> 2016-08-30 14:52:50,615 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              - Starting 
> TaskManager actor system at 10.55.2.218:0
> 2016-08-30 14:52:50,956 INFO  akka.event.slf4j.Slf4jLogger                    
>               - Slf4jLogger started
> 2016-08-30 14:52:51,005 INFO  Remoting                                        
>               - Starting remoting
> 2016-08-30 14:52:51,159 INFO  Remoting                                        
>               - Remoting started; listening on addresses 
> :[akka.tcp://flink@10.55.2.218:44007]
> 2016-08-30 14:52:51,163 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              - Starting 
> TaskManager actor
> 2016-08-30 14:52:51,177 INFO  
> org.apache.flink.runtime.io.network.netty.NettyConfig         - NettyConfig 
> [server address: /10.55.2.218, server port: 36007, memory segment size 
> (bytes): 32768, transport type: NIO, number of server threads: 2 (manual), 
> number of client threads: 2 (manual), server connect backlog: 0 (use Netty's 
> default), client connect timeout (sec): 120, send/receive buffer size 
> (bytes): 0 (use Netty's default)]
> 2016-08-30 14:52:51,179 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              - Messages 
> between TaskManager and JobManager have a max timeout of 10000 milliseconds
> 2016-08-30 14:52:51,183 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              - Temporary 
> file directory '/tmp': total 9 GB, usable 8 GB (88.89% usable)
> 2016-08-30 14:52:51,227 INFO  
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 64 
> MB for network buffer pool (number of memory segments: 2048, bytes per 
> segment: 32768).
> 2016-08-30 14:52:51,287 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              - Limiting 
> managed memory to 0.7 of the currently free heap space (1377 MB), memory will 
> be allocated lazily.
> 2016-08-30 14:52:51,309 INFO  
> org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager 
> uses directory /tmp/flink-io-c000b71a-d793-4f4c-90c4-0c0808154219 for spill 
> files.
> 2016-08-30 14:52:51,325 INFO  org.apache.flink.runtime.filecache.FileCache    
>               - User file cache uses directory 
> /tmp/flink-dist-cache-95fd32d3-a629-42ce-816e-ca8a63e13c7d
> 2016-08-30 14:52:51,529 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              - Starting 
> TaskManager actor at akka://flink/user/taskmanager#496823118.
> 2016-08-30 14:52:51,529 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              - TaskManager 
> data connection information: ip-10-55-2-218.ec2.internal (dataPort=36007)
> 2016-08-30 14:52:51,530 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              - TaskManager 
> has 2 task slot(s).
> 2016-08-30 14:52:51,531 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              - Memory usage 
> stats: [HEAP: 93/2048/2048 MB, NON HEAP: 31/32/-1 MB (used/committed/max)]
> 2016-08-30 14:52:51,534 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              - Trying to 
> register at JobManager akka.tcp://flink@10.55.2.212:6123/user/jobmanager 
> (attempt 1, timeout: 500 milliseconds)
> 2016-08-30 14:52:51,690 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              - Successful 
> registration at JobManager 
> (akka.tcp://flink@10.55.2.212:6123/user/jobmanager), starting network stack 
> and library cache.
> 2016-08-30 14:52:51,841 INFO  
> org.apache.flink.runtime.io.network.netty.NettyClient         - Successful 
> initialization (took 28 ms).
> 2016-08-30 14:52:51,871 INFO  
> org.apache.flink.runtime.io.network.netty.NettyServer         - Successful 
> initialization (took 30 ms). Listening on SocketAddress /10.55.2.218:36007.
> 2016-08-30 14:52:51,872 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              - Determined 
> BLOB server address to be /10.55.2.212:55892. Starting BLOB cache.
> 2016-08-30 14:52:51,874 INFO  org.apache.flink.runtime.blob.BlobCache         
>               - Created BLOB cache storage directory 
> /tmp/blobStore-81d7d33f-a815-4f0b-9131-d3ba5a256d1b
> 2016-08-30 14:52:51,889 INFO  org.apache.flink.metrics.statsd.StatsDReporter  
>               - Starting StatsDReporter to send metric reports to 
> localhost/127.0.0.1:8125
> 2016-08-30 14:52:51,891 INFO  org.apache.flink.runtime.metrics.MetricRegistry 
>               - Periodically reporting metrics in intervals of 10 SECONDS for 
> reporter statsd of type org.apache.flink.metrics.statsd.StatsDReporter.
> {code}
> The Kinesis reader errors look like:
> {code}
> 2016-08-30 17:23:43,353 WARN  
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy  - Got 
> ProvisionedThroughputExceededException. Backing off for 53 millis.
> 2016-08-30 17:23:43,725 WARN  
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy  - Got 
> ProvisionedThroughputExceededException. Backing off for 597 millis.
> 2016-08-30 17:23:44,805 WARN  
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy  - Got 
> ProvisionedThroughputExceededException. Backing off for 538 millis.
> 2016-08-30 17:23:45,344 INFO  
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher  - 
> Shutting down the shard consumer threads of subtask 0 ...
> 2016-08-30 17:23:45,394 INFO  
> org.apache.flink.streaming.runtime.tasks.StreamTask           - Timer service 
> is shutting down.
> 2016-08-30 17:23:45,395 INFO  
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher  - 
> Shutting down the shard consumer threads of subtask 0 ...
> 2016-08-30 17:23:45,395 ERROR org.apache.flink.runtime.taskmanager.Task       
>               - Task execution failed. 
> java.lang.RuntimeException: Rate Exceeded for getRecords operation - all 
> 3retryattempts returned ProvisionedThroughputExceededException.
>       at 
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:204)
>       at 
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:167)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 2016-08-30 17:23:45,396 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Source: Kinesis (1/2) switched to FAILED with exception.
> java.lang.RuntimeException: Rate Exceeded for getRecords operation - all 
> 3retryattempts returned ProvisionedThroughputExceededException.
>       at 
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:204)
>       at 
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:167)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 2016-08-30 17:23:45,396 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Freeing task resources for Source: Kinesis (1/2)
> 2016-08-30 17:23:45,396 INFO  
> org.apache.flink.streaming.runtime.tasks.StreamTask           - Timer service 
> is shutting down.
> 2016-08-30 17:23:45,396 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              - 
> Un-registering task and sending final execution state FAILED to JobManager 
> for task Source: Kinesis (c1bf869b9d8506cea67b1317b21c014e)
> 2016-08-30 17:23:45,396 INFO  
> org.apache.flink.streaming.runtime.tasks.StreamTask           - Timer service 
> is shutting down.
> 2016-08-30 17:23:45,402 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              - Discarding 
> the results produced by task execution c1bf869b9d8506cea67b1317b21c014e
> 2016-08-30 17:23:45,403 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Attempting to cancel task Parse -> Timestamp and Watermark -> 
> (Map, Map, Map, Map) (1/4)
> 2016-08-30 17:23:45,403 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Parse -> Timestamp and Watermark -> (Map, Map, Map, Map) 
> (1/4) switched to CANCELING
> 2016-08-30 17:23:45,403 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Triggering cancellation of task code Parse -> Timestamp and 
> Watermark -> (Map, Map, Map, Map) (1/4) (70809fea05a6f67ffd7672bbe5b9643a).
> 2016-08-30 17:23:45,405 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Attempting to cancel task Parse -> Timestamp and Watermark -> 
> (Map, Map, Map, Map) (3/4)
> 2016-08-30 17:23:45,405 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Parse -> Timestamp and Watermark -> (Map, Map, Map, Map) 
> (3/4) switched to CANCELING
> 2016-08-30 17:23:45,405 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Triggering cancellation of task code Parse -> Timestamp and 
> Watermark -> (Map, Map, Map, Map) (3/4) (a70eafedd01af499a4ee066108e92ae8).
> 2016-08-30 17:23:45,405 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Attempting to cancel task Fold: property_id, video_id -> 
> 1-minute Fixed-Window Percentile Aggregation -> Sink: InfluxDB (1/4)
> 2016-08-30 17:23:45,405 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Fold: property_id, video_id -> 1-minute Fixed-Window 
> Percentile Aggregation -> Sink: InfluxDB (1/4) switched to CANCELING
> 2016-08-30 17:23:45,405 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Triggering cancellation of task code Fold: property_id, 
> video_id -> 1-minute Fixed-Window Percentile Aggregation -> Sink: InfluxDB 
> (1/4) (b71730eb5cd484cde0eb8332e69d443e).
> 2016-08-30 17:23:45,405 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Attempting to cancel task Fold: property_id, video_id -> 
> 1-minute Fixed-Window Percentile Aggregation -> Sink: InfluxDB (2/4)
> 2016-08-30 17:23:45,405 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Fold: property_id, video_id -> 1-minute Fixed-Window 
> Percentile Aggregation -> Sink: InfluxDB (2/4) switched to CANCELING
> 2016-08-30 17:23:45,405 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Triggering cancellation of task code Fold: property_id, 
> video_id -> 1-minute Fixed-Window Percentile Aggregation -> Sink: InfluxDB 
> (2/4) (6115f675c4d36004d4c885c9868d6b61).
> 2016-08-30 17:23:45,405 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Attempting to cancel task Fold: property_id, video_id -> 
> 5-minute Sliding-Window Percentile Aggregation -> Sink: InfluxDB (1/4)
> 2016-08-30 17:23:45,405 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Fold: property_id, video_id -> 5-minute Sliding-Window 
> Percentile Aggregation -> Sink: InfluxDB (1/4) switched to CANCELING
> 2016-08-30 17:23:45,405 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Triggering cancellation of task code Fold: property_id, 
> video_id -> 5-minute Sliding-Window Percentile Aggregation -> Sink: InfluxDB 
> (1/4) (d9cb8238533083dc2396e90cae3e702e).
> 2016-08-30 17:23:45,406 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Attempting to cancel task Fold: property_id, video_id -> 
> 5-minute Sliding-Window Percentile Aggregation -> Sink: InfluxDB (2/4)
> 2016-08-30 17:23:45,406 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Fold: property_id, video_id -> 5-minute Sliding-Window 
> Percentile Aggregation -> Sink: InfluxDB (2/4) switched to CANCELING
> 2016-08-30 17:23:45,406 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Triggering cancellation of task code Fold: property_id, 
> video_id -> 5-minute Sliding-Window Percentile Aggregation -> Sink: InfluxDB 
> (2/4) (311ffa604444b33985572fe99170f405).
> 2016-08-30 17:23:45,406 INFO  
> org.apache.flink.streaming.runtime.tasks.StreamTask           - Timer service 
> is shutting down.
> 2016-08-30 17:23:45,406 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Attempting to cancel task Fold: property_id, video_id -> 
> 10-minute Sliding-Window Percentile Aggregation -> Sink: InfluxDB (1/4)
> 2016-08-30 17:23:45,420 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Fold: property_id, video_id -> 10-minute Sliding-Window 
> Percentile Aggregation -> Sink: InfluxDB (1/4) switched to CANCELING
> 2016-08-30 17:23:45,420 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Triggering cancellation of task code Fold: property_id, 
> video_id -> 10-minute Sliding-Window Percentile Aggregation -> Sink: InfluxDB 
> (1/4) (556a4e6d2a4b7686539b50ad6b5f0d0d).
> 2016-08-30 17:23:45,420 INFO  
> org.apache.flink.streaming.runtime.tasks.StreamTask           - Timer service 
> is shutting down.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to