Hi Fabian, thanks for your response ... here. That sounds like a plan. I'll try it.
Thanks, Xingcan On Thu, Nov 2, 2017 at 7:13 PM, Fabian Hueske <[email protected]> wrote: > Hi Xingcan, > > you could restrict the rule that translates the join operator by adding a > condition that checks if the join condition contains an equality predicate > on expressions that do not access a field (i.e., expressions that compute a > value). > This would prevent such plans from being translated and only leave the plan > with pushed down expressions. > > Best, Fabian > > 2017-10-28 11:36 GMT+02:00 Xingcan Cui <[email protected]>: > > > Hi all, > > > > I got a question about selecting the best plan for join in Flink. For > > instance, given the logical plan below: > > > > LogicalProject(c=[$2], g=[$6]) > > LogicalFilter(condition=[AND(=($1, +($7, 1)), =(-($0, 1), +($3, 2)))]) > > LogicalJoin(condition=[true], joinType=[inner]) > > LogicalTableScan(table=[[_DataSetTable_0]]) > > LogicalTableScan(table=[[_DataSetTable_1]]), > > > > there could be two kinds of physical plans generated: > > > > DataSetCalc(select=[c, g]) > > DataSetJoin(where=[AND(=(b, +(h, 1)), =(-(a, 1), +(d, 2)))], join=[a, > b, > > c, d, g, h], joinType=[InnerJoin]) > > DataSetScan(table=[[_DataSetTable_0]]) > > DataSetCalc(select=[d, g, h]) > > DataSetScan(table=[[_DataSetTable_1]]), > > > > and > > > > DataSetCalc(select=[c, g]) > > DataSetJoin(where=[AND(=(b, $f30), =($f3, $f4))], join=[b, c, $f3, g, > > $f30, $f4], joinType=[InnerJoin]) > > DataSetCalc(select=[b, c, -(a, 1) AS $f3]) > > DataSetScan(table=[[_DataSetTable_0]]) > > DataSetCalc(select=[g, +(h, 1) AS $f3, +(d, 2) AS $f4]) > > DataSetScan(table=[[_DataSetTable_1]]). > > > > Normally, the former one will be evaluated to be more efficient with the > > cost evaluation method below, which I think is sort of classic. > > > > override def computeSelfCost (planner: RelOptPlanner, metadata: > > RelMetadataQuery): RelOptCost = { > > val leftRowCnt = metadata.getRowCount(getLeft) > > val leftRowSize = estimateRowSize(getLeft.getRowType) > > > > val rightRowCnt = metadata.getRowCount(getRight) > > val rightRowSize = estimateRowSize(getRight.getRowType) > > > > val ioCost = (leftRowCnt * leftRowSize) + (rightRowCnt * > rightRowSize) > > val cpuCost = leftRowCnt + rightRowCnt > > val rowCnt = leftRowCnt + rightRowCnt > > > > planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost) > > } > > > > However, as equi-predicates are so important for improving the > parallelism > > in Flink, the later plan (with equi-predicate in the join node) should > > actually be selected, regardless of the cost value. Is there any > approach I > > can take to solve this *without breaking* the cost model? > > > > Thanks, > > Xingcan > > >
