This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a3ebd0c Use thread local for groupby raw key holders (#5419)
a3ebd0c is described below
commit a3ebd0c5981d692185d7c79a2ff1e8d682b5f069
Author: Xiang Fu <[email protected]>
AuthorDate: Wed May 20 20:50:58 2020 -0700
Use thread local for groupby raw key holders (#5419)
* Use thread local for groupby raw key holders
* Adding more optimization for map initial size and discard size
---
.../groupby/DefaultGroupByExecutor.java | 6 +-
.../groupby/DictionaryBasedGroupKeyGenerator.java | 66 ++++++++++++++++++++--
.../DictionaryBasedGroupKeyGeneratorTest.java | 28 ++++++---
3 files changed, 84 insertions(+), 16 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java
index a2d561a..f73704d 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.query.aggregation.groupby;
+import java.util.HashMap;
import java.util.Map;
import org.apache.pinot.common.request.transform.TransformExpressionTree;
import org.apache.pinot.core.common.BlockValSet;
@@ -45,6 +46,9 @@ public class DefaultGroupByExecutor implements
GroupByExecutor {
private static final ThreadLocal<int[][]> THREAD_LOCAL_MV_GROUP_KEYS =
ThreadLocal.withInitial(() -> new
int[DocIdSetPlanNode.MAX_DOC_PER_CALL][]);
+ // Thread local (reusable) hashMap as holder for group keys
+ private static final ThreadLocal<Map>
THREAD_LOCAL_DICTIONARY_BASED_GROUP_KEY_HOLDERS =
+ ThreadLocal.withInitial(() -> new HashMap());
protected final AggregationFunction[] _aggregationFunctions;
protected final GroupKeyGenerator _groupKeyGenerator;
protected final GroupByResultHolder[] _groupByResultHolders;
@@ -86,7 +90,7 @@ public class DefaultGroupByExecutor implements
GroupByExecutor {
}
} else {
_groupKeyGenerator = new
DictionaryBasedGroupKeyGenerator(transformOperator, groupByExpressions,
numGroupsLimit,
- maxInitialResultHolderCapacity);
+ maxInitialResultHolderCapacity,
THREAD_LOCAL_DICTIONARY_BASED_GROUP_KEY_HOLDERS.get());
}
// Initialize result holders
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java
index c163ffd..4ce030a 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java
@@ -26,6 +26,7 @@ import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import it.unimi.dsi.fastutil.objects.ObjectIterator;
import java.util.Iterator;
+import java.util.Map;
import java.util.NoSuchElementException;
import javax.annotation.Nonnull;
import org.apache.pinot.common.request.transform.TransformExpressionTree;
@@ -61,7 +62,8 @@ import org.apache.pinot.core.segment.index.readers.Dictionary;
* bounded by the number of groups limit (globalGroupIdUpperBound is always
smaller or equal to numGroupsLimit).
*/
public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
- private final static int DEFAULT_HASH_MAP_INITIAL_SIZE = 16;
+ private final static int INITIAL_MAP_SIZE = 256;
+ private final static int MAX_CACHING_MAP_SIZE = 1048576;
private final TransformExpressionTree[] _groupByExpressions;
private final int _numGroupByExpressions;
private final int[] _cardinalities;
@@ -78,7 +80,8 @@ public class DictionaryBasedGroupKeyGenerator implements
GroupKeyGenerator {
private final RawKeyHolder _rawKeyHolder;
public DictionaryBasedGroupKeyGenerator(TransformOperator transformOperator,
- TransformExpressionTree[] groupByExpressions, int numGroupsLimit, int
arrayBasedThreshold) {
+ TransformExpressionTree[] groupByExpressions, int numGroupsLimit, int
arrayBasedThreshold,
+ Map mapBasedRawKeyHolders) {
assert numGroupsLimit >= arrayBasedThreshold;
_groupByExpressions = groupByExpressions;
@@ -107,18 +110,32 @@ public class DictionaryBasedGroupKeyGenerator implements
GroupKeyGenerator {
_isSingleValueColumn[i] =
transformOperator.getResultMetadata(groupByExpression).isSingleValue();
}
-
if (longOverflow) {
_globalGroupIdUpperBound = numGroupsLimit;
- _rawKeyHolder = new ArrayMapBasedHolder(DEFAULT_HASH_MAP_INITIAL_SIZE);
+ Object mapInternal =
mapBasedRawKeyHolders.computeIfAbsent(ArrayMapBasedHolder.class.getName(),
+ o -> new ArrayMapBasedHolder(INITIAL_MAP_SIZE).getInternal());
+ _rawKeyHolder = new ArrayMapBasedHolder(mapInternal);
+ if (((Object2IntOpenHashMap)mapInternal).size() > MAX_CACHING_MAP_SIZE) {
+ mapBasedRawKeyHolders.put(ArrayMapBasedHolder.class.getName(), new
ArrayMapBasedHolder(INITIAL_MAP_SIZE).getInternal());
+ }
} else {
if (cardinalityProduct > Integer.MAX_VALUE) {
_globalGroupIdUpperBound = numGroupsLimit;
- _rawKeyHolder = new LongMapBasedHolder(DEFAULT_HASH_MAP_INITIAL_SIZE);
+ Object mapInternal =
mapBasedRawKeyHolders.computeIfAbsent(LongMapBasedHolder.class.getName(),
+ o -> new LongMapBasedHolder(INITIAL_MAP_SIZE).getInternal());
+ _rawKeyHolder = new LongMapBasedHolder(mapInternal);
+ if (((Long2IntOpenHashMap)mapInternal).size() > MAX_CACHING_MAP_SIZE) {
+ mapBasedRawKeyHolders.put(ArrayMapBasedHolder.class.getName(), new
ArrayMapBasedHolder(INITIAL_MAP_SIZE).getInternal());
+ }
} else {
_globalGroupIdUpperBound = Math.min((int) cardinalityProduct,
numGroupsLimit);
if (cardinalityProduct > arrayBasedThreshold) {
- _rawKeyHolder = new IntMapBasedHolder(DEFAULT_HASH_MAP_INITIAL_SIZE);
+ Object mapInternal =
mapBasedRawKeyHolders.computeIfAbsent(IntMapBasedHolder.class.getName(),
+ o -> new IntMapBasedHolder(INITIAL_MAP_SIZE).getInternal());
+ _rawKeyHolder = new IntMapBasedHolder(mapInternal);
+ if (((Int2IntOpenHashMap)mapInternal).size() > MAX_CACHING_MAP_SIZE)
{
+ mapBasedRawKeyHolders.put(ArrayMapBasedHolder.class.getName(), new
ArrayMapBasedHolder(INITIAL_MAP_SIZE).getInternal());
+ }
} else {
_rawKeyHolder = new ArrayBasedHolder();
}
@@ -191,6 +208,8 @@ public class DictionaryBasedGroupKeyGenerator implements
GroupKeyGenerator {
* @return Upper bound of group id inside the holder
*/
int getGroupIdUpperBound();
+
+ Object getInternal();
}
private class ArrayBasedHolder implements RawKeyHolder {
@@ -225,6 +244,11 @@ public class DictionaryBasedGroupKeyGenerator implements
GroupKeyGenerator {
return _globalGroupIdUpperBound;
}
+ @Override
+ public Object getInternal() {
+ return _flags;
+ }
+
@Nonnull
@Override
public Iterator<GroupKey> iterator() {
@@ -269,6 +293,11 @@ public class DictionaryBasedGroupKeyGenerator implements
GroupKeyGenerator {
_rawKeyToGroupIdMap.defaultReturnValue(INVALID_ID);
}
+ public IntMapBasedHolder(Object hashMap) {
+ _rawKeyToGroupIdMap = (Int2IntOpenHashMap) hashMap;
+ _rawKeyToGroupIdMap.clear();
+ }
+
@Override
public void processSingleValue(int numDocs, @Nonnull int[] outGroupIds) {
for (int i = 0; i < numDocs; i++) {
@@ -308,6 +337,11 @@ public class DictionaryBasedGroupKeyGenerator implements
GroupKeyGenerator {
return _numGroups;
}
+ @Override
+ public Object getInternal() {
+ return _rawKeyToGroupIdMap;
+ }
+
@Nonnull
@Override
public Iterator<GroupKey> iterator() {
@@ -448,6 +482,11 @@ public class DictionaryBasedGroupKeyGenerator implements
GroupKeyGenerator {
_rawKeyToGroupIdMap.defaultReturnValue(INVALID_ID);
}
+ public LongMapBasedHolder(Object rawKeyToGroupIdMap) {
+ _rawKeyToGroupIdMap = (Long2IntOpenHashMap) rawKeyToGroupIdMap;
+ _rawKeyToGroupIdMap.clear();
+ }
+
@Override
public void processSingleValue(int numDocs, @Nonnull int[] outGroupIds) {
for (int i = 0; i < numDocs; i++) {
@@ -488,6 +527,11 @@ public class DictionaryBasedGroupKeyGenerator implements
GroupKeyGenerator {
return _numGroups;
}
+ @Override
+ public Object getInternal() {
+ return _rawKeyToGroupIdMap;
+ }
+
@Nonnull
@Override
public Iterator<GroupKey> iterator() {
@@ -619,6 +663,11 @@ public class DictionaryBasedGroupKeyGenerator implements
GroupKeyGenerator {
_rawKeyToGroupIdMap.defaultReturnValue(INVALID_ID);
}
+ public ArrayMapBasedHolder(Object rawKeyToGroupIdMap) {
+ _rawKeyToGroupIdMap = (Object2IntOpenHashMap<IntArray>)
rawKeyToGroupIdMap;
+ _rawKeyToGroupIdMap.clear();
+ }
+
@Override
public void processSingleValue(int numDocs, @Nonnull int[] outGroupIds) {
for (int i = 0; i < numDocs; i++) {
@@ -659,6 +708,11 @@ public class DictionaryBasedGroupKeyGenerator implements
GroupKeyGenerator {
return _numGroups;
}
+ @Override
+ public Object getInternal() {
+ return _rawKeyToGroupIdMap;
+ }
+
@Nonnull
@Override
public Iterator<GroupKey> iterator() {
diff --git
a/pinot-core/src/test/java/org/apache/pinot/query/aggregation/groupby/DictionaryBasedGroupKeyGeneratorTest.java
b/pinot-core/src/test/java/org/apache/pinot/query/aggregation/groupby/DictionaryBasedGroupKeyGeneratorTest.java
index 2223c32..4fffcef 100644
---
a/pinot-core/src/test/java/org/apache/pinot/query/aggregation/groupby/DictionaryBasedGroupKeyGeneratorTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/query/aggregation/groupby/DictionaryBasedGroupKeyGeneratorTest.java
@@ -74,6 +74,8 @@ public class DictionaryBasedGroupKeyGeneratorTest {
private static final String FILTER_COLUMN = "docId";
private static final String[] SV_COLUMNS = {"s1", "s2", "s3", "s4", "s5",
"s6", "s7", "s8", "s9", "s10"};
private static final String[] MV_COLUMNS = {"m1", "m2"};
+ private static final ThreadLocal<Map>
THREAD_LOCAL_DICTIONARY_BASED_GROUP_KEY_HOLDERS =
+ ThreadLocal.withInitial(() -> new HashMap());
private final long _randomSeed = System.currentTimeMillis();
private final Random _random = new Random(_randomSeed);
@@ -168,7 +170,8 @@ public class DictionaryBasedGroupKeyGeneratorTest {
DictionaryBasedGroupKeyGenerator dictionaryBasedGroupKeyGenerator =
new DictionaryBasedGroupKeyGenerator(_transformOperator,
getExpressions(groupByColumns),
InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT,
-
InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
+ InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY,
+ THREAD_LOCAL_DICTIONARY_BASED_GROUP_KEY_HOLDERS.get());
assertEquals(dictionaryBasedGroupKeyGenerator.getGlobalGroupKeyUpperBound(),
UNIQUE_ROWS, _errorMessage);
assertEquals(dictionaryBasedGroupKeyGenerator.getCurrentGroupKeyUpperBound(),
UNIQUE_ROWS, _errorMessage);
@@ -188,7 +191,8 @@ public class DictionaryBasedGroupKeyGeneratorTest {
DictionaryBasedGroupKeyGenerator dictionaryBasedGroupKeyGenerator =
new DictionaryBasedGroupKeyGenerator(_transformOperator,
getExpressions(groupByColumns),
InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT,
-
InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
+ InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY,
+ THREAD_LOCAL_DICTIONARY_BASED_GROUP_KEY_HOLDERS.get());
assertEquals(dictionaryBasedGroupKeyGenerator.getGlobalGroupKeyUpperBound(),
InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, _errorMessage);
assertEquals(dictionaryBasedGroupKeyGenerator.getCurrentGroupKeyUpperBound(),
0, _errorMessage);
@@ -209,7 +213,8 @@ public class DictionaryBasedGroupKeyGeneratorTest {
DictionaryBasedGroupKeyGenerator dictionaryBasedGroupKeyGenerator =
new DictionaryBasedGroupKeyGenerator(_transformOperator,
getExpressions(groupByColumns),
InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT,
-
InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
+ InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY,
+ THREAD_LOCAL_DICTIONARY_BASED_GROUP_KEY_HOLDERS.get());
assertEquals(dictionaryBasedGroupKeyGenerator.getGlobalGroupKeyUpperBound(),
InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, _errorMessage);
assertEquals(dictionaryBasedGroupKeyGenerator.getCurrentGroupKeyUpperBound(),
0, _errorMessage);
@@ -230,7 +235,8 @@ public class DictionaryBasedGroupKeyGeneratorTest {
DictionaryBasedGroupKeyGenerator dictionaryBasedGroupKeyGenerator =
new DictionaryBasedGroupKeyGenerator(_transformOperator,
getExpressions(groupByColumns),
InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT,
-
InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
+ InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY,
+ THREAD_LOCAL_DICTIONARY_BASED_GROUP_KEY_HOLDERS.get());
assertEquals(dictionaryBasedGroupKeyGenerator.getGlobalGroupKeyUpperBound(),
InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, _errorMessage);
assertEquals(dictionaryBasedGroupKeyGenerator.getCurrentGroupKeyUpperBound(),
0, _errorMessage);
@@ -265,7 +271,8 @@ public class DictionaryBasedGroupKeyGeneratorTest {
DictionaryBasedGroupKeyGenerator dictionaryBasedGroupKeyGenerator =
new DictionaryBasedGroupKeyGenerator(_transformOperator,
getExpressions(groupByColumns),
InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT,
-
InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
+ InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY,
+ THREAD_LOCAL_DICTIONARY_BASED_GROUP_KEY_HOLDERS.get());
int groupKeyUpperBound =
dictionaryBasedGroupKeyGenerator.getGlobalGroupKeyUpperBound();
assertEquals(dictionaryBasedGroupKeyGenerator.getCurrentGroupKeyUpperBound(),
groupKeyUpperBound, _errorMessage);
@@ -286,7 +293,8 @@ public class DictionaryBasedGroupKeyGeneratorTest {
DictionaryBasedGroupKeyGenerator dictionaryBasedGroupKeyGenerator =
new DictionaryBasedGroupKeyGenerator(_transformOperator,
getExpressions(groupByColumns),
InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT,
-
InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
+ InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY,
+ THREAD_LOCAL_DICTIONARY_BASED_GROUP_KEY_HOLDERS.get());
assertEquals(dictionaryBasedGroupKeyGenerator.getGlobalGroupKeyUpperBound(),
InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, _errorMessage);
assertEquals(dictionaryBasedGroupKeyGenerator.getCurrentGroupKeyUpperBound(),
0, _errorMessage);
@@ -308,7 +316,8 @@ public class DictionaryBasedGroupKeyGeneratorTest {
DictionaryBasedGroupKeyGenerator dictionaryBasedGroupKeyGenerator =
new DictionaryBasedGroupKeyGenerator(_transformOperator,
getExpressions(groupByColumns),
InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT,
-
InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
+ InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY,
+ THREAD_LOCAL_DICTIONARY_BASED_GROUP_KEY_HOLDERS.get());
assertEquals(dictionaryBasedGroupKeyGenerator.getGlobalGroupKeyUpperBound(),
InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, _errorMessage);
assertEquals(dictionaryBasedGroupKeyGenerator.getCurrentGroupKeyUpperBound(),
0, _errorMessage);
@@ -330,7 +339,8 @@ public class DictionaryBasedGroupKeyGeneratorTest {
DictionaryBasedGroupKeyGenerator dictionaryBasedGroupKeyGenerator =
new DictionaryBasedGroupKeyGenerator(_transformOperator,
getExpressions(groupByColumns),
InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT,
-
InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
+ InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY,
+ THREAD_LOCAL_DICTIONARY_BASED_GROUP_KEY_HOLDERS.get());
assertEquals(dictionaryBasedGroupKeyGenerator.getGlobalGroupKeyUpperBound(),
InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, _errorMessage);
assertEquals(dictionaryBasedGroupKeyGenerator.getCurrentGroupKeyUpperBound(),
0, _errorMessage);
@@ -350,7 +360,7 @@ public class DictionaryBasedGroupKeyGeneratorTest {
// NOTE: arrayBasedThreshold must be smaller or equal to numGroupsLimit
DictionaryBasedGroupKeyGenerator dictionaryBasedGroupKeyGenerator =
new DictionaryBasedGroupKeyGenerator(_transformOperator,
getExpressions(groupByColumns), numGroupsLimit,
- numGroupsLimit);
+ numGroupsLimit,
THREAD_LOCAL_DICTIONARY_BASED_GROUP_KEY_HOLDERS.get());
assertEquals(dictionaryBasedGroupKeyGenerator.getGlobalGroupKeyUpperBound(),
numGroupsLimit, _errorMessage);
assertEquals(dictionaryBasedGroupKeyGenerator.getCurrentGroupKeyUpperBound(),
0, _errorMessage);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]