rohityadav1993 opened a new issue, #18667:
URL: https://github.com/apache/pinot/issues/18667

   # Proposal
   **Sorted Merge join**
   When the data from left input and right input is sorted by the join columns 
we can leverage [sorted merge 
join](https://en.wikipedia.org/wiki/Sort-merge_join) for a more scalable join 
computation by avoiding hash based joins (e.g. [ClickHouse full sort-merge 
join](https://clickhouse.com/blog/clickhouse-fully-supports-joins-full-sort-partial-merge-part3#full-sorting-merge-join)).
   
   **Sorted key group by**
   When data is sorted by the group-by keys in the aggregation query, we can 
avoid hash aggregations by computing the aggregation in a streaming manner 
(e.g. [MySQL GROUP BY 
Optimization](https://dev.mysql.com/doc/refman/8.4/en/group-by-optimization.html)).
   
   ## Why avoid hash join
   
   Hash join materializes the entire right side into a hash table before 
probing begins, so memory grows linearly with right-side row count 
https://github.com/apache/pinot/blob/9e2335de3c3f252eef1147b7af7f5e1d16fde403/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java#L186-L189.
 For multi-column join keys, Pinot falls back from primitive-typed lookup 
tables (`IntLookupTable`, `LongLookupTable`) to a generic `ObjectLookupTable` 
that allocates a composite key object per row, increasing GC pressure at scale. 
When data is already partitioned and sorted on the join key, a streaming merge 
join avoids the hash table entirely.
   
   ## Why avoid hash aggregation
   Hash-based GROUP BY in Pinot's MSE (MultistageGroupByExecutor) assigns each 
distinct group key a slot in a hash map — Long2IntOpenHashMap for single LONG 
keys, Object2IntOpenHashMap<FixedIntArray> for multi-column keys — and 
accumulates aggregation state for every group  simultaneously 
https://github.com/apache/pinot/blob/9e2335de3c3f252eef1147b7af7f5e1d16fde403/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java#L447-L451
 Memory grows with the number of distinct groups, which is why queries with 
high-cardinality keys require SET numGroupsLimit = 50000000 to avoid hitting 
the default cap.
                                                                                
                                
   When input rows are already sorted by the group-by key, none of that is 
necessary. E.g. MySQL calls this loose/tight index scan 
(https://dev.mysql.com/doc/refman/8.4/en/group-by-optimization.html): once rows 
are ordered by key, you only need to hold state for the current key — flush the 
accumulator to output when the key changes, reset, and continue. Memory is O(1) 
regardless of group count, and there are no hash collisions, no map resizes, 
and no FixedIntArray allocations per group.
   
   **Note:** *Can create two separate issues but starting with one as there are 
commonalities in improvement at leaf stage to achieve either.*
   
   ---
   ## POC
   
   We did a POC on a table (`user_events`) sorted on `correlation_id` within 
each segment and partitioned across servers using a Murmur hash on 
`correlation_id` (128 buckets). This ensures all events for a given user land 
on the same server.
   
   **Design choices in `SortedMergeJoinOperator`:**
   - Lazy block reading: blocks are fetched from left and right inputs on 
demand — only one block per side is held in memory at a time.
   - `LeafStageSortJoinRule.addSortToLeafStage()` injects a `PhysicalSort` with 
no fetch limit and no offset, so the leaf ORDER BY is unbounded by design.
   
   <details>
   
   <summary>Funnel count query using sorted join:</summary>
   
   ```sql
   SET useMultistageEngine = true;
   SET usePhysicalOptimizer = true;
   SET maxRowsInJoin = 1234194304;
   SET timeoutMs = 120000;
   
   WITH joined AS (
     SELECT /*+ joinOptions(join_strategy='sorted') */
       CAST(
         DATETIMECONVERT(
           CAST(step1.occurred_at_min AS BIGINT) * 1000,
           '1:MILLISECONDS:EPOCH', '1:MILLISECONDS:EPOCH', '600:SECONDS'
         ) AS BIGINT
       ) AS ts,
       step1.correlation_id AS u1,
       step2.correlation_id AS u2
     FROM user_events
       /*+ tableOptions(partition_key='correlation_id', 
partition_function='Murmur', partition_size='128') */
       AS step1
     LEFT JOIN user_events
       /*+ tableOptions(partition_key='correlation_id', 
partition_function='Murmur', partition_size='128') */
       AS step2
       ON step1.correlation_id = step2.correlation_id
       AND step2.name = 'addToCart'
       AND step2.occurred_at_min BETWEEN {start_epoch} AND {end_epoch}
     WHERE step1.name = 'checkout'
       AND step1.occurred_at_min BETWEEN {start_epoch} AND {end_epoch}
   )
   SELECT ts, COUNT(DISTINCT u1) AS step1, COUNT(DISTINCT u2) AS step2
   FROM joined
   GROUP BY ts ORDER BY ts LIMIT 100000
   ```
   
   Physical plan (`EXPLAIN PLAN FOR` with `usePhysicalOptimizer=true`):
   
   ```
   PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])
     PhysicalSort(sort0=[$0], dir0=[ASC], fetch=[100000])
       PhysicalAggregate(group=[{1}], agg#0=[DISTINCTCOUNT($0)], ...)
         PhysicalJoin(condition=[=($0, $2)], joinType=[left])
           PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])
             PhysicalSort(sort0=[$0], dir0=[ASC])          ← pushed into leaf 
by LeafStageSortJoinRule
               PhysicalProject(correlation_id, ts=[DATETIMECONVERT(...)])
                 PhysicalFilter(name=checkout AND ...)
                   PhysicalTableScan(user_events)
           PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])
             PhysicalSort(sort0=[$0], dir0=[ASC])          ← pushed into leaf 
by LeafStageSortJoinRule
               PhysicalProject(correlation_id)
                 PhysicalFilter(name=addToCart AND ...)
                   PhysicalTableScan(user_events)
   ```
   `IDENTITY_EXCHANGE` on both sides confirms partition co-location — no 
cross-server shuffle for the join input.
   </details>
   
   <details>
   <summary> Alternate single-scan query:</summary>
   
   ```sql
   SET useMultistageEngine = true;
   SET numGroupsLimit = 50000000;
   SET timeoutMs = 30000;
   
   SET useMultistageEngine = true;
   SET numGroupsLimit = 50000000;
   SET timeoutMs = 30000;
   
   WITH per_id AS (
     SELECT
     /*+ aggOptions(is_partitioned_by_group_by_keys='true') */
     FLOOR(occurred_at_min / 600) * 600 AS ts,
     correlation_id,
       MAX(CASE WHEN name = 'addToCart'  THEN 1 ELSE 0 END) AS has_step1,
       MAX(CASE WHEN name = 'checkout'   THEN 1 ELSE 0 END) AS has_step2
     FROM user_events
     WHERE occurred_at_min BETWEEN {start_epoch} AND {end_epoch}
       AND (name = 'addToCart' OR name = 'checkout')
     GROUP BY ts, correlation_id
   )
   SELECT
     ts,
     SUM(has_step1) AS step1_count,
     SUM(CASE WHEN has_step1 = 1 AND has_step2 = 1 THEN 1 ELSE 0 END) AS 
step2_count
   FROM per_id
   GROUP BY ts ORDER BY ts LIMIT 200
   ```
   
   Explain plan
   ```
   LogicalSort(sort0=[$0], dir0=[ASC], offset=[0], fetch=[200])
     PinotLogicalSortExchange(distribution=[hash], collation=[[0]], 
isSortOnSender=[false], isSortOnReceiver=[true])
       LogicalSort(sort0=[$0], dir0=[ASC], fetch=[200])
         PinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], 
agg#1=[COUNT($2)], aggType=[FINAL])
           PinotLogicalExchange(distribution=[hash[0]])
             PinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], 
agg#1=[COUNT() FILTER $2], aggType=[LEAF])
               LogicalProject(ts=[$0], has_step1=[$2], $f3=[AND(=($2, 1), =($3, 
1))])
                 PinotLogicalAggregate(group=[{0, 1}], agg#0=[MAX($2)], 
agg#1=[MAX($3)], aggType=[DIRECT])
                   LogicalProject(ts=[*(FLOOR(/($31, 600)), 600)], 
correlation_id=[$14], ...)
                     LogicalFilter(...)
                       PinotLogicalTableScan(table=[[default, user_events]]) 
--- Notice this is a single scan
   ```
   
   This uses a single `TableScan` (vs two in the join).
   </details>
   
   ---
   
   ## Challenges
   
   ### 1. Leaf stage ORDER BY in sorted join will materialize all segments into 
a single block
   
   `LeafStageSortJoinRule` injects a `PhysicalSort` with no fetch limit or 
offset into the leaf stage so both join inputs arrive sorted. 
`ServerPlanRequestVisitor.visitSort()` only sets a LIMIT when `fetch >= 0`, so 
the injected sort — which has no fetch — generates an **unbounded** V1 `ORDER 
BY` with no LIMIT clause. `CombinePlanNode` selects 
`MinMaxValueBasedSelectionOrderByCombineOperator` for this ORDER BY, which 
merges results from all segments into a single block before returning. At large 
data volumes this exceeds the leaf stage's CPU budget, triggering 
`EarlyTerminationException` from `ThreadAccountant` inside 
`SelectionOperatorUtils.mergeWithOrdering()`, which propagates as a spurious 
`Cancelled by sender` to the broker.
   
   **Suggestions:**
   - New `MinMaxValueBasedSelectionOrderByCombineOperator` to extend 
`BaseStreamingCombineOperator`, using a bounded queue and emitting sorted 
blocks incrementally via a k-way heap merge across segments. The existing 
MinMax segment-pruning optimization can be preserved. Consuming or unsorted 
segments would sort in memory per segment before joining the heap.
   - **Implement k-way merge in `SortedMailboxReceiveOperator` when the sending 
stage is known to be sorted (already noted as a TODO in that class), rather 
than accumulating all rows and re-sorting
   
https://github.com/apache/pinot/blob/b50a421740aa0749d63a12e2740595dccac21c37/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java#L92
   
   ```mermaid
   flowchart TD
       subgraph leaf["Leaf Stage — per server"]
           S1["Segment 1\n(sorted by join key)"]
           S2["Segment 2\n(sorted by join key)"]
           SN["Segment N\n(sorted or sort-in-memory)"]
           S1 & S2 & SN --> KH["K-way Heap 
Merge\nMinMaxValueBased→StreamingCombineOperator\npreserves collation, emits 
sorted blocks\nincrementally — MinMax pruning retained"]
       end
   
       KH --> MS["MailboxSendOperator\n(sorted blocks sent downstream)"]
   
       MS -->|"IDENTITY_EXCHANGE\n(co-located — no network shuffle)"| MR
   
       subgraph recv["Intermediate Stage"]
           MR["SortedMailboxReceiveOperator\nOption A: k-way merge across 
mailboxes\n\nOption B: receives already-merged stream\nfrom streaming combine"]
       end
   
       MR --> SMJ["SortedMergeJoinOperator\n(streaming two-pointer join\nlazy 
block loading — one block per side in memory)"]
    ```
   
   
   ### 2. Join queries that reduce per key can be rewritten as sorted GROUP BY, 
but MSE GROUP BY is not sort-aware
   
   Some join queries — including the funnel query above — can be expressed as a 
single-scan GROUP BY (shown in the alternate query). MSE aggregation is also 
required for correctness. This is not just a rewrite convenience: SSE 
`FUNNELCOUNT(settings('partitioned','sorted'))` is not equivalent here because 
SSE processes each segment independently. When a user's funnel events span two 
segments on the same server, `SortedAggregationResult` resets its per-key state 
at the segment boundary and `PartitionedMergeStrategy` merges results by plain 
addition — producing incorrect counts.
   
   With `/*+ aggOptions(is_partitioned_by_group_by_keys='true') */`, the 
alternate query achieves partition co-location by avoiding PinotLogicalExchange 
before first PinotLogicalAggregate, but the intermediate and final GROUP BY 
stages still use hash-based aggregation. When data is sorted on the group-by 
keys, a streaming sorted aggregation — accumulating state per key and flushing 
when the key changes — would eliminate hash table construction and the 
associated group count limits, mirroring how ClickHouse's 
`MergingSortedTransform` handles aggregation on MergeTree data. In the past we 
have seen hash based group by aggergation having issues with misconfigured 
hashtable size when cardinality is high.
   
   **Suggestions:**
   - Implement a sorted-input-aware aggregation operator in MSE that streams 
over sorted blocks without a hash table, activated when the leaf stage 
advertises sorted collation on the group-by keys.
   ```mermaid
   flowchart TB
       subgraph current["Current — two hash map layers"]
           direction TB
   
           subgraph leaf["V1 Leaf Stage (per server, across segments)"]
               direction TB
               S1["Seg 1\nGROUP BY scan\n→ GroupByResultsBlock\n(per-segment 
hash map)"]
               S2["Seg 2\nGROUP BY scan\n→ GroupByResultsBlock"]
               SN["Seg N\nGROUP BY scan\n→ GroupByResultsBlock"]
               S1 & S2 & SN --> GBC["GroupByCombineOperator\nextends 
BaseSingleBlockCombineOperator\n(NOT streaming)\nmerges all segments 
into\nshared IndexedTable hash map\n→ single GroupByResultsBlock"]
           end
   
           GBC -->|"mailbox send"| MSE
   
           subgraph mse["MSE Intermediate Stage"]
               direction TB
               
MSE["MultistageGroupByExecutor\ngenerateGroupByKeys()\nObject2IntOpenHashMap or 
Long2IntOpenHashMap\nFixedIntArray per multi-column key\ngrows O(distinct 
groups)\n→ SET numGroupsLimit = 50000000 needed"]
           end
       end
   
       subgraph proposed["Proposed — sorted streaming (data sorted by GROUP BY 
key)"]
           direction TB
   
           subgraph pleaf["V1 Leaf Stage (streaming)"]
               direction TB
               T1["Seg 1\n(sorted by key)"] & T2["Seg 2\n(sorted by key)"] & 
TN["Seg N\n(sorted by key)"] --> KH["K-way Heap Merge\n(streaming 
combine)\nemits groups in key order\nno IndexedTable — O(1) per key"]
           end
   
           KH -->|"sorted blocks via mailbox"| SMSE
   
           subgraph smse["MSE Intermediate Stage (streaming)"]
               direction TB
               SMSE["Sorted Streaming Aggregator\nCompare key vs previous key"]
               SMSE -->|"same key"| ACC["Accumulate into\ncurrent state\nO(1) 
memory"]
               SMSE -->|"key changes"| EMIT["Emit completed group\nreset state"]
               ACC --> SMSE
               EMIT --> SMSE
           end
       end
   ```
   
   
   Looking for community input on the preferred direction and would be happy to 
contribute the finalised approach.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to