This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f9655b9c04 Allow configurable initial capacity for IndexedTable
(#14620)
f9655b9c04 is described below
commit f9655b9c0412797cf43890caa8d4296e0f1b48f3
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Mon Dec 9 03:42:11 2024 -0800
Allow configurable initial capacity for IndexedTable (#14620)
---
.../org/apache/pinot/common/utils/HashUtil.java | 10 +-
.../common/utils/config/QueryOptionsUtils.java | 6 +
.../core/data/table/ConcurrentIndexedTable.java | 10 +-
.../apache/pinot/core/data/table/IndexedTable.java | 36 +----
.../pinot/core/data/table/SimpleIndexedTable.java | 9 +-
.../table/UnboundedConcurrentIndexedTable.java | 8 +-
.../blocks/results/GroupByResultsBlock.java | 10 ++
.../operator/combine/GroupByCombineOperator.java | 42 +-----
.../streaming/StreamingGroupByCombineOperator.java | 41 +-----
.../core/plan/maker/InstancePlanMakerImplV2.java | 64 +++++++--
.../groupby/AggregationGroupByResult.java | 4 +
.../pinot/core/query/reduce/BaseReduceService.java | 4 +
.../core/query/reduce/BrokerReduceService.java | 7 +-
.../core/query/reduce/DataTableReducerContext.java | 8 +-
.../core/query/reduce/GroupByDataTableReducer.java | 53 ++-----
.../core/query/reduce/StreamingReduceService.java | 7 +-
.../core/query/request/context/QueryContext.java | 10 ++
.../org/apache/pinot/core/util/GroupByUtils.java | 159 +++++++++++++++++++++
.../accounting/ResourceManagerAccountingTest.java | 4 +-
.../pinot/core/data/table/IndexedTableTest.java | 79 +++-------
.../apache/pinot/core/util/GroupByUtilsTest.java | 34 +++++
.../org/apache/pinot/queries/ExprMinMaxTest.java | 5 +-
...terSegmentAggregationMultiValueQueriesTest.java | 8 +-
...SegmentAggregationMultiValueRawQueriesTest.java | 8 +-
...erSegmentAggregationSingleValueQueriesTest.java | 8 +-
.../InterSegmentGroupByMultiValueQueriesTest.java | 10 +-
...nterSegmentGroupByMultiValueRawQueriesTest.java | 10 +-
.../InterSegmentGroupBySingleValueQueriesTest.java | 10 +-
.../tests/OfflineGRPCServerIntegrationTest.java | 2 +-
.../apache/pinot/perf/BenchmarkCombineGroupBy.java | 6 +-
.../apache/pinot/perf/BenchmarkIndexedTable.java | 6 +-
.../apache/pinot/query/runtime/QueryRunner.java | 15 ++
.../apache/pinot/spi/utils/CommonConstants.java | 8 +-
33 files changed, 410 insertions(+), 291 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/HashUtil.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/HashUtil.java
index 9c8d227c9b..a8c5cc9985 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/HashUtil.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/HashUtil.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.common.utils;
+import com.google.common.primitives.Ints;
import java.nio.ByteBuffer;
import java.nio.IntBuffer;
@@ -44,9 +45,16 @@ public class HashUtil {
/**
* Returns a capacity that is sufficient to keep the map from being resized
as long as it grows no larger than
* expectedSize and the load factor is >= its default (0.75).
+ * NOTE: Borrowed from Guava's Maps library {@code int capacity(int
expectedSize)}.
*/
public static int getHashMapCapacity(int expectedSize) {
- return (int) ((float) expectedSize / 0.75f + 1f);
+ if (expectedSize < 3) {
+ return expectedSize + 1;
+ }
+ if (expectedSize < Ints.MAX_POWER_OF_TWO) {
+ return (int) Math.ceil(expectedSize / 0.75);
+ }
+ return Integer.MAX_VALUE;
}
public static long compute(IntBuffer buff) {
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 1ac9e6fab8..8dbd4bb402 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -275,6 +275,12 @@ public class QueryOptionsUtils {
return
Boolean.parseBoolean(queryOptions.get(QueryOptionKey.OPTIMIZE_MAX_INITIAL_RESULT_HOLDER_CAPACITY));
}
+ @Nullable
+ public static Integer getMinInitialIndexedTableCapacity(Map<String, String>
queryOptions) {
+ String minInitialIndexedTableCapacity =
queryOptions.get(QueryOptionKey.MIN_INITIAL_INDEXED_TABLE_CAPACITY);
+ return
checkedParseIntPositive(QueryOptionKey.MIN_INITIAL_INDEXED_TABLE_CAPACITY,
minInitialIndexedTableCapacity);
+ }
+
public static boolean shouldDropResults(Map<String, String> queryOptions) {
return
Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.QueryOptionKey.DROP_RESULTS));
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
index 119d47c79e..871eea7c26 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
@@ -32,14 +32,10 @@ public class ConcurrentIndexedTable extends IndexedTable {
private final AtomicBoolean _noMoreNewRecords = new AtomicBoolean();
private final ReentrantReadWriteLock _readWriteLock = new
ReentrantReadWriteLock();
- public ConcurrentIndexedTable(DataSchema dataSchema, QueryContext
queryContext, int resultSize, int trimSize,
- int trimThreshold) {
- this(dataSchema, false, queryContext, resultSize, trimSize, trimThreshold);
- }
-
public ConcurrentIndexedTable(DataSchema dataSchema, boolean hasFinalInput,
QueryContext queryContext, int resultSize,
- int trimSize, int trimThreshold) {
- super(dataSchema, hasFinalInput, queryContext, resultSize, trimSize,
trimThreshold, new ConcurrentHashMap<>());
+ int trimSize, int trimThreshold, int initialCapacity) {
+ super(dataSchema, hasFinalInput, queryContext, resultSize, trimSize,
trimThreshold,
+ new ConcurrentHashMap<>(initialCapacity));
}
/**
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
index dd961a0154..bce224eb3a 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
@@ -26,13 +26,10 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.request.context.ExpressionContext;
-import org.apache.pinot.common.request.context.OrderByExpressionContext;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.request.context.QueryContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
@@ -40,8 +37,6 @@ import org.slf4j.LoggerFactory;
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public abstract class IndexedTable extends BaseTable {
- private static final Logger LOGGER =
LoggerFactory.getLogger(IndexedTable.class);
-
protected final Map<Key, Record> _lookupMap;
protected final boolean _hasFinalInput;
protected final int _resultSize;
@@ -83,31 +78,12 @@ public abstract class IndexedTable extends BaseTable {
assert groupByExpressions != null;
_numKeyColumns = groupByExpressions.size();
_aggregationFunctions = queryContext.getAggregationFunctions();
- List<OrderByExpressionContext> orderByExpressions =
queryContext.getOrderByExpressions();
- if (orderByExpressions != null) {
- // GROUP BY with ORDER BY
- _hasOrderBy = true;
- _tableResizer = new TableResizer(dataSchema, hasFinalInput,
queryContext);
- _trimSize = trimSize;
- // trimThreshold is lower bounded by (2 * trimSize) in order to avoid
excessive trimming. We don't modify trimSize
- // in order to maintain the desired accuracy
- if (trimSize > trimThreshold / 2) {
- // Handle potential overflow
- _trimThreshold = (2 * trimSize) > 0 ? 2 * trimSize : Integer.MAX_VALUE;
- LOGGER.debug("Overriding group trim threshold to {}, since the
configured value {} is less than twice the "
- + "trim size ({})", _trimThreshold, trimThreshold, trimSize);
- } else {
- _trimThreshold = trimThreshold;
- }
- } else {
- // GROUP BY without ORDER BY
- // NOTE: The indexed table stops accepting records once the map size
reaches resultSize, and there is no
- // resize/trim during upsert.
- _hasOrderBy = false;
- _tableResizer = null;
- _trimSize = Integer.MAX_VALUE;
- _trimThreshold = Integer.MAX_VALUE;
- }
+ _hasOrderBy = queryContext.getOrderByExpressions() != null;
+ _tableResizer = _hasOrderBy ? new TableResizer(dataSchema, hasFinalInput,
queryContext) : null;
+ // NOTE: Trim should be disabled when there is no ORDER BY
+ assert _hasOrderBy || (trimSize == Integer.MAX_VALUE && trimThreshold ==
Integer.MAX_VALUE);
+ _trimSize = trimSize;
+ _trimThreshold = trimThreshold;
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java
index 2163620225..df89c3a8e1 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java
@@ -30,14 +30,9 @@ import
org.apache.pinot.core.query.request.context.QueryContext;
@NotThreadSafe
public class SimpleIndexedTable extends IndexedTable {
- public SimpleIndexedTable(DataSchema dataSchema, QueryContext queryContext,
int resultSize, int trimSize,
- int trimThreshold) {
- this(dataSchema, false, queryContext, resultSize, trimSize, trimThreshold);
- }
-
public SimpleIndexedTable(DataSchema dataSchema, boolean hasFinalInput,
QueryContext queryContext, int resultSize,
- int trimSize, int trimThreshold) {
- super(dataSchema, hasFinalInput, queryContext, resultSize, trimSize,
trimThreshold, new HashMap<>());
+ int trimSize, int trimThreshold, int initialCapacity) {
+ super(dataSchema, hasFinalInput, queryContext, resultSize, trimSize,
trimThreshold, new HashMap<>(initialCapacity));
}
/**
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/UnboundedConcurrentIndexedTable.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/UnboundedConcurrentIndexedTable.java
index 67f82b2011..f397ac0e8c 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/UnboundedConcurrentIndexedTable.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/UnboundedConcurrentIndexedTable.java
@@ -35,13 +35,9 @@ import
org.apache.pinot.core.query.request.context.QueryContext;
*/
public class UnboundedConcurrentIndexedTable extends ConcurrentIndexedTable {
- public UnboundedConcurrentIndexedTable(DataSchema dataSchema, QueryContext
queryContext, int resultSize) {
- this(dataSchema, false, queryContext, resultSize);
- }
-
public UnboundedConcurrentIndexedTable(DataSchema dataSchema, boolean
hasFinalInput, QueryContext queryContext,
- int resultSize) {
- super(dataSchema, hasFinalInput, queryContext, resultSize,
Integer.MAX_VALUE, Integer.MAX_VALUE);
+ int resultSize, int initialCapacity) {
+ super(dataSchema, hasFinalInput, queryContext, resultSize,
Integer.MAX_VALUE, Integer.MAX_VALUE, initialCapacity);
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
index dfc5faa289..b1bf65b7eb 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
@@ -120,6 +120,16 @@ public class GroupByResultsBlock extends BaseResultsBlock {
return _table;
}
+ public int getNumGroups() {
+ assert _aggregationGroupByResult != null || _intermediateRecords != null
+ : "Should not call getNumGroups() on instance level results";
+ if (_aggregationGroupByResult != null) {
+ return _aggregationGroupByResult.getNumGroups();
+ } else {
+ return _intermediateRecords.size();
+ }
+ }
+
public boolean isNumGroupsLimitReached() {
return _numGroupsLimitReached;
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
index ecb0a56cbf..aeef763ad5 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
@@ -26,15 +26,11 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
import org.apache.pinot.core.data.table.IndexedTable;
import org.apache.pinot.core.data.table.IntermediateRecord;
import org.apache.pinot.core.data.table.Key;
import org.apache.pinot.core.data.table.Record;
-import org.apache.pinot.core.data.table.SimpleIndexedTable;
-import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator;
import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
import org.apache.pinot.core.operator.blocks.results.ExceptionResultsBlock;
@@ -61,8 +57,6 @@ public class GroupByCombineOperator extends
BaseSingleBlockCombineOperator<Group
private static final Logger LOGGER =
LoggerFactory.getLogger(GroupByCombineOperator.class);
private static final String EXPLAIN_NAME = "COMBINE_GROUP_BY";
- private final int _trimSize;
- private final int _trimThreshold;
private final int _numAggregationFunctions;
private final int _numGroupByExpressions;
private final int _numColumns;
@@ -76,25 +70,6 @@ public class GroupByCombineOperator extends
BaseSingleBlockCombineOperator<Group
public GroupByCombineOperator(List<Operator> operators, QueryContext
queryContext, ExecutorService executorService) {
super(null, operators, overrideMaxExecutionThreads(queryContext,
operators.size()), executorService);
- int minTrimSize = queryContext.getMinServerGroupTrimSize();
- if (minTrimSize > 0) {
- int limit = queryContext.getLimit();
- if ((!queryContext.isServerReturnFinalResult() &&
queryContext.getOrderByExpressions() != null)
- || queryContext.getHavingFilter() != null) {
- _trimSize = GroupByUtils.getTableCapacity(limit, minTrimSize);
- } else {
- // TODO: Keeping only 'LIMIT' groups can cause inaccurate result
because the groups are randomly selected
- // without ordering. Consider ordering on group-by columns if no
ordering is specified.
- _trimSize = limit;
- }
- int trimThreshold = queryContext.getGroupTrimThreshold();
- _trimThreshold = trimThreshold > 0 ? trimThreshold : Integer.MAX_VALUE;
- } else {
- // Server trim is disabled
- _trimSize = Integer.MAX_VALUE;
- _trimThreshold = Integer.MAX_VALUE;
- }
-
AggregationFunction[] aggregationFunctions =
_queryContext.getAggregationFunctions();
assert aggregationFunctions != null;
_numAggregationFunctions = aggregationFunctions.length;
@@ -136,22 +111,7 @@ public class GroupByCombineOperator extends
BaseSingleBlockCombineOperator<Group
if (_indexedTable == null) {
synchronized (this) {
if (_indexedTable == null) {
- DataSchema dataSchema = resultsBlock.getDataSchema();
- // NOTE: Use trimSize as resultSize on server side.
- if (_numTasks == 1) {
- _indexedTable = new SimpleIndexedTable(dataSchema,
_queryContext, _trimSize, _trimSize, _trimThreshold);
- } else {
- if (_trimThreshold >= MAX_TRIM_THRESHOLD) {
- // special case of trim threshold where it is set to max
value.
- // there won't be any trimming during upsert in this case.
- // thus we can avoid the overhead of read-lock and write-lock
- // in the upsert method.
- _indexedTable = new
UnboundedConcurrentIndexedTable(dataSchema, _queryContext, _trimSize);
- } else {
- _indexedTable =
- new ConcurrentIndexedTable(dataSchema, _queryContext,
_trimSize, _trimSize, _trimThreshold);
- }
- }
+ _indexedTable =
GroupByUtils.createIndexedTableForCombineOperator(resultsBlock, _queryContext,
_numTasks);
}
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingGroupByCombineOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingGroupByCombineOperator.java
index 1e8c88e9ce..13b06ae6f4 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingGroupByCombineOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingGroupByCombineOperator.java
@@ -27,15 +27,11 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.pinot.common.exception.QueryException;
-import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
import org.apache.pinot.core.data.table.IndexedTable;
import org.apache.pinot.core.data.table.IntermediateRecord;
import org.apache.pinot.core.data.table.Key;
import org.apache.pinot.core.data.table.Record;
-import org.apache.pinot.core.data.table.SimpleIndexedTable;
-import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator;
import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
import org.apache.pinot.core.operator.blocks.results.ExceptionResultsBlock;
@@ -66,8 +62,6 @@ public class StreamingGroupByCombineOperator extends
BaseStreamingCombineOperato
private static final Logger LOGGER =
LoggerFactory.getLogger(StreamingGroupByCombineOperator.class);
private static final String EXPLAIN_NAME = "STREAMING_COMBINE_GROUP_BY";
- private final int _trimSize;
- private final int _trimThreshold;
private final int _numAggregationFunctions;
private final int _numGroupByExpressions;
private final int _numColumns;
@@ -83,24 +77,6 @@ public class StreamingGroupByCombineOperator extends
BaseStreamingCombineOperato
ExecutorService executorService) {
super(null, operators, overrideMaxExecutionThreads(queryContext,
operators.size()), executorService);
- int minTrimSize = queryContext.getMinServerGroupTrimSize();
- if (minTrimSize > 0) {
- int limit = queryContext.getLimit();
- if ((!queryContext.isServerReturnFinalResult() &&
queryContext.getOrderByExpressions() != null)
- || queryContext.getHavingFilter() != null) {
- _trimSize = GroupByUtils.getTableCapacity(limit, minTrimSize);
- } else {
- // TODO: Keeping only 'LIMIT' groups can cause inaccurate result
because the groups are randomly selected
- // without ordering. Consider ordering on group-by columns if no
ordering is specified.
- _trimSize = limit;
- }
- _trimThreshold = queryContext.getGroupTrimThreshold();
- } else {
- // Server trim is disabled
- _trimSize = Integer.MAX_VALUE;
- _trimThreshold = Integer.MAX_VALUE;
- }
-
AggregationFunction[] aggregationFunctions =
_queryContext.getAggregationFunctions();
assert aggregationFunctions != null;
_numAggregationFunctions = aggregationFunctions.length;
@@ -163,22 +139,7 @@ public class StreamingGroupByCombineOperator extends
BaseStreamingCombineOperato
if (_indexedTable == null) {
synchronized (this) {
if (_indexedTable == null) {
- DataSchema dataSchema = resultsBlock.getDataSchema();
- // NOTE: Use trimSize as resultSize on server side.
- if (_numTasks == 1) {
- _indexedTable = new SimpleIndexedTable(dataSchema,
_queryContext, _trimSize, _trimSize, _trimThreshold);
- } else {
- if (_trimThreshold >= MAX_TRIM_THRESHOLD) {
- // special case of trim threshold where it is set to max
value.
- // there won't be any trimming during upsert in this case.
- // thus we can avoid the overhead of read-lock and write-lock
- // in the upsert method.
- _indexedTable = new
UnboundedConcurrentIndexedTable(dataSchema, _queryContext, _trimSize);
- } else {
- _indexedTable =
- new ConcurrentIndexedTable(dataSchema, _queryContext,
_trimSize, _trimSize, _trimThreshold);
- }
- }
+ _indexedTable =
GroupByUtils.createIndexedTableForCombineOperator(resultsBlock, _queryContext,
_numTasks);
}
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
index e76a649886..cadce4bcf6 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
@@ -71,6 +71,8 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY_KEY =
"max.init.group.holder.capacity";
public static final int DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY = 10_000;
+ public static final String MIN_INITIAL_INDEXED_TABLE_CAPACITY_KEY =
"min.init.indexed.table.capacity";
+ public static final int DEFAULT_MIN_INITIAL_INDEXED_TABLE_CAPACITY = 128;
public static final String NUM_GROUPS_LIMIT_KEY = "num.groups.limit";
public static final int DEFAULT_NUM_GROUPS_LIMIT = 100_000;
@@ -93,6 +95,7 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
private final FetchPlanner _fetchPlanner = FetchPlannerRegistry.getPlanner();
private int _maxExecutionThreads = DEFAULT_MAX_EXECUTION_THREADS;
private int _maxInitialResultHolderCapacity =
DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY;
+ private int _minInitialIndexedTableCapacity =
DEFAULT_MIN_INITIAL_INDEXED_TABLE_CAPACITY;
// Limit on number of groups stored for each segment, beyond which no new
group will be created
private int _numGroupsLimit = DEFAULT_NUM_GROUPS_LIMIT;
// Used for SQL GROUP BY (server combine)
@@ -103,25 +106,20 @@ public class InstancePlanMakerImplV2 implements PlanMaker
{
public InstancePlanMakerImplV2() {
}
- @VisibleForTesting
- public InstancePlanMakerImplV2(int maxInitialResultHolderCapacity, int
numGroupsLimit, int minSegmentGroupTrimSize,
- int minServerGroupTrimSize, int groupByTrimThreshold) {
- _maxInitialResultHolderCapacity = maxInitialResultHolderCapacity;
- _numGroupsLimit = numGroupsLimit;
- _minSegmentGroupTrimSize = minSegmentGroupTrimSize;
- _minServerGroupTrimSize = minServerGroupTrimSize;
- _groupByTrimThreshold = groupByTrimThreshold;
- }
-
@Override
public void init(PinotConfiguration queryExecutorConfig) {
_maxExecutionThreads =
queryExecutorConfig.getProperty(MAX_EXECUTION_THREADS_KEY,
DEFAULT_MAX_EXECUTION_THREADS);
_maxInitialResultHolderCapacity =
queryExecutorConfig.getProperty(MAX_INITIAL_RESULT_HOLDER_CAPACITY_KEY,
DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
+ _minInitialIndexedTableCapacity =
queryExecutorConfig.getProperty(MIN_INITIAL_INDEXED_TABLE_CAPACITY_KEY,
+ DEFAULT_MIN_INITIAL_INDEXED_TABLE_CAPACITY);
_numGroupsLimit = queryExecutorConfig.getProperty(NUM_GROUPS_LIMIT_KEY,
DEFAULT_NUM_GROUPS_LIMIT);
Preconditions.checkState(_maxInitialResultHolderCapacity <=
_numGroupsLimit,
"Invalid configuration: maxInitialResultHolderCapacity: %d must be
smaller or equal to numGroupsLimit: %d",
_maxInitialResultHolderCapacity, _numGroupsLimit);
+ Preconditions.checkState(_minInitialIndexedTableCapacity <=
_numGroupsLimit,
+ "Invalid configuration: minInitialIndexedTableCapacity: %d must be
smaller or equal to numGroupsLimit: %d",
+ _minInitialIndexedTableCapacity, _numGroupsLimit);
_minSegmentGroupTrimSize =
queryExecutorConfig.getProperty(MIN_SEGMENT_GROUP_TRIM_SIZE_KEY,
DEFAULT_MIN_SEGMENT_GROUP_TRIM_SIZE);
_minServerGroupTrimSize =
@@ -135,6 +133,36 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
_minServerGroupTrimSize, _groupByTrimThreshold);
}
+ @VisibleForTesting
+ public void setMaxInitialResultHolderCapacity(int
maxInitialResultHolderCapacity) {
+ _maxInitialResultHolderCapacity = maxInitialResultHolderCapacity;
+ }
+
+ @VisibleForTesting
+ public void setMinInitialIndexedTableCapacity(int
minInitialIndexedTableCapacity) {
+ _minInitialIndexedTableCapacity = minInitialIndexedTableCapacity;
+ }
+
+ @VisibleForTesting
+ public void setNumGroupsLimit(int numGroupsLimit) {
+ _numGroupsLimit = numGroupsLimit;
+ }
+
+ @VisibleForTesting
+ public void setMinSegmentGroupTrimSize(int minSegmentGroupTrimSize) {
+ _minSegmentGroupTrimSize = minSegmentGroupTrimSize;
+ }
+
+ @VisibleForTesting
+ public void setMinServerGroupTrimSize(int minServerGroupTrimSize) {
+ _minServerGroupTrimSize = minServerGroupTrimSize;
+ }
+
+ @VisibleForTesting
+ public void setGroupByTrimThreshold(int groupByTrimThreshold) {
+ _groupByTrimThreshold = groupByTrimThreshold;
+ }
+
public Plan makeInstancePlan(List<SegmentContext> segmentContexts,
QueryContext queryContext,
ExecutorService executorService, ServerMetrics serverMetrics) {
applyQueryOptions(queryContext);
@@ -196,12 +224,19 @@ public class InstancePlanMakerImplV2 implements PlanMaker
{
// Set group-by query options
if (QueryContextUtils.isAggregationQuery(queryContext) &&
queryContext.getGroupByExpressions() != null) {
// Set maxInitialResultHolderCapacity
- Integer initResultCap =
QueryOptionsUtils.getMaxInitialResultHolderCapacity(queryOptions);
- if (initResultCap != null) {
- queryContext.setMaxInitialResultHolderCapacity(initResultCap);
+ Integer maxInitialResultHolderCapacity =
QueryOptionsUtils.getMaxInitialResultHolderCapacity(queryOptions);
+ if (maxInitialResultHolderCapacity != null) {
+
queryContext.setMaxInitialResultHolderCapacity(maxInitialResultHolderCapacity);
} else {
queryContext.setMaxInitialResultHolderCapacity(_maxInitialResultHolderCapacity);
}
+ // Set initialResultTableCapacity
+ Integer minInitialIndexedTableCapacity =
QueryOptionsUtils.getMinInitialIndexedTableCapacity(queryOptions);
+ if (minInitialIndexedTableCapacity != null) {
+
queryContext.setMinInitialIndexedTableCapacity(minInitialIndexedTableCapacity);
+ } else {
+
queryContext.setMinInitialIndexedTableCapacity(_minInitialIndexedTableCapacity);
+ }
// Set numGroupsLimit
Integer numGroupsLimit =
QueryOptionsUtils.getNumGroupsLimit(queryOptions);
if (numGroupsLimit != null) {
@@ -361,7 +396,8 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
.contains(overrideExpression.getIdentifier())) {
return overrideExpression;
}
- expression.getFunction().getArguments()
+ expression.getFunction()
+ .getArguments()
.replaceAll(argument -> overrideWithExpressionHints(argument,
indexSegment, expressionOverrideHints));
return expression;
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/AggregationGroupByResult.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/AggregationGroupByResult.java
index e2933527af..49c2361c3c 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/AggregationGroupByResult.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/AggregationGroupByResult.java
@@ -40,6 +40,10 @@ public class AggregationGroupByResult {
_resultHolders = resultHolders;
}
+ public int getNumGroups() {
+ return _groupKeyGenerator.getNumKeys();
+ }
+
/**
* Returns an iterator of {@link GroupKeyGenerator.GroupKey}.
*/
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java
index 9b44e0c405..05e9dae536 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java
@@ -52,6 +52,7 @@ public abstract class BaseReduceService {
protected final int _maxReduceThreadsPerQuery;
protected final int _groupByTrimThreshold;
protected final int _minGroupTrimSize;
+ protected final int _minInitialIndexedTableCapacity;
public BaseReduceService(PinotConfiguration config) {
_maxReduceThreadsPerQuery =
config.getProperty(CommonConstants.Broker.CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY,
@@ -60,6 +61,9 @@ public abstract class BaseReduceService {
CommonConstants.Broker.DEFAULT_BROKER_GROUPBY_TRIM_THRESHOLD);
_minGroupTrimSize =
config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_MIN_GROUP_TRIM_SIZE,
CommonConstants.Broker.DEFAULT_BROKER_MIN_GROUP_TRIM_SIZE);
+ _minInitialIndexedTableCapacity =
+
config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_MIN_INITIAL_INDEXED_TABLE_CAPACITY,
+
CommonConstants.Broker.DEFAULT_BROKER_MIN_INITIAL_INDEXED_TABLE_CAPACITY);
int numThreadsInExecutorService =
Runtime.getRuntime().availableProcessors();
LOGGER.info("Initializing BrokerReduceService with {} threads, and {} max
reduce threads.",
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
index b9b0f7bb51..d10e0811ed 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
@@ -142,18 +142,23 @@ public class BrokerReduceService extends
BaseReduceService {
Integer minGroupTrimSizeQueryOption = null;
Integer groupTrimThresholdQueryOption = null;
+ Integer minInitialIndexedTableCapacityQueryOption = null;
if (queryOptions != null) {
minGroupTrimSizeQueryOption =
QueryOptionsUtils.getMinBrokerGroupTrimSize(queryOptions);
groupTrimThresholdQueryOption =
QueryOptionsUtils.getGroupTrimThreshold(queryOptions);
+ minInitialIndexedTableCapacityQueryOption =
QueryOptionsUtils.getMinInitialIndexedTableCapacity(queryOptions);
}
int minGroupTrimSize = minGroupTrimSizeQueryOption != null ?
minGroupTrimSizeQueryOption : _minGroupTrimSize;
int groupTrimThreshold =
groupTrimThresholdQueryOption != null ? groupTrimThresholdQueryOption
: _groupByTrimThreshold;
+ int minInitialIndexedTableCapacity =
+ minInitialIndexedTableCapacityQueryOption != null ?
minInitialIndexedTableCapacityQueryOption
+ : _minInitialIndexedTableCapacity;
try {
dataTableReducer.reduceAndSetResults(rawTableName, cachedDataSchema,
dataTableMap, brokerResponseNative,
new DataTableReducerContext(_reduceExecutorService,
_maxReduceThreadsPerQuery, reduceTimeOutMs,
- groupTrimThreshold, minGroupTrimSize), brokerMetrics);
+ groupTrimThreshold, minGroupTrimSize,
minInitialIndexedTableCapacity), brokerMetrics);
} catch (EarlyTerminationException e) {
brokerResponseNative.addException(
new
QueryProcessingException(QueryException.QUERY_CANCELLATION_ERROR_CODE,
e.toString()));
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducerContext.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducerContext.java
index d4b69e6c21..8c645a622b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducerContext.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducerContext.java
@@ -32,6 +32,7 @@ public class DataTableReducerContext {
// used for SQL GROUP BY
private final int _groupByTrimThreshold;
private final int _minGroupTrimSize;
+ private final int _minInitialIndexedTableCapacity;
/**
* Constructor for the class.
@@ -42,12 +43,13 @@ public class DataTableReducerContext {
* @param groupByTrimThreshold trim threshold for SQL group by
*/
public DataTableReducerContext(ExecutorService executorService, int
maxReduceThreadsPerQuery, long reduceTimeOutMs,
- int groupByTrimThreshold, int minGroupTrimSize) {
+ int groupByTrimThreshold, int minGroupTrimSize, int
minInitialIndexedTableCapacity) {
_executorService = executorService;
_maxReduceThreadsPerQuery = maxReduceThreadsPerQuery;
_reduceTimeOutMs = reduceTimeOutMs;
_groupByTrimThreshold = groupByTrimThreshold;
_minGroupTrimSize = minGroupTrimSize;
+ _minInitialIndexedTableCapacity = minInitialIndexedTableCapacity;
}
public ExecutorService getExecutorService() {
@@ -69,4 +71,8 @@ public class DataTableReducerContext {
public int getMinGroupTrimSize() {
return _minGroupTrimSize;
}
+
+ public int getMinInitialIndexedTableCapacity() {
+ return _minInitialIndexedTableCapacity;
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
index 34395febfb..d8ff92f908 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
@@ -51,12 +51,8 @@ import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
import org.apache.pinot.core.data.table.IndexedTable;
import org.apache.pinot.core.data.table.Record;
-import org.apache.pinot.core.data.table.SimpleIndexedTable;
-import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
-import org.apache.pinot.core.operator.combine.GroupByCombineOperator;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import
org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
import org.apache.pinot.core.query.request.context.QueryContext;
@@ -232,53 +228,22 @@ public class GroupByDataTableReducer implements
DataTableReducer {
DataTableReducerContext reducerContext)
throws TimeoutException {
long start = System.currentTimeMillis();
- int numDataTables = dataTablesToReduce.size();
+
+ assert !dataTablesToReduce.isEmpty();
+ ArrayList<DataTable> dataTables = new ArrayList<>(dataTablesToReduce);
+ int numDataTables = dataTables.size();
// Get the number of threads to use for reducing.
- // In case of single reduce thread, fall back to SimpleIndexedTable to
avoid redundant locking/unlocking calls.
int numReduceThreadsToUse = getNumReduceThreadsToUse(numDataTables,
reducerContext.getMaxReduceThreadsPerQuery());
- boolean hasFinalInput =
- _queryContext.isServerReturnFinalResult() ||
_queryContext.isServerReturnFinalResultKeyUnpartitioned();
- int limit = _queryContext.getLimit();
- int minTrimSize = reducerContext.getMinGroupTrimSize();
- int trimSize;
- int trimThreshold;
- if (minTrimSize > 0) {
- trimSize = GroupByUtils.getTableCapacity(limit, minTrimSize);
- trimThreshold = reducerContext.getGroupByTrimThreshold();
- if (trimThreshold <= 0) {
- trimThreshold = Integer.MAX_VALUE;
- }
- } else {
- // Broker trim is disabled
- trimSize = Integer.MAX_VALUE;
- trimThreshold = Integer.MAX_VALUE;
- }
- // NOTE: For query with HAVING clause, use trimSize as resultSize to
ensure the result accuracy.
- // TODO: Resolve the HAVING clause within the IndexedTable before
returning the result
- int resultSize = _queryContext.getHavingFilter() != null ? trimSize :
limit;
- IndexedTable indexedTable;
- if (numReduceThreadsToUse == 1) {
- indexedTable =
- new SimpleIndexedTable(dataSchema, hasFinalInput, _queryContext,
resultSize, trimSize, trimThreshold);
- } else {
- if (trimThreshold >= GroupByCombineOperator.MAX_TRIM_THRESHOLD) {
- // special case of trim threshold where it is set to max value.
- // there won't be any trimming during upsert in this case.
- // thus we can avoid the overhead of read-lock and write-lock
- // in the upsert method.
- indexedTable = new UnboundedConcurrentIndexedTable(dataSchema,
hasFinalInput, _queryContext, resultSize);
- } else {
- indexedTable =
- new ConcurrentIndexedTable(dataSchema, hasFinalInput,
_queryContext, resultSize, trimSize, trimThreshold);
- }
- }
+
+ // Create an indexed table to perform the reduce.
+ IndexedTable indexedTable =
+ GroupByUtils.createIndexedTableForDataTableReducer(dataTables.get(0),
_queryContext, reducerContext,
+ numReduceThreadsToUse);
// Create groups of data tables that each thread can process concurrently.
// Given that numReduceThreads is <= numDataTables, each group will have
at least one data table.
- ArrayList<DataTable> dataTables = new ArrayList<>(dataTablesToReduce);
List<List<DataTable>> reduceGroups = new
ArrayList<>(numReduceThreadsToUse);
-
for (int i = 0; i < numReduceThreadsToUse; i++) {
reduceGroups.add(new ArrayList<>());
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
index 653498aade..8b61a97d55 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
@@ -80,18 +80,23 @@ public class StreamingReduceService extends
BaseReduceService {
Integer minGroupTrimSizeQueryOption = null;
Integer groupTrimThresholdQueryOption = null;
+ Integer minInitialIndexedTableCapacityQueryOption = null;
if (queryOptions != null) {
minGroupTrimSizeQueryOption =
QueryOptionsUtils.getMinBrokerGroupTrimSize(queryOptions);
groupTrimThresholdQueryOption =
QueryOptionsUtils.getGroupTrimThreshold(queryOptions);
+ minInitialIndexedTableCapacityQueryOption =
QueryOptionsUtils.getMinInitialIndexedTableCapacity(queryOptions);
}
int minGroupTrimSize = minGroupTrimSizeQueryOption != null ?
minGroupTrimSizeQueryOption : _minGroupTrimSize;
int groupTrimThreshold =
groupTrimThresholdQueryOption != null ? groupTrimThresholdQueryOption
: _groupByTrimThreshold;
+ int minInitialIndexedTableCapacity =
+ minInitialIndexedTableCapacityQueryOption != null ?
minInitialIndexedTableCapacityQueryOption
+ : _minInitialIndexedTableCapacity;
// Process server response.
DataTableReducerContext dataTableReducerContext =
new DataTableReducerContext(_reduceExecutorService,
_maxReduceThreadsPerQuery, reduceTimeOutMs,
- groupTrimThreshold, minGroupTrimSize);
+ groupTrimThreshold, minGroupTrimSize,
minInitialIndexedTableCapacity);
StreamingReducer streamingReducer =
ResultReducerFactory.getStreamingReducer(queryContext);
streamingReducer.init(dataTableReducerContext);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
index 0aa233b43e..e1e3c37a8d 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
@@ -114,6 +114,8 @@ public class QueryContext {
// The following properties apply to group-by queries
// Maximum initial capacity of the group-by result holder
private int _maxInitialResultHolderCapacity =
InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY;
+ // Initial capacity of the indexed table
+ private int _minInitialIndexedTableCapacity =
InstancePlanMakerImplV2.DEFAULT_MIN_INITIAL_INDEXED_TABLE_CAPACITY;
// Limit of number of groups stored in each segment
private int _numGroupsLimit =
InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT;
// Minimum number of groups to keep per segment when trimming groups for SQL
GROUP BY
@@ -368,6 +370,14 @@ public class QueryContext {
_maxInitialResultHolderCapacity = maxInitialResultHolderCapacity;
}
+ public int getMinInitialIndexedTableCapacity() {
+ return _minInitialIndexedTableCapacity;
+ }
+
+ public void setMinInitialIndexedTableCapacity(int
minInitialIndexedTableCapacity) {
+ _minInitialIndexedTableCapacity = minInitialIndexedTableCapacity;
+ }
+
public int getNumGroupsLimit() {
return _numGroupsLimit;
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java
index e8551dab2c..313786cecf 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java
@@ -18,11 +18,25 @@
*/
package org.apache.pinot.core.util;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
+import org.apache.pinot.core.data.table.IndexedTable;
+import org.apache.pinot.core.data.table.SimpleIndexedTable;
+import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
+import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
+import org.apache.pinot.core.query.reduce.DataTableReducerContext;
+import org.apache.pinot.core.query.request.context.QueryContext;
+
+
public final class GroupByUtils {
private GroupByUtils() {
}
public static final int DEFAULT_MIN_NUM_GROUPS = 5000;
+ public static final int MAX_TRIM_THRESHOLD = 1_000_000_000;
/**
* Returns the capacity of the table required by the given query.
@@ -41,4 +55,149 @@ public final class GroupByUtils {
long capacityByLimit = limit * 5L;
return capacityByLimit > Integer.MAX_VALUE ? Integer.MAX_VALUE :
Math.max((int) capacityByLimit, minNumGroups);
}
+
+ /**
+ * Returns the actual trim threshold used for the indexed table. Trim
threshold should be at least (2 * trimSize) to
+ * avoid excessive trimming. When trim threshold is non-positive or higher
than 10^9, trim is considered disabled,
+ * where {@code Integer.MAX_VALUE} is returned.
+ */
+ @VisibleForTesting
+ static int getIndexedTableTrimThreshold(int trimSize, int trimThreshold) {
+ if (trimThreshold <= 0 || trimThreshold > MAX_TRIM_THRESHOLD || trimSize >
MAX_TRIM_THRESHOLD / 2) {
+ return Integer.MAX_VALUE;
+ }
+ return Math.max(trimThreshold, 2 * trimSize);
+ }
+
+ /**
+ * Returns the initial capacity of the indexed table required by the given
query.
+ */
+ @VisibleForTesting
+ static int getIndexedTableInitialCapacity(int maxRowsToKeep, int
minNumGroups, int minCapacity) {
+ // The upper bound of the initial capacity is the capacity required to
hold all the required rows. The indexed table
+ // should never grow over this capacity.
+ int upperBound = HashUtil.getHashMapCapacity(maxRowsToKeep);
+ if (minCapacity > upperBound) {
+ return upperBound;
+ }
+ // The lower bound of the initial capacity is the capacity required by the
min number of groups to be added to the
+ // table.
+ int lowerBound = HashUtil.getHashMapCapacity(minNumGroups);
+ if (lowerBound > upperBound) {
+ return upperBound;
+ }
+ return Math.max(minCapacity, lowerBound);
+ }
+
+ /**
+ * Creates an indexed table for the combine operator given a sample results
block.
+ */
+ public static IndexedTable
createIndexedTableForCombineOperator(GroupByResultsBlock resultsBlock,
+ QueryContext queryContext, int numThreads) {
+ DataSchema dataSchema = resultsBlock.getDataSchema();
+ int numGroups = resultsBlock.getNumGroups();
+ int limit = queryContext.getLimit();
+ boolean hasOrderBy = queryContext.getOrderByExpressions() != null;
+ boolean hasHaving = queryContext.getHavingFilter() != null;
+ int minTrimSize = queryContext.getMinServerGroupTrimSize();
+ int minInitialIndexedTableCapacity =
queryContext.getMinInitialIndexedTableCapacity();
+
+ // Disable trim when min trim size is non-positive
+ int trimSize = minTrimSize > 0 ? getTableCapacity(limit, minTrimSize) :
Integer.MAX_VALUE;
+
+ // When there is no ORDER BY, trim is not required because the indexed
table stops accepting new groups once the
+ // result size is reached
+ if (!hasOrderBy) {
+ int resultSize;
+ if (hasHaving) {
+ // Keep more groups when there is HAVING clause
+ resultSize = trimSize;
+ } else {
+ // TODO: Keeping only 'LIMIT' groups can cause inaccurate result
because the groups are randomly selected
+ // without ordering. Consider ordering on group-by columns if no
ordering is specified.
+ resultSize = limit;
+ }
+ int initialCapacity = getIndexedTableInitialCapacity(resultSize,
numGroups, minInitialIndexedTableCapacity);
+ return getTrimDisabledIndexedTable(dataSchema, false, queryContext,
resultSize, initialCapacity, numThreads);
+ }
+
+ int resultSize;
+ if (queryContext.isServerReturnFinalResult() && !hasHaving) {
+ // When server is asked to return final result and there is no HAVING
clause, return only LIMIT groups
+ resultSize = limit;
+ } else {
+ resultSize = trimSize;
+ }
+ int trimThreshold = getIndexedTableTrimThreshold(trimSize,
queryContext.getGroupTrimThreshold());
+ int initialCapacity = getIndexedTableInitialCapacity(trimThreshold,
numGroups, minInitialIndexedTableCapacity);
+ if (trimThreshold == Integer.MAX_VALUE) {
+ return getTrimDisabledIndexedTable(dataSchema, false, queryContext,
resultSize, initialCapacity, numThreads);
+ } else {
+ return getTrimEnabledIndexedTable(dataSchema, false, queryContext,
resultSize, trimSize, trimThreshold,
+ initialCapacity, numThreads);
+ }
+ }
+
+ /**
+ * Creates an indexed table for the data table reducer given a sample data
table.
+ */
+ public static IndexedTable createIndexedTableForDataTableReducer(DataTable
dataTable, QueryContext queryContext,
+ DataTableReducerContext reducerContext, int numThreads) {
+ DataSchema dataSchema = dataTable.getDataSchema();
+ int numGroups = dataTable.getNumberOfRows();
+ int limit = queryContext.getLimit();
+ boolean hasOrderBy = queryContext.getOrderByExpressions() != null;
+ boolean hasHaving = queryContext.getHavingFilter() != null;
+ boolean hasFinalInput =
+ queryContext.isServerReturnFinalResult() ||
queryContext.isServerReturnFinalResultKeyUnpartitioned();
+ int minTrimSize = reducerContext.getMinGroupTrimSize();
+ int minInitialIndexedTableCapacity =
reducerContext.getMinInitialIndexedTableCapacity();
+
+ // Disable trim when min trim size is non-positive
+ int trimSize = minTrimSize > 0 ? getTableCapacity(limit, minTrimSize) :
Integer.MAX_VALUE;
+
+ // Keep more groups when there is HAVING clause
+ // TODO: Resolve the HAVING clause within the IndexedTable before
returning the result
+ int resultSize = hasHaving ? trimSize : limit;
+
+ // When there is no ORDER BY, trim is not required because the indexed
table stops accepting new groups once the
+ // result size is reached
+ if (!hasOrderBy) {
+ int initialCapacity = getIndexedTableInitialCapacity(resultSize,
numGroups, minInitialIndexedTableCapacity);
+ return getTrimDisabledIndexedTable(dataSchema, hasFinalInput,
queryContext, resultSize, initialCapacity,
+ numThreads);
+ }
+
+ int trimThreshold = getIndexedTableTrimThreshold(trimSize,
reducerContext.getGroupByTrimThreshold());
+ int initialCapacity = getIndexedTableInitialCapacity(trimThreshold,
numGroups, minInitialIndexedTableCapacity);
+ if (trimThreshold == Integer.MAX_VALUE) {
+ return getTrimDisabledIndexedTable(dataSchema, hasFinalInput,
queryContext, resultSize, initialCapacity,
+ numThreads);
+ } else {
+ return getTrimEnabledIndexedTable(dataSchema, hasFinalInput,
queryContext, resultSize, trimSize, trimThreshold,
+ initialCapacity, numThreads);
+ }
+ }
+
+ private static IndexedTable getTrimDisabledIndexedTable(DataSchema
dataSchema, boolean hasFinalInput,
+ QueryContext queryContext, int resultSize, int initialCapacity, int
numThreads) {
+ if (numThreads == 1) {
+ return new SimpleIndexedTable(dataSchema, hasFinalInput, queryContext,
resultSize, Integer.MAX_VALUE,
+ Integer.MAX_VALUE, initialCapacity);
+ } else {
+ return new UnboundedConcurrentIndexedTable(dataSchema, hasFinalInput,
queryContext, resultSize, initialCapacity);
+ }
+ }
+
+ private static IndexedTable getTrimEnabledIndexedTable(DataSchema
dataSchema, boolean hasFinalInput,
+ QueryContext queryContext, int resultSize, int trimSize, int
trimThreshold, int initialCapacity, int numThreads) {
+ assert trimThreshold != Integer.MAX_VALUE;
+ if (numThreads == 1) {
+ return new SimpleIndexedTable(dataSchema, hasFinalInput, queryContext,
resultSize, trimSize, trimThreshold,
+ initialCapacity);
+ } else {
+ return new ConcurrentIndexedTable(dataSchema, hasFinalInput,
queryContext, resultSize, trimSize, trimThreshold,
+ initialCapacity);
+ }
+ }
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
index 852e847d5a..e77e644fc3 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
@@ -44,6 +44,7 @@ import org.apache.pinot.core.data.table.IndexedTable;
import org.apache.pinot.core.data.table.Record;
import org.apache.pinot.core.data.table.SimpleIndexedTable;
import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
+import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.request.context.QueryContext;
import
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
@@ -329,7 +330,8 @@ public class ResourceManagerAccountingTest {
});
List<Object[]> rows = DataBlockTestUtils.getRandomRows(dataSchema,
NUM_ROWS, 0);
IndexedTable indexedTable =
- new SimpleIndexedTable(dataSchema, queryContext, NUM_ROWS,
Integer.MAX_VALUE, Integer.MAX_VALUE);
+ new SimpleIndexedTable(dataSchema, false, queryContext, NUM_ROWS,
Integer.MAX_VALUE, Integer.MAX_VALUE,
+
InstancePlanMakerImplV2.DEFAULT_MIN_INITIAL_INDEXED_TABLE_CAPACITY);
for (Object[] row : rows) {
indexedTable.upsert(new Record(row));
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java
index efbcbe0225..af8d8cf2ff 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java
@@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
import org.apache.pinot.core.query.request.context.QueryContext;
import
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
import org.testng.Assert;
@@ -45,6 +46,7 @@ import org.testng.annotations.Test;
public class IndexedTableTest {
private static final int TRIM_SIZE = 10;
private static final int TRIM_THRESHOLD = 20;
+ private static final int INITIAL_CAPACITY =
InstancePlanMakerImplV2.DEFAULT_MIN_INITIAL_INDEXED_TABLE_CAPACITY;
@Test
public void testConcurrentIndexedTable()
@@ -54,7 +56,8 @@ public class IndexedTableTest {
DataSchema dataSchema = new DataSchema(new String[]{"d1", "d2", "d3",
"sum(m1)", "max(m2)"}, new ColumnDataType[]{
ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.DOUBLE,
ColumnDataType.DOUBLE, ColumnDataType.DOUBLE
});
- IndexedTable indexedTable = new ConcurrentIndexedTable(dataSchema,
queryContext, 5, TRIM_SIZE, TRIM_THRESHOLD);
+ IndexedTable indexedTable =
+ new ConcurrentIndexedTable(dataSchema, false, queryContext, 5,
TRIM_SIZE, TRIM_THRESHOLD, INITIAL_CAPACITY);
// 3 threads upsert together
// a inserted 6 times (60), b inserted 5 times (50), d inserted 2 times
(20)
@@ -127,15 +130,19 @@ public class IndexedTableTest {
});
// Test SimpleIndexedTable
- IndexedTable indexedTable = new SimpleIndexedTable(dataSchema,
queryContext, 5, TRIM_SIZE, TRIM_THRESHOLD);
- IndexedTable mergeTable = new SimpleIndexedTable(dataSchema, queryContext,
10, TRIM_SIZE, TRIM_THRESHOLD);
+ IndexedTable indexedTable =
+ new SimpleIndexedTable(dataSchema, false, queryContext, 5, TRIM_SIZE,
TRIM_THRESHOLD, INITIAL_CAPACITY);
+ IndexedTable mergeTable =
+ new SimpleIndexedTable(dataSchema, false, queryContext, 10, TRIM_SIZE,
TRIM_THRESHOLD, INITIAL_CAPACITY);
testNonConcurrent(indexedTable, mergeTable);
indexedTable.finish(true);
checkSurvivors(indexedTable, survivors);
// Test ConcurrentIndexedTable
- indexedTable = new ConcurrentIndexedTable(dataSchema, queryContext, 5,
TRIM_SIZE, TRIM_THRESHOLD);
- mergeTable = new SimpleIndexedTable(dataSchema, queryContext, 10,
TRIM_SIZE, TRIM_THRESHOLD);
+ indexedTable =
+ new ConcurrentIndexedTable(dataSchema, false, queryContext, 5,
TRIM_SIZE, TRIM_THRESHOLD, INITIAL_CAPACITY);
+ mergeTable =
+ new SimpleIndexedTable(dataSchema, false, queryContext, 10, TRIM_SIZE,
TRIM_THRESHOLD, INITIAL_CAPACITY);
testNonConcurrent(indexedTable, mergeTable);
indexedTable.finish(true);
checkSurvivors(indexedTable, survivors);
@@ -251,10 +258,13 @@ public class IndexedTableTest {
ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.DOUBLE,
ColumnDataType.DOUBLE, ColumnDataType.DOUBLE
});
- IndexedTable indexedTable = new SimpleIndexedTable(dataSchema,
queryContext, 5, TRIM_SIZE, TRIM_THRESHOLD);
+ IndexedTable indexedTable =
+ new SimpleIndexedTable(dataSchema, false, queryContext, 5,
Integer.MAX_VALUE, Integer.MAX_VALUE,
+ INITIAL_CAPACITY);
testNoMoreNewRecordsInTable(indexedTable);
- indexedTable = new ConcurrentIndexedTable(dataSchema, queryContext, 5,
TRIM_SIZE, TRIM_THRESHOLD);
+ indexedTable = new ConcurrentIndexedTable(dataSchema, false, queryContext,
5, Integer.MAX_VALUE, Integer.MAX_VALUE,
+ INITIAL_CAPACITY);
testNoMoreNewRecordsInTable(indexedTable);
}
@@ -284,59 +294,4 @@ public class IndexedTableTest {
checkEvicted(indexedTable, "f", "g");
}
-
- @Test
- public void testAdaptiveTrimThreshold() {
- QueryContext queryContext = QueryContextConverterUtils.getQueryContext(
- "SELECT SUM(m1), MAX(m2) FROM testTable GROUP BY d1, d2, d3 ORDER BY
SUM(m1)");
- DataSchema dataSchema = new DataSchema(new String[]{"d1", "d2", "d3",
"sum(m1)", "max(m2)"}, new ColumnDataType[]{
- ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.DOUBLE,
ColumnDataType.DOUBLE, ColumnDataType.DOUBLE
- });
- IndexedTable indexedTable = new SimpleIndexedTable(dataSchema,
queryContext, 5, 5, 6);
-
- // Insert 7 records. Ensure that no trimming has been done since the trim
threshold should adapt to be at least
- // twice the trim size to avoid excessive trimming
- indexedTable.upsert(getRecord(new Object[]{"a", 1, 10d, 10d, 100d}));
- indexedTable.upsert(getRecord(new Object[]{"b", 2, 20d, 10d, 200d}));
- indexedTable.upsert(getRecord(new Object[]{"a", 1, 10d, 10d, 100d}));
- indexedTable.upsert(getRecord(new Object[]{"a", 1, 10d, 10d, 100d}));
- Assert.assertEquals(indexedTable.size(), 2);
-
- indexedTable.upsert(getRecord(new Object[]{"c", 3, 30d, 10d, 300d}));
- indexedTable.upsert(getRecord(new Object[]{"d", 4, 40d, 10d, 400d}));
- indexedTable.upsert(getRecord(new Object[]{"e", 5, 50d, 10d, 500d}));
- Assert.assertEquals(indexedTable.size(), 5);
-
- indexedTable.upsert(getRecord(new Object[]{"c", 3, 30d, 10d, 300d}));
- indexedTable.upsert(getRecord(new Object[]{"d", 4, 40d, 10d, 400d}));
- indexedTable.upsert(getRecord(new Object[]{"e", 5, 50d, 10d, 500d}));
- Assert.assertEquals(indexedTable.size(), 5);
-
- // No resizing / trimming should be done yet
- indexedTable.upsert(getRecord(new Object[]{"f", 6, 60d, 10d, 600d}));
- indexedTable.upsert(getRecord(new Object[]{"g", 7, 70d, 10d, 700d}));
- Assert.assertEquals(indexedTable.size(), 7);
-
- // Insert 3 more records - this should reach the trim threshold and
trigger trimming
- indexedTable.upsert(getRecord(new Object[]{"h", 8, 80d, 10d, 800d}));
- indexedTable.upsert(getRecord(new Object[]{"i", 9, 90d, 10d, 900d}));
- indexedTable.upsert(getRecord(new Object[]{"j", 10, 100d, 20d, 1000d}));
- Assert.assertEquals(indexedTable.size(), 5);
-
- indexedTable.finish(false);
- // The 5 keys with the largest aggregated values for SUM(m1) should be
evicted
- checkEvicted(indexedTable, "a", "c", "d", "e", "j");
- }
-
- @Test
- public void testAdaptiveTrimThresholdMaxValue() {
- QueryContext queryContext = QueryContextConverterUtils.getQueryContext(
- "SELECT SUM(m1), MAX(m2) FROM testTable GROUP BY d1, d2, d3 ORDER BY
SUM(m1)");
- DataSchema dataSchema = new DataSchema(new String[]{"d1", "d2", "d3",
"sum(m1)", "max(m2)"}, new ColumnDataType[]{
- ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.DOUBLE,
ColumnDataType.DOUBLE, ColumnDataType.DOUBLE
- });
- IndexedTable indexedTable = new SimpleIndexedTable(dataSchema,
queryContext, 1234567890, 1234567890, 1234567890);
- // If 2 * trimSize exceeds the max integer value, the trim threshold
should be bounded to the max integer value
- Assert.assertEquals(indexedTable._trimThreshold, Integer.MAX_VALUE);
- }
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/util/GroupByUtilsTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/util/GroupByUtilsTest.java
index 4370c43744..99d5cdf82f 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/util/GroupByUtilsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/util/GroupByUtilsTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.util;
+import org.apache.pinot.common.utils.HashUtil;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
@@ -37,4 +38,37 @@ public class GroupByUtilsTest {
assertEquals(GroupByUtils.getTableCapacity(100000000), 500000000);
assertEquals(GroupByUtils.getTableCapacity(1000000000), Integer.MAX_VALUE);
}
+
+ @Test
+ public void getIndexedTableTrimThreshold() {
+ assertEquals(GroupByUtils.getIndexedTableTrimThreshold(5000, -1),
Integer.MAX_VALUE);
+ assertEquals(GroupByUtils.getIndexedTableTrimThreshold(5000, 0),
Integer.MAX_VALUE);
+ assertEquals(GroupByUtils.getIndexedTableTrimThreshold(5000, 10), 10000);
+ assertEquals(GroupByUtils.getIndexedTableTrimThreshold(5000, 100), 10000);
+ assertEquals(GroupByUtils.getIndexedTableTrimThreshold(5000, 1000), 10000);
+ assertEquals(GroupByUtils.getIndexedTableTrimThreshold(5000, 10000),
10000);
+ assertEquals(GroupByUtils.getIndexedTableTrimThreshold(5000, 100000),
100000);
+ assertEquals(GroupByUtils.getIndexedTableTrimThreshold(5000, 1000000),
1000000);
+ assertEquals(GroupByUtils.getIndexedTableTrimThreshold(5000, 10000000),
10000000);
+ assertEquals(GroupByUtils.getIndexedTableTrimThreshold(5000, 100000000),
100000000);
+ assertEquals(GroupByUtils.getIndexedTableTrimThreshold(5000, 1000000000),
1000000000);
+ assertEquals(GroupByUtils.getIndexedTableTrimThreshold(5000, 1000000001),
Integer.MAX_VALUE);
+ assertEquals(GroupByUtils.getIndexedTableTrimThreshold(Integer.MAX_VALUE,
10), Integer.MAX_VALUE);
+ assertEquals(GroupByUtils.getIndexedTableTrimThreshold(500000000, 10),
1000000000);
+ assertEquals(GroupByUtils.getIndexedTableTrimThreshold(500000001, 10),
Integer.MAX_VALUE);
+ }
+
+ @Test
+ public void testGetIndexedTableInitialCapacity() {
+
assertEquals(GroupByUtils.getIndexedTableInitialCapacity(Integer.MAX_VALUE, 10,
128), 128);
+
assertEquals(GroupByUtils.getIndexedTableInitialCapacity(Integer.MAX_VALUE,
100, 128),
+ HashUtil.getHashMapCapacity(100));
+
assertEquals(GroupByUtils.getIndexedTableInitialCapacity(Integer.MAX_VALUE,
100, 256), 256);
+
assertEquals(GroupByUtils.getIndexedTableInitialCapacity(Integer.MAX_VALUE,
1000, 256),
+ HashUtil.getHashMapCapacity(1000));
+ assertEquals(GroupByUtils.getIndexedTableInitialCapacity(100, 10, 128),
128);
+ assertEquals(GroupByUtils.getIndexedTableInitialCapacity(100, 10, 256),
HashUtil.getHashMapCapacity(100));
+ assertEquals(GroupByUtils.getIndexedTableInitialCapacity(100, 100, 256),
HashUtil.getHashMapCapacity(100));
+ assertEquals(GroupByUtils.getIndexedTableInitialCapacity(100, 1000, 256),
HashUtil.getHashMapCapacity(100));
+ }
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/ExprMinMaxTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/ExprMinMaxTest.java
index 20d9fa80ab..1a22234ee3 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/ExprMinMaxTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/ExprMinMaxTest.java
@@ -524,7 +524,8 @@ public class ExprMinMaxTest extends BaseQueriesTest {
String query =
"SELECT stringColumn,
expr_min(VALUE_IN(mvIntColumn,16,17,18,19,20,21,22,23,24,25,26,27), intColumn),
"
+
"expr_max(VALUE_IN(mvIntColumn,16,17,18,19,20,21,22,23,24,25,26,27), intColumn)
"
- + "FROM testTable WHERE mvIntColumn in
(16,17,18,19,20,21,22,23,24,25,26,27) GROUP BY stringColumn";
+ + "FROM testTable WHERE mvIntColumn in
(16,17,18,19,20,21,22,23,24,25,26,27) "
+ + "GROUP BY stringColumn ORDER BY stringColumn";
BrokerResponse brokerResponse = getBrokerResponse(query);
ResultTable resultTable = brokerResponse.getResultTable();
@@ -540,7 +541,7 @@ public class ExprMinMaxTest extends BaseQueriesTest {
query =
"SELECT stringColumn,
expr_min(VALUE_IN(mvIntColumn,16,17,18,19,20,21,22,23,24,25,26,27), intColumn),
"
+
"expr_max(VALUE_IN(mvIntColumn,16,17,18,19,20,21,22,23,24,25,26,27), intColumn)
"
- + "FROM testTable GROUP BY stringColumn";
+ + "FROM testTable GROUP BY stringColumn ORDER BY stringColumn";
brokerResponse = getBrokerResponse(query);
resultTable = brokerResponse.getResultTable();
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
index f974aa8542..f5d0be1a06 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
@@ -679,10 +679,10 @@ public class InterSegmentAggregationMultiValueQueriesTest
extends BaseMultiValue
BrokerResponseNative brokerResponse = getBrokerResponse(query);
assertFalse(brokerResponse.isNumGroupsLimitReached());
- brokerResponse = getBrokerResponse(query,
- new InstancePlanMakerImplV2(1000, 1000,
InstancePlanMakerImplV2.DEFAULT_MIN_SEGMENT_GROUP_TRIM_SIZE,
- InstancePlanMakerImplV2.DEFAULT_MIN_SERVER_GROUP_TRIM_SIZE,
- InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD));
+ InstancePlanMakerImplV2 planMaker = new InstancePlanMakerImplV2();
+ planMaker.setMaxInitialResultHolderCapacity(1000);
+ planMaker.setNumGroupsLimit(1000);
+ brokerResponse = getBrokerResponse(query, planMaker);
assertTrue(brokerResponse.isNumGroupsLimitReached());
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java
index 591b5ffffa..62cbb0a3ef 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java
@@ -661,10 +661,10 @@ public class
InterSegmentAggregationMultiValueRawQueriesTest extends BaseMultiVa
BrokerResponseNative brokerResponse = getBrokerResponse(query);
assertFalse(brokerResponse.isNumGroupsLimitReached());
- brokerResponse = getBrokerResponse(query,
- new InstancePlanMakerImplV2(1000, 1000,
InstancePlanMakerImplV2.DEFAULT_MIN_SEGMENT_GROUP_TRIM_SIZE,
- InstancePlanMakerImplV2.DEFAULT_MIN_SERVER_GROUP_TRIM_SIZE,
- InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD));
+ InstancePlanMakerImplV2 planMaker = new InstancePlanMakerImplV2();
+ planMaker.setMaxInitialResultHolderCapacity(1000);
+ planMaker.setNumGroupsLimit(1000);
+ brokerResponse = getBrokerResponse(query, planMaker);
assertTrue(brokerResponse.isNumGroupsLimitReached());
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java
index a2c74071c9..6fcb909374 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java
@@ -767,10 +767,10 @@ public class
InterSegmentAggregationSingleValueQueriesTest extends BaseSingleVal
BrokerResponseNative brokerResponse = getBrokerResponse(query);
assertFalse(brokerResponse.isNumGroupsLimitReached());
- brokerResponse = getBrokerResponse(query,
- new InstancePlanMakerImplV2(1000, 1000,
InstancePlanMakerImplV2.DEFAULT_MIN_SEGMENT_GROUP_TRIM_SIZE,
- InstancePlanMakerImplV2.DEFAULT_MIN_SERVER_GROUP_TRIM_SIZE,
- InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD));
+ InstancePlanMakerImplV2 planMaker = new InstancePlanMakerImplV2();
+ planMaker.setMaxInitialResultHolderCapacity(1000);
+ planMaker.setNumGroupsLimit(1000);
+ brokerResponse = getBrokerResponse(query, planMaker);
assertTrue(brokerResponse.isNumGroupsLimitReached());
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentGroupByMultiValueQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentGroupByMultiValueQueriesTest.java
index 6b876de3ae..070ff531cb 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentGroupByMultiValueQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentGroupByMultiValueQueriesTest.java
@@ -33,11 +33,11 @@ import org.testng.annotations.Test;
* Tests order by queries
*/
public class InterSegmentGroupByMultiValueQueriesTest extends
BaseMultiValueQueriesTest {
- private static final InstancePlanMakerImplV2 TRIM_ENABLED_PLAN_MAKER =
- new
InstancePlanMakerImplV2(InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY,
- InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, 1,
- InstancePlanMakerImplV2.DEFAULT_MIN_SERVER_GROUP_TRIM_SIZE,
- InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD);
+ private static final InstancePlanMakerImplV2 TRIM_ENABLED_PLAN_MAKER = new
InstancePlanMakerImplV2();
+
+ static {
+ TRIM_ENABLED_PLAN_MAKER.setMinSegmentGroupTrimSize(1);
+ }
@Test(dataProvider = "groupByOrderByDataProvider")
public void testGroupByOrderBy(String query, long
expectedNumEntriesScannedPostFilter,
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentGroupByMultiValueRawQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentGroupByMultiValueRawQueriesTest.java
index ec9e0d951f..a75bce325d 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentGroupByMultiValueRawQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentGroupByMultiValueRawQueriesTest.java
@@ -32,11 +32,11 @@ import org.testng.annotations.Test;
* Tests order by queries with MV RAW index
*/
public class InterSegmentGroupByMultiValueRawQueriesTest extends
BaseMultiValueRawQueriesTest {
- private static final InstancePlanMakerImplV2 TRIM_ENABLED_PLAN_MAKER =
- new
InstancePlanMakerImplV2(InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY,
- InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, 1,
- InstancePlanMakerImplV2.DEFAULT_MIN_SERVER_GROUP_TRIM_SIZE,
- InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD);
+ private static final InstancePlanMakerImplV2 TRIM_ENABLED_PLAN_MAKER = new
InstancePlanMakerImplV2();
+
+ static {
+ TRIM_ENABLED_PLAN_MAKER.setMinSegmentGroupTrimSize(1);
+ }
@Test(dataProvider = "groupByOrderByDataProvider")
public void testGroupByOrderBy(String query, long
expectedNumEntriesScannedPostFilter,
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentGroupBySingleValueQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentGroupBySingleValueQueriesTest.java
index 846fbdbb8c..8cda3e1609 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentGroupBySingleValueQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentGroupBySingleValueQueriesTest.java
@@ -34,11 +34,11 @@ import org.testng.annotations.Test;
* Tests order by queries
*/
public class InterSegmentGroupBySingleValueQueriesTest extends
BaseSingleValueQueriesTest {
- private static final InstancePlanMakerImplV2 TRIM_ENABLED_PLAN_MAKER =
- new
InstancePlanMakerImplV2(InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY,
- InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, 1,
- InstancePlanMakerImplV2.DEFAULT_MIN_SERVER_GROUP_TRIM_SIZE,
- InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD);
+ private static final InstancePlanMakerImplV2 TRIM_ENABLED_PLAN_MAKER = new
InstancePlanMakerImplV2();
+
+ static {
+ TRIM_ENABLED_PLAN_MAKER.setMinSegmentGroupTrimSize(1);
+ }
@Test(dataProvider = "groupByOrderByDataProvider")
public void testGroupByOrderBy(String query, long
expectedNumEntriesScannedPostFilter,
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java
index 6408fd8f31..66d1437460 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java
@@ -60,7 +60,7 @@ import static org.testng.Assert.*;
public class OfflineGRPCServerIntegrationTest extends
BaseClusterIntegrationTest {
private static final ExecutorService EXECUTOR_SERVICE =
Executors.newFixedThreadPool(2);
private static final DataTableReducerContext DATATABLE_REDUCER_CONTEXT =
- new DataTableReducerContext(EXECUTOR_SERVICE, 2, 10000, 10000, 5000);
+ new DataTableReducerContext(EXECUTOR_SERVICE, 2, 10000, 10000, 5000,
128);
@BeforeClass
public void setUp()
diff --git
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java
index 79a0d9e6ff..579bd5b227 100644
---
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java
+++
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java
@@ -118,8 +118,10 @@ public class BenchmarkCombineGroupBy {
int trimSize = GroupByUtils.getTableCapacity(_queryContext.getLimit());
// make 1 concurrent table
- IndexedTable concurrentIndexedTable = new
ConcurrentIndexedTable(_dataSchema, _queryContext, trimSize, trimSize,
- InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD);
+ IndexedTable concurrentIndexedTable =
+ new ConcurrentIndexedTable(_dataSchema, false, _queryContext,
trimSize, trimSize,
+ InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD,
+
InstancePlanMakerImplV2.DEFAULT_MIN_INITIAL_INDEXED_TABLE_CAPACITY);
List<Callable<Void>> innerSegmentCallables = new ArrayList<>(NUM_SEGMENTS);
diff --git
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java
index 8ba8d2756e..6c9667533b 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java
@@ -118,7 +118,8 @@ public class BenchmarkIndexedTable {
// make 1 concurrent table
IndexedTable concurrentIndexedTable =
- new ConcurrentIndexedTable(_dataSchema, _queryContext, TRIM_SIZE,
TRIM_SIZE, TRIM_THRESHOLD);
+ new ConcurrentIndexedTable(_dataSchema, false, _queryContext,
TRIM_SIZE, TRIM_SIZE, TRIM_THRESHOLD,
+ TRIM_THRESHOLD);
// 10 parallel threads putting 10k records into the table
@@ -167,7 +168,8 @@ public class BenchmarkIndexedTable {
// make 10 indexed tables
IndexedTable simpleIndexedTable =
- new SimpleIndexedTable(_dataSchema, _queryContext, TRIM_SIZE,
TRIM_SIZE, TRIM_THRESHOLD);
+ new SimpleIndexedTable(_dataSchema, false, _queryContext, TRIM_SIZE,
TRIM_SIZE, TRIM_THRESHOLD,
+ TRIM_THRESHOLD);
simpleIndexedTables.add(simpleIndexedTable);
// put 10k records in each indexed table, in parallel
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index d62887b7be..0ca99b06cc 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -109,6 +109,8 @@ public class QueryRunner {
private Integer _numGroupsLimit;
@Nullable
private Integer _maxInitialResultHolderCapacity;
+ @Nullable
+ private Integer _minInitialIndexedTableCapacity;
// Join overflow settings
@Nullable
@@ -142,6 +144,10 @@ public class QueryRunner {
config.getProperty(CommonConstants.Server.CONFIG_OF_QUERY_EXECUTOR_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
_maxInitialResultHolderCapacity =
maxInitialGroupHolderCapacity != null ?
Integer.parseInt(maxInitialGroupHolderCapacity) : null;
+ String minInitialIndexedTableCapacityStr =
+
config.getProperty(CommonConstants.Server.CONFIG_OF_QUERY_EXECUTOR_MIN_INITIAL_INDEXED_TABLE_CAPACITY);
+ _minInitialIndexedTableCapacity =
+ minInitialIndexedTableCapacityStr != null ?
Integer.parseInt(minInitialIndexedTableCapacityStr) : null;
String maxRowsInJoinStr =
config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_ROWS_IN_JOIN);
_maxRowsInJoin = maxRowsInJoinStr != null ?
Integer.parseInt(maxRowsInJoinStr) : null;
String joinOverflowModeStr =
config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_JOIN_OVERFLOW_MODE);
@@ -319,6 +325,15 @@ public class QueryRunner {
Integer.toString(maxInitialResultHolderCapacity));
}
+ Integer minInitialIndexedTableCapacity =
QueryOptionsUtils.getMinInitialIndexedTableCapacity(opChainMetadata);
+ if (minInitialIndexedTableCapacity == null) {
+ minInitialIndexedTableCapacity = _minInitialIndexedTableCapacity;
+ }
+ if (minInitialIndexedTableCapacity != null) {
+ opChainMetadata.put(QueryOptionKey.MIN_INITIAL_INDEXED_TABLE_CAPACITY,
+ Integer.toString(minInitialIndexedTableCapacity));
+ }
+
Integer maxRowsInJoin =
QueryOptionsUtils.getMaxRowsInJoin(opChainMetadata);
if (maxRowsInJoin == null) {
maxRowsInJoin = _maxRowsInJoin;
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 06c7184f4e..1a6985084b 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -51,7 +51,7 @@ public class CommonConstants {
public static final String DEFAULT_METRICS_FACTORY_CLASS_NAME =
//"org.apache.pinot.plugin.metrics.compound.CompoundPinotMetricsFactory";
"org.apache.pinot.plugin.metrics.yammer.YammerMetricsFactory";
- //"org.apache.pinot.plugin.metrics.dropwizard.DropwizardMetricsFactory";
+ //"org.apache.pinot.plugin.metrics.dropwizard.DropwizardMetricsFactory";
public static final String DEFAULT_BROKER_EVENT_LISTENER_CLASS_NAME =
"org.apache.pinot.spi.eventlistener.query.NoOpBrokerQueryEventListener";
@@ -311,6 +311,9 @@ public class CommonConstants {
public static final int DEFAULT_BROKER_GROUPBY_TRIM_THRESHOLD = 1_000_000;
public static final String CONFIG_OF_BROKER_MIN_GROUP_TRIM_SIZE =
"pinot.broker.min.group.trim.size";
public static final int DEFAULT_BROKER_MIN_GROUP_TRIM_SIZE = 5000;
+ public static final String
CONFIG_OF_BROKER_MIN_INITIAL_INDEXED_TABLE_CAPACITY =
+ "pinot.broker.min.init.indexed.table.capacity";
+ public static final int DEFAULT_BROKER_MIN_INITIAL_INDEXED_TABLE_CAPACITY
= 128;
// Configure the request handler type used by broker to handler inbound
query request.
// NOTE: the request handler type refers to the communication between
Broker and Server.
@@ -439,6 +442,7 @@ public class CommonConstants {
public static final String MULTI_STAGE_LEAF_LIMIT =
"multiStageLeafLimit";
public static final String NUM_GROUPS_LIMIT = "numGroupsLimit";
public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY =
"maxInitialResultHolderCapacity";
+ public static final String MIN_INITIAL_INDEXED_TABLE_CAPACITY =
"minInitialIndexedTableCapacity";
public static final String GROUP_TRIM_THRESHOLD = "groupTrimThreshold";
public static final String STAGE_PARALLELISM = "stageParallelism";
@@ -685,6 +689,8 @@ public class CommonConstants {
"pinot.server.query.executor.num.groups.limit";
public static final String
CONFIG_OF_QUERY_EXECUTOR_MAX_INITIAL_RESULT_HOLDER_CAPACITY =
"pinot.server.query.executor.max.init.group.holder.capacity";
+ public static final String
CONFIG_OF_QUERY_EXECUTOR_MIN_INITIAL_INDEXED_TABLE_CAPACITY =
+ "pinot.server.query.executor.min.init.indexed.table.capacity";
public static final String CONFIG_OF_QUERY_EXECUTOR_OPCHAIN_EXECUTOR =
"pinot.server.query.executor.multistage.executor";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]