This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 93aeaf4 Improve on-heap aggregator footprint estimates. (#11950)
93aeaf4 is described below
commit 93aeaf4801f65826f01cac0a3eedb4b4d3fdecef
Author: Gian Merlino <[email protected]>
AuthorDate: Sat Nov 27 23:51:24 2021 -0800
Improve on-heap aggregator footprint estimates. (#11950)
Add a "guessAggregatorHeapFootprint" method to AggregatorFactory that
mitigates #6743 by enabling heap footprint estimates based on a specific
number of rows. The idea is that at ingestion time, the number of rows
that go into an aggregator will be 1 (if rollup is off) or will likely
be a small number (if rollup is on).
It's a heuristic, because of course nothing guarantees that the rollup
ratio is a small number. But it's a common case, and I expect this logic
to go wrong much less often than the current logic. Also, when it does
go wrong, users can fix it by lowering maxRowsInMemory or
maxBytesInMemory. The current situation is unintuitive: when the
estimation goes wrong, users get an OOME, but actually they need to
*raise* these limits to fix it.
---
.../quantiles/DoublesSketchAggregatorFactory.java | 6 ++++++
.../theta/SketchAggregatorFactory.java | 25 ++++++++++++++++++++++
.../DoublesSketchAggregatorFactoryTest.java | 15 +++++++++++++
.../theta/SketchAggregatorFactoryTest.java | 25 ++++++++++++++++++++++
.../druid/query/aggregation/AggregatorFactory.java | 17 +++++++++++++++
.../incremental/OnheapIncrementalIndex.java | 23 +++++++++++++++-----
6 files changed, 106 insertions(+), 5 deletions(-)
diff --git
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java
index 4f7f4cd..145ffcf 100644
---
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java
+++
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java
@@ -298,6 +298,12 @@ public class DoublesSketchAggregatorFactory extends
AggregatorFactory
return Collections.singletonList(fieldName);
}
+ @Override
+ public int guessAggregatorHeapFootprint(long rows)
+ {
+ return DoublesSketch.getUpdatableStorageBytes(k, rows);
+ }
+
// Quantiles sketches never stop growing, but they do so very slowly.
// This size must suffice for overwhelming majority of sketches,
// but some sketches may request more memory on heap and move there
diff --git
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java
index 7a10a16..a57c547 100644
---
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java
+++
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java
@@ -21,6 +21,7 @@ package org.apache.druid.query.aggregation.datasketches.theta;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
import org.apache.datasketches.Family;
import org.apache.datasketches.Util;
import org.apache.datasketches.theta.SetOperation;
@@ -48,6 +49,13 @@ public abstract class SketchAggregatorFactory extends
AggregatorFactory
{
public static final int DEFAULT_MAX_SKETCH_SIZE = 16384;
+ // Smallest number of entries in an Aggregator. Each entry is a long. Based
on the constructor of
+ // HeapQuickSelectSketch and used by guessAggregatorHeapFootprint.
+ private static final int MIN_ENTRIES_PER_AGGREGATOR = 1 <<
Util.MIN_LG_ARR_LONGS;
+
+ // Largest preamble size for the sketch stored in an Aggregator, in bytes.
Based on Util.getMaxUnionBytes.
+ private static final int LONGEST_POSSIBLE_PREAMBLE_BYTES =
Family.UNION.getMaxPreLongs() << 3;
+
protected final String name;
protected final String fieldName;
protected final int size;
@@ -171,6 +179,23 @@ public abstract class SketchAggregatorFactory extends
AggregatorFactory
}
@Override
+ public int guessAggregatorHeapFootprint(long rows)
+ {
+ final int maxEntries = size * 2;
+ final int expectedEntries;
+
+ if (rows > maxEntries) {
+ expectedEntries = maxEntries;
+ } else {
+ // rows is within int range since it's <= maxEntries, so casting is OK.
+ expectedEntries = Math.max(MIN_ENTRIES_PER_AGGREGATOR,
Util.ceilingPowerOf2(Ints.checkedCast(rows)));
+ }
+
+ // 8 bytes per entry + largest possible preamble.
+ return Long.BYTES * expectedEntries + LONGEST_POSSIBLE_PREAMBLE_BYTES;
+ }
+
+ @Override
public int getMaxIntermediateSize()
{
return SetOperation.getMaxUnionBytes(size);
diff --git
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactoryTest.java
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactoryTest.java
index cf1f4f9..d4867c7 100644
---
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactoryTest.java
+++
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactoryTest.java
@@ -84,6 +84,21 @@ public class DoublesSketchAggregatorFactoryTest
}
@Test
+ public void testGuessAggregatorHeapFootprint()
+ {
+ DoublesSketchAggregatorFactory factory = new
DoublesSketchAggregatorFactory(
+ "myFactory",
+ "myField",
+ 128,
+ null
+ );
+ Assert.assertEquals(64, factory.guessAggregatorHeapFootprint(1));
+ Assert.assertEquals(1056, factory.guessAggregatorHeapFootprint(100));
+ Assert.assertEquals(4128, factory.guessAggregatorHeapFootprint(1000));
+ Assert.assertEquals(34848,
factory.guessAggregatorHeapFootprint(1_000_000_000_000L));
+ }
+
+ @Test
public void testMaxIntermediateSize()
{
DoublesSketchAggregatorFactory factory = new
DoublesSketchAggregatorFactory(
diff --git
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactoryTest.java
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactoryTest.java
index bd495ce..61efd5f 100644
---
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactoryTest.java
+++
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactoryTest.java
@@ -36,6 +36,31 @@ import org.junit.Test;
public class SketchAggregatorFactoryTest
{
+ private static final SketchMergeAggregatorFactory AGGREGATOR_16384 =
+ new SketchMergeAggregatorFactory("x", "x", 16384, null, false, null);
+
+ private static final SketchMergeAggregatorFactory AGGREGATOR_32768 =
+ new SketchMergeAggregatorFactory("x", "x", 32768, null, false, null);
+
+ @Test
+ public void testGuessAggregatorHeapFootprint()
+ {
+ Assert.assertEquals(288, AGGREGATOR_16384.guessAggregatorHeapFootprint(1));
+ Assert.assertEquals(1056,
AGGREGATOR_16384.guessAggregatorHeapFootprint(100));
+ Assert.assertEquals(262176,
AGGREGATOR_16384.guessAggregatorHeapFootprint(1_000_000_000_000L));
+
+ Assert.assertEquals(288, AGGREGATOR_32768.guessAggregatorHeapFootprint(1));
+ Assert.assertEquals(1056,
AGGREGATOR_32768.guessAggregatorHeapFootprint(100));
+ Assert.assertEquals(524320,
AGGREGATOR_32768.guessAggregatorHeapFootprint(1_000_000_000_000L));
+ }
+
+ @Test
+ public void testMaxIntermediateSize()
+ {
+ Assert.assertEquals(262176, AGGREGATOR_16384.getMaxIntermediateSize());
+ Assert.assertEquals(524320, AGGREGATOR_32768.getMaxIntermediateSize());
+ }
+
@Test
public void testResultArraySignature()
{
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorFactory.java
b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorFactory.java
index f0e3837..45f9328 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorFactory.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorFactory.java
@@ -303,6 +303,23 @@ public abstract class AggregatorFactory implements
Cacheable
}
/**
+ * Returns a best guess as to how much memory the on-heap {@link Aggregator}
returned by {@link #factorize} will
+ * require when a certain number of rows have been aggregated into it.
+ *
+ * The main user of this method is {@link
org.apache.druid.segment.incremental.OnheapIncrementalIndex}, which
+ * uses it to determine when to persist the current in-memory data to disk.
+ *
+ * Important note for callers! In nearly all cases, callers that wish to
constrain memory would be better off
+ * using {@link #factorizeBuffered} or {@link #factorizeVector}, which offer
precise control over how much memory
+ * is being used.
+ */
+ public int guessAggregatorHeapFootprint(long rows)
+ {
+ // By default, guess that on-heap footprint is equal to off-heap footprint.
+ return getMaxIntermediateSizeWithNulls();
+ }
+
+ /**
* Return a potentially optimized form of this AggregatorFactory for
per-segment queries.
*/
public AggregatorFactory
optimizeForSegment(PerSegmentQueryOptimizationContext optimizationContext)
diff --git
a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
index 8ebbeaf..bb24ae4 100644
---
a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
+++
b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
@@ -44,7 +44,6 @@ import org.apache.druid.utils.JvmUtils;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -59,6 +58,15 @@ import java.util.concurrent.atomic.AtomicLong;
public class OnheapIncrementalIndex extends IncrementalIndex
{
private static final Logger log = new Logger(OnheapIncrementalIndex.class);
+
+ /**
+ * Constant factor provided to {@link
AggregatorFactory#guessAggregatorHeapFootprint(long)} for footprint estimates.
+ * This figure is large enough to catch most common rollup ratios, but not
so large that it will cause persists to
+ * happen too often. If an actual workload involves a much higher rollup
ratio, then this may lead to excessive
+ * heap usage. Users would have to work around that by lowering
maxRowsInMemory or maxBytesInMemory.
+ */
+ private static final long ROLLUP_RATIO_FOR_AGGREGATOR_FOOTPRINT_ESTIMATION =
100;
+
/**
* overhead per {@link ConcurrentHashMap.Node} or {@link
java.util.concurrent.ConcurrentSkipListMap.Node} object
*/
@@ -113,11 +121,16 @@ public class OnheapIncrementalIndex extends
IncrementalIndex
*/
private static long getMaxBytesPerRowForAggregators(IncrementalIndexSchema
incrementalIndexSchema)
{
+ final long rowsPerAggregator =
+ incrementalIndexSchema.isRollup() ?
ROLLUP_RATIO_FOR_AGGREGATOR_FOOTPRINT_ESTIMATION : 1;
+
long maxAggregatorIntermediateSize = ((long) Integer.BYTES) *
incrementalIndexSchema.getMetrics().length;
- maxAggregatorIntermediateSize +=
Arrays.stream(incrementalIndexSchema.getMetrics())
- .mapToLong(aggregator ->
aggregator.getMaxIntermediateSizeWithNulls()
- +
Long.BYTES * 2L)
- .sum();
+
+ for (final AggregatorFactory aggregator :
incrementalIndexSchema.getMetrics()) {
+ maxAggregatorIntermediateSize +=
+ (long) aggregator.guessAggregatorHeapFootprint(rowsPerAggregator) +
2L * Long.BYTES;
+ }
+
return maxAggregatorIntermediateSize;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]