[
https://issues.apache.org/jira/browse/FLINK-4536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15451227#comment-15451227
]
Tzu-Li (Gordon) Tai commented on FLINK-4536:
--------------------------------------------
Thanks for reporting this.
Just to confirm, Could you also find Kinesis logs that look like this:
{{.kinesis.internals.KinesisDataFetcher - Subtask .. will start consuming
seeded shard .. from sequence number .. with ShardConsumer ..}}?
This will help us to check if the Kinesis fetchers are unnecessarily opening up
threads to consume the shards after restore from the failures.
> Possible thread leak in Task Manager
> ------------------------------------
>
> Key: FLINK-4536
> URL: https://issues.apache.org/jira/browse/FLINK-4536
> Project: Flink
> Issue Type: Bug
> Components: TaskManager
> Affects Versions: 1.1.0
> Reporter: Scott Kidder
>
> Running Flink release 1.1.1 commit 61bfb36 in the following configuration:
> Job Manager
> 2 x Task Manager (2 CPU cores on each Task Manager)
> I've also updated the Kinesis source to use the latest AWS Java SDK, release
> 1.11.29.
> I've got a single Flink application using all 4 slots. It consumes from a
> Kinesis stream configured with 2 shards. I've limited the Kinesis source to a
> parallelism of 2 as a workaround for FLINK-4341.
> Occasionally the Kinesis consumer fails because of provisioned-throughput
> limits being hit. The application automatically restarts, and resumes
> processing with the checkpoint stored on the Job Manager with no outward
> indication of problems.
> I recently enabled the StatsD metrics reporter in Flink and noticed that the
> number of threads running on each Task Manager increases by about 20 threads
> each time the application restarts. Over the course of a day the application
> might hit provisioned-throughput limits 20 times or so (this is not fully
> production yet, so hitting these limits is acceptable for now). But the
> number of threads continues to grow unbounded with no increase in workload on
> the Task Managers.
> The following link includes charts for the overall Flink cluster performance
> & Task Manager JVM threads over the course of 12 hours:
> http://imgur.com/a/K59hz
> Each decrease and subsequent spike in threads corresponds to the job being
> restarted due to an AWS Kinesis source error.
> Here are the logs from one of the Task Manager instances on startup:
> {code}
> 2016-08-30 14:52:50,438 WARN org.apache.hadoop.util.NativeCodeLoader
> - Unable to load native-hadoop library for your platform...
> using builtin-java classes where applicable
> 2016-08-30 14:52:50,540 INFO
> org.apache.flink.runtime.taskmanager.TaskManager -
> --------------------------------------------------------------------------------
> 2016-08-30 14:52:50,540 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - Starting
> TaskManager (Version: 1.1.1, Rev:61bfb36, Date:09.08.2016 @ 12:09:08 UTC)
> 2016-08-30 14:52:50,540 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - Current
> user: root
> 2016-08-30 14:52:50,541 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - JVM: OpenJDK
> 64-Bit Server VM - Oracle Corporation - 1.8/25.92-b14
> 2016-08-30 14:52:50,541 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - Maximum heap
> size: 2048 MiBytes
> 2016-08-30 14:52:50,541 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - JAVA_HOME:
> /usr/lib/jvm/java-1.8-openjdk/jre
> 2016-08-30 14:52:50,543 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - Hadoop
> version: 2.7.2
> 2016-08-30 14:52:50,543 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - JVM Options:
> 2016-08-30 14:52:50,543 INFO
> org.apache.flink.runtime.taskmanager.TaskManager -
> -XX:+UseG1GC
> 2016-08-30 14:52:50,543 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - -Xms2048M
> 2016-08-30 14:52:50,543 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - -Xmx2048M
> 2016-08-30 14:52:50,543 INFO
> org.apache.flink.runtime.taskmanager.TaskManager -
> -XX:MaxDirectMemorySize=8388607T
> 2016-08-30 14:52:50,543 INFO
> org.apache.flink.runtime.taskmanager.TaskManager -
> -Dlog.file=/usr/local/flink-1.1.1/log/flink--taskmanager-1-ip-10-55-2-218.log
> 2016-08-30 14:52:50,543 INFO
> org.apache.flink.runtime.taskmanager.TaskManager -
> -Dlog4j.configuration=file:/usr/local/flink-1.1.1/conf/log4j.properties
> 2016-08-30 14:52:50,543 INFO
> org.apache.flink.runtime.taskmanager.TaskManager -
> -Dlogback.configurationFile=file:/usr/local/flink-1.1.1/conf/logback.xml
> 2016-08-30 14:52:50,544 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - Program
> Arguments:
> 2016-08-30 14:52:50,544 INFO
> org.apache.flink.runtime.taskmanager.TaskManager -
> --configDir
> 2016-08-30 14:52:50,544 INFO
> org.apache.flink.runtime.taskmanager.TaskManager -
> /usr/local/flink-1.1.1/conf
> 2016-08-30 14:52:50,544 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - Classpath:
> /usr/local/flink-1.1.1/lib/flink-dist_2.11-1.1.1.jar:/usr/local/flink-1.1.1/lib/flink-metrics-statsd-1.1.1.jar:/usr/local/flink-1.1.1/lib/flink-python_2.11-1.1.1.jar:/usr/local/flink-1.1.1/lib/log4j-1.2.17.jar:/usr/local/flink-1.1.1/lib/slf4j-log4j12-1.7.7.jar:::
> 2016-08-30 14:52:50,544 INFO
> org.apache.flink.runtime.taskmanager.TaskManager -
> --------------------------------------------------------------------------------
> 2016-08-30 14:52:50,544 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - Registered
> UNIX signal handlers for [TERM, HUP, INT]
> 2016-08-30 14:52:50,547 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - Maximum
> number of open file descriptors is 1048576
> 2016-08-30 14:52:50,565 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - Loading
> configuration from /usr/local/flink-1.1.1/conf
> 2016-08-30 14:52:50,610 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - Security is
> not enabled. Starting non-authenticated TaskManager.
> 2016-08-30 14:52:50,610 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - Using
> configured hostname/address for TaskManager: 10.55.2.218
> 2016-08-30 14:52:50,611 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - Starting
> TaskManager
> 2016-08-30 14:52:50,615 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - Starting
> TaskManager actor system at 10.55.2.218:0
> 2016-08-30 14:52:50,956 INFO akka.event.slf4j.Slf4jLogger
> - Slf4jLogger started
> 2016-08-30 14:52:51,005 INFO Remoting
> - Starting remoting
> 2016-08-30 14:52:51,159 INFO Remoting
> - Remoting started; listening on addresses
> :[akka.tcp://[email protected]:44007]
> 2016-08-30 14:52:51,163 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - Starting
> TaskManager actor
> 2016-08-30 14:52:51,177 INFO
> org.apache.flink.runtime.io.network.netty.NettyConfig - NettyConfig
> [server address: /10.55.2.218, server port: 36007, memory segment size
> (bytes): 32768, transport type: NIO, number of server threads: 2 (manual),
> number of client threads: 2 (manual), server connect backlog: 0 (use Netty's
> default), client connect timeout (sec): 120, send/receive buffer size
> (bytes): 0 (use Netty's default)]
> 2016-08-30 14:52:51,179 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - Messages
> between TaskManager and JobManager have a max timeout of 10000 milliseconds
> 2016-08-30 14:52:51,183 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - Temporary
> file directory '/tmp': total 9 GB, usable 8 GB (88.89% usable)
> 2016-08-30 14:52:51,227 INFO
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool - Allocated 64
> MB for network buffer pool (number of memory segments: 2048, bytes per
> segment: 32768).
> 2016-08-30 14:52:51,287 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - Limiting
> managed memory to 0.7 of the currently free heap space (1377 MB), memory will
> be allocated lazily.
> 2016-08-30 14:52:51,309 INFO
> org.apache.flink.runtime.io.disk.iomanager.IOManager - I/O manager
> uses directory /tmp/flink-io-c000b71a-d793-4f4c-90c4-0c0808154219 for spill
> files.
> 2016-08-30 14:52:51,325 INFO org.apache.flink.runtime.filecache.FileCache
> - User file cache uses directory
> /tmp/flink-dist-cache-95fd32d3-a629-42ce-816e-ca8a63e13c7d
> 2016-08-30 14:52:51,529 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - Starting
> TaskManager actor at akka://flink/user/taskmanager#496823118.
> 2016-08-30 14:52:51,529 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - TaskManager
> data connection information: ip-10-55-2-218.ec2.internal (dataPort=36007)
> 2016-08-30 14:52:51,530 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - TaskManager
> has 2 task slot(s).
> 2016-08-30 14:52:51,531 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - Memory usage
> stats: [HEAP: 93/2048/2048 MB, NON HEAP: 31/32/-1 MB (used/committed/max)]
> 2016-08-30 14:52:51,534 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - Trying to
> register at JobManager akka.tcp://[email protected]:6123/user/jobmanager
> (attempt 1, timeout: 500 milliseconds)
> 2016-08-30 14:52:51,690 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - Successful
> registration at JobManager
> (akka.tcp://[email protected]:6123/user/jobmanager), starting network stack
> and library cache.
> 2016-08-30 14:52:51,841 INFO
> org.apache.flink.runtime.io.network.netty.NettyClient - Successful
> initialization (took 28 ms).
> 2016-08-30 14:52:51,871 INFO
> org.apache.flink.runtime.io.network.netty.NettyServer - Successful
> initialization (took 30 ms). Listening on SocketAddress /10.55.2.218:36007.
> 2016-08-30 14:52:51,872 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - Determined
> BLOB server address to be /10.55.2.212:55892. Starting BLOB cache.
> 2016-08-30 14:52:51,874 INFO org.apache.flink.runtime.blob.BlobCache
> - Created BLOB cache storage directory
> /tmp/blobStore-81d7d33f-a815-4f0b-9131-d3ba5a256d1b
> 2016-08-30 14:52:51,889 INFO org.apache.flink.metrics.statsd.StatsDReporter
> - Starting StatsDReporter to send metric reports to
> localhost/127.0.0.1:8125
> 2016-08-30 14:52:51,891 INFO org.apache.flink.runtime.metrics.MetricRegistry
> - Periodically reporting metrics in intervals of 10 SECONDS for
> reporter statsd of type org.apache.flink.metrics.statsd.StatsDReporter.
> {code}
> The Kinesis reader errors look like:
> {code}
> 2016-08-30 17:23:43,353 WARN
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy - Got
> ProvisionedThroughputExceededException. Backing off for 53 millis.
> 2016-08-30 17:23:43,725 WARN
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy - Got
> ProvisionedThroughputExceededException. Backing off for 597 millis.
> 2016-08-30 17:23:44,805 WARN
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy - Got
> ProvisionedThroughputExceededException. Backing off for 538 millis.
> 2016-08-30 17:23:45,344 INFO
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher -
> Shutting down the shard consumer threads of subtask 0 ...
> 2016-08-30 17:23:45,394 INFO
> org.apache.flink.streaming.runtime.tasks.StreamTask - Timer service
> is shutting down.
> 2016-08-30 17:23:45,395 INFO
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher -
> Shutting down the shard consumer threads of subtask 0 ...
> 2016-08-30 17:23:45,395 ERROR org.apache.flink.runtime.taskmanager.Task
> - Task execution failed.
> java.lang.RuntimeException: Rate Exceeded for getRecords operation - all
> 3retryattempts returned ProvisionedThroughputExceededException.
> at
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:204)
> at
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:167)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 2016-08-30 17:23:45,396 INFO org.apache.flink.runtime.taskmanager.Task
> - Source: Kinesis (1/2) switched to FAILED with exception.
> java.lang.RuntimeException: Rate Exceeded for getRecords operation - all
> 3retryattempts returned ProvisionedThroughputExceededException.
> at
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:204)
> at
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:167)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 2016-08-30 17:23:45,396 INFO org.apache.flink.runtime.taskmanager.Task
> - Freeing task resources for Source: Kinesis (1/2)
> 2016-08-30 17:23:45,396 INFO
> org.apache.flink.streaming.runtime.tasks.StreamTask - Timer service
> is shutting down.
> 2016-08-30 17:23:45,396 INFO
> org.apache.flink.runtime.taskmanager.TaskManager -
> Un-registering task and sending final execution state FAILED to JobManager
> for task Source: Kinesis (c1bf869b9d8506cea67b1317b21c014e)
> 2016-08-30 17:23:45,396 INFO
> org.apache.flink.streaming.runtime.tasks.StreamTask - Timer service
> is shutting down.
> 2016-08-30 17:23:45,402 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - Discarding
> the results produced by task execution c1bf869b9d8506cea67b1317b21c014e
> 2016-08-30 17:23:45,403 INFO org.apache.flink.runtime.taskmanager.Task
> - Attempting to cancel task Parse -> Timestamp and Watermark ->
> (Map, Map, Map, Map) (1/4)
> 2016-08-30 17:23:45,403 INFO org.apache.flink.runtime.taskmanager.Task
> - Parse -> Timestamp and Watermark -> (Map, Map, Map, Map)
> (1/4) switched to CANCELING
> 2016-08-30 17:23:45,403 INFO org.apache.flink.runtime.taskmanager.Task
> - Triggering cancellation of task code Parse -> Timestamp and
> Watermark -> (Map, Map, Map, Map) (1/4) (70809fea05a6f67ffd7672bbe5b9643a).
> 2016-08-30 17:23:45,405 INFO org.apache.flink.runtime.taskmanager.Task
> - Attempting to cancel task Parse -> Timestamp and Watermark ->
> (Map, Map, Map, Map) (3/4)
> 2016-08-30 17:23:45,405 INFO org.apache.flink.runtime.taskmanager.Task
> - Parse -> Timestamp and Watermark -> (Map, Map, Map, Map)
> (3/4) switched to CANCELING
> 2016-08-30 17:23:45,405 INFO org.apache.flink.runtime.taskmanager.Task
> - Triggering cancellation of task code Parse -> Timestamp and
> Watermark -> (Map, Map, Map, Map) (3/4) (a70eafedd01af499a4ee066108e92ae8).
> 2016-08-30 17:23:45,405 INFO org.apache.flink.runtime.taskmanager.Task
> - Attempting to cancel task Fold: property_id, video_id ->
> 1-minute Fixed-Window Percentile Aggregation -> Sink: InfluxDB (1/4)
> 2016-08-30 17:23:45,405 INFO org.apache.flink.runtime.taskmanager.Task
> - Fold: property_id, video_id -> 1-minute Fixed-Window
> Percentile Aggregation -> Sink: InfluxDB (1/4) switched to CANCELING
> 2016-08-30 17:23:45,405 INFO org.apache.flink.runtime.taskmanager.Task
> - Triggering cancellation of task code Fold: property_id,
> video_id -> 1-minute Fixed-Window Percentile Aggregation -> Sink: InfluxDB
> (1/4) (b71730eb5cd484cde0eb8332e69d443e).
> 2016-08-30 17:23:45,405 INFO org.apache.flink.runtime.taskmanager.Task
> - Attempting to cancel task Fold: property_id, video_id ->
> 1-minute Fixed-Window Percentile Aggregation -> Sink: InfluxDB (2/4)
> 2016-08-30 17:23:45,405 INFO org.apache.flink.runtime.taskmanager.Task
> - Fold: property_id, video_id -> 1-minute Fixed-Window
> Percentile Aggregation -> Sink: InfluxDB (2/4) switched to CANCELING
> 2016-08-30 17:23:45,405 INFO org.apache.flink.runtime.taskmanager.Task
> - Triggering cancellation of task code Fold: property_id,
> video_id -> 1-minute Fixed-Window Percentile Aggregation -> Sink: InfluxDB
> (2/4) (6115f675c4d36004d4c885c9868d6b61).
> 2016-08-30 17:23:45,405 INFO org.apache.flink.runtime.taskmanager.Task
> - Attempting to cancel task Fold: property_id, video_id ->
> 5-minute Sliding-Window Percentile Aggregation -> Sink: InfluxDB (1/4)
> 2016-08-30 17:23:45,405 INFO org.apache.flink.runtime.taskmanager.Task
> - Fold: property_id, video_id -> 5-minute Sliding-Window
> Percentile Aggregation -> Sink: InfluxDB (1/4) switched to CANCELING
> 2016-08-30 17:23:45,405 INFO org.apache.flink.runtime.taskmanager.Task
> - Triggering cancellation of task code Fold: property_id,
> video_id -> 5-minute Sliding-Window Percentile Aggregation -> Sink: InfluxDB
> (1/4) (d9cb8238533083dc2396e90cae3e702e).
> 2016-08-30 17:23:45,406 INFO org.apache.flink.runtime.taskmanager.Task
> - Attempting to cancel task Fold: property_id, video_id ->
> 5-minute Sliding-Window Percentile Aggregation -> Sink: InfluxDB (2/4)
> 2016-08-30 17:23:45,406 INFO org.apache.flink.runtime.taskmanager.Task
> - Fold: property_id, video_id -> 5-minute Sliding-Window
> Percentile Aggregation -> Sink: InfluxDB (2/4) switched to CANCELING
> 2016-08-30 17:23:45,406 INFO org.apache.flink.runtime.taskmanager.Task
> - Triggering cancellation of task code Fold: property_id,
> video_id -> 5-minute Sliding-Window Percentile Aggregation -> Sink: InfluxDB
> (2/4) (311ffa604444b33985572fe99170f405).
> 2016-08-30 17:23:45,406 INFO
> org.apache.flink.streaming.runtime.tasks.StreamTask - Timer service
> is shutting down.
> 2016-08-30 17:23:45,406 INFO org.apache.flink.runtime.taskmanager.Task
> - Attempting to cancel task Fold: property_id, video_id ->
> 10-minute Sliding-Window Percentile Aggregation -> Sink: InfluxDB (1/4)
> 2016-08-30 17:23:45,420 INFO org.apache.flink.runtime.taskmanager.Task
> - Fold: property_id, video_id -> 10-minute Sliding-Window
> Percentile Aggregation -> Sink: InfluxDB (1/4) switched to CANCELING
> 2016-08-30 17:23:45,420 INFO org.apache.flink.runtime.taskmanager.Task
> - Triggering cancellation of task code Fold: property_id,
> video_id -> 10-minute Sliding-Window Percentile Aggregation -> Sink: InfluxDB
> (1/4) (556a4e6d2a4b7686539b50ad6b5f0d0d).
> 2016-08-30 17:23:45,420 INFO
> org.apache.flink.streaming.runtime.tasks.StreamTask - Timer service
> is shutting down.
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)