siddharthteotia commented on a change in pull request #6559:
URL: https://github.com/apache/incubator-pinot/pull/6559#discussion_r573305843
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java
##########
@@ -61,8 +61,16 @@
* bounded by the number of groups limit (globalGroupIdUpperBound is always
smaller or equal to numGroupsLimit).
*/
public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
- private final static int INITIAL_MAP_SIZE = 256;
- private final static int MAX_CACHING_MAP_SIZE = 1048576;
+ // NOTE: map size = map capacity (power of 2) * load factor
+ private static final int INITIAL_MAP_SIZE = (int) ((1 << 10) * 0.75f);
+ private static final int MAX_CACHING_MAP_SIZE = (int) ((1 << 20) * 0.75f);
+
+ private static final ThreadLocal<IntGroupIdMap> THREAD_LOCAL_INT_MAP =
ThreadLocal.withInitial(IntGroupIdMap::new);
+ private static final ThreadLocal<Long2IntOpenHashMap> THREAD_LOCAL_LONG_MAP =
Review comment:
The Long2IntOpenHashMap can also be reimplemented in a different format
for better locality and cache friendliness right. Similar to how this PR
implements IntGroupIdMap ?
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java
##########
@@ -108,35 +116,35 @@ public DictionaryBasedGroupKeyGenerator(TransformOperator
transformOperator, Exp
_isSingleValueColumn[i] =
transformOperator.getResultMetadata(groupByExpression).isSingleValue();
}
+ // TODO: Clear the holder after processing the query instead of before
if (longOverflow) {
+ // ArrayMapBasedHolder
_globalGroupIdUpperBound = numGroupsLimit;
- Object mapInternal =
mapBasedRawKeyHolders.computeIfAbsent(ArrayMapBasedHolder.class.getName(),
- o -> new ArrayMapBasedHolder(INITIAL_MAP_SIZE).getInternal());
- _rawKeyHolder = new ArrayMapBasedHolder(mapInternal);
- if (((Object2IntOpenHashMap) mapInternal).size() > MAX_CACHING_MAP_SIZE)
{
- mapBasedRawKeyHolders
- .put(ArrayMapBasedHolder.class.getName(), new
ArrayMapBasedHolder(INITIAL_MAP_SIZE).getInternal());
+ Object2IntOpenHashMap<IntArray> groupIdMap =
THREAD_LOCAL_INT_ARRAY_MAP.get();
+ int size = groupIdMap.size();
+ groupIdMap.clear();
+ if (size > MAX_CACHING_MAP_SIZE) {
+ groupIdMap.trim();
}
+ _rawKeyHolder = new ArrayMapBasedHolder(groupIdMap);
} else {
if (cardinalityProduct > Integer.MAX_VALUE) {
+ // LongMapBasedHolder
_globalGroupIdUpperBound = numGroupsLimit;
- Object mapInternal =
mapBasedRawKeyHolders.computeIfAbsent(LongMapBasedHolder.class.getName(),
- o -> new LongMapBasedHolder(INITIAL_MAP_SIZE).getInternal());
- _rawKeyHolder = new LongMapBasedHolder(mapInternal);
- if (((Long2IntOpenHashMap) mapInternal).size() > MAX_CACHING_MAP_SIZE)
{
- mapBasedRawKeyHolders
- .put(ArrayMapBasedHolder.class.getName(), new
ArrayMapBasedHolder(INITIAL_MAP_SIZE).getInternal());
+ Long2IntOpenHashMap groupIdMap = THREAD_LOCAL_LONG_MAP.get();
+ int size = groupIdMap.size();
+ groupIdMap.clear();
+ if (size > MAX_CACHING_MAP_SIZE) {
+ groupIdMap.trim();
}
+ _rawKeyHolder = new LongMapBasedHolder(groupIdMap);
} else {
_globalGroupIdUpperBound = Math.min((int) cardinalityProduct,
numGroupsLimit);
if (cardinalityProduct > arrayBasedThreshold) {
- Object mapInternal =
mapBasedRawKeyHolders.computeIfAbsent(IntMapBasedHolder.class.getName(),
- o -> new IntMapBasedHolder(INITIAL_MAP_SIZE).getInternal());
- _rawKeyHolder = new IntMapBasedHolder(mapInternal);
- if (((Int2IntOpenHashMap) mapInternal).size() >
MAX_CACHING_MAP_SIZE) {
- mapBasedRawKeyHolders
- .put(ArrayMapBasedHolder.class.getName(), new
ArrayMapBasedHolder(INITIAL_MAP_SIZE).getInternal());
- }
+ // IntMapBasedHolder
+ IntGroupIdMap groupIdMap = THREAD_LOCAL_INT_MAP.get();
+ groupIdMap.clear();
Review comment:
What is the difference between trim() and clear() ? Can we use the same
API?
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java
##########
@@ -831,6 +777,156 @@ private String getGroupKey(IntArray rawKey) {
return groupKeyBuilder.toString();
}
+ /**
+ * Fast int-to-int hashmap with {@link #INVALID_ID} as the default return
value.
+ * <p>Different from {@link it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap},
this map uses one single array to store
+ * keys and values to reduce the cache miss.
+ */
+ @VisibleForTesting
+ public static class IntGroupIdMap {
+ private static final float LOAD_FACTOR = 0.75f;
+
+ private int[] _keyValueHolder;
+ private int _capacity;
+ private int _mask;
+ private int _maxNumEntries;
+ private int _size;
+
+ public IntGroupIdMap() {
+ _capacity = 1 << 10;
+ int holderSize = _capacity << 1;
Review comment:
So looks like there are 2048 slots in the keyValueHolder array and we
are going to use the lower order 12 bits of the incoming key for hash
computation to get to the index/slot. And we will resize when 750 slots are
full. Right ? How did we come up with these values of 1024, 2048 etc
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java
##########
@@ -831,6 +777,156 @@ private String getGroupKey(IntArray rawKey) {
return groupKeyBuilder.toString();
}
+ /**
+ * Fast int-to-int hashmap with {@link #INVALID_ID} as the default return
value.
+ * <p>Different from {@link it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap},
this map uses one single array to store
+ * keys and values to reduce the cache miss.
+ */
+ @VisibleForTesting
+ public static class IntGroupIdMap {
+ private static final float LOAD_FACTOR = 0.75f;
+
+ private int[] _keyValueHolder;
+ private int _capacity;
+ private int _mask;
+ private int _maxNumEntries;
+ private int _size;
+
+ public IntGroupIdMap() {
+ _capacity = 1 << 10;
+ int holderSize = _capacity << 1;
+ _keyValueHolder = new int[holderSize];
+ _mask = holderSize - 1;
+ _maxNumEntries = (int) (_capacity * LOAD_FACTOR);
+ }
+
+ public int size() {
+ return _size;
+ }
+
+ /**
+ * Returns the group id for the given raw key. Create a new group id if
the raw key does not exist and the group id
+ * upper bound is not reached.
+ */
+ public int getGroupId(int rawKey, int groupIdUpperBound) {
+ // NOTE: Key 0 is reserved as the null key. Use (rawKey + 1) as the
internal key because rawKey can never be -1.
+ int internalKey = rawKey + 1;
+ int index = (HashCommon.mix(internalKey) << 1) & _mask;
+ int key = _keyValueHolder[index];
+
+ // Handle hash hit separately for better performance
+ if (key == internalKey) {
+ return _keyValueHolder[index + 1];
+ }
+ if (key == 0) {
+ return _size < groupIdUpperBound ? addNewGroup(internalKey, index) :
INVALID_ID;
Review comment:
So `key == 0` implies we hashed to the empty array slot right ?
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java
##########
@@ -831,6 +777,156 @@ private String getGroupKey(IntArray rawKey) {
return groupKeyBuilder.toString();
}
+ /**
+ * Fast int-to-int hashmap with {@link #INVALID_ID} as the default return
value.
+ * <p>Different from {@link it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap},
this map uses one single array to store
+ * keys and values to reduce the cache miss.
+ */
+ @VisibleForTesting
+ public static class IntGroupIdMap {
+ private static final float LOAD_FACTOR = 0.75f;
+
+ private int[] _keyValueHolder;
+ private int _capacity;
+ private int _mask;
+ private int _maxNumEntries;
+ private int _size;
+
+ public IntGroupIdMap() {
+ _capacity = 1 << 10;
+ int holderSize = _capacity << 1;
+ _keyValueHolder = new int[holderSize];
+ _mask = holderSize - 1;
+ _maxNumEntries = (int) (_capacity * LOAD_FACTOR);
+ }
+
+ public int size() {
+ return _size;
+ }
+
+ /**
+ * Returns the group id for the given raw key. Create a new group id if
the raw key does not exist and the group id
+ * upper bound is not reached.
+ */
+ public int getGroupId(int rawKey, int groupIdUpperBound) {
+ // NOTE: Key 0 is reserved as the null key. Use (rawKey + 1) as the
internal key because rawKey can never be -1.
+ int internalKey = rawKey + 1;
+ int index = (HashCommon.mix(internalKey) << 1) & _mask;
+ int key = _keyValueHolder[index];
+
+ // Handle hash hit separately for better performance
+ if (key == internalKey) {
+ return _keyValueHolder[index + 1];
+ }
+ if (key == 0) {
+ return _size < groupIdUpperBound ? addNewGroup(internalKey, index) :
INVALID_ID;
+ }
+
+ // Hash collision
+ while (true) {
+ index = (index + 2) & _mask;
+ key = _keyValueHolder[index];
+ if (key == internalKey) {
+ return _keyValueHolder[index + 1];
+ }
+ if (key == 0) {
+ return _size < groupIdUpperBound ? addNewGroup(internalKey, index) :
INVALID_ID;
+ }
+ }
+ }
+
+ private int addNewGroup(int internalKey, int index) {
+ int groupId = _size++;
+ _keyValueHolder[index] = internalKey;
+ _keyValueHolder[index + 1] = groupId;
+ if (_size > _maxNumEntries) {
+ expand();
+ }
+ return groupId;
+ }
+
+ private void expand() {
+ _capacity <<= 1;
+ int holderSize = _capacity << 1;
+ int[] oldKeyValueHolder = _keyValueHolder;
+ _keyValueHolder = new int[holderSize];
+ _mask = holderSize - 1;
+ _maxNumEntries <<= 1;
+ int oldIndex = 0;
+ for (int i = 0; i < _size; i++) {
+ while (oldKeyValueHolder[oldIndex] == 0) {
+ oldIndex += 2;
+ }
+ int key = oldKeyValueHolder[oldIndex];
+ int value = oldKeyValueHolder[oldIndex + 1];
+ int newIndex = (HashCommon.mix(key) << 1) & _mask;
+ if (_keyValueHolder[newIndex] != 0) {
+ do {
+ newIndex = (newIndex + 2) & _mask;
+ } while (_keyValueHolder[newIndex] != 0);
+ }
+ _keyValueHolder[newIndex] = key;
+ _keyValueHolder[newIndex + 1] = value;
+ oldIndex += 2;
+ }
+ }
+
+ public Iterator<Entry> iterator() {
+ return new Iterator<Entry>() {
+ private final Entry _entry = new Entry();
+ private int _index;
+ private int _numRemainingEntries = _size;
+
+ @Override
+ public boolean hasNext() {
+ return _numRemainingEntries > 0;
+ }
+
+ @Override
+ public Entry next() {
+ int key;
+ while ((key = _keyValueHolder[_index]) == 0) {
+ _index += 2;
+ }
+ _entry._rawKey = key - 1;
+ _entry._groupId = _keyValueHolder[_index + 1];
+ _index += 2;
+ _numRemainingEntries--;
+ return _entry;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ /**
Review comment:
There is no trimming here like the way it happens in combiner where we
sort and trim for. We are just zeroing out the keyValueHolder array and
allocating a new array if size > MAX_CACHING_MAP_SIZE
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java
##########
@@ -831,6 +777,156 @@ private String getGroupKey(IntArray rawKey) {
return groupKeyBuilder.toString();
}
+ /**
+ * Fast int-to-int hashmap with {@link #INVALID_ID} as the default return
value.
+ * <p>Different from {@link it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap},
this map uses one single array to store
+ * keys and values to reduce the cache miss.
+ */
+ @VisibleForTesting
+ public static class IntGroupIdMap {
+ private static final float LOAD_FACTOR = 0.75f;
+
+ private int[] _keyValueHolder;
+ private int _capacity;
+ private int _mask;
+ private int _maxNumEntries;
+ private int _size;
+
+ public IntGroupIdMap() {
+ _capacity = 1 << 10;
+ int holderSize = _capacity << 1;
+ _keyValueHolder = new int[holderSize];
+ _mask = holderSize - 1;
+ _maxNumEntries = (int) (_capacity * LOAD_FACTOR);
+ }
+
+ public int size() {
+ return _size;
+ }
+
+ /**
+ * Returns the group id for the given raw key. Create a new group id if
the raw key does not exist and the group id
+ * upper bound is not reached.
+ */
+ public int getGroupId(int rawKey, int groupIdUpperBound) {
+ // NOTE: Key 0 is reserved as the null key. Use (rawKey + 1) as the
internal key because rawKey can never be -1.
+ int internalKey = rawKey + 1;
+ int index = (HashCommon.mix(internalKey) << 1) & _mask;
+ int key = _keyValueHolder[index];
+
+ // Handle hash hit separately for better performance
+ if (key == internalKey) {
+ return _keyValueHolder[index + 1];
+ }
+ if (key == 0) {
+ return _size < groupIdUpperBound ? addNewGroup(internalKey, index) :
INVALID_ID;
+ }
+
+ // Hash collision
+ while (true) {
+ index = (index + 2) & _mask;
+ key = _keyValueHolder[index];
+ if (key == internalKey) {
+ return _keyValueHolder[index + 1];
+ }
+ if (key == 0) {
+ return _size < groupIdUpperBound ? addNewGroup(internalKey, index) :
INVALID_ID;
+ }
+ }
+ }
+
+ private int addNewGroup(int internalKey, int index) {
+ int groupId = _size++;
+ _keyValueHolder[index] = internalKey;
+ _keyValueHolder[index + 1] = groupId;
+ if (_size > _maxNumEntries) {
+ expand();
+ }
+ return groupId;
+ }
+
+ private void expand() {
+ _capacity <<= 1;
+ int holderSize = _capacity << 1;
+ int[] oldKeyValueHolder = _keyValueHolder;
+ _keyValueHolder = new int[holderSize];
+ _mask = holderSize - 1;
+ _maxNumEntries <<= 1;
+ int oldIndex = 0;
+ for (int i = 0; i < _size; i++) {
+ while (oldKeyValueHolder[oldIndex] == 0) {
+ oldIndex += 2;
+ }
+ int key = oldKeyValueHolder[oldIndex];
+ int value = oldKeyValueHolder[oldIndex + 1];
+ int newIndex = (HashCommon.mix(key) << 1) & _mask;
+ if (_keyValueHolder[newIndex] != 0) {
+ do {
+ newIndex = (newIndex + 2) & _mask;
+ } while (_keyValueHolder[newIndex] != 0);
+ }
+ _keyValueHolder[newIndex] = key;
+ _keyValueHolder[newIndex + 1] = value;
+ oldIndex += 2;
+ }
+ }
+
+ public Iterator<Entry> iterator() {
+ return new Iterator<Entry>() {
+ private final Entry _entry = new Entry();
+ private int _index;
+ private int _numRemainingEntries = _size;
+
+ @Override
+ public boolean hasNext() {
+ return _numRemainingEntries > 0;
+ }
+
+ @Override
+ public Entry next() {
+ int key;
+ while ((key = _keyValueHolder[_index]) == 0) {
+ _index += 2;
+ }
+ _entry._rawKey = key - 1;
+ _entry._groupId = _keyValueHolder[_index + 1];
+ _index += 2;
+ _numRemainingEntries--;
+ return _entry;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ /**
+ * Clears the map and trims the map if the size is larger than the {@link
#MAX_CACHING_MAP_SIZE}.
+ */
+ public void clear() {
+ if (_size == 0) {
+ return;
+ }
+ if (_size <= MAX_CACHING_MAP_SIZE) {
+ Arrays.fill(_keyValueHolder, 0);
+ } else {
+ _capacity = 1 << 10;
Review comment:
(nit) this is same as the initialization in the constructor. consider
creating an init() function and moving this code in that function.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]