[ https://issues.apache.org/jira/browse/FLINK-4536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Scott Kidder closed FLINK-4536. ------------------------------- Resolution: Not A Bug > Possible thread leak in Task Manager > ------------------------------------ > > Key: FLINK-4536 > URL: https://issues.apache.org/jira/browse/FLINK-4536 > Project: Flink > Issue Type: Bug > Components: TaskManager > Affects Versions: 1.1.0 > Reporter: Scott Kidder > > Running Flink release 1.1.1 commit 61bfb36 in the following configuration: > Job Manager > 2 x Task Manager (2 CPU cores on each Task Manager) > I've also updated the Kinesis source to use the latest AWS Java SDK, release > 1.11.29. > I've got a single Flink application using all 4 slots. It consumes from a > Kinesis stream configured with 2 shards. I've limited the Kinesis source to a > parallelism of 2 as a workaround for FLINK-4341. > Occasionally the Kinesis consumer fails because of provisioned-throughput > limits being hit. The application automatically restarts, and resumes > processing with the checkpoint stored on the Job Manager with no outward > indication of problems. > I recently enabled the StatsD metrics reporter in Flink and noticed that the > number of threads running on each Task Manager increases by about 20 threads > each time the application restarts. Over the course of a day the application > might hit provisioned-throughput limits 20 times or so (this is not fully > production yet, so hitting these limits is acceptable for now). But the > number of threads continues to grow unbounded with no increase in workload on > the Task Managers. > The following link includes charts for the overall Flink cluster performance > & Task Manager JVM threads over the course of 12 hours: > http://imgur.com/a/K59hz > Each decrease and subsequent spike in threads corresponds to the job being > restarted due to an AWS Kinesis source error. > Here are the logs from one of the Task Manager instances on startup: > {code} > 2016-08-30 14:52:50,438 WARN org.apache.hadoop.util.NativeCodeLoader > - Unable to load native-hadoop library for your platform... > using builtin-java classes where applicable > 2016-08-30 14:52:50,540 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -------------------------------------------------------------------------------- > 2016-08-30 14:52:50,540 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Starting > TaskManager (Version: 1.1.1, Rev:61bfb36, Date:09.08.2016 @ 12:09:08 UTC) > 2016-08-30 14:52:50,540 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Current > user: root > 2016-08-30 14:52:50,541 INFO > org.apache.flink.runtime.taskmanager.TaskManager - JVM: OpenJDK > 64-Bit Server VM - Oracle Corporation - 1.8/25.92-b14 > 2016-08-30 14:52:50,541 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Maximum heap > size: 2048 MiBytes > 2016-08-30 14:52:50,541 INFO > org.apache.flink.runtime.taskmanager.TaskManager - JAVA_HOME: > /usr/lib/jvm/java-1.8-openjdk/jre > 2016-08-30 14:52:50,543 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Hadoop > version: 2.7.2 > 2016-08-30 14:52:50,543 INFO > org.apache.flink.runtime.taskmanager.TaskManager - JVM Options: > 2016-08-30 14:52:50,543 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -XX:+UseG1GC > 2016-08-30 14:52:50,543 INFO > org.apache.flink.runtime.taskmanager.TaskManager - -Xms2048M > 2016-08-30 14:52:50,543 INFO > org.apache.flink.runtime.taskmanager.TaskManager - -Xmx2048M > 2016-08-30 14:52:50,543 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -XX:MaxDirectMemorySize=8388607T > 2016-08-30 14:52:50,543 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -Dlog.file=/usr/local/flink-1.1.1/log/flink--taskmanager-1-ip-10-55-2-218.log > 2016-08-30 14:52:50,543 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -Dlog4j.configuration=file:/usr/local/flink-1.1.1/conf/log4j.properties > 2016-08-30 14:52:50,543 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -Dlogback.configurationFile=file:/usr/local/flink-1.1.1/conf/logback.xml > 2016-08-30 14:52:50,544 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Program > Arguments: > 2016-08-30 14:52:50,544 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > --configDir > 2016-08-30 14:52:50,544 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > /usr/local/flink-1.1.1/conf > 2016-08-30 14:52:50,544 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Classpath: > /usr/local/flink-1.1.1/lib/flink-dist_2.11-1.1.1.jar:/usr/local/flink-1.1.1/lib/flink-metrics-statsd-1.1.1.jar:/usr/local/flink-1.1.1/lib/flink-python_2.11-1.1.1.jar:/usr/local/flink-1.1.1/lib/log4j-1.2.17.jar:/usr/local/flink-1.1.1/lib/slf4j-log4j12-1.7.7.jar::: > 2016-08-30 14:52:50,544 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -------------------------------------------------------------------------------- > 2016-08-30 14:52:50,544 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Registered > UNIX signal handlers for [TERM, HUP, INT] > 2016-08-30 14:52:50,547 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Maximum > number of open file descriptors is 1048576 > 2016-08-30 14:52:50,565 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Loading > configuration from /usr/local/flink-1.1.1/conf > 2016-08-30 14:52:50,610 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Security is > not enabled. Starting non-authenticated TaskManager. > 2016-08-30 14:52:50,610 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Using > configured hostname/address for TaskManager: 10.55.2.218 > 2016-08-30 14:52:50,611 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Starting > TaskManager > 2016-08-30 14:52:50,615 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Starting > TaskManager actor system at 10.55.2.218:0 > 2016-08-30 14:52:50,956 INFO akka.event.slf4j.Slf4jLogger > - Slf4jLogger started > 2016-08-30 14:52:51,005 INFO Remoting > - Starting remoting > 2016-08-30 14:52:51,159 INFO Remoting > - Remoting started; listening on addresses > :[akka.tcp://flink@10.55.2.218:44007] > 2016-08-30 14:52:51,163 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Starting > TaskManager actor > 2016-08-30 14:52:51,177 INFO > org.apache.flink.runtime.io.network.netty.NettyConfig - NettyConfig > [server address: /10.55.2.218, server port: 36007, memory segment size > (bytes): 32768, transport type: NIO, number of server threads: 2 (manual), > number of client threads: 2 (manual), server connect backlog: 0 (use Netty's > default), client connect timeout (sec): 120, send/receive buffer size > (bytes): 0 (use Netty's default)] > 2016-08-30 14:52:51,179 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Messages > between TaskManager and JobManager have a max timeout of 10000 milliseconds > 2016-08-30 14:52:51,183 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Temporary > file directory '/tmp': total 9 GB, usable 8 GB (88.89% usable) > 2016-08-30 14:52:51,227 INFO > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool - Allocated 64 > MB for network buffer pool (number of memory segments: 2048, bytes per > segment: 32768). > 2016-08-30 14:52:51,287 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Limiting > managed memory to 0.7 of the currently free heap space (1377 MB), memory will > be allocated lazily. > 2016-08-30 14:52:51,309 INFO > org.apache.flink.runtime.io.disk.iomanager.IOManager - I/O manager > uses directory /tmp/flink-io-c000b71a-d793-4f4c-90c4-0c0808154219 for spill > files. > 2016-08-30 14:52:51,325 INFO org.apache.flink.runtime.filecache.FileCache > - User file cache uses directory > /tmp/flink-dist-cache-95fd32d3-a629-42ce-816e-ca8a63e13c7d > 2016-08-30 14:52:51,529 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Starting > TaskManager actor at akka://flink/user/taskmanager#496823118. > 2016-08-30 14:52:51,529 INFO > org.apache.flink.runtime.taskmanager.TaskManager - TaskManager > data connection information: ip-10-55-2-218.ec2.internal (dataPort=36007) > 2016-08-30 14:52:51,530 INFO > org.apache.flink.runtime.taskmanager.TaskManager - TaskManager > has 2 task slot(s). > 2016-08-30 14:52:51,531 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Memory usage > stats: [HEAP: 93/2048/2048 MB, NON HEAP: 31/32/-1 MB (used/committed/max)] > 2016-08-30 14:52:51,534 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Trying to > register at JobManager akka.tcp://flink@10.55.2.212:6123/user/jobmanager > (attempt 1, timeout: 500 milliseconds) > 2016-08-30 14:52:51,690 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Successful > registration at JobManager > (akka.tcp://flink@10.55.2.212:6123/user/jobmanager), starting network stack > and library cache. > 2016-08-30 14:52:51,841 INFO > org.apache.flink.runtime.io.network.netty.NettyClient - Successful > initialization (took 28 ms). > 2016-08-30 14:52:51,871 INFO > org.apache.flink.runtime.io.network.netty.NettyServer - Successful > initialization (took 30 ms). Listening on SocketAddress /10.55.2.218:36007. > 2016-08-30 14:52:51,872 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Determined > BLOB server address to be /10.55.2.212:55892. Starting BLOB cache. > 2016-08-30 14:52:51,874 INFO org.apache.flink.runtime.blob.BlobCache > - Created BLOB cache storage directory > /tmp/blobStore-81d7d33f-a815-4f0b-9131-d3ba5a256d1b > 2016-08-30 14:52:51,889 INFO org.apache.flink.metrics.statsd.StatsDReporter > - Starting StatsDReporter to send metric reports to > localhost/127.0.0.1:8125 > 2016-08-30 14:52:51,891 INFO org.apache.flink.runtime.metrics.MetricRegistry > - Periodically reporting metrics in intervals of 10 SECONDS for > reporter statsd of type org.apache.flink.metrics.statsd.StatsDReporter. > {code} > The Kinesis reader errors look like: > {code} > 2016-08-30 17:23:43,353 WARN > org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy - Got > ProvisionedThroughputExceededException. Backing off for 53 millis. > 2016-08-30 17:23:43,725 WARN > org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy - Got > ProvisionedThroughputExceededException. Backing off for 597 millis. > 2016-08-30 17:23:44,805 WARN > org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy - Got > ProvisionedThroughputExceededException. Backing off for 538 millis. > 2016-08-30 17:23:45,344 INFO > org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - > Shutting down the shard consumer threads of subtask 0 ... > 2016-08-30 17:23:45,394 INFO > org.apache.flink.streaming.runtime.tasks.StreamTask - Timer service > is shutting down. > 2016-08-30 17:23:45,395 INFO > org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - > Shutting down the shard consumer threads of subtask 0 ... > 2016-08-30 17:23:45,395 ERROR org.apache.flink.runtime.taskmanager.Task > - Task execution failed. > java.lang.RuntimeException: Rate Exceeded for getRecords operation - all > 3retryattempts returned ProvisionedThroughputExceededException. > at > org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:204) > at > org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:167) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 2016-08-30 17:23:45,396 INFO org.apache.flink.runtime.taskmanager.Task > - Source: Kinesis (1/2) switched to FAILED with exception. > java.lang.RuntimeException: Rate Exceeded for getRecords operation - all > 3retryattempts returned ProvisionedThroughputExceededException. > at > org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:204) > at > org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:167) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 2016-08-30 17:23:45,396 INFO org.apache.flink.runtime.taskmanager.Task > - Freeing task resources for Source: Kinesis (1/2) > 2016-08-30 17:23:45,396 INFO > org.apache.flink.streaming.runtime.tasks.StreamTask - Timer service > is shutting down. > 2016-08-30 17:23:45,396 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > Un-registering task and sending final execution state FAILED to JobManager > for task Source: Kinesis (c1bf869b9d8506cea67b1317b21c014e) > 2016-08-30 17:23:45,396 INFO > org.apache.flink.streaming.runtime.tasks.StreamTask - Timer service > is shutting down. > 2016-08-30 17:23:45,402 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Discarding > the results produced by task execution c1bf869b9d8506cea67b1317b21c014e > 2016-08-30 17:23:45,403 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to cancel task Parse -> Timestamp and Watermark -> > (Map, Map, Map, Map) (1/4) > 2016-08-30 17:23:45,403 INFO org.apache.flink.runtime.taskmanager.Task > - Parse -> Timestamp and Watermark -> (Map, Map, Map, Map) > (1/4) switched to CANCELING > 2016-08-30 17:23:45,403 INFO org.apache.flink.runtime.taskmanager.Task > - Triggering cancellation of task code Parse -> Timestamp and > Watermark -> (Map, Map, Map, Map) (1/4) (70809fea05a6f67ffd7672bbe5b9643a). > 2016-08-30 17:23:45,405 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to cancel task Parse -> Timestamp and Watermark -> > (Map, Map, Map, Map) (3/4) > 2016-08-30 17:23:45,405 INFO org.apache.flink.runtime.taskmanager.Task > - Parse -> Timestamp and Watermark -> (Map, Map, Map, Map) > (3/4) switched to CANCELING > 2016-08-30 17:23:45,405 INFO org.apache.flink.runtime.taskmanager.Task > - Triggering cancellation of task code Parse -> Timestamp and > Watermark -> (Map, Map, Map, Map) (3/4) (a70eafedd01af499a4ee066108e92ae8). > 2016-08-30 17:23:45,405 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to cancel task Fold: property_id, video_id -> > 1-minute Fixed-Window Percentile Aggregation -> Sink: InfluxDB (1/4) > 2016-08-30 17:23:45,405 INFO org.apache.flink.runtime.taskmanager.Task > - Fold: property_id, video_id -> 1-minute Fixed-Window > Percentile Aggregation -> Sink: InfluxDB (1/4) switched to CANCELING > 2016-08-30 17:23:45,405 INFO org.apache.flink.runtime.taskmanager.Task > - Triggering cancellation of task code Fold: property_id, > video_id -> 1-minute Fixed-Window Percentile Aggregation -> Sink: InfluxDB > (1/4) (b71730eb5cd484cde0eb8332e69d443e). > 2016-08-30 17:23:45,405 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to cancel task Fold: property_id, video_id -> > 1-minute Fixed-Window Percentile Aggregation -> Sink: InfluxDB (2/4) > 2016-08-30 17:23:45,405 INFO org.apache.flink.runtime.taskmanager.Task > - Fold: property_id, video_id -> 1-minute Fixed-Window > Percentile Aggregation -> Sink: InfluxDB (2/4) switched to CANCELING > 2016-08-30 17:23:45,405 INFO org.apache.flink.runtime.taskmanager.Task > - Triggering cancellation of task code Fold: property_id, > video_id -> 1-minute Fixed-Window Percentile Aggregation -> Sink: InfluxDB > (2/4) (6115f675c4d36004d4c885c9868d6b61). > 2016-08-30 17:23:45,405 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to cancel task Fold: property_id, video_id -> > 5-minute Sliding-Window Percentile Aggregation -> Sink: InfluxDB (1/4) > 2016-08-30 17:23:45,405 INFO org.apache.flink.runtime.taskmanager.Task > - Fold: property_id, video_id -> 5-minute Sliding-Window > Percentile Aggregation -> Sink: InfluxDB (1/4) switched to CANCELING > 2016-08-30 17:23:45,405 INFO org.apache.flink.runtime.taskmanager.Task > - Triggering cancellation of task code Fold: property_id, > video_id -> 5-minute Sliding-Window Percentile Aggregation -> Sink: InfluxDB > (1/4) (d9cb8238533083dc2396e90cae3e702e). > 2016-08-30 17:23:45,406 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to cancel task Fold: property_id, video_id -> > 5-minute Sliding-Window Percentile Aggregation -> Sink: InfluxDB (2/4) > 2016-08-30 17:23:45,406 INFO org.apache.flink.runtime.taskmanager.Task > - Fold: property_id, video_id -> 5-minute Sliding-Window > Percentile Aggregation -> Sink: InfluxDB (2/4) switched to CANCELING > 2016-08-30 17:23:45,406 INFO org.apache.flink.runtime.taskmanager.Task > - Triggering cancellation of task code Fold: property_id, > video_id -> 5-minute Sliding-Window Percentile Aggregation -> Sink: InfluxDB > (2/4) (311ffa604444b33985572fe99170f405). > 2016-08-30 17:23:45,406 INFO > org.apache.flink.streaming.runtime.tasks.StreamTask - Timer service > is shutting down. > 2016-08-30 17:23:45,406 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to cancel task Fold: property_id, video_id -> > 10-minute Sliding-Window Percentile Aggregation -> Sink: InfluxDB (1/4) > 2016-08-30 17:23:45,420 INFO org.apache.flink.runtime.taskmanager.Task > - Fold: property_id, video_id -> 10-minute Sliding-Window > Percentile Aggregation -> Sink: InfluxDB (1/4) switched to CANCELING > 2016-08-30 17:23:45,420 INFO org.apache.flink.runtime.taskmanager.Task > - Triggering cancellation of task code Fold: property_id, > video_id -> 10-minute Sliding-Window Percentile Aggregation -> Sink: InfluxDB > (1/4) (556a4e6d2a4b7686539b50ad6b5f0d0d). > 2016-08-30 17:23:45,420 INFO > org.apache.flink.streaming.runtime.tasks.StreamTask - Timer service > is shutting down. > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)