msirek opened a new issue, #8241:
URL: https://github.com/apache/arrow-datafusion/issues/8241

   ### Describe the bug
   
   Datafusion does not use three-value logic (true, false, unknown) for 
evaluating NOT IN predicates when a join is required, causing incorrect results 
in the presence of nulls.
   An expression without a join evaluates correctly:
   ```sql
   select 1 NOT IN (3, null) IS NULL;
   
+----------------------------------------------------------------------------------+
   | Int64(1) NOT IN (Map { iter: Iter([Literal(Int64(3)), Literal(NULL)]) }) 
IS NULL |
   
+----------------------------------------------------------------------------------+
   | true                                                                       
      |
   
+----------------------------------------------------------------------------------+
   ```
   The expression `1 NOT IN (3, null)` is unknown (null), which if used as 
WHERE or ON clause filter should disqualify the row:
   ```sql
   select 'qualified filter' where 1 NOT IN (3, null);
   0 rows in set. Query took 0.007 seconds.
   ```
   Something similar involving a relation instead of a pure scalar expression 
returns wrong results:
   ```sql
   select 'qualified filter' where 1 NOT IN (select * from (values (3), (null)) 
my_tab);
   +--------------------------+
   | Utf8("qualified filter") |
   +--------------------------+
   | qualified filter         |
   +--------------------------+
   ```
   
   
   ### To Reproduce
   
   Here is an example involving base tables:
   
   ```sql
   create table t1 (a int);
   create table t2 (a int);
   insert into t1 values (1);
   insert into t2 values (3), (null);
   
   -- The correct answer is no rows
   select * from t1 where a not in (select a from t2);
   +---+
   | a |
   +---+
   | 1 |
   +---+
   1 row in set. Query took 0.015 seconds.
   
   ❯ explain select * from t1 where a not in (select a from t2);
   
+---------------+-------------------------------------------------------------------------+
   | plan_type     | plan                                                       
             |
   
+---------------+-------------------------------------------------------------------------+
   | logical_plan  | LeftAnti Join: t1.a = __correlated_sq_1.a                  
             |
   |               |   TableScan: t1 projection=[a]                             
             |
   |               |   SubqueryAlias: __correlated_sq_1                         
             |
   |               |     TableScan: t2 projection=[a]                           
             |
   | physical_plan | CoalesceBatchesExec: target_batch_size=8192                
             |
   |               |   HashJoinExec: mode=Partitioned, join_type=LeftAnti, 
on=[(a@0, a@0)]   |
   |               |     CoalesceBatchesExec: target_batch_size=8192            
             |
   |               |       RepartitionExec: partitioning=Hash([a@0], 32), 
input_partitions=1 |
   |               |         MemoryExec: partitions=1, partition_sizes=[1]      
             |
   |               |     CoalesceBatchesExec: target_batch_size=8192            
             |
   |               |       RepartitionExec: partitioning=Hash([a@0], 32), 
input_partitions=1 |
   |               |         MemoryExec: partitions=1, partition_sizes=[1]      
             |
   |               |                                                            
             |
   
+---------------+-------------------------------------------------------------------------+
   ```
   
   ### Expected behavior
   
   `NOT IN` should honor three-value logic.
   
   From the ANSI SQL spec, given a row value predicand, RVC, and an in 
