will-lauer opened a new issue #11558:
URL: https://github.com/apache/druid/issues/11558
### Affected Version
0.21, but probably all versions prior that support GroupBy v2
### Description
We are regularly seeing "Too Many Open Files" errors when running GroupBy
queries using GroupBy v2 combined with sketches on some of our larger backend
historical nodes. A typical stack trace looks like
```
Aug 05, 2021 2:24:32 PM
com.google.common.util.concurrent.Futures$CombinedFuture setExceptionAndMaybeLog
SEVERE: input future failed.
java.lang.RuntimeException: java.io.FileNotFoundException:
/home/y/tmp/druid/druid-groupBy-cdc606dc-9ad9-4ac3-a6ba-82f412b13c5b_will_testspilling/00104259.tmp
(Too many open files)
at
org.apache.druid.query.groupby.epinephelinae.SpillingGrouper.read(SpillingGrouper.java:331)
at
org.apache.druid.query.groupby.epinephelinae.SpillingGrouper.iterator(SpillingGrouper.java:256)
at
org.apache.druid.query.groupby.epinephelinae.ConcurrentGrouper$1.call(ConcurrentGrouper.java:351)
at
org.apache.druid.query.groupby.epinephelinae.ConcurrentGrouper$1.call(ConcurrentGrouper.java:347)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
org.apache.druid.query.PrioritizedListenableFutureTask.run(PrioritizedExecutorService.java:247)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException:
/home/y/tmp/druid/druid-groupBy-cdc606dc-9ad9-4ac3-a6ba-82f412b13c5b_will_testspilling/00104259.tmp
(Too many open files)
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.<init>(FileInputStream.java:138)
at
org.apache.druid.query.groupby.epinephelinae.SpillingGrouper.read(SpillingGrouper.java:326)
... 8 more
```
When this error occurs, it typically causes a cascade of similar "Too many
open file" errors from HDFS and ZK sockets, as all operations in the process
become constrained by the available file descriptors.
#### Configuration
- 200 historical nodes, divided up into 3 tiers. Errors trigger on tier3,
which has:
- 4 hosts
- ~40,000 segments per host
- 768g RAM per host
- 36 cores (72 hyperthreads) per host
- druid.processing.numThreads=36
- druid.processing.buffer.sizeBytes=2147483647
- druid.processing.numMergeBuffers=16
- druid.query.groupBy.maxOnDiskStorage=12884901888
- druid.query.groupBy.maxMergingDictionarySize=1610612736
- ulimit -n 65000 (max files per process)
- typical query includes:
- multiple (usually 8) thetaSketch aggregators, using size 16384
- 4-5 grouping dimensions
#### Debugging
We dug into this and found several things going on that contributed to the
final problem:
1. A single group by query can generate an absurb number of spill files. In
our case, one basic query was generating more than 100,000 spill files, with an
average size around 100k. This appears to happen because the 2g processing
buffer is divided up into around 1000 buckets because the computed row size for
my 8 thetasketch aggregators is ~2m bytes (about twice what I think it should
be). The grouper spills to disk when the hash table fills to about 70%, so
around every 700 distinct dimensions combinations trigger a spill. For one of
our historical nodes, on a query hitting 515 out of the 40,000 segments on the
host, this generated 105,000 different spill files. Because the sketches are
often much smaller than their possible maximum size, when combined with the
standard compression on the spill file, the 2g buffer was spilled to files that
averaged about 100k in size.
2. The code to _read_ spill files appears to open ALL the spill files at
once and builds an iterator of their deserialized context. Instead of opening
the files lazily on-demand, the code appears to assume that only a small number
of files will exist and opens them and keeps handles to the open file pending
processing. The relevant code is in `SpillingGrouper.read(Iterator)` and
`SpillingGrouper.iterator(boolean)`. The code that actually writes the files in
`SpillingGrouper.spill(Iterator)` uses a resource-try mechanism to ensure that
the spilled file is closed immediately after writing it. Instead of using a
similar mechanism in `read()`, SpillingGrouper simply opens files and builds a
series of MappingIterators and then uses them to construct an overall iterator
over the complete results.
#### Proposed solution
`SpillingGrouper` needs to be changed to open files one at a time, only when
it is ready to read from them, and then close them immediately afterwards. We
can do this by changing `SpillingGrouper.read()` to return a
`Provider<Iterator>` rather than a `MappingIterator`. The `Provider` would be
provided a lambda at creation that would construct and open the file at the
time it is needed rather than opening the file up front. This `Provider` could
then be used in a new `LazyCloseableIterator` to retrieve the underlying
iterator (and thus opening the associated file) only when it is actually read
to consume the files contents.
I'll attach an implementation of the proposed fix shortly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]