alamb opened a new pull request #1776:
URL: https://github.com/apache/arrow-datafusion/pull/1776


   # Which issue does this PR close?
   
   Closes https://github.com/apache/arrow-datafusion/issues/424
   
   Along with https://github.com/apache/arrow-datafusion/pull/1732, fixes 
https://github.com/apache/arrow-datafusion/issues/423 (the last part).
   
   
    # Rationale for this change
   Repartitioning the input to an operator that relies on its input to be 
sorted is incorrect as the repartitioning will intermix the partitions and 
effectively "unsort" the input stream
   
   We found this in IOx here 
https://github.com/influxdata/influxdb_iox/pull/3633#issuecomment-1030126757
   
   Here is a picture showing the problem:
   
   ```
       ┌─────────────────────────────────┐
       │                                 │
       │     SortPreservingMergeExec     │
       │                                 │
       └─────────────────────────────────┘
                 ▲      ▲       ▲            Input may not
                 │      │       │             be sorted!
         ┌───────┘      │       └───────┐
         │              │               │
         │              │               │
   ┌───────────┐  ┌───────────┐   ┌───────────┐
   │           │  │           │   │           │
   │ batch A1  │  │ batch A3  │   │ batch B3  │
   │           │  │           │   │           │
   ├───────────┤  ├───────────┤   ├───────────┤
   │           │  │           │   │           │
   │ batch B2  │  │ batch B1  │   │ batch A2  │
   │           │  │           │   │           │
   └───────────┘  └───────────┘   └───────────┘
         ▲              ▲               ▲
         │              │               │
         └─────────┐    │    ┌──────────┘
                   │    │    │                  Outputs
                   │    │    │                batches are
       ┌─────────────────────────────────┐   repartitioned
       │       RepartitionExec(3)        │    and may not
       │           RoundRobin            │   remain sorted
       │                                 │
       └─────────────────────────────────┘
                   ▲         ▲
                   │         │                Inputs are
             ┌─────┘         └─────┐            sorted
             │                     │
             │                     │
             │                     │
       ┌───────────┐         ┌───────────┐
       │           │         │           │
       │ batch A1  │         │ batch B1  │
       │           │         │           │
       ├───────────┤         ├───────────┤
       │           │         │           │
       │ batch A2  │         │ batch B2  │
       │           │         │           │
       ├───────────┤         ├───────────┤
       │           │         │           │
       │ batch A3  │         │ batch B3  │
       │           │         │           │
       └───────────┘         └───────────┘
   
   
        Sorted Input          Sorted Input
              A                     B
   ```
   
   The streams need to remain the way they were
   
   ```
   ┌─────────────────────────────────┐
   │                                 │
   │     SortPreservingMergeExec     │
   │                                 │
   └─────────────────────────────────┘
               ▲         ▲
               │         │         Inputs are
         ┌─────┘         └─────┐   sorted, as
         │                     │    required
         │                     │
         │                     │
   ┌───────────┐         ┌───────────┐
   │           │         │           │
   │ batch A1  │         │ batch B1  │
   │           │         │           │
   ├───────────┤         ├───────────┤
   │           │         │           │
   │ batch A2  │         │ batch B2  │
   │           │         │           │
   ├───────────┤         ├───────────┤
   │           │         │           │
   │ batch A3  │         │ batch B3  │
   │           │         │           │
   └───────────┘         └───────────┘
   
   
    Sorted Input          Sorted Input
          A                     B
   ```
   
   # What changes are included in this PR?
   1. Add several "metadata" functions to `ExecutionPlan` that describe its 
sortedness and the invariants required for its input
   2. Teach the repartitioning optimizer pass to respect the invariants
   
   # Are there any user-facing changes?
   Yes: All `ExecutionPlan`s are now required to implement `output_ordering` as 
described by @andygrove  here 
https://github.com/apache/arrow-datafusion/issues/424#issuecomment-847857451
   
   The rationale for not providing a default implementation (`None`) was to 
force anyone who `impl ExecutionPlan` to think about sort orders. If they do 
not (very!) subtle bugs are possible as DataFusion starts to rely more on 
sortedness for optimizations
   
   cc @tustvold @Dandandan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to