will-lauer opened a new issue #11558:
URL: https://github.com/apache/druid/issues/11558


   ### Affected Version
   
   0.21, but probably all versions prior that support GroupBy v2
   
   ### Description
   
   We are regularly seeing "Too Many Open Files" errors when running GroupBy 
queries using GroupBy v2 combined with sketches on some of our larger backend 
historical nodes. A typical stack trace looks like
   
   ```
   Aug 05, 2021 2:24:32 PM 
com.google.common.util.concurrent.Futures$CombinedFuture setExceptionAndMaybeLog
   SEVERE: input future failed.
   java.lang.RuntimeException: java.io.FileNotFoundException: 
/home/y/tmp/druid/druid-groupBy-cdc606dc-9ad9-4ac3-a6ba-82f412b13c5b_will_testspilling/00104259.tmp
 (Too many open files)
        at 
org.apache.druid.query.groupby.epinephelinae.SpillingGrouper.read(SpillingGrouper.java:331)
        at 
org.apache.druid.query.groupby.epinephelinae.SpillingGrouper.iterator(SpillingGrouper.java:256)
        at 
org.apache.druid.query.groupby.epinephelinae.ConcurrentGrouper$1.call(ConcurrentGrouper.java:351)
        at 
org.apache.druid.query.groupby.epinephelinae.ConcurrentGrouper$1.call(ConcurrentGrouper.java:347)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
org.apache.druid.query.PrioritizedListenableFutureTask.run(PrioritizedExecutorService.java:247)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: java.io.FileNotFoundException: 
/home/y/tmp/druid/druid-groupBy-cdc606dc-9ad9-4ac3-a6ba-82f412b13c5b_will_testspilling/00104259.tmp
 (Too many open files)
        at java.io.FileInputStream.open0(Native Method)
        at java.io.FileInputStream.open(FileInputStream.java:195)
        at java.io.FileInputStream.<init>(FileInputStream.java:138)
        at 
org.apache.druid.query.groupby.epinephelinae.SpillingGrouper.read(SpillingGrouper.java:326)
        ... 8 more
   ```
   
   When this error occurs, it typically causes a cascade of similar "Too many 
open file" errors from HDFS and ZK sockets, as all operations in the process 
become constrained by the available file descriptors.
   
   #### Configuration
    - 200 historical nodes, divided up into 3 tiers. Errors trigger on tier3, 
which has:
       - 4 hosts
       - ~40,000 segments per host
       - 768g RAM per host
       - 36 cores (72 hyperthreads) per host
       - druid.processing.numThreads=36
       - druid.processing.buffer.sizeBytes=2147483647
       - druid.processing.numMergeBuffers=16
       - druid.query.groupBy.maxOnDiskStorage=12884901888
       - druid.query.groupBy.maxMergingDictionarySize=1610612736
       - ulimit -n 65000 (max files per process)
    - typical query includes:
       - multiple (usually 8) thetaSketch aggregators, using size 16384
       - 4-5 grouping dimensions
   
   #### Debugging
   We dug into this and found several things going on that contributed to the 
final problem:
   
   1. A single group by query can generate an absurb number of spill files. In 
our case, one basic query was generating more than 100,000 spill files, with an 
average size around 100k. This appears to happen because the 2g processing 
buffer is divided up into around 1000 buckets because the computed row size for 
my 8 thetasketch aggregators is ~2m bytes (about twice what I think it should 
be). The grouper spills to disk when the hash table fills to about 70%, so 
around every 700 distinct dimensions combinations trigger a spill. For one of 
our historical nodes, on a query hitting 515 out of the 40,000 segments on the 
host, this generated 105,000 different spill files. Because the sketches are 
often much smaller than their possible maximum size, when combined with the 
standard compression on the spill file, the 2g buffer was spilled to files that 
averaged about 100k in size.
   2. The code to _read_ spill files appears to open ALL the spill files at 
once and builds an iterator of their deserialized context. Instead of opening 
the files lazily on-demand, the code appears to assume that only a small number 
of files will exist and opens them and keeps handles to the open file pending 
processing. The relevant code is in `SpillingGrouper.read(Iterator)` and 
`SpillingGrouper.iterator(boolean)`. The code that actually writes the files in 
`SpillingGrouper.spill(Iterator)` uses a resource-try mechanism to ensure that 
the spilled file is closed immediately after writing it. Instead of using a 
similar mechanism in `read()`, SpillingGrouper simply opens files and builds a 
series of MappingIterators and then uses them to construct an overall iterator 
over the complete results.
   
   #### Proposed solution
   `SpillingGrouper` needs to be changed to open files one at a time, only when 
it is ready to read from them, and then close them immediately afterwards. We 
can do this by changing `SpillingGrouper.read()` to return a 
`Provider<Iterator>` rather than a `MappingIterator`. The `Provider` would be 
provided a lambda at creation that would construct and open the file at the 
time it is needed rather than opening the file up front. This `Provider` could 
then be used in a new `LazyCloseableIterator` to retrieve the underlying 
iterator (and thus opening the associated file) only when it is actually read 
to consume the files contents.
   
   I'll attach an implementation of the proposed fix shortly.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to