jonahgao opened a new pull request, #11536:
URL: https://github.com/apache/datafusion/pull/11536

   ## Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and 
enhancements and this helps us generate change logs for our releases. You can 
link an issue to this PR using the GitHub syntax. For example `Closes #123` 
indicates that this PR will close issue #123.
   -->
   
   Closes #11409.
   
   ## Rationale for this change
   
   After certain optimizations such as `ProjectionPushdown`, the children of 
`InterleaveExec` can no longer meet the interleave 
[condition](https://github.com/apache/datafusion/blob/b19744968770c4ab426d065dec3cc5147534e87a/datafusion/physical-plan/src/union.rs#L363)
 because their output_partitioning becomes inconsistent.
   Perhaps in this situation,  we could consider falling back to `UnionExec` 
from `InterleaveExec` instead of throwing an error.
   
   For example, in the issue #11409
   **Before** optimization :
   ```sh
   # InterleaveExec's children[0] 
   ProjectionExec: expr=[v2@2 as v2, v0@1 as v0], 
OutPartitioning=Hash([UnKnownColumn { name: "v0@0" }], 2)
     ProjectionExec: expr=[v0@0 as v0, v0@1 as v0, v2@2 as v2], 
OutPartitioning=Hash([Column { name: "v0", index: 0 }], 2)
       HashJoinExec: mode=Partitioned, join_type=Inner, on=[(v0@0, CAST(t1.v0 
AS Float64)@2)], OutPartitioning=Hash([Column { name: "CAST(t1.v0 AS Float64)", 
index: 3 }], 2)
         RepartitionExec: partitioning=Hash([v0@0], 2), input_partitions=1, 
OutPartitioning=Hash([Column { name: "v0", index: 0 }], 2)
           MemoryExec: partitions=1, partition_sizes=[1], 
OutPartitioning=UnknownPartitioning(1)
         RepartitionExec: partitioning=Hash([CAST(t1.v0 AS Float64)@2], 2), 
input_partitions=1, OutPartitioning=Hash([Column { name: "CAST(t1.v0 AS 
Float64)", index: 2 }], 2)
           ProjectionExec: expr=[v0@0 as v0, v2@1 as v2, CAST(v0@0 AS Float64) 
as CAST(t1.v0 AS Float64)], OutPartitioning=UnknownPartitioning(1)
             MemoryExec: partitions=1, partition_sizes=[4], 
OutPartitioning=UnknownPartitioning(1)
   
   # InterleaveExec's children[1]
   ProjectionExec: expr=[v2@2 as v2, v0@1 as v0], 
OutPartitioning=Hash([UnKnownColumn { name: "v0@0" }], 2)
     ProjectionExec: expr=[v0@0 as v0, v0@1 as v0, v2@2 as v2], 
OutPartitioning=Hash([Column { name: "v0", index: 0 }], 2)
       ProjectionExec: expr=[v0@3 as v0, v0@0 as v0, v2@1 as v2, CAST(t1.v0 AS 
Float64)@2 as CAST(t1.v0 AS Float64)], OutPartitioning=Hash([Column { name: 
"v0", index: 0 }], 2)
         HashJoinExec: mode=Partitioned, join_type=Inner, on=[(CAST(t1.v0 AS 
Float64)@2, v0@0)], OutPartitioning=Hash([Column { name: "v0", index: 3 }], 2)
           RepartitionExec: partitioning=Hash([CAST(t1.v0 AS Float64)@2], 2), 
input_partitions=2, OutPartitioning=Hash([Column { name: "CAST(t1.v0 AS 
Float64)", index: 2 }], 2)
             ProjectionExec: expr=[v0@0 as v0, v2@1 as v2, CAST(v0@0 AS 
Float64) as CAST(t1.v0 AS Float64)], OutPartitioning=RoundRobinBatch(2)
               RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1, OutPartitioning=RoundRobinBatch(2)
                 FilterExec: v2@1 IS NULL, 
OutPartitioning=UnknownPartitioning(1)
                   MemoryExec: partitions=1, partition_sizes=[4], 
OutPartitioning=UnknownPartitioning(1)
           RepartitionExec: partitioning=Hash([v0@0], 2), input_partitions=1, 
OutPartitioning=Hash([Column { name: "v0", index: 0 }], 2)
             MemoryExec: partitions=1, partition_sizes=[1], 
OutPartitioning=UnknownPartitioning(1)
   ```
   **After:**
   ```sh
   # InterleaveExec's children[0] OutputPartitioning: Hash([UnKnownColumn { 
name: "CAST(t1.v0 AS Float64)@3" }], 2)
   ProjectionExec: expr=[v2@1 as v2, v0@0 as v0], 
OutPartitioning=Hash([UnKnownColumn { name: "CAST(t1.v0 AS Float64)@3" }], 2)
     HashJoinExec: mode=Partitioned, join_type=Inner, on=[(v0@0, CAST(t1.v0 AS 
Float64)@2)], projection=[v0@1, v2@2], OutPartitioning=Hash([UnKnownColumn { 
name: "CAST(t1.v0 AS Float64)@3" }], 2)
       RepartitionExec: partitioning=Hash([v0@0], 2), input_partitions=1, 
OutPartitioning=Hash([Column { name: "v0", index: 0 }], 2)
         MemoryExec: partitions=1, partition_sizes=[1], 
OutPartitioning=UnknownPartitioning(1)
       RepartitionExec: partitioning=Hash([CAST(t1.v0 AS Float64)@2], 2), 
input_partitions=1, OutPartitioning=Hash([Column { name: "CAST(t1.v0 AS 
Float64)", index: 2 }], 2)
         ProjectionExec: expr=[v0@0 as v0, v2@1 as v2, CAST(v0@0 AS Float64) as 
CAST(t1.v0 AS Float64)], OutPartitioning=UnknownPartitioning(1)
           MemoryExec: partitions=1, partition_sizes=[4], 
OutPartitioning=UnknownPartitioning(1)
   
   # InterleaveExec children[1]  OutputPartitioning: Hash([UnKnownColumn { 
name: "v0@3" }], 2)
   ProjectionExec: expr=[v2@1 as v2, v0@0 as v0], 
OutPartitioning=Hash([UnKnownColumn { name: "v0@3" }], 2)
     HashJoinExec: mode=Partitioned, join_type=Inner, on=[(CAST(t1.v0 AS 
Float64)@2, v0@0)], projection=[v0@0, v2@1], 
OutPartitioning=Hash([UnKnownColumn { name: "v0@3" }], 2)
       RepartitionExec: partitioning=Hash([CAST(t1.v0 AS Float64)@2], 2), 
input_partitions=2, OutPartitioning=Hash([Column { name: "CAST(t1.v0 AS 
Float64)", index: 2 }], 2)
         ProjectionExec: expr=[v0@0 as v0, v2@1 as v2, CAST(v0@0 AS Float64) as 
CAST(t1.v0 AS Float64)], OutPartitioning=RoundRobinBatch(2)
           RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1, OutPartitioning=RoundRobinBatch(2)
             FilterExec: v2@1 IS NULL, OutPartitioning=UnknownPartitioning(1)
               MemoryExec: partitions=1, partition_sizes=[4], 
OutPartitioning=UnknownPartitioning(1)
       RepartitionExec: partitioning=Hash([v0@0], 2), input_partitions=1, 
OutPartitioning=Hash([Column { name: "v0", index: 0 }], 2)
         MemoryExec: partitions=1, partition_sizes=[1], 
OutPartitioning=UnknownPartitioning(1)
   ```
   
   
   ## What changes are included in this PR?
   
   
   ## Are these changes tested?
   Yes
   
   ## Are there any user-facing changes?
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to