agavra opened a new pull request, #9832:
URL: https://github.com/apache/pinot/pull/9832

   This replaces #9507
   
   **Review Notes**
   
   1. **!!NOTE!!**: there is a v1 change in this PR, if 
`SERVER_RETURN_FINAL_RESULT` is set, the server will now return the "proper" 
columns, ordered by the query projection instead of the internal format. You 
can see this change in the files for `*CombineOperator` and 
`SelectionOperatorService`
   2. 
[ImmutableSortExchangeCopyRule.java](https://github.com/apache/pinot/compare/master...agavra:pinot:sort_pushdown?expand=1#diff-8a22fa3cfc8f05c0581192f941dc92c76d3cf5ebf2250cac528256fa2db0764a)
 is a generated file that I'm checking in. I generated it within the Calcite 
code base and ran `./gradlew generateSources` and copied the contents over to 
Pinot. We could generate it as part of Pinot build, but that would require 
pulling in extra dependencies to do the codegen and add that to our build, and 
this file should change extremely infrequently.
   3. The "main" part of the change is the introduction of 
`PinotSortExchangeCopyRule`, which simply copies a `Sort` past an `Exchange` 
(dropping the offset, and setting the limit to limit + offset). This was 
inspired by the implementation of 
https://github.com/apache/calcite/blob/406c913b808b3234464d8c81d7352c4040dd281a/core/src/main/java/org/apache/calcite/rel/rules/SortJoinCopyRule.java
 
   
   **What**
   
   This PR supports `SORT` operator push-down. For example, `SELECT * FROM 
basic ORDER BY col1 DESC LIMIT 2 OFFSET 1` will now generate the following 
calcite logical plan:
   ```
   LogicalSort(sort0=[$2], dir0=[DESC], offset=[1], fetch=[2])
     LogicalSortExchange(distribution=[hash], collation=[[2 DESC]])
       LogicalSort(sort0=[$2], dir0=[DESC], fetch=[+(2, 1)])
         LogicalTableScan(table=[[basic_order_by_basic]])
   ```
   
   which generates this pinot query plan:
   ```
   [0]@localhost:50762 MAIL_RECEIVE(RANDOM_DISTRIBUTED)
   ├── [1]@localhost:50761 MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost:50762} 
(Subtree Omitted)
   └── [1]@localhost:50760 MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost:50762}
      └── [1]@localhost:50760 SORT (LIMIT 2)
         └── [1]@localhost:50760 MAIL_RECEIVE(HASH_DISTRIBUTED)
            └── [2]@localhost:50760 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:50761,[1]@localhost:50760}
               └── [2]@localhost:50760 SORT (LIMIT 3)
                  └── [2]@localhost:50760 TABLE SCAN (basic_order_by_basic) 
{OFFLINE=[basic_order_by_basic_OFFLINE_e625b8e5-ede7-40e3-8696-1992ecee3f9d]}
   ```
   
   Notice that there are now _two_ `SORT` operators, one right after the table 
scan and another right after the mail receive (the operator that consolidates 
responses from both localhost:50760 and localhost:50761).
   
   **Why**
   
   Without this change, the leaf nodes have no knowledge of how much data they 
should send to the multistage intermediate servers. You can imagine why this is 
a problem: if a query specifies `LIMIT 3` but queries a large data set with low 
selectivity, the server nodes will respond to the intermediate node with 
potentially gigabytes of data. This compounded with the observation that our 
current engine has no flow-control that pipes down to v1 (leaf server queries 
will complete in entirety before sending any data back) means that before this 
change our queries were likely to timeout (and cost a lot of money in network 
bandwidth).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to