metesynnada opened a new pull request, #5880:
URL: https://github.com/apache/arrow-datafusion/pull/5880

   # Which issue does this PR close?
   
   Closes #5878.
   
   # Rationale for this change
   Since some strongly dependent optimizer rules affect each other, rule 
ordering becomes more important. PipelineFixer (maybe more rules in the future) 
can change the ExecutionPlan at a level, and the new `ExecutionPlan` can have a 
different set of flags (maybe ordering, distribution, or more). 
   
   I suggest making the executor changer rules above the rules that fill the 
sort, distribution, etc.
   
   - Before PR
   ```
   [
       "SymmetricHashJoinExec: join_type=Full, on=[(Column { name: \"a2\", 
index: 1 }, Column { name: \"a2\", index: 1 })], filter=BinaryExpr { left: 
BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 0 }, 
cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Gt, right: 
BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 1 }, 
cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Plus, right: 
Literal { value: Int64(3) } } }, op: And, right: BinaryExpr { left: CastExpr { 
expr: Column { name: \"a1\", index: 0 }, cast_type: Int64, cast_options: 
CastOptions { safe: false } }, op: Lt, right: BinaryExpr { left: CastExpr { 
expr: Column { name: \"a1\", index: 1 }, cast_type: Int64, cast_options: 
CastOptions { safe: false } }, op: Plus, right: Literal { value: Int64(10) } } 
} }",
       "  CoalesceBatchesExec: target_batch_size=8192",
       "    RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 1 
}], 8), input_partitions=8",
       "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
       "        CsvExec: files={1 group: 
[[private/var/folders/rf/dhj0s83d57l2_m51k2dmd_ch0000gn/T/.tmpcRbDJD/left.csv]]},
 has_header=false, limit=None, projection=[a1, a2]",
       "  CoalesceBatchesExec: target_batch_size=8192",
       "    RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 1 
}], 8), input_partitions=8",
       "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
       "        CsvExec: files={1 group: 
[[private/var/folders/rf/dhj0s83d57l2_m51k2dmd_ch0000gn/T/.tmpcRbDJD/right.csv]]},
 has_header=false, limit=None, projection=[a1, a2]",
   ]
   ```
   SHJ particularly sets `benefits_from_input_partitioning` to `false`, 
however, it is ineffective since `RepartitionExec::RounRobin` is added before 
`HashJoin -> SymmetricHashJoin` change.
   
   - After PR
   ```
   [
       "SymmetricHashJoinExec: join_type=Full, on=[(Column { name: \"a2\", 
index: 1 }, Column { name: \"a2\", index: 1 })], filter=BinaryExpr { left: 
BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 0 }, 
cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Gt, right: 
BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 1 }, 
cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Plus, right: 
Literal { value: Int64(3) } } }, op: And, right: BinaryExpr { left: CastExpr { 
expr: Column { name: \"a1\", index: 0 }, cast_type: Int64, cast_options: 
CastOptions { safe: false } }, op: Lt, right: BinaryExpr { left: CastExpr { 
expr: Column { name: \"a1\", index: 1 }, cast_type: Int64, cast_options: 
CastOptions { safe: false } }, op: Plus, right: Literal { value: Int64(10) } } 
} }",
       "  CoalesceBatchesExec: target_batch_size=8192",
       "    RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 1 
}], 8), input_partitions=1",
       "      CsvExec: files={1 group: 
[[private/var/folders/rf/dhj0s83d57l2_m51k2dmd_ch0000gn/T/.tmpdTwdrk/left.csv]]},
 has_header=false, limit=None, projection=[a1, a2]",
       "  CoalesceBatchesExec: target_batch_size=8192",
       "    RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 1 
}], 8), input_partitions=1",
       "      CsvExec: files={1 group: 
[[private/var/folders/rf/dhj0s83d57l2_m51k2dmd_ch0000gn/T/.tmpdTwdrk/right.csv]]},
 has_header=false, limit=None, projection=[a1, a2]",
   ]
   ```
   
   
   # What changes are included in this PR?
   
   Optimizer reorders to use the `benefits_from_input_partitioning` API. If we 
call PipelineFixer above (almost) everything, we can leverage the changed 
executor APIs in the optimizer.
   
   # Are these changes tested?
   
   Yes.
   
   # Are there any user-facing changes?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to