Omega359 commented on code in PR #58: URL: https://github.com/apache/datafusion-site/pull/58#discussion_r1989479511
########## content/blog/2025-03-11-ordering-analysis.md: ########## @@ -0,0 +1,381 @@ +--- +layout: post +title: Using Ordering for Better Plans in Apache DataFusion +date: 2025-03-11 +author: Mustafa Akur, Andrew Lamb +categories: [tutorial] +--- + +<!-- +{% comment %} +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +{% endcomment %} +--> + +<!-- see https://github.com/apache/datafusion/issues/11631 for details --> + +## Introduction +In this blog post, we explain when an ordering requirement of an operator is satisfied by its input data. This analysis is essential for order-based optimizations and is often more complex than one might initially think. +<blockquote style="border-left: 4px solid #007bff; padding: 10px; background-color: #f8f9fa;"> + <strong>Ordering Requirement</strong> for an operator describes how the input data to that operator must be sorted for the operator to compute the correct result. It is the job of the planner to make sure that these requirements are satisfied during execution (See DataFusion <a href="https://docs.rs/datafusion/latest/datafusion/physical_optimizer/enforce_sorting/struct.EnforceSorting.html" target="_blank">EnforceSorting</a> for an implementation of such rule). +</blockquote> + +There are various use cases, where this type of analysis can be useful such as the following examples. +### Removing Unnecessary Sorts +Imagine a user wants to execute the following query: +```SQL +SELECT hostname, log_line +FROM telemetry ORDER BY time ASC limit 10 +``` +If we don't know anything about the `telemetry` table, we need to sort it by `time ASC` and then retrieve the first 10 rows to get the correct result. However, if the table is already ordered by `time ASC`, we can simply retrieve the first 10 rows. This approach executes much faster and uses less memory compared to resorting the entire table, even when the [TopK] operator is used. + +[TopK]: https://docs.rs/datafusion/latest/datafusion/physical_plan/struct.TopK.html + +In order to avoid the sort, the query optimizer must determine the data is already sorted. For simple queries the analysis is straightforward, but it gets complicated fast. For example, what if your data is sorted by `[hostname, time ASC]` and your query is +```sql +SELECT hostname, log_line +FROM telemetry WHERE hostname = 'app.example.com' ORDER BY time ASC; +``` +In this case, a sort still isn't needed, but the analysis must reason about the sortedness of the stream when it knows `hostname` has a single value. + +### Optimized Operator Implementations +As another use case, some operators can utilize the ordering information to change its underlying algorithm to execute more efficiently. Consider the following query: +```SQL +SELECT COUNT(log_line) +FROM telemetry GROUP BY hostname; +``` +Most analytic systems, including DataFusion, by default implement such a query using a hash table keyed on values of `hostname` to store the counts. However, if the `telemetry` table is sorted by `hostname`, there are much more efficient algorithms for grouping on `hostname` values than hashing every value and storing it in memory. However, the more efficient algorithm can only be used when the input is sorted correctly. To see this in practice, check out the [source](https://github.com/apache/datafusion/tree/main/datafusion/physical-plan/src/aggregates/order) for ordered variant of the `Aggregation` in `DataFusion`. + +### Streaming-Friendly Execution + +Stream processing aims to produce results immediately as they become available, ensuring minimal latency for real-time workloads. However, some operators need to consume all input data before producing any output. Consider the `Sort` operation: before it can start generating output, the algorithm must first process all input data. As a result, data flow halts whenever such an operator is encountered until all input is consumed. When a physical query plan contains such an operator (`Sort`, `CrossJoin`, ..), we refer to this as pipeline breaking, meaning the query cannot be executed in a streaming fashion. Review Comment: ```suggestion Stream processing aims to produce results immediately as they become available ensuring minimal latency for real-time workloads. However, some operators need to consume all input data before producing any output. Consider the `Sort` operation: before it can start generating output, the algorithm must first process all input data. As a result, data flow halts whenever such an operator is encountered until all input is consumed. When a physical query plan contains such an operator (`Sort`, `CrossJoin`, ..) we refer to this as pipeline breaking, meaning the query cannot be executed in a streaming fashion. ``` ########## content/blog/2025-03-11-ordering-analysis.md: ########## @@ -0,0 +1,381 @@ +--- +layout: post +title: Using Ordering for Better Plans in Apache DataFusion +date: 2025-03-11 +author: Mustafa Akur, Andrew Lamb +categories: [tutorial] +--- + +<!-- +{% comment %} +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +{% endcomment %} +--> + +<!-- see https://github.com/apache/datafusion/issues/11631 for details --> + +## Introduction +In this blog post, we explain when an ordering requirement of an operator is satisfied by its input data. This analysis is essential for order-based optimizations and is often more complex than one might initially think. +<blockquote style="border-left: 4px solid #007bff; padding: 10px; background-color: #f8f9fa;"> + <strong>Ordering Requirement</strong> for an operator describes how the input data to that operator must be sorted for the operator to compute the correct result. It is the job of the planner to make sure that these requirements are satisfied during execution (See DataFusion <a href="https://docs.rs/datafusion/latest/datafusion/physical_optimizer/enforce_sorting/struct.EnforceSorting.html" target="_blank">EnforceSorting</a> for an implementation of such rule). +</blockquote> + +There are various use cases, where this type of analysis can be useful such as the following examples. +### Removing Unnecessary Sorts +Imagine a user wants to execute the following query: +```SQL +SELECT hostname, log_line +FROM telemetry ORDER BY time ASC limit 10 +``` +If we don't know anything about the `telemetry` table, we need to sort it by `time ASC` and then retrieve the first 10 rows to get the correct result. However, if the table is already ordered by `time ASC`, we can simply retrieve the first 10 rows. This approach executes much faster and uses less memory compared to resorting the entire table, even when the [TopK] operator is used. + +[TopK]: https://docs.rs/datafusion/latest/datafusion/physical_plan/struct.TopK.html + +In order to avoid the sort, the query optimizer must determine the data is already sorted. For simple queries the analysis is straightforward, but it gets complicated fast. For example, what if your data is sorted by `[hostname, time ASC]` and your query is +```sql +SELECT hostname, log_line +FROM telemetry WHERE hostname = 'app.example.com' ORDER BY time ASC; +``` +In this case, a sort still isn't needed, but the analysis must reason about the sortedness of the stream when it knows `hostname` has a single value. + +### Optimized Operator Implementations +As another use case, some operators can utilize the ordering information to change its underlying algorithm to execute more efficiently. Consider the following query: +```SQL +SELECT COUNT(log_line) +FROM telemetry GROUP BY hostname; +``` +Most analytic systems, including DataFusion, by default implement such a query using a hash table keyed on values of `hostname` to store the counts. However, if the `telemetry` table is sorted by `hostname`, there are much more efficient algorithms for grouping on `hostname` values than hashing every value and storing it in memory. However, the more efficient algorithm can only be used when the input is sorted correctly. To see this in practice, check out the [source](https://github.com/apache/datafusion/tree/main/datafusion/physical-plan/src/aggregates/order) for ordered variant of the `Aggregation` in `DataFusion`. + +### Streaming-Friendly Execution + +Stream processing aims to produce results immediately as they become available, ensuring minimal latency for real-time workloads. However, some operators need to consume all input data before producing any output. Consider the `Sort` operation: before it can start generating output, the algorithm must first process all input data. As a result, data flow halts whenever such an operator is encountered until all input is consumed. When a physical query plan contains such an operator (`Sort`, `CrossJoin`, ..), we refer to this as pipeline breaking, meaning the query cannot be executed in a streaming fashion. + +For a query to be executed in a streaming fashion, we need to satisfy 2 conditions: Review Comment: ```suggestion For a query to be executed in a streaming fashion we need to satisfy 2 conditions: ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org