agavra opened a new pull request, #9832: URL: https://github.com/apache/pinot/pull/9832
This replaces #9507 **Review Notes** 1. **!!NOTE!!**: there is a v1 change in this PR, if `SERVER_RETURN_FINAL_RESULT` is set, the server will now return the "proper" columns, ordered by the query projection instead of the internal format. You can see this change in the files for `*CombineOperator` and `SelectionOperatorService` 2. [ImmutableSortExchangeCopyRule.java](https://github.com/apache/pinot/compare/master...agavra:pinot:sort_pushdown?expand=1#diff-8a22fa3cfc8f05c0581192f941dc92c76d3cf5ebf2250cac528256fa2db0764a) is a generated file that I'm checking in. I generated it within the Calcite code base and ran `./gradlew generateSources` and copied the contents over to Pinot. We could generate it as part of Pinot build, but that would require pulling in extra dependencies to do the codegen and add that to our build, and this file should change extremely infrequently. 3. The "main" part of the change is the introduction of `PinotSortExchangeCopyRule`, which simply copies a `Sort` past an `Exchange` (dropping the offset, and setting the limit to limit + offset). This was inspired by the implementation of https://github.com/apache/calcite/blob/406c913b808b3234464d8c81d7352c4040dd281a/core/src/main/java/org/apache/calcite/rel/rules/SortJoinCopyRule.java **What** This PR supports `SORT` operator push-down. For example, `SELECT * FROM basic ORDER BY col1 DESC LIMIT 2 OFFSET 1` will now generate the following calcite logical plan: ``` LogicalSort(sort0=[$2], dir0=[DESC], offset=[1], fetch=[2]) LogicalSortExchange(distribution=[hash], collation=[[2 DESC]]) LogicalSort(sort0=[$2], dir0=[DESC], fetch=[+(2, 1)]) LogicalTableScan(table=[[basic_order_by_basic]]) ``` which generates this pinot query plan: ``` [0]@localhost:50762 MAIL_RECEIVE(RANDOM_DISTRIBUTED) ├── [1]@localhost:50761 MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost:50762} (Subtree Omitted) └── [1]@localhost:50760 MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost:50762} └── [1]@localhost:50760 SORT (LIMIT 2) └── [1]@localhost:50760 MAIL_RECEIVE(HASH_DISTRIBUTED) └── [2]@localhost:50760 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:50761,[1]@localhost:50760} └── [2]@localhost:50760 SORT (LIMIT 3) └── [2]@localhost:50760 TABLE SCAN (basic_order_by_basic) {OFFLINE=[basic_order_by_basic_OFFLINE_e625b8e5-ede7-40e3-8696-1992ecee3f9d]} ``` Notice that there are now _two_ `SORT` operators, one right after the table scan and another right after the mail receive (the operator that consolidates responses from both localhost:50760 and localhost:50761). **Why** Without this change, the leaf nodes have no knowledge of how much data they should send to the multistage intermediate servers. You can imagine why this is a problem: if a query specifies `LIMIT 3` but queries a large data set with low selectivity, the server nodes will respond to the intermediate node with potentially gigabytes of data. This compounded with the observation that our current engine has no flow-control that pipes down to v1 (leaf server queries will complete in entirety before sending any data back) means that before this change our queries were likely to timeout (and cost a lot of money in network bandwidth). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
