This is an automated email from the ASF dual-hosted git repository.
suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 3a3271eddc Introduce defaultOnDiskStorage config for Group By (#12833)
3a3271eddc is described below
commit 3a3271eddc946599e1c9b81154d2f37663b186c5
Author: Lucas Capistrant <[email protected]>
AuthorDate: Fri Aug 12 11:40:21 2022 -0500
Introduce defaultOnDiskStorage config for Group By (#12833)
* Introduce defaultOnDiskStorage config for groupBy
* add debug log to groupby query config
* Apply config change suggestion from review
* Remove accidental new lines
* update default value of new default disk storage config
* update debug log to have more descriptive text
* Make maxOnDiskStorage and defaultOnDiskStorage HumanRedadableBytes
* improve test coverage
* Provide default implementation to new default method on advice of reviewer
---
.../benchmark/GroupByTypeInterfaceBenchmark.java | 5 +-
.../druid/benchmark/query/GroupByBenchmark.java | 5 +-
docs/configuration/index.md | 3 +-
.../materializedview/MaterializedViewQuery.java | 7 ++
.../MaterializedViewQueryTest.java | 35 ++++++++++
.../java/org/apache/druid/query/BaseQuery.java | 7 ++
.../main/java/org/apache/druid/query/Query.java | 19 ++++++
.../java/org/apache/druid/query/QueryContext.java | 7 ++
.../java/org/apache/druid/query/QueryContexts.java | 18 ++++++
.../druid/query/groupby/GroupByQueryConfig.java | 39 ++++++++++--
.../epinephelinae/GroupByMergingQueryRunnerV2.java | 2 +-
.../groupby/epinephelinae/GroupByRowProcessor.java | 2 +-
.../org/apache/druid/query/select/SelectQuery.java | 7 ++
.../org/apache/druid/query/QueryContextTest.java | 30 +++++++++
.../org/apache/druid/query/QueryContextsTest.java | 9 +++
...GroupByLimitPushDownInsufficientBufferTest.java | 5 +-
.../GroupByLimitPushDownMultiNodeMergeTest.java | 5 +-
.../query/groupby/GroupByMultiSegmentTest.java | 5 +-
.../query/groupby/GroupByQueryConfigTest.java | 74 ++++++++++++++++++++--
.../query/groupby/GroupByQueryRunnerTest.java | 11 ++--
.../query/groupby/NestedQueryPushDownTest.java | 5 +-
21 files changed, 269 insertions(+), 31 deletions(-)
diff --git
a/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
index 4450689275..8e885856ba 100644
---
a/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
+++
b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
@@ -32,6 +32,7 @@ import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequence;
@@ -362,9 +363,9 @@ public class GroupByTypeInterfaceBenchmark
}
@Override
- public long getMaxOnDiskStorage()
+ public HumanReadableBytes getMaxOnDiskStorage()
{
- return 1_000_000_000L;
+ return HumanReadableBytes.valueOf(1_000_000_000L);
}
};
config.setSingleThreaded(false);
diff --git
a/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
b/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
index 0fdffa8b87..9dd0ea5c5e 100644
---
a/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
+++
b/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
@@ -34,6 +34,7 @@ import org.apache.druid.collections.StupidPool;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
@@ -477,9 +478,9 @@ public class GroupByBenchmark
}
@Override
- public long getMaxOnDiskStorage()
+ public HumanReadableBytes getMaxOnDiskStorage()
{
- return 1_000_000_000L;
+ return HumanReadableBytes.valueOf(1_000_000_000L);
}
};
config.setSingleThreaded(false);
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index dee540f399..65ddb3a782 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -2079,6 +2079,7 @@ Supported runtime properties:
|`druid.query.groupBy.maxSelectorDictionarySize`|Maximum amount of heap space
(approximately) to use for per-segment string dictionaries. See [groupBy memory
tuning and resource
limits](../querying/groupbyquery.md#memory-tuning-and-resource-limits) for
details.|100000000|
|`druid.query.groupBy.maxMergingDictionarySize`|Maximum amount of heap space
(approximately) to use for per-query string dictionaries. When the dictionary
exceeds this size, a spill to disk will be triggered. See [groupBy memory
tuning and resource
limits](../querying/groupbyquery.md#memory-tuning-and-resource-limits) for
details.|100000000|
|`druid.query.groupBy.maxOnDiskStorage`|Maximum amount of disk space to use,
per-query, for spilling result sets to disk when either the merging buffer or
the dictionary fills up. Queries that exceed this limit will fail. Set to zero
to disable disk spilling.|0 (disabled)|
+|`druid.query.groupBy.defaultOnDiskStorage`|Default amount of disk space to
use, per-query, for spilling the result sets to disk when either the merging
buffer or the dictionary fills up. Set to zero to disable disk spilling for
queries which don't override `maxOnDiskStorage` in their
context.|`druid.query.groupBy.maxOnDiskStorage`|
Supported query contexts:
@@ -2086,7 +2087,7 @@ Supported query contexts:
|---|-----------|
|`maxSelectorDictionarySize`|Can be used to lower the value of
`druid.query.groupBy.maxMergingDictionarySize` for this query.|
|`maxMergingDictionarySize`|Can be used to lower the value of
`druid.query.groupBy.maxMergingDictionarySize` for this query.|
-|`maxOnDiskStorage`|Can be used to lower the value of
`druid.query.groupBy.maxOnDiskStorage` for this query.|
+|`maxOnDiskStorage`|Can be used to set `maxOnDiskStorage` to a value between 0
and `druid.query.groupBy.maxOnDiskStorage` for this query. If this query
context override exceeds `druid.query.groupBy.maxOnDiskStorage`, the query will
use `druid.query.groupBy.maxOnDiskStorage`. Omitting this from the query
context will cause the query to use `druid.query.groupBy.defaultOnDiskStorage`
for `maxOnDiskStorage`|
### Advanced configurations
diff --git
a/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/MaterializedViewQuery.java
b/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/MaterializedViewQuery.java
index cfd4bc9255..cb98d6b4b2 100644
---
a/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/MaterializedViewQuery.java
+++
b/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/MaterializedViewQuery.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Ordering;
+import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.DataSource;
@@ -170,6 +171,12 @@ public class MaterializedViewQuery<T> implements Query<T>
return query.getContextBoolean(key, defaultValue);
}
+ @Override
+ public HumanReadableBytes getContextHumanReadableBytes(String key,
HumanReadableBytes defaultValue)
+ {
+ return query.getContextHumanReadableBytes(key, defaultValue);
+ }
+
@Override
public boolean isDescending()
{
diff --git
a/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/MaterializedViewQueryTest.java
b/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/MaterializedViewQueryTest.java
index 1432f519ef..1a55cdd026 100644
---
a/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/MaterializedViewQueryTest.java
+++
b/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/MaterializedViewQueryTest.java
@@ -22,8 +22,10 @@ package org.apache.druid.query.materializedview;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
+import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunnerTestHelper;
@@ -89,4 +91,37 @@ public class MaterializedViewQueryTest
Assert.assertEquals(QueryRunnerTestHelper.ALL_GRAN,
query.getGranularity());
Assert.assertEquals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC.getIntervals(),
query.getIntervals());
}
+
+ @Test
+ public void testGetContextHumanReadableBytes()
+ {
+ TopNQuery topNQuery = new TopNQueryBuilder()
+ .dataSource(QueryRunnerTestHelper.DATA_SOURCE)
+ .granularity(QueryRunnerTestHelper.ALL_GRAN)
+ .dimension(QueryRunnerTestHelper.MARKET_DIMENSION)
+ .metric(QueryRunnerTestHelper.INDEX_METRIC)
+ .threshold(4)
+ .intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC)
+ .aggregators(
+ Lists.newArrayList(
+ Iterables.concat(
+ QueryRunnerTestHelper.COMMON_DOUBLE_AGGREGATORS,
+ Lists.newArrayList(
+ new DoubleMaxAggregatorFactory("maxIndex", "index"),
+ new DoubleMinAggregatorFactory("minIndex", "index")
+ )
+ )
+ )
+ )
+ .context(
+ ImmutableMap.of(
+ "maxOnDiskStorage", "20M"
+ )
+ )
+ .postAggregators(QueryRunnerTestHelper.ADD_ROWS_INDEX_CONSTANT)
+ .build();
+ MaterializedViewQuery query = new MaterializedViewQuery(topNQuery,
optimizer);
+ Assert.assertEquals(20_000_000,
query.getContextHumanReadableBytes("maxOnDiskStorage",
HumanReadableBytes.ZERO).getBytes());
+
+ }
}
diff --git a/processing/src/main/java/org/apache/druid/query/BaseQuery.java
b/processing/src/main/java/org/apache/druid/query/BaseQuery.java
index bc45380d26..c581dfcfc3 100644
--- a/processing/src/main/java/org/apache/druid/query/BaseQuery.java
+++ b/processing/src/main/java/org/apache/druid/query/BaseQuery.java
@@ -27,6 +27,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Ordering;
import org.apache.druid.guice.annotations.ExtensionPoint;
+import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
@@ -199,6 +200,12 @@ public abstract class BaseQuery<T> implements Query<T>
return context.getAsBoolean(key, defaultValue);
}
+ @Override
+ public HumanReadableBytes getContextHumanReadableBytes(String key,
HumanReadableBytes defaultValue)
+ {
+ return context.getAsHumanReadableBytes(key, defaultValue);
+ }
+
/**
* @deprecated use {@link #computeOverriddenContext(Map, Map)
computeOverriddenContext(getContext(), overrides))}
* instead. This method may be removed in the next minor or major version of
Druid.
diff --git a/processing/src/main/java/org/apache/druid/query/Query.java
b/processing/src/main/java/org/apache/druid/query/Query.java
index ced91a6383..9db2f0ca00 100644
--- a/processing/src/main/java/org/apache/druid/query/Query.java
+++ b/processing/src/main/java/org/apache/druid/query/Query.java
@@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Ordering;
import org.apache.druid.guice.annotations.ExtensionPoint;
+import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.datasourcemetadata.DataSourceMetadataQuery;
import org.apache.druid.query.filter.DimFilter;
@@ -129,6 +130,24 @@ public interface Query<T>
boolean getContextBoolean(String key, boolean defaultValue);
+ /**
+ * Returns {@link HumanReadableBytes} for a specified context key. If the
context is null or the key doesn't exist
+ * a caller specified default value is returned. A default implementation is
provided since Query is an extension
+ * point. Extensions can choose to rely on this default to retain
compatibility with core Druid.
+ *
+ * @param key The context key value being looked up
+ * @param defaultValue The default to return if the key value doesn't exist
or the context is null.
+ * @return {@link HumanReadableBytes}
+ */
+ default HumanReadableBytes getContextHumanReadableBytes(String key,
HumanReadableBytes defaultValue)
+ {
+ if (null != getQueryContext()) {
+ return getQueryContext().getAsHumanReadableBytes(key, defaultValue);
+ } else {
+ return defaultValue;
+ }
+ }
+
boolean isDescending();
/**
diff --git a/processing/src/main/java/org/apache/druid/query/QueryContext.java
b/processing/src/main/java/org/apache/druid/query/QueryContext.java
index 20b607f784..af7352c8d9 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryContext.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryContext.java
@@ -19,6 +19,8 @@
package org.apache.druid.query;
+import org.apache.druid.java.util.common.HumanReadableBytes;
+
import javax.annotation.Nullable;
import java.util.Collections;
@@ -176,6 +178,11 @@ public class QueryContext
return QueryContexts.getAsLong(parameter, get(parameter), defaultValue);
}
+ public HumanReadableBytes getAsHumanReadableBytes(final String parameter,
final HumanReadableBytes defaultValue)
+ {
+ return QueryContexts.getAsHumanReadableBytes(parameter, get(parameter),
defaultValue);
+ }
+
public Map<String, Object> getMergedParams()
{
if (mergedParams == null) {
diff --git a/processing/src/main/java/org/apache/druid/query/QueryContexts.java
b/processing/src/main/java/org/apache/druid/query/QueryContexts.java
index 67cb49be91..e531869ac5 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryContexts.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryContexts.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.guice.annotations.PublicApi;
+import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Numbers;
@@ -568,6 +569,23 @@ public class QueryContexts
}
}
+ public static HumanReadableBytes getAsHumanReadableBytes(
+ final String parameter,
+ final Object value,
+ final HumanReadableBytes defaultValue
+ )
+ {
+ if (null == value) {
+ return defaultValue;
+ } else if (value instanceof Number) {
+ return HumanReadableBytes.valueOf(Numbers.parseLong(value));
+ } else if (value instanceof String) {
+ return new HumanReadableBytes((String) value);
+ } else {
+ throw new IAE("Expected parameter [%s] to be in human readable format",
parameter);
+ }
+ }
+
public static Map<String, Object> override(
final Map<String, Object> context,
final Map<String, Object> overrides
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java
b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java
index be4fb985a4..6125577ac3 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java
@@ -21,6 +21,7 @@ package org.apache.druid.query.groupby;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
@@ -31,6 +32,8 @@ import org.apache.druid.utils.JvmUtils;
*/
public class GroupByQueryConfig
{
+ private static final Logger logger = new Logger(GroupByQueryConfig.class);
+
public static final long AUTOMATIC = 0;
public static final String CTX_KEY_STRATEGY = "groupByStrategy";
@@ -100,7 +103,10 @@ public class GroupByQueryConfig
@JsonProperty
// Max on-disk temporary storage, per-query; when exceeded, the query fails
- private long maxOnDiskStorage = 0L;
+ private HumanReadableBytes maxOnDiskStorage = HumanReadableBytes.valueOf(0);
+
+ @JsonProperty
+ private HumanReadableBytes defaultOnDiskStorage =
HumanReadableBytes.valueOf(-1);
@JsonProperty
private boolean forcePushDownLimit = false;
@@ -258,11 +264,24 @@ public class GroupByQueryConfig
);
}
- public long getMaxOnDiskStorage()
+ public HumanReadableBytes getMaxOnDiskStorage()
{
return maxOnDiskStorage;
}
+ /**
+ * Mirror maxOnDiskStorage if defaultOnDiskStorage's default is not
overridden by cluster operator.
+ *
+ * This mirroring is done to maintain continuity in behavior between Druid
versions. If an operator wants to use
+ * defaultOnDiskStorage, they have to explicitly override it.
+ *
+ * @return The working value for defaultOnDiskStorage
+ */
+ public HumanReadableBytes getDefaultOnDiskStorage()
+ {
+ return defaultOnDiskStorage.getBytes() < 0L ? getMaxOnDiskStorage() :
defaultOnDiskStorage;
+ }
+
public boolean isForcePushDownLimit()
{
return forcePushDownLimit;
@@ -338,9 +357,14 @@ public class GroupByQueryConfig
CTX_KEY_BUFFER_GROUPER_INITIAL_BUCKETS,
getBufferGrouperInitialBuckets()
);
- newConfig.maxOnDiskStorage = Math.min(
- ((Number) query.getContextValue(CTX_KEY_MAX_ON_DISK_STORAGE,
getMaxOnDiskStorage())).longValue(),
- getMaxOnDiskStorage()
+ // If the client overrides do not provide "maxOnDiskStorage" context key,
the server side "defaultOnDiskStorage"
+ // value is used in the calculation of the newConfig value of
maxOnDiskStorage. This allows the operator to
+ // choose a default value lower than the max allowed when the context key
is missing in the client query.
+ newConfig.maxOnDiskStorage = HumanReadableBytes.valueOf(
+ Math.min(
+ query.getContextHumanReadableBytes(CTX_KEY_MAX_ON_DISK_STORAGE,
getDefaultOnDiskStorage()).getBytes(),
+ getMaxOnDiskStorage().getBytes()
+ )
);
newConfig.maxSelectorDictionarySize = maxSelectorDictionarySize; // No
overrides
newConfig.maxMergingDictionarySize = maxMergingDictionarySize; // No
overrides
@@ -368,6 +392,8 @@ public class GroupByQueryConfig
CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING,
isMultiValueUnnestingEnabled()
);
+
+ logger.debug("Override config for GroupBy query %s - %s", query.getId(),
newConfig.toString());
return newConfig;
}
@@ -383,7 +409,8 @@ public class GroupByQueryConfig
", bufferGrouperMaxLoadFactor=" + bufferGrouperMaxLoadFactor +
", bufferGrouperInitialBuckets=" + bufferGrouperInitialBuckets +
", maxMergingDictionarySize=" + maxMergingDictionarySize +
- ", maxOnDiskStorage=" + maxOnDiskStorage +
+ ", maxOnDiskStorage=" + maxOnDiskStorage.getBytes() +
+ ", defaultOnDiskStorage=" + getDefaultOnDiskStorage().getBytes() +
// use the getter because of special behavior for mirroring maxOnDiskStorage if
defaultOnDiskStorage not explicitly set.
", forcePushDownLimit=" + forcePushDownLimit +
", forceHashAggregation=" + forceHashAggregation +
", intermediateCombineDegree=" + intermediateCombineDegree +
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java
index 4f3bb6a645..59642bb919 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java
@@ -175,7 +175,7 @@ public class GroupByMergingQueryRunnerV2 implements
QueryRunner<ResultRow>
try {
final LimitedTemporaryStorage temporaryStorage = new
LimitedTemporaryStorage(
temporaryStorageDirectory,
- querySpecificConfig.getMaxOnDiskStorage()
+ querySpecificConfig.getMaxOnDiskStorage().getBytes()
);
final ReferenceCountingResourceHolder<LimitedTemporaryStorage>
temporaryStorageHolder =
ReferenceCountingResourceHolder.fromCloseable(temporaryStorage);
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java
index 88c3c96544..76125c2c50 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java
@@ -106,7 +106,7 @@ public class GroupByRowProcessor
final LimitedTemporaryStorage temporaryStorage = new
LimitedTemporaryStorage(
temporaryStorageDirectory,
- querySpecificConfig.getMaxOnDiskStorage()
+ querySpecificConfig.getMaxOnDiskStorage().getBytes()
);
closeOnExit.register(temporaryStorage);
diff --git
a/processing/src/main/java/org/apache/druid/query/select/SelectQuery.java
b/processing/src/main/java/org/apache/druid/query/select/SelectQuery.java
index e46e6e9b56..f2895f0f9b 100644
--- a/processing/src/main/java/org/apache/druid/query/select/SelectQuery.java
+++ b/processing/src/main/java/org/apache/druid/query/select/SelectQuery.java
@@ -21,6 +21,7 @@ package org.apache.druid.query.select;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.google.common.collect.Ordering;
+import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.Query;
@@ -134,6 +135,12 @@ public class SelectQuery implements Query<Object>
throw new RuntimeException(REMOVED_ERROR_MESSAGE);
}
+ @Override
+ public HumanReadableBytes getContextHumanReadableBytes(String key,
HumanReadableBytes defaultValue)
+ {
+ throw new RuntimeException(REMOVED_ERROR_MESSAGE);
+ }
+
@Override
public boolean isDescending()
{
diff --git
a/processing/src/test/java/org/apache/druid/query/QueryContextTest.java
b/processing/src/test/java/org/apache/druid/query/QueryContextTest.java
index 3654f85af1..3b1eee0db0 100644
--- a/processing/src/test/java/org/apache/druid/query/QueryContextTest.java
+++ b/processing/src/test/java/org/apache/druid/query/QueryContextTest.java
@@ -23,7 +23,10 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Ordering;
import nl.jqno.equalsverifier.EqualsVerifier;
import nl.jqno.equalsverifier.Warning;
+import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.Numbers;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
@@ -130,6 +133,17 @@ public class QueryContextTest
Assert.assertEquals(0L, context.getAsLong("non-exist", 0));
}
+ @Test
+ public void testGetHumanReadableBytes()
+ {
+ final QueryContext context = new QueryContext(
+ ImmutableMap.of(
+ "maxOnDiskStorage", "500M"
+ )
+ );
+ Assert.assertEquals(500_000_000,
context.getAsHumanReadableBytes("maxOnDiskStorage",
HumanReadableBytes.ZERO).getBytes());
+ }
+
@Test
public void testAddSystemParamOverrideUserParam()
{
@@ -346,6 +360,22 @@ public class QueryContextTest
return (boolean) context.get(key);
}
+ @Override
+ public HumanReadableBytes getContextHumanReadableBytes(String key,
HumanReadableBytes defaultValue)
+ {
+ if (null == context || !context.containsKey(key)) {
+ return defaultValue;
+ }
+ Object value = context.get(key);
+ if (value instanceof Number) {
+ return HumanReadableBytes.valueOf(Numbers.parseLong(value));
+ } else if (value instanceof String) {
+ return new HumanReadableBytes((String) value);
+ } else {
+ throw new IAE("Expected parameter [%s] to be in human readable
format", key);
+ }
+ }
+
@Override
public boolean isDescending()
{
diff --git
a/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java
b/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java
index 2ee9b9363e..3d34e2d0e4 100644
--- a/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java
+++ b/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.query;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
@@ -274,4 +275,12 @@ public class QueryContextsTest
// Expected
}
}
+
+ @Test
+ public void testGetAsHumanReadableBytes()
+ {
+ Assert.assertEquals(new HumanReadableBytes("500M").getBytes(),
QueryContexts.getAsHumanReadableBytes("maxOnDiskStorage", 500_000_000,
HumanReadableBytes.ZERO).getBytes());
+ Assert.assertEquals(new HumanReadableBytes("500M").getBytes(),
QueryContexts.getAsHumanReadableBytes("maxOnDiskStorage", "500000000",
HumanReadableBytes.ZERO).getBytes());
+ Assert.assertEquals(new HumanReadableBytes("500M").getBytes(),
QueryContexts.getAsHumanReadableBytes("maxOnDiskStorage", "500M",
HumanReadableBytes.ZERO).getBytes());
+ }
}
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java
index 8e5d7011ed..0dfdac7fb4 100644
---
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java
+++
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java
@@ -34,6 +34,7 @@ import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
@@ -291,9 +292,9 @@ public class GroupByLimitPushDownInsufficientBufferTest
extends InitializedNullH
}
@Override
- public long getMaxOnDiskStorage()
+ public HumanReadableBytes getMaxOnDiskStorage()
{
- return 1_000_000_000L;
+ return HumanReadableBytes.valueOf(1_000_000_000L);
}
};
config.setSingleThreaded(false);
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
index 4c362a53c8..73685d7aa5 100644
---
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
+++
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
@@ -34,6 +34,7 @@ import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
@@ -562,9 +563,9 @@ public class GroupByLimitPushDownMultiNodeMergeTest
}
@Override
- public long getMaxOnDiskStorage()
+ public HumanReadableBytes getMaxOnDiskStorage()
{
- return 1_000_000_000L;
+ return HumanReadableBytes.valueOf(1_000_000_000L);
}
};
config.setSingleThreaded(false);
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java
index 58c4abed17..b18b4d9252 100644
---
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java
+++
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java
@@ -32,6 +32,7 @@ import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
@@ -227,9 +228,9 @@ public class GroupByMultiSegmentTest
}
@Override
- public long getMaxOnDiskStorage()
+ public HumanReadableBytes getMaxOnDiskStorage()
{
- return 1_000_000_000L;
+ return HumanReadableBytes.valueOf(1_000_000_000L);
}
};
config.setSingleThreaded(false);
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryConfigTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryConfigTest.java
index 869bd75765..0c7a57ca03 100644
---
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryConfigTest.java
+++
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryConfigTest.java
@@ -38,7 +38,8 @@ public class GroupByQueryConfigTest
.put("bufferGrouperInitialBuckets", "1")
.put("maxIntermediateRows", "2")
.put("maxResults", "3")
- .put("maxOnDiskStorage", "4")
+ .put("defaultOnDiskStorage", "1M")
+ .put("maxOnDiskStorage", "4M")
.put("maxSelectorDictionarySize", "5")
.put("maxMergingDictionarySize", "6M")
.put("bufferGrouperMaxLoadFactor", "7")
@@ -54,7 +55,8 @@ public class GroupByQueryConfigTest
Assert.assertEquals(1, config.getBufferGrouperInitialBuckets());
Assert.assertEquals(2, config.getMaxIntermediateRows());
Assert.assertEquals(3, config.getMaxResults());
- Assert.assertEquals(4, config.getMaxOnDiskStorage());
+ Assert.assertEquals(4_000_000, config.getMaxOnDiskStorage().getBytes());
+ Assert.assertEquals(1_000_000,
config.getDefaultOnDiskStorage().getBytes());
Assert.assertEquals(5, config.getConfiguredMaxSelectorDictionarySize());
Assert.assertEquals(6_000_000,
config.getConfiguredMaxMergingDictionarySize());
Assert.assertEquals(7.0, config.getBufferGrouperMaxLoadFactor(), 0.0);
@@ -78,7 +80,7 @@ public class GroupByQueryConfigTest
Assert.assertEquals(1, config2.getBufferGrouperInitialBuckets());
Assert.assertEquals(2, config2.getMaxIntermediateRows());
Assert.assertEquals(3, config2.getMaxResults());
- Assert.assertEquals(4, config2.getMaxOnDiskStorage());
+ Assert.assertEquals(1_000_000, config2.getMaxOnDiskStorage().getBytes());
Assert.assertEquals(5, config2.getConfiguredMaxSelectorDictionarySize());
Assert.assertEquals(6_000_000,
config2.getConfiguredMaxMergingDictionarySize());
Assert.assertEquals(7.0, config2.getBufferGrouperMaxLoadFactor(), 0.0);
@@ -97,7 +99,7 @@ public class GroupByQueryConfigTest
.setContext(
ImmutableMap.<String, Object>builder()
.put("groupByStrategy", "v1")
- .put("maxOnDiskStorage", 0)
+ .put("maxOnDiskStorage", "3M")
.put("maxResults", 2)
.put("maxSelectorDictionarySize", 3)
.put("maxMergingDictionarySize", 4)
@@ -112,7 +114,7 @@ public class GroupByQueryConfigTest
Assert.assertEquals(1, config2.getBufferGrouperInitialBuckets());
Assert.assertEquals(2, config2.getMaxIntermediateRows());
Assert.assertEquals(2, config2.getMaxResults());
- Assert.assertEquals(0, config2.getMaxOnDiskStorage());
+ Assert.assertEquals(3_000_000, config2.getMaxOnDiskStorage().getBytes());
Assert.assertEquals(5 /* Can't override */,
config2.getConfiguredMaxSelectorDictionarySize());
Assert.assertEquals(6_000_000 /* Can't override */,
config2.getConfiguredMaxMergingDictionarySize());
Assert.assertEquals(7.0, config2.getBufferGrouperMaxLoadFactor(), 0.0);
@@ -166,4 +168,66 @@ public class GroupByQueryConfigTest
Assert.assertEquals(100, config.getConfiguredMaxSelectorDictionarySize());
Assert.assertEquals(100,
config.getActualMaxSelectorDictionarySize(1_000_000_000, 2));
}
+
+ /**
+ * Tests that the defaultOnDiskStorage value is used when applying override
context that is lacking maxOnDiskStorage.
+ */
+ @Test
+ public void testUseDefaultOnDiskStorage()
+ {
+ final GroupByQueryConfig config = MAPPER.convertValue(
+ ImmutableMap.of(
+ "maxOnDiskStorage", "10G",
+ "defaultOnDiskStorage", "5G"
+ ),
+ GroupByQueryConfig.class
+ );
+ final GroupByQueryConfig config2 = config.withOverrides(
+ GroupByQuery.builder()
+ .setDataSource("test")
+ .setInterval(Intervals.of("2000/P1D"))
+ .setGranularity(Granularities.ALL)
+ .setContext(ImmutableMap.<String, Object>builder().build())
+ .build()
+ );
+ Assert.assertEquals(5_000_000_000L,
config2.getMaxOnDiskStorage().getBytes());
+ }
+
+ @Test
+ public void testUseMaxOnDiskStorageWhenClientOverrideIsTooLarge()
+ {
+ final GroupByQueryConfig config = MAPPER.convertValue(
+ ImmutableMap.of("maxOnDiskStorage", "500M"),
+ GroupByQueryConfig.class
+ );
+ final GroupByQueryConfig config2 = config.withOverrides(
+ GroupByQuery.builder()
+ .setDataSource("test")
+ .setInterval(Intervals.of("2000/P1D"))
+ .setGranularity(Granularities.ALL)
+ .setContext(
+ ImmutableMap.<String, Object>builder()
+ .put("maxOnDiskStorage", "1G")
+ .build()
+ )
+ .build()
+ );
+ Assert.assertEquals(500_000_000, config2.getMaxOnDiskStorage().getBytes());
+ }
+
+ @Test
+ public void testGetDefaultOnDiskStorageReturnsCorrectValue()
+ {
+ final GroupByQueryConfig config = MAPPER.convertValue(
+ ImmutableMap.of("maxOnDiskStorage", "500M"),
+ GroupByQueryConfig.class
+ );
+ final GroupByQueryConfig config2 = MAPPER.convertValue(
+ ImmutableMap.of("maxOnDiskStorage", "500M",
+ "defaultOnDiskStorage", "100M"),
+ GroupByQueryConfig.class
+ );
+ Assert.assertEquals(500_000_000,
config.getDefaultOnDiskStorage().getBytes());
+ Assert.assertEquals(100_000_000,
config2.getDefaultOnDiskStorage().getBytes());
+ }
}
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
index 71274c9f78..db1c689e84 100644
---
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
+++
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
@@ -34,6 +34,7 @@ import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.Row;
import org.apache.druid.data.input.Rows;
import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
@@ -283,9 +284,9 @@ public class GroupByQueryRunnerTest extends
InitializedNullHandlingTest
}
@Override
- public long getMaxOnDiskStorage()
+ public HumanReadableBytes getMaxOnDiskStorage()
{
- return 10L * 1024 * 1024;
+ return HumanReadableBytes.valueOf(10L * 1024 * 1024);
}
@Override
@@ -315,9 +316,9 @@ public class GroupByQueryRunnerTest extends
InitializedNullHandlingTest
}
@Override
- public long getMaxOnDiskStorage()
+ public HumanReadableBytes getMaxOnDiskStorage()
{
- return 10L * 1024 * 1024;
+ return HumanReadableBytes.valueOf(10L * 1024 * 1024);
}
@Override
@@ -3018,7 +3019,7 @@ public class GroupByQueryRunnerTest extends
InitializedNullHandlingTest
List<ResultRow> expectedResults = null;
if
(config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) {
expectedException.expect(ResourceLimitExceededException.class);
- if (config.getMaxOnDiskStorage() > 0) {
+ if (config.getMaxOnDiskStorage().getBytes() > 0) {
// The error message always mentions disk if you have spilling enabled
(maxOnDiskStorage > 0)
expectedException.expectMessage("Not enough disk space to execute this
query");
} else {
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java
index 81554b2672..8592fca11e 100644
---
a/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java
+++
b/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java
@@ -33,6 +33,7 @@ import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
@@ -272,9 +273,9 @@ public class NestedQueryPushDownTest extends
InitializedNullHandlingTest
}
@Override
- public long getMaxOnDiskStorage()
+ public HumanReadableBytes getMaxOnDiskStorage()
{
- return 1_000_000_000L;
+ return HumanReadableBytes.valueOf(1_000_000_000L);
}
};
config.setSingleThreaded(false);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]