tarun11Mavani commented on code in PR #18760:
URL: https://github.com/apache/pinot/pull/18760#discussion_r3466398850


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/BitmapResultExtractionStrategy.java:
##########
@@ -42,14 +49,32 @@ public List<RoaringBitmap> 
extractIntermediateResult(DictIdsWrapper dictIdsWrapp
       }
       return result;
     }
-    Dictionary dictionary = dictIdsWrapper._dictionary;
     List<RoaringBitmap> result = new ArrayList<>(_numSteps);
-    for (RoaringBitmap dictIdBitmap : dictIdsWrapper._stepsBitmaps) {
-      result.add(convertToValueBitmap(dictionary, dictIdBitmap));
+    if (dictIdsWrapper.isMultiKey()) {
+      for (RoaringBitmap compositeIdBitmap : dictIdsWrapper._stepsBitmaps) {
+        result.add(convertCompositeToValueBitmap(dictIdsWrapper, 
compositeIdBitmap));
+      }
+    } else {
+      Dictionary dictionary = dictIdsWrapper._dictionaries[0];
+      for (RoaringBitmap dictIdBitmap : dictIdsWrapper._stepsBitmaps) {
+        result.add(convertToValueBitmap(dictionary, dictIdBitmap));
+      }
     }
     return result;
   }
 
