imply-cheddar commented on a change in pull request #12073:
URL: https://github.com/apache/druid/pull/12073#discussion_r777824055
##########
File path:
extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java
##########
@@ -21,23 +21,45 @@
import org.apache.datasketches.Family;
import org.apache.datasketches.theta.SetOperation;
+import org.apache.datasketches.theta.Sketch;
import org.apache.datasketches.theta.Union;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import javax.annotation.Nullable;
+import java.lang.reflect.Field;
import java.util.List;
public class SketchAggregator implements Aggregator
{
+ private static final Logger LOG = new Logger(SketchAggregator.class);
Review comment:
This is the nittiest of nits, but the code tends to keep the name of the
logger lower case.
##########
File path:
extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java
##########
@@ -21,23 +21,45 @@
import org.apache.datasketches.Family;
import org.apache.datasketches.theta.SetOperation;
+import org.apache.datasketches.theta.Sketch;
import org.apache.datasketches.theta.Union;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import javax.annotation.Nullable;
+import java.lang.reflect.Field;
import java.util.List;
public class SketchAggregator implements Aggregator
{
+ private static final Logger LOG = new Logger(SketchAggregator.class);
+
private final BaseObjectColumnValueSelector selector;
private final int size;
@Nullable
private Union union;
+ @Nullable
+ private Sketch sketch;
+
+ @Nullable
+ private static Field sketchField;
Review comment:
This can be `final`
##########
File path:
processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
##########
@@ -225,13 +236,19 @@ protected AddToFactsResult addToFacts(
* </ul>
*
* @param key TimeAndDims key
- * @param maxBytesPerRowForAggregators max size per aggregator
- *
+ * @param maxBytesPerRowForAggregators max size per row for aggregators
+ * @param actualRowSizeForAggregators actual aggregator size for this row
* @return estimated size of row
*/
- private long estimateRowSizeInBytes(IncrementalIndexRow key, long
maxBytesPerRowForAggregators)
+ private long estimateRowSizeInBytes(
+ IncrementalIndexRow key,
+ long maxBytesPerRowForAggregators,
+ long actualRowSizeForAggregators
+ )
{
- return ROUGH_OVERHEAD_PER_MAP_ENTRY + key.estimateBytesInMemory() +
maxBytesPerRowForAggregators;
+ return ROUGH_OVERHEAD_PER_MAP_ENTRY
+ + key.estimateBytesInMemory()
+ + (useMaxMemoryEstimates ? maxBytesPerRowForAggregators :
actualRowSizeForAggregators);
Review comment:
In reading this code I found myself wondering why push down the choice
of which value to use so low and add an extra parameter here instead of change
the call site of `estimateRowSizeInBytes` to check and pass the "correct" value?
##########
File path:
processing/src/main/java/org/apache/druid/query/aggregation/SizedAggregator.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+/**
+ * Encapsulates an {@link Aggregator} and the initial size in bytes required by
+ * the Aggregator.
+ */
+public class SizedAggregator
Review comment:
Naming: given that this isn't actually an `Aggregator` object, it
shouldn't end with `Aggregator`. How about `AggregatorAndSize` instead?
##########
File path:
extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java
##########
@@ -64,6 +86,25 @@ public void aggregate()
}
}
+ @Override
+ public long aggregateWithSize()
Review comment:
As I read this code, there's 2 different "lifecycle"s happening. One
where normal `aggregate()` is being called and using `initUnion()` and another
where `aggregateWithSize()` is being called and expecting something external to
have called `getInitialSizeBytes()`.
I find myself wondering if it wouldn't be nicer to have two concrete classes
each specialized to each lifecycle and not trying to mix the code together.
There's probably still a lot of room for re-use, but with the current mixing of
the code, it seems relatively simple to accidentally use one when the other
should've been used (i.e. if we separate out the classes, then the
`aggregate()` method for the sized one can throw an
`UnsupportedOperationException` which will ensure that it's never called when
sizing is supposed to be used.
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
##########
@@ -849,6 +849,8 @@ private TaskStatus generateAndPublishSegments(
final PartitionsSpec partitionsSpec =
partitionAnalysis.getPartitionsSpec();
final IndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
final long pushTimeout = tuningConfig.getPushTimeout();
+ tuningConfig.useMaxMemoryEstimates = getContextValue(
Review comment:
If it's on the tuning config, then it can be taken as another parameter
from the tuning config's JSON and doesn't need to be on the context. Setting
values on the config like this is a bit dangerous, so let's just take it on the
constructor and stop looking at the context of the Task.
##########
File path:
processing/src/main/java/org/apache/druid/segment/DimensionDictionary.java
##########
@@ -96,6 +99,12 @@ public int size()
}
}
+ public long sizeInBytes()
+ {
+ // TODO: size of map/list object itself?
Review comment:
Make sure to do something to remove the TODOs before merge. Either
implement or add a comment that explains what's missing and why it's
potentially an issue or just delete.
##########
File path:
processing/src/main/java/org/apache/druid/segment/DimensionDictionary.java
##########
@@ -114,6 +123,11 @@ public int add(@Nullable T originalValue)
final int index = idToValue.size();
valueToId.put(originalValue, index);
idToValue.add(originalValue);
+
+ // Add size of the String and two of its references to the total size
+ // TODO: map entry overhead?
+ sizeInBytes.addAndGet(getObjectSize(originalValue) + Long.BYTES +
Long.BYTES);
Review comment:
Instead of relying on the comment to be able to read what this code is
doing, you can write it as
```
long sizeOfReference = Long.BYTES;
long sizeOfString = getObjectSize(originalValue);
sizeInBytes.addAndGet(sizeOfString + (2 * sizeOfReference));
```
Now, the actual code is saying the same thing that your comment was.
##########
File path:
extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java
##########
@@ -133,4 +174,34 @@ static void updateUnion(Union union, Object update)
throw new ISE("Illegal type received while theta sketch merging [%s]",
update.getClass());
}
}
+
+ /**
+ * Gets the initial size of this aggregator in bytes.
+ */
+ public long getInitialSizeBytes()
+ {
+ // SketchAggregator has 3 references and an int
+ // UnionImpl has a reference, a short, a long, a boolean
+ long sizeOfReferences = 3L * Long.BYTES + Integer.BYTES
+ + Long.BYTES + Short.BYTES + Long.BYTES + 1;
+ if (sketchField == null) {
+ return sizeOfReferences;
+ }
+
+ // Initialize the sketch if not already initialized
+ if (union == null) {
+ initUnion();
Review comment:
This feels a little bit wrong to me to init on the initial sizing. If
it starts out `null`, then its size is `0`. It seems like we should either do
this lazily, OR we should initialize the union on construction.
##########
File path:
processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
##########
@@ -240,22 +257,46 @@ public int getLastRowIndex()
return indexIncrement.get() - 1;
}
- private void factorizeAggs(
+ /**
+ * Creates aggregators for the given aggregator factories.
+ *
+ * @return Total initial size in bytes required by all the aggregators.
+ * This value is non-zero only when {@link #useMaxMemoryEstimates} is false.
+ */
+ private long factorizeAggs(
AggregatorFactory[] metrics,
Aggregator[] aggs,
ThreadLocal<InputRow> rowContainer,
InputRow row
)
{
+ long totalInitialSizeBytes = 0L;
rowContainer.set(row);
for (int i = 0; i < metrics.length; i++) {
final AggregatorFactory agg = metrics[i];
- aggs[i] = agg.factorize(selectors.get(agg.getName()));
+
+ if (useMaxMemoryEstimates) {
+ aggs[i] = agg.factorize(selectors.get(agg.getName()));
+ } else {
+ SizedAggregator sizedAggregator =
+ agg.factorizeSized(selectors.get(agg.getName()));
+ aggs[i] = sizedAggregator.getAggregator();
+ totalInitialSizeBytes += sizedAggregator.getInitialSizeBytes();
+ }
}
rowContainer.set(null);
+
+ return useMaxMemoryEstimates ? 0 : totalInitialSizeBytes;
Review comment:
You've got the same check in multiple places. I don't think you need
the one here as `totalInitialSizeBytes` will be 0 in the
`useMaxMemoryEstimates` case anyway. But if you really want to ensure that the
two code paths don't mix, move the `if(useMaxMemoryEstimates)` check to the top
level and write the loop twice. Once in a block that always returns `0` and
another in a block that returns an aggregated sum.
##########
File path:
processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
##########
@@ -265,11 +306,12 @@ private void doAggregate(
{
rowContainer.set(row);
+ long totalIncrementalBytes = 0L;
for (int i = 0; i < aggs.length; i++) {
final Aggregator agg = aggs[i];
synchronized (agg) {
try {
- agg.aggregate();
+ totalIncrementalBytes += agg.aggregateWithSize();
Review comment:
This is going to call the `WithSize` version even when it's not supposed
to (when it's using the max size). Please adjust to only call the `WithSize`
version when the flag is set.
##########
File path:
server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java
##########
@@ -34,6 +34,13 @@
boolean isSkipBytesInMemoryOverheadCheck();
+ /**
+ * Whether maximum memory usage should be considered in estimation.
+ */
+ default boolean isUseMaxMemoryEstimates() {
Review comment:
Should this really be a default method instead of becoming a real thing
on the config object?
##########
File path:
extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java
##########
@@ -21,23 +21,45 @@
import org.apache.datasketches.Family;
import org.apache.datasketches.theta.SetOperation;
+import org.apache.datasketches.theta.Sketch;
import org.apache.datasketches.theta.Union;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import javax.annotation.Nullable;
+import java.lang.reflect.Field;
import java.util.List;
public class SketchAggregator implements Aggregator
{
+ private static final Logger LOG = new Logger(SketchAggregator.class);
+
private final BaseObjectColumnValueSelector selector;
private final int size;
@Nullable
private Union union;
+ @Nullable
+ private Sketch sketch;
+
+ @Nullable
+ private static Field sketchField;
+ static {
+ try {
+ sketchField = Class.forName("org.apache.datasketches.theta.UnionImpl")
+ .getDeclaredField("gadget_");
+ sketchField.setAccessible(true);
+ }
+ catch (NoSuchFieldException | ClassNotFoundException e) {
+ LOG.error(e, "Could not initialize 'sketchField'");
Review comment:
This will only happen if someone happens to have loaded a new/different
version of sketches than is actually depended on by this current code. If that
happens, this error will put something into the logs that will be ignored
(people don't look at logs until something actually explodes) and then silently
ignore things. When they are silently ignored, the estimation becomes
incorrect and potentially starts causing OOMs where OOMs didn't exist
previously. If this happens, it will be super hard to track down why it
happened.
I would recommend that we actually explode loudly throwing an error out of
the static initializer (which should effectively kill the process from actually
starting in the first place). If we want a way for someone to say "I know what
I'm doing, ignore this please", we can add an extra config that the error
message in the exception points to as a way to ignore things.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]