jonahgao opened a new pull request, #11536: URL: https://github.com/apache/datafusion/pull/11536
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> Closes #11409. ## Rationale for this change After certain optimizations such as `ProjectionPushdown`, the children of `InterleaveExec` can no longer meet the interleave [condition](https://github.com/apache/datafusion/blob/b19744968770c4ab426d065dec3cc5147534e87a/datafusion/physical-plan/src/union.rs#L363) because their output_partitioning becomes inconsistent. Perhaps in this situation, we could consider falling back to `UnionExec` from `InterleaveExec` instead of throwing an error. For example, in the issue #11409 **Before** optimization : ```sh # InterleaveExec's children[0] ProjectionExec: expr=[v2@2 as v2, v0@1 as v0], OutPartitioning=Hash([UnKnownColumn { name: "v0@0" }], 2) ProjectionExec: expr=[v0@0 as v0, v0@1 as v0, v2@2 as v2], OutPartitioning=Hash([Column { name: "v0", index: 0 }], 2) HashJoinExec: mode=Partitioned, join_type=Inner, on=[(v0@0, CAST(t1.v0 AS Float64)@2)], OutPartitioning=Hash([Column { name: "CAST(t1.v0 AS Float64)", index: 3 }], 2) RepartitionExec: partitioning=Hash([v0@0], 2), input_partitions=1, OutPartitioning=Hash([Column { name: "v0", index: 0 }], 2) MemoryExec: partitions=1, partition_sizes=[1], OutPartitioning=UnknownPartitioning(1) RepartitionExec: partitioning=Hash([CAST(t1.v0 AS Float64)@2], 2), input_partitions=1, OutPartitioning=Hash([Column { name: "CAST(t1.v0 AS Float64)", index: 2 }], 2) ProjectionExec: expr=[v0@0 as v0, v2@1 as v2, CAST(v0@0 AS Float64) as CAST(t1.v0 AS Float64)], OutPartitioning=UnknownPartitioning(1) MemoryExec: partitions=1, partition_sizes=[4], OutPartitioning=UnknownPartitioning(1) # InterleaveExec's children[1] ProjectionExec: expr=[v2@2 as v2, v0@1 as v0], OutPartitioning=Hash([UnKnownColumn { name: "v0@0" }], 2) ProjectionExec: expr=[v0@0 as v0, v0@1 as v0, v2@2 as v2], OutPartitioning=Hash([Column { name: "v0", index: 0 }], 2) ProjectionExec: expr=[v0@3 as v0, v0@0 as v0, v2@1 as v2, CAST(t1.v0 AS Float64)@2 as CAST(t1.v0 AS Float64)], OutPartitioning=Hash([Column { name: "v0", index: 0 }], 2) HashJoinExec: mode=Partitioned, join_type=Inner, on=[(CAST(t1.v0 AS Float64)@2, v0@0)], OutPartitioning=Hash([Column { name: "v0", index: 3 }], 2) RepartitionExec: partitioning=Hash([CAST(t1.v0 AS Float64)@2], 2), input_partitions=2, OutPartitioning=Hash([Column { name: "CAST(t1.v0 AS Float64)", index: 2 }], 2) ProjectionExec: expr=[v0@0 as v0, v2@1 as v2, CAST(v0@0 AS Float64) as CAST(t1.v0 AS Float64)], OutPartitioning=RoundRobinBatch(2) RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, OutPartitioning=RoundRobinBatch(2) FilterExec: v2@1 IS NULL, OutPartitioning=UnknownPartitioning(1) MemoryExec: partitions=1, partition_sizes=[4], OutPartitioning=UnknownPartitioning(1) RepartitionExec: partitioning=Hash([v0@0], 2), input_partitions=1, OutPartitioning=Hash([Column { name: "v0", index: 0 }], 2) MemoryExec: partitions=1, partition_sizes=[1], OutPartitioning=UnknownPartitioning(1) ``` **After:** ```sh # InterleaveExec's children[0] OutputPartitioning: Hash([UnKnownColumn { name: "CAST(t1.v0 AS Float64)@3" }], 2) ProjectionExec: expr=[v2@1 as v2, v0@0 as v0], OutPartitioning=Hash([UnKnownColumn { name: "CAST(t1.v0 AS Float64)@3" }], 2) HashJoinExec: mode=Partitioned, join_type=Inner, on=[(v0@0, CAST(t1.v0 AS Float64)@2)], projection=[v0@1, v2@2], OutPartitioning=Hash([UnKnownColumn { name: "CAST(t1.v0 AS Float64)@3" }], 2) RepartitionExec: partitioning=Hash([v0@0], 2), input_partitions=1, OutPartitioning=Hash([Column { name: "v0", index: 0 }], 2) MemoryExec: partitions=1, partition_sizes=[1], OutPartitioning=UnknownPartitioning(1) RepartitionExec: partitioning=Hash([CAST(t1.v0 AS Float64)@2], 2), input_partitions=1, OutPartitioning=Hash([Column { name: "CAST(t1.v0 AS Float64)", index: 2 }], 2) ProjectionExec: expr=[v0@0 as v0, v2@1 as v2, CAST(v0@0 AS Float64) as CAST(t1.v0 AS Float64)], OutPartitioning=UnknownPartitioning(1) MemoryExec: partitions=1, partition_sizes=[4], OutPartitioning=UnknownPartitioning(1) # InterleaveExec children[1] OutputPartitioning: Hash([UnKnownColumn { name: "v0@3" }], 2) ProjectionExec: expr=[v2@1 as v2, v0@0 as v0], OutPartitioning=Hash([UnKnownColumn { name: "v0@3" }], 2) HashJoinExec: mode=Partitioned, join_type=Inner, on=[(CAST(t1.v0 AS Float64)@2, v0@0)], projection=[v0@0, v2@1], OutPartitioning=Hash([UnKnownColumn { name: "v0@3" }], 2) RepartitionExec: partitioning=Hash([CAST(t1.v0 AS Float64)@2], 2), input_partitions=2, OutPartitioning=Hash([Column { name: "CAST(t1.v0 AS Float64)", index: 2 }], 2) ProjectionExec: expr=[v0@0 as v0, v2@1 as v2, CAST(v0@0 AS Float64) as CAST(t1.v0 AS Float64)], OutPartitioning=RoundRobinBatch(2) RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, OutPartitioning=RoundRobinBatch(2) FilterExec: v2@1 IS NULL, OutPartitioning=UnknownPartitioning(1) MemoryExec: partitions=1, partition_sizes=[4], OutPartitioning=UnknownPartitioning(1) RepartitionExec: partitioning=Hash([v0@0], 2), input_partitions=1, OutPartitioning=Hash([Column { name: "v0", index: 0 }], 2) MemoryExec: partitions=1, partition_sizes=[1], OutPartitioning=UnknownPartitioning(1) ``` ## What changes are included in this PR? ## Are these changes tested? Yes ## Are there any user-facing changes? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org