+  private RoaringBitmap convertCompositeToValueBitmap(DictIdsWrapper wrapper, 
RoaringBitmap compositeIdBitmap) {
+    RoaringBitmap valueBitmap = new RoaringBitmap();
+    PeekableIntIterator iterator = compositeIdBitmap.getIntIterator();
+    int numKeys = wrapper._dictionaries.length;
+    int[] dictIds = new int[numKeys];
+    while (iterator.hasNext()) {
+      wrapper.reverseCompositeId(iterator.next(), dictIds);
+      valueBitmap.add(DictIdsWrapper.toCompositeString(wrapper._dictionaries, 
dictIds).hashCode());

Review Comment:
   Make sense. I'll replace the toCompositeString().hashCode() with a direct 
hash-combining loop over the dictionary values. 
   



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/DictIdsWrapper.java:
##########
@@ -18,19 +18,151 @@
  */
 package org.apache.pinot.core.query.aggregation.function.funnel;
 
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.roaringbitmap.RoaringBitmap;
 
 
+/**
+ * Holds per-step RoaringBitmaps keyed by correlation dictionary IDs.
+ *
+ * <p>For single-key CORRELATE_BY, stores raw dictionary IDs directly in the 
bitmaps (compact, fits in one
+ * RoaringBitmap container for typical segment sizes).
+ *
+ * <p>For multi-key CORRELATE_BY, composite IDs are assigned via stride-based 
arithmetic (when the combined key
+ * space fits in int) or a HashMap fallback for large key spaces.
+ */
 final class DictIdsWrapper {
-  final Dictionary _dictionary;
+  final Dictionary[] _dictionaries;
   final RoaringBitmap[] _stepsBitmaps;
 
+  // Stride-based composite mapping (non-null only for multi-key when product 
of dict sizes fits in int)
+  private final int[] _strides;
+
+  // HashMap-based composite mapping (non-null only for multi-key when stride 
overflows int)
+  private final Map<IntArrayList, Integer> _compositeKeyMap;
+  private final List<int[]> _compositeKeyReverse;
+  private final IntArrayList _lookupKey;
+
   DictIdsWrapper(int numSteps, Dictionary dictionary) {
-    _dictionary = dictionary;
+    _dictionaries = new Dictionary[]{dictionary};
     _stepsBitmaps = new RoaringBitmap[numSteps];
     for (int n = 0; n < numSteps; n++) {
       _stepsBitmaps[n] = new RoaringBitmap();
     }
+    _strides = null;
+    _compositeKeyMap = null;
+    _compositeKeyReverse = null;
+    _lookupKey = null;
+  }
+
+  DictIdsWrapper(int numSteps, Dictionary[] dictionaries) {
+    _dictionaries = dictionaries;
+    _stepsBitmaps = new RoaringBitmap[numSteps];
+    for (int n = 0; n < numSteps; n++) {
+      _stepsBitmaps[n] = new RoaringBitmap();
+    }
+
+    if (dictionaries.length > 1) {
+      long totalSpace = 1;
+      boolean fitsInInt = true;
+      for (Dictionary d : dictionaries) {
+        totalSpace *= d.length();
+        if (totalSpace > Integer.MAX_VALUE) {
+          fitsInInt = false;
+          break;
+        }
+      }
+
+      if (fitsInInt) {
+        _strides = new int[dictionaries.length];
+        _strides[dictionaries.length - 1] = 1;
+        for (int k = dictionaries.length - 2; k >= 0; k--) {
+          _strides[k] = _strides[k + 1] * dictionaries[k + 1].length();
+        }
+        _compositeKeyMap = null;
+        _compositeKeyReverse = null;
+        _lookupKey = null;
+      } else {
+        _strides = null;
+        _compositeKeyMap = new HashMap<>();
+        _compositeKeyReverse = new ArrayList<>();
+        _lookupKey = new IntArrayList(dictionaries.length);
+      }
+    } else {
+      _strides = null;
+      _compositeKeyMap = null;
+      _compositeKeyReverse = null;
+      _lookupKey = null;
+    }
+  }
+
+  boolean isMultiKey() {
+    return _dictionaries.length > 1;
+  }
+
+  boolean isHashMapPath() {
+    return _compositeKeyMap != null;
+  }
+
+  /**
+   * Maps a tuple of per-column dictionary IDs to a single composite int 
suitable for RoaringBitmap.
+   * Only used for multi-key; for single-key, callers should add the dictId 
directly.
+   */
+  int getCompositeCorrelationId(int[] dictIds) {
+    if (_strides != null) {
+      int id = 0;
+      for (int k = 0; k < dictIds.length; k++) {
+        id += dictIds[k] * _strides[k];
+      }
+      return id;
+    }
+    _lookupKey.clear();
+    for (int dictId : dictIds) {
+      _lookupKey.add(dictId);
+    }
+    Integer existingId = _compositeKeyMap.get(_lookupKey);
+    if (existingId != null) {
+      return existingId;
+    }
+    IntArrayList insertKey = new IntArrayList(dictIds);

Review Comment:
   agreed. 
   If we adopt the approach from above comment, this entire HashMap path goes 
away. If we keep it, I can at least eliminate the double-store (IntArrayList + 
dictIds.clone()). Will address based on how we resolve comment 2.



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/SortedAggregationResult.java:
##########
@@ -50,7 +78,50 @@ public void add(int step, int correlationId) {
     _correlatedSteps[step] = true;
   }
 
+  /**
+   * Multi-key add. Data must be sorted by correlationIds[0] (primary key).
+   * Secondary keys are tracked via HashMap within each primary-key group.
+   *
+   * <p>Within a primary-key group, rows for the same (primary, secondary) 
combination may appear
+   * non-contiguously (e.g. interleaved with other secondary keys). This is 
handled correctly
+   * because the HashMap accumulates all step observations per secondary key 
regardless of row order.
+   */
+  public void addMultiKey(int step, int[] correlationIds) {
+    int primaryId = correlationIds[0];
+    if (primaryId != _lastPrimaryId) {
+      flushMultiKeyGroup();
+      _lastPrimaryId = primaryId;
+      _secondaryKeySteps.clear();
+    }
+
+    _lookupKey.clear();
+    for (int k = 1; k < correlationIds.length; k++) {
+      _lookupKey.add(correlationIds[k]);
+    }
+
+    boolean[] steps = _secondaryKeySteps.get(_lookupKey);
+    if (steps == null) {
+      steps = new boolean[_numSteps];
+      _secondaryKeySteps.put(new IntArrayList(_lookupKey), steps);

Review Comment:
   The clone is needed because _lookupKey is a reusable buffer that gets 
clear()+add() on every row. Without the clone, the HashMap key mutates in-place 
on the next row. 



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/SortedAggregationResult.java:
##########
@@ -50,7 +78,50 @@ public void add(int step, int correlationId) {
     _correlatedSteps[step] = true;
   }
 
+  /**
+   * Multi-key add. Data must be sorted by correlationIds[0] (primary key).
+   * Secondary keys are tracked via HashMap within each primary-key group.
+   *
+   * <p>Within a primary-key group, rows for the same (primary, secondary) 
combination may appear
+   * non-contiguously (e.g. interleaved with other secondary keys). This is 
handled correctly
+   * because the HashMap accumulates all step observations per secondary key 
regardless of row order.
+   */
+  public void addMultiKey(int step, int[] correlationIds) {
+    int primaryId = correlationIds[0];
+    if (primaryId != _lastPrimaryId) {
+      flushMultiKeyGroup();
+      _lastPrimaryId = primaryId;
+      _secondaryKeySteps.clear();
+    }
+
+    _lookupKey.clear();

Review Comment:
   `correlationIds` includes the primary key at index 0 (we only need secondary 
keys from index 1+), and `wrap()` aliases the underlying array which gets 
mutated on the next row



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/DictIdsWrapper.java:
##########
@@ -18,19 +18,151 @@
  */
 package org.apache.pinot.core.query.aggregation.function.funnel;
 
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.roaringbitmap.RoaringBitmap;
 
 
+/**
+ * Holds per-step RoaringBitmaps keyed by correlation dictionary IDs.
+ *
+ * <p>For single-key CORRELATE_BY, stores raw dictionary IDs directly in the 
bitmaps (compact, fits in one
+ * RoaringBitmap container for typical segment sizes).
+ *
+ * <p>For multi-key CORRELATE_BY, composite IDs are assigned via stride-based 
arithmetic (when the combined key
+ * space fits in int) or a HashMap fallback for large key spaces.
+ */
 final class DictIdsWrapper {
-  final Dictionary _dictionary;
+  final Dictionary[] _dictionaries;
   final RoaringBitmap[] _stepsBitmaps;
 
+  // Stride-based composite mapping (non-null only for multi-key when product 
of dict sizes fits in int)
+  private final int[] _strides;
+
+  // HashMap-based composite mapping (non-null only for multi-key when stride 
overflows int)
+  private final Map<IntArrayList, Integer> _compositeKeyMap;
+  private final List<int[]> _compositeKeyReverse;
+  private final IntArrayList _lookupKey;
+
   DictIdsWrapper(int numSteps, Dictionary dictionary) {
-    _dictionary = dictionary;
+    _dictionaries = new Dictionary[]{dictionary};
     _stepsBitmaps = new RoaringBitmap[numSteps];
     for (int n = 0; n < numSteps; n++) {
       _stepsBitmaps[n] = new RoaringBitmap();
     }
+    _strides = null;
+    _compositeKeyMap = null;
+    _compositeKeyReverse = null;
+    _lookupKey = null;
+  }
+
+  DictIdsWrapper(int numSteps, Dictionary[] dictionaries) {
+    _dictionaries = dictionaries;
+    _stepsBitmaps = new RoaringBitmap[numSteps];
+    for (int n = 0; n < numSteps; n++) {
+      _stepsBitmaps[n] = new RoaringBitmap();
+    }
+
+    if (dictionaries.length > 1) {
+      long totalSpace = 1;
+      boolean fitsInInt = true;
+      for (Dictionary d : dictionaries) {
+        totalSpace *= d.length();
+        if (totalSpace > Integer.MAX_VALUE) {
+          fitsInInt = false;
+          break;
+        }
+      }
+
+      if (fitsInInt) {
+        _strides = new int[dictionaries.length];
+        _strides[dictionaries.length - 1] = 1;
+        for (int k = dictionaries.length - 2; k >= 0; k--) {
+          _strides[k] = _strides[k + 1] * dictionaries[k + 1].length();
+        }
+        _compositeKeyMap = null;
+        _compositeKeyReverse = null;
+        _lookupKey = null;
+      } else {
+        _strides = null;
+        _compositeKeyMap = new HashMap<>();
+        _compositeKeyReverse = new ArrayList<>();
+        _lookupKey = new IntArrayList(dictionaries.length);
+      }
+    } else {
+      _strides = null;
+      _compositeKeyMap = null;
+      _compositeKeyReverse = null;
+      _lookupKey = null;
+    }
+  }
+
+  boolean isMultiKey() {
+    return _dictionaries.length > 1;
+  }
+
+  boolean isHashMapPath() {
+    return _compositeKeyMap != null;
+  }
+
+  /**
+   * Maps a tuple of per-column dictionary IDs to a single composite int 
suitable for RoaringBitmap.
+   * Only used for multi-key; for single-key, callers should add the dictId 
directly.
+   */
+  int getCompositeCorrelationId(int[] dictIds) {

Review Comment:
   To make sure I understand: with `((long) primaryDictId << 32) | 
hash32(secondaryDictIds)`, the result is a 64-bit value, but RoaringBitmap only 
stores 32-bit ints. Are you suggesting we: 
   (a) use Roaring64Bitmap instead, 
   (b) hash/cast the long down to 32 bits for the bitmap, 
   or (c) something else? 
   Also — this makes the set strategy and partitioned-bitmap strategy 
approximate for multi-key (they're exact today since composite IDs are 
collision-free). We need to agree on this and document it. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to