This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 465c811675 Introducing MSE result holder config to minimize rehashing
for high cardinality group by (#14981)
465c811675 is described below
commit 465c811675fedd8fd0b9834e3309887cc23c8409
Author: Shaurya Chaturvedi <[email protected]>
AuthorDate: Thu Feb 6 09:05:12 2025 -0800
Introducing MSE result holder config to minimize rehashing for high
cardinality group by (#14981)
---
.../common/utils/config/QueryOptionsUtils.java | 6 +++++
.../pinot/calcite/rel/hint/PinotHintOptions.java | 1 +
.../apache/pinot/query/runtime/QueryRunner.java | 17 +++++++++++++
.../operator/MultistageGroupByExecutor.java | 29 ++++++++++++++++++++--
.../operator/groupby/GroupIdGeneratorFactory.java | 18 ++++++++------
.../groupby/MultiKeysGroupIdGenerator.java | 5 ++--
.../groupby/OneDoubleKeyGroupIdGenerator.java | 4 +--
.../groupby/OneFloatKeyGroupIdGenerator.java | 4 +--
.../groupby/OneIntKeyGroupIdGenerator.java | 4 +--
.../groupby/OneLongKeyGroupIdGenerator.java | 4 +--
.../groupby/OneObjectKeyGroupIdGenerator.java | 4 +--
.../operator/groupby/TwoKeysGroupIdGenerator.java | 5 ++--
.../apache/pinot/spi/utils/CommonConstants.java | 3 +++
13 files changed, 80 insertions(+), 24 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 5e8ba86643..74501c5458 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -308,6 +308,12 @@ public class QueryOptionsUtils {
return
Boolean.parseBoolean(queryOptions.get(QueryOptionKey.OPTIMIZE_MAX_INITIAL_RESULT_HOLDER_CAPACITY));
}
+ @Nullable
+ public static Integer getMSEMaxInitialResultHolderCapacity(Map<String,
String> queryOptions) {
+ String maxInitialCapacity =
queryOptions.get(QueryOptionKey.MSE_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
+ return
checkedParseIntPositive(QueryOptionKey.MSE_MAX_INITIAL_RESULT_HOLDER_CAPACITY,
maxInitialCapacity);
+ }
+
@Nullable
public static Integer getMinInitialIndexedTableCapacity(Map<String, String>
queryOptions) {
String minInitialIndexedTableCapacity =
queryOptions.get(QueryOptionKey.MIN_INITIAL_INDEXED_TABLE_CAPACITY);
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java
index 5a711ed669..f9b3f61cf1 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java
@@ -62,6 +62,7 @@ public class PinotHintOptions {
public static final String GROUP_TRIM_SIZE = "group_trim_size";
public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY =
"max_initial_result_holder_capacity";
+ public static final String MSE_MAX_INITIAL_RESULT_HOLDER_CAPACITY =
"mse_max_initial_result_holder_capacity";
}
public static class WindowHintOptions {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 876306352b..754f605f4c 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -115,6 +115,8 @@ public class QueryRunner {
private Integer _maxInitialResultHolderCapacity;
@Nullable
private Integer _minInitialIndexedTableCapacity;
+ @Nullable
+ private Integer _mseMaxInitialResultHolderCapacity;
// Join overflow settings
@Nullable
@@ -158,6 +160,12 @@ public class QueryRunner {
_minInitialIndexedTableCapacity =
minInitialIndexedTableCapacityStr != null ?
Integer.parseInt(minInitialIndexedTableCapacityStr) : null;
+
+ String mseMaxInitialGroupHolderCapacity =
+
config.getProperty(CommonConstants.Server.CONFIG_OF_MSE_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
+ _mseMaxInitialResultHolderCapacity =
+ mseMaxInitialGroupHolderCapacity != null ?
Integer.parseInt(mseMaxInitialGroupHolderCapacity) : null;
+
String maxRowsInJoinStr =
config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_ROWS_IN_JOIN);
_maxRowsInJoin = maxRowsInJoinStr != null ?
Integer.parseInt(maxRowsInJoinStr) : null;
@@ -377,6 +385,15 @@ public class QueryRunner {
Integer.toString(minInitialIndexedTableCapacity));
}
+ Integer mseMaxInitialResultHolderCapacity =
QueryOptionsUtils.getMSEMaxInitialResultHolderCapacity(opChainMetadata);
+ if (mseMaxInitialResultHolderCapacity == null) {
+ mseMaxInitialResultHolderCapacity = _mseMaxInitialResultHolderCapacity;
+ }
+ if (mseMaxInitialResultHolderCapacity != null) {
+
opChainMetadata.put(QueryOptionKey.MSE_MAX_INITIAL_RESULT_HOLDER_CAPACITY,
+ Integer.toString(mseMaxInitialResultHolderCapacity));
+ }
+
Integer maxRowsInJoin =
QueryOptionsUtils.getMaxRowsInJoin(opChainMetadata);
if (maxRowsInJoin == null) {
maxRowsInJoin = _maxRowsInJoin;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java
index e37798df08..4fa3619c51 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java
@@ -87,7 +87,9 @@ public class MultistageGroupByExecutor {
_aggType = aggType;
_leafReturnFinalResult = leafReturnFinalResult;
_resultSchema = resultSchema;
- int maxInitialResultHolderCapacity =
getMaxInitialResultHolderCapacity(opChainMetadata, nodeHint);
+
+ int maxInitialResultHolderCapacity =
getResolvedMaxInitialResultHolderCapacity(opChainMetadata, nodeHint);
+
_numGroupsLimit = getNumGroupsLimit(opChainMetadata, nodeHint);
// By default, we compute all groups for SQL compliant results. However,
we allow overriding this behavior via
@@ -109,7 +111,7 @@ public class MultistageGroupByExecutor {
_groupIdGenerator =
GroupIdGeneratorFactory.getGroupIdGenerator(_resultSchema.getStoredColumnDataTypes(),
groupKeyIds.length,
- _numGroupsLimit);
+ _numGroupsLimit, maxInitialResultHolderCapacity);
}
private int getNumGroupsLimit(Map<String, String> opChainMetadata, @Nullable
PlanNode.NodeHint nodeHint) {
@@ -126,6 +128,13 @@ public class MultistageGroupByExecutor {
return numGroupsLimit != null ? numGroupsLimit :
InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT;
}
+ private int getResolvedMaxInitialResultHolderCapacity(Map<String, String>
opChainMetadata,
+ @Nullable PlanNode.NodeHint nodeHint) {
+ Integer mseMaxInitialResultHolderCapacity =
getMSEMaxInitialResultHolderCapacity(opChainMetadata, nodeHint);
+ return (mseMaxInitialResultHolderCapacity != null) ?
mseMaxInitialResultHolderCapacity
+ : getMaxInitialResultHolderCapacity(opChainMetadata, nodeHint);
+ }
+
private int getMaxInitialResultHolderCapacity(Map<String, String>
opChainMetadata,
@Nullable PlanNode.NodeHint nodeHint) {
if (nodeHint != null) {
@@ -143,6 +152,22 @@ public class MultistageGroupByExecutor {
: InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY;
}
+ private Integer getMSEMaxInitialResultHolderCapacity(Map<String, String>
opChainMetadata,
+ @Nullable PlanNode.NodeHint nodeHint) {
+ if (nodeHint != null) {
+ Map<String, String> aggregateOptions =
nodeHint.getHintOptions().get(PinotHintOptions.AGGREGATE_HINT_OPTIONS);
+ if (aggregateOptions != null) {
+ String maxInitialMSEResultHolderCapacityStr =
+
aggregateOptions.get(PinotHintOptions.AggregateOptions.MSE_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
+ if (maxInitialMSEResultHolderCapacityStr != null) {
+ return Integer.parseInt(maxInitialMSEResultHolderCapacityStr);
+ }
+ }
+ }
+ // Don't return default value since null value means we need to fallback
to MaxInitialResultHolderCapacity
+ return
QueryOptionsUtils.getMSEMaxInitialResultHolderCapacity(opChainMetadata);
+ }
+
public int getNumGroupsLimit() {
return _numGroupsLimit;
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/GroupIdGeneratorFactory.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/GroupIdGeneratorFactory.java
index 16be037f38..a254c37bf8 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/GroupIdGeneratorFactory.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/GroupIdGeneratorFactory.java
@@ -25,24 +25,26 @@ public class GroupIdGeneratorFactory {
private GroupIdGeneratorFactory() {
}
- public static GroupIdGenerator getGroupIdGenerator(ColumnDataType[]
keyTypes, int numKeyColumns, int numGroupsLimit) {
+ public static GroupIdGenerator getGroupIdGenerator(ColumnDataType[]
keyTypes, int numKeyColumns,
+ int numGroupsLimit, int maxInitialResultHolderCapacity) {
+ int initialCapacity = Math.min(maxInitialResultHolderCapacity,
numGroupsLimit);
if (numKeyColumns == 1) {
switch (keyTypes[0]) {
case INT:
- return new OneIntKeyGroupIdGenerator(numGroupsLimit);
+ return new OneIntKeyGroupIdGenerator(numGroupsLimit,
initialCapacity);
case LONG:
- return new OneLongKeyGroupIdGenerator(numGroupsLimit);
+ return new OneLongKeyGroupIdGenerator(numGroupsLimit,
initialCapacity);
case FLOAT:
- return new OneFloatKeyGroupIdGenerator(numGroupsLimit);
+ return new OneFloatKeyGroupIdGenerator(numGroupsLimit,
initialCapacity);
case DOUBLE:
- return new OneDoubleKeyGroupIdGenerator(numGroupsLimit);
+ return new OneDoubleKeyGroupIdGenerator(numGroupsLimit,
initialCapacity);
default:
- return new OneObjectKeyGroupIdGenerator(numGroupsLimit);
+ return new OneObjectKeyGroupIdGenerator(numGroupsLimit,
initialCapacity);
}
} else if (numKeyColumns == 2) {
- return new TwoKeysGroupIdGenerator(keyTypes[0], keyTypes[1],
numGroupsLimit);
+ return new TwoKeysGroupIdGenerator(keyTypes[0], keyTypes[1],
numGroupsLimit, initialCapacity);
} else {
- return new MultiKeysGroupIdGenerator(keyTypes, numKeyColumns,
numGroupsLimit);
+ return new MultiKeysGroupIdGenerator(keyTypes, numKeyColumns,
numGroupsLimit, initialCapacity);
}
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/MultiKeysGroupIdGenerator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/MultiKeysGroupIdGenerator.java
index 30019746b4..9a31aae3ef 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/MultiKeysGroupIdGenerator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/MultiKeysGroupIdGenerator.java
@@ -32,8 +32,9 @@ public class MultiKeysGroupIdGenerator implements
GroupIdGenerator {
private final ValueToIdMap[] _keyToIdMaps;
private final int _numGroupsLimit;
- public MultiKeysGroupIdGenerator(ColumnDataType[] keyTypes, int
numKeyColumns, int numGroupsLimit) {
- _groupIdMap = new Object2IntOpenHashMap<>();
+ public MultiKeysGroupIdGenerator(ColumnDataType[] keyTypes, int
numKeyColumns,
+ int numGroupsLimit, int initialCapacity) {
+ _groupIdMap = new Object2IntOpenHashMap<>(initialCapacity);
_groupIdMap.defaultReturnValue(INVALID_ID);
_keyToIdMaps = new ValueToIdMap[numKeyColumns];
for (int i = 0; i < numKeyColumns; i++) {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneDoubleKeyGroupIdGenerator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneDoubleKeyGroupIdGenerator.java
index cf3f920d22..34392a2825 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneDoubleKeyGroupIdGenerator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneDoubleKeyGroupIdGenerator.java
@@ -31,8 +31,8 @@ public class OneDoubleKeyGroupIdGenerator implements
GroupIdGenerator {
private int _numGroups = 0;
private int _nullGroupId = INVALID_ID;
- public OneDoubleKeyGroupIdGenerator(int numGroupsLimit) {
- _groupIdMap = new Double2IntOpenHashMap();
+ public OneDoubleKeyGroupIdGenerator(int numGroupsLimit, int initialCapacity)
{
+ _groupIdMap = new Double2IntOpenHashMap(initialCapacity);
_groupIdMap.defaultReturnValue(INVALID_ID);
_numGroupsLimit = numGroupsLimit;
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneFloatKeyGroupIdGenerator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneFloatKeyGroupIdGenerator.java
index 5d3005dc6b..675bda6975 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneFloatKeyGroupIdGenerator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneFloatKeyGroupIdGenerator.java
@@ -30,8 +30,8 @@ public class OneFloatKeyGroupIdGenerator implements
GroupIdGenerator {
private int _numGroups = 0;
private int _nullGroupId = INVALID_ID;
- public OneFloatKeyGroupIdGenerator(int numGroupsLimit) {
- _groupIdMap = new Float2IntOpenHashMap();
+ public OneFloatKeyGroupIdGenerator(int numGroupsLimit, int initialCapacity) {
+ _groupIdMap = new Float2IntOpenHashMap(initialCapacity);
_groupIdMap.defaultReturnValue(INVALID_ID);
_numGroupsLimit = numGroupsLimit;
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneIntKeyGroupIdGenerator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneIntKeyGroupIdGenerator.java
index 77064f8b3e..703f68f0bc 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneIntKeyGroupIdGenerator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneIntKeyGroupIdGenerator.java
@@ -31,8 +31,8 @@ public class OneIntKeyGroupIdGenerator implements
GroupIdGenerator {
private int _numGroups = 0;
private int _nullGroupId = INVALID_ID;
- public OneIntKeyGroupIdGenerator(int numGroupsLimit) {
- _groupIdMap = new Int2IntOpenHashMap();
+ public OneIntKeyGroupIdGenerator(int numGroupsLimit, int initialCapacity) {
+ _groupIdMap = new Int2IntOpenHashMap(initialCapacity);
_groupIdMap.defaultReturnValue(INVALID_ID);
_numGroupsLimit = numGroupsLimit;
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneLongKeyGroupIdGenerator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneLongKeyGroupIdGenerator.java
index 5862df3def..82ae52e9a9 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneLongKeyGroupIdGenerator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneLongKeyGroupIdGenerator.java
@@ -31,8 +31,8 @@ public class OneLongKeyGroupIdGenerator implements
GroupIdGenerator {
private int _numGroups = 0;
private int _nullGroupId = INVALID_ID;
- public OneLongKeyGroupIdGenerator(int numGroupsLimit) {
- _groupIdMap = new Long2IntOpenHashMap();
+ public OneLongKeyGroupIdGenerator(int numGroupsLimit, int initialCapacity) {
+ _groupIdMap = new Long2IntOpenHashMap(initialCapacity);
_groupIdMap.defaultReturnValue(INVALID_ID);
_numGroupsLimit = numGroupsLimit;
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneObjectKeyGroupIdGenerator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneObjectKeyGroupIdGenerator.java
index e7d7bc3815..be2f82b2c3 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneObjectKeyGroupIdGenerator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneObjectKeyGroupIdGenerator.java
@@ -28,8 +28,8 @@ public class OneObjectKeyGroupIdGenerator implements
GroupIdGenerator {
private final Object2IntOpenHashMap<Object> _groupIdMap;
private final int _numGroupsLimit;
- public OneObjectKeyGroupIdGenerator(int numGroupsLimit) {
- _groupIdMap = new Object2IntOpenHashMap<>();
+ public OneObjectKeyGroupIdGenerator(int numGroupsLimit, int initialCapacity)
{
+ _groupIdMap = new Object2IntOpenHashMap<>(initialCapacity);
_groupIdMap.defaultReturnValue(INVALID_ID);
_numGroupsLimit = numGroupsLimit;
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/TwoKeysGroupIdGenerator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/TwoKeysGroupIdGenerator.java
index 21e8fcf144..0f31dcb4fb 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/TwoKeysGroupIdGenerator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/TwoKeysGroupIdGenerator.java
@@ -33,8 +33,9 @@ public class TwoKeysGroupIdGenerator implements
GroupIdGenerator {
private final ValueToIdMap _secondKeyToIdMap;
private final int _numGroupsLimit;
- public TwoKeysGroupIdGenerator(ColumnDataType firstKeyType, ColumnDataType
secondKeyType, int numGroupsLimit) {
- _groupIdMap = new Long2IntOpenHashMap();
+ public TwoKeysGroupIdGenerator(ColumnDataType firstKeyType,
+ ColumnDataType secondKeyType, int numGroupsLimit, int initialCapacity) {
+ _groupIdMap = new Long2IntOpenHashMap(initialCapacity);
_groupIdMap.defaultReturnValue(INVALID_ID);
_firstKeyToIdMap = ValueToIdMapFactory.get(firstKeyType.toDataType());
_secondKeyToIdMap = ValueToIdMapFactory.get(secondKeyType.toDataType());
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 4032042ca7..bdf1cbed0f 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -509,6 +509,7 @@ public class CommonConstants {
public static final String NUM_GROUPS_LIMIT = "numGroupsLimit";
public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY =
"maxInitialResultHolderCapacity";
public static final String MIN_INITIAL_INDEXED_TABLE_CAPACITY =
"minInitialIndexedTableCapacity";
+ public static final String MSE_MAX_INITIAL_RESULT_HOLDER_CAPACITY =
"mseMaxInitialResultHolderCapacity";
public static final String GROUP_TRIM_THRESHOLD = "groupTrimThreshold";
public static final String STAGE_PARALLELISM = "stageParallelism";
@@ -764,6 +765,8 @@ public class CommonConstants {
"pinot.server.query.executor.group.trim.size";
public static final String
CONFIG_OF_QUERY_EXECUTOR_MAX_INITIAL_RESULT_HOLDER_CAPACITY =
"pinot.server.query.executor.max.init.group.holder.capacity";
+ public static final String
CONFIG_OF_MSE_MAX_INITIAL_RESULT_HOLDER_CAPACITY =
+ "pinot.server.mse.max.init.group.holder.capacity";
public static final String
CONFIG_OF_QUERY_EXECUTOR_MIN_INITIAL_INDEXED_TABLE_CAPACITY =
"pinot.server.query.executor.min.init.indexed.table.capacity";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]