metesynnada opened a new pull request, #5322:
URL: https://github.com/apache/arrow-datafusion/pull/5322
# Which issue does this PR close?
Closes #5321.
This is a large pull request, with over 5.4k additions. However, we made
sure to include detailed comments and extensive testing, and the changes made
break down as follows:
- 308 lines changed in `Cargo.lock`.
- 2,300 lines of additional testing code.
- Over 1,000 lines of new or improved comments.
# Rationale for this change
Symmetric Hash Join (SHJ) extends the join capabilities of Datafusion by
supporting filter expressions with order guarantees efficiently. This use case
arises commonly in time-series contexts; e.g. use cases involving sliding
windows. While ordinary hash join remains the preferable option when both
sources are finite, the join type can be changed to SHJ using a `PipelineFixer`
sub-rule when both sources are unbounded.
Let’s see how important this feature is: In a typical stream processing
library like Apache Flink or Apache Spark, the join operation can be performed
using watermarks. Let's examine a query taken from the Apache Spark docstring:
```sql
SELECT * FROM left_table, right_table
WHERE
left_key = right_key AND
left_time > right_time - INTERVAL 12 MINUTES AND
left_time < right_time + INTERVAL 2 HOUR
```
> In this query (), say each join side has a time column, named "left_time"
and "right_time", and there is a join condition "left_time > right_time - 8
min". While processing, say the watermark on the right input is "12:34". This
means that from henceforth, only right inputs rows with "right_time > 12:34"
will be processed, and any older rows will be considered as "too late" and
therefore dropped. Then, the left side buffer only needs to keep rows where
"left_time > right_time - 8 min > 12:34 - 8m > 12:26". That is, the left state
watermark is 12:26, and any rows older than that can be dropped from the state.
In other words, the operator will discard all states where the timestamp in
state value (input rows) < state watermark.
>
Actually, this is part of the picture, not the whole. In theory, range-based
pruning can be done with any sorted field (not just the watermark field) and
with any arbitrary join filter condition that is amenable to this type of data
pruning. However, Apache Spark overfits to timestamps and associates the
pruning operation with a watermark. Let’s follow a different approach and
examine the following query from a more general, first-principles perspective:
```sql
SELECT * FROM left_table, right_table
WHERE
left_key = right_key AND
left_sorted > right_sorted - 3 AND
left_sorted < right_sorted + 10
```
If sort orders of the two columns (`left_sorted` and `right_sorted`) are
ascending, and the join condition is `left_sorted > right_sorted - 3`, and the
latest value on the right input is 1234, then the left side buffer only has to
keep rows where `left_sorted > 1231` and any rows coming before this boundary
can be dropped from the buffer. Note that this example is in no way specific;
similar scenarios can manifest with a variety of orderings and join filter
expressions.
***[Please refer to the blog post for more
information.](https://synnada.notion.site/General-purpose-Stream-Joins-via-Pruning-Symmetric-Hash-Joins-2fe26d3127a241e294a0217b1f18603a)***
# What changes are included in this PR?
The main features included in this PR are:
- An initial library for interval arithmetic, which includes basic
arithmetic operations (addition and subtraction) and comparison operations
(greater than and less than) for integer types, and supports the logical
conjunction operator.
- An API for performing interval calculations, which can be used for other
purposes, such as range pruning in Parquet. Within the context of this PR, we
use this functionality to calculate filter expression bounds for pruning
purposes.
- A constraint propagation module to construct expression DAGs from
`PhysicalExpr` trees and update column bounds efficiently for data pruning
purposes.
- An initial implementation of SHJ, which is limited to the partitioned mode
and does not yet have full support for output order information.
- A new sub rule for `PipelineFixer` to utilize SHJ instead of ordinary Hash
Join when joining two (unbounded) streams.
In order to have a PR with a manageable size, some features have been
excluded for now, but will be added in the future. These include:
- Improved support for interval arithmetic, such as support for open/closed
intervals, multiply/divide operations, additional comparison and logical
operators, floating point numbers, and time intervals.
- Improved constant propagation, including the ability to determine
monotonicity properties of complex `PhysicalExpr`s.
- An improved SHJ algorithm, including support for collect left/right/all
modes, intermediate buffers for complex expressions, and an improved output
ordering flag.
## Performance Gains
SHJ not only makes sliding windows pipeline-friendly, it improves execution
throughput even in non-streaming cases in many scenarios, thanks to data
pruning. Data pruning results in lower memory requirements, and higher cache
efficiency, and opens the door to executing joins entirely in memory for large
datasets with short sliding window join filters. ***[You can find a detailed
performance analysis for various scenarios
here.](https://www.notion.so/synnada/General-purpose-Stream-Joins-via-Pruning-Symmetric-Hash-Joins-2fe26d3127a241e294a0217b1f18603a?pvs=4#0da4085680de4424b588cf67b981ce82)***
# Are these changes tested?
Yes, deterministic and fuzzy unit tests are added.
# Are there any user-facing changes?
No backwards-incompatible changes.
This change simply creates new use cases in streaming applications. Below,
we provide several usage patterns we may start to see more often in the wild,
given that we have stream join capabilities:
- Marking sources infinite and provide schema:
```rust
let fifo_options = CsvReadOptions::new()
.schema(schema.as_ref())
.has_header(false) // Optional
.mark_infinite(true);
```
- Specifying ordering for columns where an a-priori order is known:
```rust
let file_sort_order = [datafusion_expr::col("col_name")]
.into_iter()
.map(|e| {
let ascending = true;
let nulls_first = false;
e.sort(ascending, nulls_first)
})
.collect::<Vec<_>>();
```
More examples on table registration can be found in the subroutines we
employ in the `unbounded_file_with_symmetric_join` test under
`datafusion/core/tests/fifo.rs`.
On the query side, one will be able to execute a query like
```sql
SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left_table as t1
FULL JOIN right_table as t2
ON t1.a2 = t2.a2
AND t1.a1 > t2.a1 + 3
AND t1.a1 < t2.a1 + 10
```
in a streaming fashion, so we may see some new usage patterns arise at the
query level too.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]