msirek opened a new issue, #8241:
URL: https://github.com/apache/arrow-datafusion/issues/8241
### Describe the bug
Datafusion does not use three-value logic (true, false, unknown) for
evaluating NOT IN predicates when a join is required, causing incorrect results
in the presence of nulls.
An expression without a join evaluates correctly:
```sql
select 1 NOT IN (3, null) IS NULL;
+----------------------------------------------------------------------------------+
| Int64(1) NOT IN (Map { iter: Iter([Literal(Int64(3)), Literal(NULL)]) })
IS NULL |
+----------------------------------------------------------------------------------+
| true
|
+----------------------------------------------------------------------------------+
```
The expression `1 NOT IN (3, null)` is unknown (null), which if used as
WHERE or ON clause filter should disqualify the row:
```sql
select 'qualified filter' where 1 NOT IN (3, null);
0 rows in set. Query took 0.007 seconds.
```
Something similar involving a relation instead of a pure scalar expression
returns wrong results:
```sql
select 'qualified filter' where 1 NOT IN (select * from (values (3), (null))
my_tab);
+--------------------------+
| Utf8("qualified filter") |
+--------------------------+
| qualified filter |
+--------------------------+
```
### To Reproduce
Here is an example involving base tables:
```sql
create table t1 (a int);
create table t2 (a int);
insert into t1 values (1);
insert into t2 values (3), (null);
-- The correct answer is no rows
select * from t1 where a not in (select a from t2);
+---+
| a |
+---+
| 1 |
+---+
1 row in set. Query took 0.015 seconds.
❯ explain select * from t1 where a not in (select a from t2);
+---------------+-------------------------------------------------------------------------+
| plan_type | plan
|
+---------------+-------------------------------------------------------------------------+
| logical_plan | LeftAnti Join: t1.a = __correlated_sq_1.a
|
| | TableScan: t1 projection=[a]
|
| | SubqueryAlias: __correlated_sq_1
|
| | TableScan: t2 projection=[a]
|
| physical_plan | CoalesceBatchesExec: target_batch_size=8192
|
| | HashJoinExec: mode=Partitioned, join_type=LeftAnti,
on=[(a@0, a@0)] |
| | CoalesceBatchesExec: target_batch_size=8192
|
| | RepartitionExec: partitioning=Hash([a@0], 32),
input_partitions=1 |
| | MemoryExec: partitions=1, partition_sizes=[1]
|
| | CoalesceBatchesExec: target_batch_size=8192
|
| | RepartitionExec: partitioning=Hash([a@0], 32),
input_partitions=1 |
| | MemoryExec: partitions=1, partition_sizes=[1]
|
| |
|
+---------------+-------------------------------------------------------------------------+
```
### Expected behavior
`NOT IN` should honor three-value logic.
From the ANSI SQL spec, given a row value predicand, RVC, and an in
predicate value, IPV,
RVC NOT IN IPV is equivalent to NOT ( RVC IN IPV ).
For the given example,
```
1 NOT IN (3, null) -> NOT (1 IN (3, null))
1 IN (3, null) -> 1 = 3 OR 1 = null
NOT(1 IN (3, null)) -> 1 != 3 AND 1 != NULL
-> true AND null
-> null
```
A filter which evaluates to null should not return the row.
### Additional context
Velox handles nulls in NOT IN via a [null-aware
antijoin](https://facebookincubator.github.io/velox/develop/anti-join.html).
Substrait is [adding null-aware
antijoins](https://github.com/substrait-io/substrait/pull/572) as physical join
methods an engine can support.
Datafusion could add null-aware antijoin support as a solution.
A less performant alternative would be to use a cross antijoin, with a
modified join predicate, as is done in CockroachDB:
```sql
explain(opt) select * from t1 where a not in (select a from t2);
info
----------------------------------------
anti-join (cross)
├── scan t1
├── scan t2
└── filters
└── (t1.a = t2.a) IS NOT false
```
which perhaps would translate into a `NestedLoopJoinExec` in Datafusion:
```sql
explain select * from t1 where a not in (select a from t2);
+---------------+----------------------------------------------------------+
| plan_type | plan |
+---------------+----------------------------------------------------------+
| logical_plan | LeftAnti Join: Filter: (t1.a = __correlated_sq_1.a) IS
NOT false |
| | TableScan: t1 projection=[a] |
| | SubqueryAlias: __correlated_sq_1 |
| | TableScan: t2 projection=[a] |
| physical_plan | NestedLoopJoinExec: join_type=LeftAnti, filter=a@0 = a@1
IS NOT false |
| | MemoryExec: partitions=1, partition_sizes=[1] |
| | MemoryExec: partitions=1, partition_sizes=[1] |
| | |
+---------------+----------------------------------------------------------+
```
A null-aware antijoin would need to broadcast the subquery rows with a null
key to all partitions so that each partition could "see" the null. A null in
the subquery would cause no rows to be returned. Processing is more
complicated for tuple types, e.g. `(a, b) IN (SELECT a,b ...`, where one of the
keys could be null and the other non-null, but Datafusion doesn't currently
support tuple types.
Proposed distribution/partitioning changes to support null-aware antijoin:
1. Add a new `HashPartitionedBroadcastNull` distribution method:
```rust
/// How data is distributed amongst partitions. See [`Partitioning`] for more
/// details.
#[derive(Debug, Clone)]
pub enum Distribution {
/// Unspecified distribution
UnspecifiedDistribution,
/// A single partition is required
SinglePartition,
/// Requires children to be distributed in such a way that the same
/// values of the keys end up in the same partition
HashPartitioned(Vec<Arc<dyn PhysicalExpr>>),
/// Requires children to be distributed in such a way that the same
/// values of the keys end up in the same partition, and the first row
having a key with nulls
/// is broadcast to all partitions (typically used for the inner
relation of antijoin)
HashPartitionedBroadcastNull(Vec<Arc<dyn PhysicalExpr>>),
}
```
Is this better than modifying `HashPartitioned` with a new flag, e.g.
`HashPartitioned(Vec<Arc<dyn PhysicalExpr>>, bool)`, to control whether nulls
are distributed?
2. Add a new flag to `Partitioning::Hash`:
```rust
pub enum Partitioning {
/// Allocate batches using a round-robin algorithm and the specified
number of partitions
RoundRobinBatch(usize),
- /// Allocate rows based on a hash of one of more expressions and the
specified number of
- /// partitions
- Hash(Vec<Arc<dyn PhysicalExpr>>, usize),
+ /// Allocate rows based on a hash of one of more expressions, the
specified number of
+ /// partitions, and whether the first null in the input should be
broadcast to all partitions
+ Hash(Vec<Arc<dyn PhysicalExpr>>, usize, bool),
/// Unknown partitioning scheme with a known number of partitions
UnknownPartitioning(usize),
}
```
Same question here. Does it make more sense to add a flag to `Hash`
partitioning, or make a totally separate enum value?
Alternative:
- Instead of broadcasting the nulls, create a new mode for
`RepartitionExec`, which broadcasts a special marker row indicating a null key
was seen, then end the repartition operation. The antijoin would then have to
detect that special row and quit the antijoin early.
- This approach may not work well for the tuple NOT IN case. That case
must broadcast every key combination containing a null. For example, all of
the following would need to be broadcast: (1, null), (2, null), (null, 1)...
Only if (null, null) is seen could the repartition end early. Also, there may
be a filter in the join which filters out a given row with a null, so the
presence of a null in the `RepartitionExec` doesn't necessarily mean the null
will be processed by the join.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]