[
https://issues.apache.org/jira/browse/FLINK-8707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424006#comment-16424006
]
Alon Galant commented on FLINK-8707:
------------------------------------
Hey,
I have a similar problem, this is my thread on Flink ML:
[https://lists.apache.org/thread.html/ce89ed53533ec19ec0a7b74de0f96ac22506d5b2564c2949fd54d6ce@%3Cuser.flink.apache.org%3E]
I've created a whitelist of 16 Customer-Ids that I filter from the Kafka topic
into Flink, so I need to have around the same number of files that the data is
being written to at once.
I attached 3 files:
lsof.txt - the result for $lsof > lsof.txt
lsofp.txt - the result for $lsof -p 5514 > lsofp.txt (5514 is the flink
proccess)
ll.txt - the result for $ls -la /tmp/hadoop-flink/s3a/ > ll.txt
/tmp/hadoop-flink/s3a/ is the directory Flink uses to store the tmp files it
works on, before uploading them to s3.
I ran these commands on my task manager (I run 2 tasks manager and a total of 8
task slots, 4 on each tm)
Here are some lsof | wc -l results:
{code:java}
// code placeholder
less lsof.txt | wc -l --> 44228
less lsofp.txt | wc -l --> 403
less ll.txt | wc -l --> 64
less lsof.txt | grep output-1620244093829221334.tmp | wc -l --> 108 (this is an
example of an output file, I think all of them gives the same result)
{code}
>From ll.txt we can see that there are 4 files for each customerId (for each
>partition), so I guess that every task slot opens its own file.
For each 'output' file there are 108 FDs.
My problem is that I want to be able to handle around 500 customers, and I want
to still be able to use high concurrency.
When I tried to use 32 tasks slots on 2 TM, I got more than a million FDs on
lsof
This is the exception I got when I've enabled all of the Ids:
{code:java}
// code placeholder
java.lang.RuntimeException: Error parsing YAML configuration.
at
org.apache.flink.configuration.GlobalConfiguration.loadYAMLResource(GlobalConfiguration.java:178)
at
org.apache.flink.configuration.GlobalConfiguration.loadConfiguration(GlobalConfiguration.java:112)
at
org.apache.flink.configuration.GlobalConfiguration.loadConfiguration(GlobalConfiguration.java:79)
at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopConfiguration(HadoopFileSystem.java:178)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:408)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:351)
at
org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at
org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:106)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:232)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:653)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:640)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:246)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: /srv/flink-1.2.1/conf/flink-conf.yaml
(Too many open files)
at java.io.FileInputStream.open(Native Method)
at java.io.FileInputStream.<init>(FileInputStream.java:146)
at
org.apache.flink.configuration.GlobalConfiguration.loadYAMLResource(GlobalConfiguration.java:144)
... 14 more
{code}
I increased the ulimit to 500,000 but it's still not enough, and I guess this
is too much anyhow.
I'd love to get some help!
Thanks,
Alon
[^ll.txt]
[^lsof.txt]
[^lsofp.txt]
> Excessive amount of files opened by flink task manager
> ------------------------------------------------------
>
> Key: FLINK-8707
> URL: https://issues.apache.org/jira/browse/FLINK-8707
> Project: Flink
> Issue Type: Bug
> Components: TaskManager
> Affects Versions: 1.3.2
> Environment: NAME="Red Hat Enterprise Linux Server"
> VERSION="7.3 (Maipo)"
> Two boxes, each with a Job Manager & Task Manager, using Zookeeper for HA.
> flink.yaml below with some settings (removed exact box names) etc:
> env.log.dir: ...some dir...residing on the same box
> env.pid.dir: some dir...residing on the same box
> metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
> metrics.reporters: jmx
> state.backend: filesystem
> state.backend.fs.checkpointdir: file:///some_nfs_mount
> state.checkpoints.dir: file:///some_nfs_mount
> state.checkpoints.num-retained: 3
> high-availability.cluster-id: /tst
> high-availability.storageDir: file:///some_nfs_mount/ha
> high-availability: zookeeper
> high-availability.zookeeper.path.root: /flink
> high-availability.zookeeper.quorum: ...list of zookeeper boxes
> env.java.opts.jobmanager: ...some extra jar args
> jobmanager.archive.fs.dir: some dir...residing on the same box
> jobmanager.web.submit.enable: true
> jobmanager.web.tmpdir: some dir...residing on the same box
> env.java.opts.taskmanager: some extra jar args
> taskmanager.tmp.dirs: some dir...residing on the same box/var/tmp
> taskmanager.network.memory.min: 1024MB
> taskmanager.network.memory.max: 2048MB
> blob.storage.directory: some dir...residing on the same box
> Reporter: Alexander Gardner
> Assignee: Piotr Nowojski
> Priority: Critical
> Fix For: 1.5.0
>
> Attachments: AfterRunning-3-jobs-Box2-TM-JCONSOLE.png,
> AfterRunning-3-jobs-TM-FDs-BOX2.jpg, AfterRunning-3-jobs-lsof-p.box2-TM,
> AfterRunning-3-jobs-lsof.box2-TM, AterRunning-3-jobs-Box1-TM-JCONSOLE.png,
> box1-jobmgr-lsof, box1-taskmgr-lsof, box2-jobmgr-lsof, box2-taskmgr-lsof,
> ll.txt, ll.txt, lsof.txt, lsof.txt, lsofp.txt, lsofp.txt
>
>
> The job manager has less FDs than the task manager.
>
> Hi
> A support alert indicated that there were a lot of open files for the boxes
> running Flink.
> There were 4 flink jobs that were dormant but had consumed a number of msgs
> from Kafka using the FlinkKafkaConsumer010.
> A simple general lsof:
> $ lsof | wc -l -> returned 153114 open file descriptors.
> Focusing on the TaskManager process (process ID = 12154):
> $ lsof | grep 12154 | wc -l- > returned 129322 open FDs
> $ lsof -p 12154 | wc -l -> returned 531 FDs
> There were 228 threads running for the task manager.
>
> Drilling down a bit further, looking at a_inode and FIFO entries:
> $ lsof -p 12154 | grep a_inode | wc -l = 100 FDs
> $ lsof -p 12154 | grep FIFO | wc -l = 200 FDs
> $ /proc/12154/maps = 920 entries.
> Apart from lsof identifying lots of JARs and SOs being referenced there were
> also 244 child processes for the task manager process.
> Noticed that in each environment, a creep of file descriptors...are the above
> figures deemed excessive for the no of FDs in use? I know Flink uses Netty -
> is it using a separate Selector for reads & writes?
> Additionally Flink uses memory mapped files? or direct bytebuffers are these
> skewing the numbers of FDs shown?
> Example of one child process ID 6633:
> java 12154 6633 dfdev 387u a_inode 0,9 0 5869 [eventpoll]
> java 12154 6633 dfdev 388r FIFO 0,8 0t0 459758080 pipe
> java 12154 6633 dfdev 389w FIFO 0,8 0t0 459758080 pipe
> Lasty, cannot identify yet the reason for the creep in FDs even if Flink is
> pretty dormant or has dormant jobs. Production nodes are not experiencing
> excessive amounts of throughput yet either.
>
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)