predicate value, IPV,
   RVC NOT IN IPV is equivalent to NOT ( RVC IN IPV ).
   For the given example,
   ```
   1 NOT IN (3, null) -> NOT (1 IN (3, null))
   
   1 IN (3, null)      -> 1 = 3 OR 1 = null
   NOT(1 IN (3, null)) -> 1 != 3 AND 1 != NULL
                       -> true AND null
                       -> null
   ```
   A filter which evaluates to null should not return the row.
   
   
   
   
   
   ### Additional context
   
   Velox handles nulls in NOT IN via a [null-aware 
antijoin](https://facebookincubator.github.io/velox/develop/anti-join.html).
   Substrait is [adding null-aware 
antijoins](https://github.com/substrait-io/substrait/pull/572) as physical join 
methods an engine can support.
   Datafusion could add null-aware antijoin support as a solution.
   
   A less performant alternative would be to use a cross antijoin, with a 
modified join predicate, as is done in CockroachDB:
   ```sql
   explain(opt) select * from t1 where a not in (select a from t2);             
          
                     info
   ----------------------------------------
     anti-join (cross)
      ├── scan t1
      ├── scan t2
      └── filters
           └── (t1.a = t2.a) IS NOT false
   ```
   which perhaps would translate into a `NestedLoopJoinExec` in Datafusion:
   ```sql
   explain select * from t1 where a not in (select a from t2);
   +---------------+----------------------------------------------------------+
   | plan_type     | plan                                                     |
   +---------------+----------------------------------------------------------+
   | logical_plan  | LeftAnti Join:  Filter: (t1.a = __correlated_sq_1.a) IS 
NOT false     |
   |               |   TableScan: t1 projection=[a]                           |
   |               |   SubqueryAlias: __correlated_sq_1                       |
   |               |     TableScan: t2 projection=[a]                         |
   | physical_plan | NestedLoopJoinExec: join_type=LeftAnti, filter=a@0 = a@1 
IS NOT false |
   |               |   MemoryExec: partitions=1, partition_sizes=[1]          |
   |               |   MemoryExec: partitions=1, partition_sizes=[1]          |
   |               |                                                          |
   +---------------+----------------------------------------------------------+
   ```
   
   A null-aware antijoin would need to broadcast the subquery rows with a null 
key to all partitions so that each partition could "see" the null.  A null in 
the subquery would cause no rows to be returned.  Processing is more 
complicated for tuple types, e.g. `(a, b) IN (SELECT a,b ...`, where one of the 
keys could be null and the other non-null, but Datafusion doesn't currently 
support tuple types.
   
   Proposed distribution/partitioning changes to support null-aware antijoin:
   
   1. Add a new `HashPartitionedBroadcastNull` distribution method:
   ```rust
   /// How data is distributed amongst partitions. See [`Partitioning`] for more
   /// details.
   #[derive(Debug, Clone)]
   pub enum Distribution {
       /// Unspecified distribution
       UnspecifiedDistribution,
       /// A single partition is required
       SinglePartition,
       /// Requires children to be distributed in such a way that the same
       /// values of the keys end up in the same partition
       HashPartitioned(Vec<Arc<dyn PhysicalExpr>>),
       /// Requires children to be distributed in such a way that the same
       /// values of the keys end up in the same partition, and the first row 
having a key with nulls
       /// is broadcast to all partitions (typically used for the inner 
relation of antijoin)
       HashPartitionedBroadcastNull(Vec<Arc<dyn PhysicalExpr>>),
   }
   ```
   Is this better than modifying `HashPartitioned` with a new flag, e.g. 
`HashPartitioned(Vec<Arc<dyn PhysicalExpr>>, bool)`, to control whether nulls 
are distributed?
   
   2. Add a new flag to `Partitioning::Hash`:
   ```rust
    pub enum Partitioning {
        /// Allocate batches using a round-robin algorithm and the specified 
number of partitions
        RoundRobinBatch(usize),
   -    /// Allocate rows based on a hash of one of more expressions and the 
specified number of
   -    /// partitions
   -    Hash(Vec<Arc<dyn PhysicalExpr>>, usize),
   +    /// Allocate rows based on a hash of one of more expressions, the 
specified number of
   +    /// partitions, and whether the first null in the input should be 
broadcast to all partitions
   +    Hash(Vec<Arc<dyn PhysicalExpr>>, usize, bool),
        /// Unknown partitioning scheme with a known number of partitions
        UnknownPartitioning(usize),
    }
   ```
   Same question here.  Does it make more sense to add a flag to `Hash` 
partitioning, or make a totally separate enum value?
   
   Alternative:
   
   - Instead of broadcasting the nulls, create a new mode for 
`RepartitionExec`, which broadcasts a special marker row indicating a null key 
was seen, then end the repartition operation.  The antijoin would then have to 
detect that special row and quit the antijoin early.
       - This approach may not work well for the tuple NOT IN case. That case 
must broadcast every key combination containing a null.  For example, all of 
the following would need to be broadcast:  (1, null), (2, null), (null, 1)...   
Only if (null, null) is seen could the repartition end early. Also, there may 
be a filter in the join which filters out a given row with a null, so the 
presence of a null in the `RepartitionExec` doesn't necessarily mean the null 
will be processed by the join.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to