Alexander Gardner commented on FLINK-8707:

Hi Stephan, Piotr & Nico

Just some notes from the data I sent /referenced in the comments section 
yesterday, when the cluster was merely started, not the data discussed in the 
JIRA description:

1) Even though the cluster didn't have any flink jobs running per se, we were 
polling the Flink REST API for all jobs' status, there werent any jobs but the 
REST API would have been used via a stand alone Java process.

2) Looking at the files I uploaded without any flink jobs running I noticed:

For Box-1 TM there were 116 REG FDs in all, for every JAR? e.g. logback-classic 
was accessed 116 times?) e.g. REG FDsm 58 mem & 58 REG blank. 
For SOs just 58 REG mem FDs.

Why so many?

3) "Problem when using LSOF": It appears that with linux, 'lsof' will show 
multiple threads accessing the same file which will potentially skew the "lsof" 
figures somewhat. Do you concur on this?
Using "lsof -p" is allegedly the truer figure, HOWEVER, even so, why would 
Flink be using so many threads to the same file / socket?
Surely you wouldn't have a lot of readers and writers on a single socket if 
using NIO or Netty?

In the case above where logback-classic has 116 FDs, even if multiple threads 
were responsible, would you envisage Flink having that many threads accessing 
that jar - or even requiring that many FDs?

Bear in mind these figures are when the cluster is not running any jobs.

4) Last night in the same DEV environment, I kicked off 3 flink jobs, all 
consuming from a Kafka topic with a different consumger group id via a 
FlinkKafkaConsumer and a dummy sink which just read the msgs (didn't do 
anything with the msgs)

Reminder: 2 boxes, each box with a JM & TM.
I'd killed the JM on Box 1 and the job mgr leader swapped to box JM.
The 3 jobs were still running ok.

For Box-2, our monitoring system highlighted that we'd surpassed the BOX-2 200k 
FD ulimit hitting 326k FDs. For just 3 jobs. We weren't hitting anu "too many 
files" exceptions and the flink jobs were pretty dormant after launching.
However, the alert used a "lsof" not a "lsof -p TM_PID".

BOX-1, TM FDs stood at 196567 using lsof, 1087 using lsof -p TM_PID

Regardless, looking at the BOX-1:
Usage of logback-classic jar for lsof, there were now 362 REG FDs. For just 3 
Flink jobs.

Usage of a_inode using lsof, 54119, FIFO 108238 (no coincidence they're double 
as there seems to be a read and write FIFO for each inode?)
If there were 195 threads, then the average no of sockets accessed per thread 
would be 54119 / 195 thread = 277 per thread ??? Really?

BOX-2, TM FDs stood at 296073 using lsof, lsof -p 1391
LSOF -p TM_PID stats:
a_inode = 395
FIFO = 790
.jar = 116
.so = 32

LSOF | grep TM_PID stats
a_inode = 84135
FIFO = 168270
.jar = 24708
.so = 6818

5) How does FLINK use classloaders?
When just starting the cluster, it's just going to load the classes per JM & TM 

What about when we upload & run a flink job? Classloader per flink job?

6) I've attached the Box1 & Box2 Jconsole image, the FDs monitoring tool image 
and the "lsof" entries & "lsof -p" files for the remaining JM and the two TMs.


> Excessive amount of files opened by flink task manager
> ------------------------------------------------------
>                 Key: FLINK-8707
>                 URL: https://issues.apache.org/jira/browse/FLINK-8707
>             Project: Flink
>          Issue Type: Bug
>          Components: TaskManager
>    Affects Versions: 1.3.2
>         Environment: NAME="Red Hat Enterprise Linux Server"
> VERSION="7.3 (Maipo)"
> Two boxes, each with a Job Manager & Task Manager, using Zookeeper for HA.
> flink.yaml below with some settings (removed exact box names) etc:
> env.log.dir: ...some dir...residing on the same box
> env.pid.dir: some dir...residing on the same box
> metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
> metrics.reporters: jmx
> state.backend: filesystem
> state.backend.fs.checkpointdir: file:///some_nfs_mount
> state.checkpoints.dir: file:///some_nfs_mount
> state.checkpoints.num-retained: 3
> high-availability.cluster-id: /tst
> high-availability.storageDir: file:///some_nfs_mount/ha
> high-availability: zookeeper
> high-availability.zookeeper.path.root: /flink
> high-availability.zookeeper.quorum: ...list of zookeeper boxes
> env.java.opts.jobmanager: ...some extra jar args
> jobmanager.archive.fs.dir: some dir...residing on the same box
> jobmanager.web.submit.enable: true
> jobmanager.web.tmpdir:  some dir...residing on the same box
> env.java.opts.taskmanager: some extra jar args
> taskmanager.tmp.dirs:  some dir...residing on the same box/var/tmp
> taskmanager.network.memory.min: 1024MB
> taskmanager.network.memory.max: 2048MB
> blob.storage.directory:  some dir...residing on the same box
>            Reporter: Alexander Gardner
>            Priority: Blocker
>             Fix For: 1.5.0
>         Attachments: box1-jobmgr-lsof, box1-taskmgr-lsof, box2-jobmgr-lsof, 
> box2-taskmgr-lsof
> The job manager has less FDs than the task manager.
> Hi
> A support alert indicated that there were a lot of open files for the boxes 
> running Flink.
> There were 4 flink jobs that were dormant but had consumed a number of msgs 
> from Kafka using the FlinkKafkaConsumer010.
> A simple general lsof:
> $ lsof | wc -l       ->  returned 153114 open file descriptors.
> Focusing on the TaskManager process (process ID = 12154):
> $ lsof | grep 12154 | wc -l-    > returned 129322 open FDs
> $ lsof -p 12154 | wc -l   -> returned 531 FDs
> There were 228 threads running for the task manager.
> Drilling down a bit further, looking at a_inode and FIFO entries: 
> $ lsof -p 12154 | grep a_inode | wc -l = 100 FDs
> $ lsof -p 12154 | grep FIFO | wc -l  = 200 FDs
> $ /proc/12154/maps = 920 entries.
> Apart from lsof identifying lots of JARs and SOs being referenced there were 
> also 244 child processes for the task manager process.
> Noticed that in each environment, a creep of file descriptors...are the above 
> figures deemed excessive for the no of FDs in use? I know Flink uses Netty - 
> is it using a separate Selector for reads & writes? 
> Additionally Flink uses memory mapped files? or direct bytebuffers are these 
> skewing the numbers of FDs shown?
> Example of one child process ID 6633:
> java 12154 6633 dfdev 387u a_inode 0,9 0 5869 [eventpoll]
>  java 12154 6633 dfdev 388r FIFO 0,8 0t0 459758080 pipe
>  java 12154 6633 dfdev 389w FIFO 0,8 0t0 459758080 pipe
> Lasty, cannot identify yet the reason for the creep in FDs even if Flink is 
> pretty dormant or has dormant jobs. Production nodes are not experiencing 
> excessive amounts of throughput yet either.

This message was sent by Atlassian JIRA

Reply via email to