metesynnada opened a new pull request, #5322:
URL: https://github.com/apache/arrow-datafusion/pull/5322

   # Which issue does this PR close?
   
   Closes #5321.
   
   This is a large pull request, with over 5.4k additions. However, we made 
sure to include detailed comments and extensive testing, and the changes made 
break down as follows:
   
   - 308 lines changed in `Cargo.lock`.
   - 2,300 lines of additional testing code.
   - Over 1,000 lines of new or improved comments.
   
   # Rationale for this change
   
   Symmetric Hash Join (SHJ) extends the join capabilities of Datafusion by 
supporting filter expressions with order guarantees efficiently. This use case 
arises commonly in time-series contexts; e.g. use cases involving sliding 
windows. While ordinary hash join remains the preferable option when both 
sources are finite, the join type can be changed to SHJ using a `PipelineFixer` 
sub-rule when both sources are unbounded. 
   
   Let’s see how important this feature is: In a typical stream processing 
library like Apache Flink or Apache Spark, the join operation can be performed 
using watermarks. Let's examine a query taken from the Apache Spark docstring:
   
   ```sql
   SELECT * FROM left_table, right_table
   WHERE
       left_key = right_key AND
       left_time > right_time - INTERVAL 12 MINUTES AND
       left_time < right_time + INTERVAL 2 HOUR
   ```
   
   > In this query (), say each join side has a time column, named "left_time" 
and "right_time", and there is a join condition "left_time > right_time - 8 
min". While processing, say the watermark on the right input is "12:34". This 
means that from henceforth, only right inputs rows with "right_time > 12:34" 
will be processed, and any older rows will be considered as "too late" and 
therefore dropped. Then, the left side buffer only needs to keep rows where 
"left_time > right_time - 8 min > 12:34 - 8m > 12:26". That is, the left state 
watermark is 12:26, and any rows older than that can be dropped from the state. 
In other words, the operator will discard all states where the timestamp in 
state value (input rows) < state watermark.
   > 
   
   Actually, this is part of the picture, not the whole. In theory, range-based 
pruning can be done with any sorted field (not just the watermark field) and 
with any arbitrary join filter condition that is amenable to this type of data 
pruning. However, Apache Spark overfits to timestamps and associates the 
pruning operation with a watermark. Let’s follow a different approach and 
examine the following query from a more general, first-principles perspective:
   
   ```sql
   SELECT * FROM left_table, right_table
   WHERE
     left_key = right_key AND
     left_sorted > right_sorted - 3 AND
     left_sorted < right_sorted + 10
   ```
   
   If sort orders of the two columns (`left_sorted` and `right_sorted`) are 
ascending, and the join condition is `left_sorted > right_sorted - 3`, and the 
latest value on the right input is 1234, then the left side buffer only has to 
keep rows where `left_sorted > 1231` and any rows coming before this boundary 
can be dropped from the buffer. Note that this example is in no way specific; 
similar scenarios can manifest with a variety of orderings and join filter 
expressions.
   
   ***[Please refer to the blog post for more 
information.](https://synnada.notion.site/General-purpose-Stream-Joins-via-Pruning-Symmetric-Hash-Joins-2fe26d3127a241e294a0217b1f18603a)***
   
   # What changes are included in this PR?
   
   The main features included in this PR are:
   
   - An initial library for interval arithmetic, which includes basic 
arithmetic operations (addition and subtraction) and comparison operations 
(greater than and less than) for integer types, and supports the logical 
conjunction operator.
   - An API for performing interval calculations, which can be used for other 
purposes, such as range pruning in Parquet. Within the context of this PR, we 
use this functionality to calculate filter expression bounds for pruning 
purposes.
   - A constraint propagation module to construct expression DAGs from 
`PhysicalExpr` trees and update column bounds efficiently for data pruning 
purposes.
   - An initial implementation of SHJ, which is limited to the partitioned mode 
and does not yet have full support for output order information.
   - A new sub rule for `PipelineFixer` to utilize SHJ instead of ordinary Hash 
Join when joining two (unbounded) streams.
   
   In order to have a PR with a manageable size, some features have been 
excluded for now, but will be added in the future. These include:
   
   - Improved support for interval arithmetic, such as support for open/closed 
intervals, multiply/divide operations, additional comparison and logical 
operators, floating point numbers, and time intervals.
   - Improved constant propagation, including the ability to determine 
monotonicity properties of complex `PhysicalExpr`s.
   - An improved SHJ algorithm, including support for collect left/right/all 
modes, intermediate buffers for complex expressions, and an improved output 
ordering flag.
   
   ## Performance Gains
   
   SHJ not only makes sliding windows pipeline-friendly, it improves execution 
throughput even in non-streaming cases in many scenarios, thanks to data 
pruning. Data pruning results in lower memory requirements, and higher cache 
efficiency, and opens the door to executing joins entirely in memory for large 
datasets with short sliding window join filters. ***[You can find a detailed 
performance analysis for various scenarios 
here.](https://www.notion.so/synnada/General-purpose-Stream-Joins-via-Pruning-Symmetric-Hash-Joins-2fe26d3127a241e294a0217b1f18603a?pvs=4#0da4085680de4424b588cf67b981ce82)***
   
   # Are these changes tested?
   
   Yes, deterministic and fuzzy unit tests are added.
   
   # Are there any user-facing changes?
   
   No backwards-incompatible changes. 
   
   This change simply creates new use cases in streaming applications. Below, 
we provide several usage patterns we may start to see more often in the wild, 
given that we have stream join capabilities:
   
   - Marking sources infinite and provide schema:
   
   ```rust
   let fifo_options = CsvReadOptions::new()
       .schema(schema.as_ref())
       .has_header(false) // Optional
       .mark_infinite(true);
   ```
   
   - Specifying ordering for columns where an a-priori order is known:
   
   ```rust
   let file_sort_order = [datafusion_expr::col("col_name")]
       .into_iter()
       .map(|e| {
           let ascending = true;
           let nulls_first = false;
           e.sort(ascending, nulls_first)
       })
       .collect::<Vec<_>>();
   ```
   
   More examples on table registration can be found in the subroutines we 
employ in the `unbounded_file_with_symmetric_join` test under 
`datafusion/core/tests/fifo.rs`.
   
   On the query side, one will be able to execute a query like
   
   ```sql
   SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left_table as t1 
   FULL JOIN right_table as t2 
   ON t1.a2 = t2.a2 
        AND t1.a1 > t2.a1 + 3 
        AND t1.a1 < t2.a1 + 10
   ```
   
   in a streaming fashion, so we may see some new usage patterns arise at the 
query level too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to