Hi all,

I got a question about selecting the best plan for join in Flink. For
instance, given the logical plan below:

LogicalProject(c=[$2], g=[$6])
  LogicalFilter(condition=[AND(=($1, +($7, 1)), =(-($0, 1), +($3, 2)))])
    LogicalJoin(condition=[true], joinType=[inner])
      LogicalTableScan(table=[[_DataSetTable_0]])
      LogicalTableScan(table=[[_DataSetTable_1]]),

there could be two kinds of physical plans generated:

DataSetCalc(select=[c, g])
  DataSetJoin(where=[AND(=(b, +(h, 1)), =(-(a, 1), +(d, 2)))], join=[a, b,
c, d, g, h], joinType=[InnerJoin])
    DataSetScan(table=[[_DataSetTable_0]])
    DataSetCalc(select=[d, g, h])
      DataSetScan(table=[[_DataSetTable_1]]),

and

DataSetCalc(select=[c, g])
  DataSetJoin(where=[AND(=(b, $f30), =($f3, $f4))], join=[b, c, $f3, g,
$f30, $f4], joinType=[InnerJoin])
    DataSetCalc(select=[b, c, -(a, 1) AS $f3])
      DataSetScan(table=[[_DataSetTable_0]])
    DataSetCalc(select=[g, +(h, 1) AS $f3, +(d, 2) AS $f4])
      DataSetScan(table=[[_DataSetTable_1]]).

Normally, the former one will be evaluated to be more efficient with the
cost evaluation method below, which I think is sort of classic.

override def computeSelfCost (planner: RelOptPlanner, metadata:
RelMetadataQuery): RelOptCost = {
    val leftRowCnt = metadata.getRowCount(getLeft)
    val leftRowSize = estimateRowSize(getLeft.getRowType)

    val rightRowCnt = metadata.getRowCount(getRight)
    val rightRowSize = estimateRowSize(getRight.getRowType)

    val ioCost = (leftRowCnt * leftRowSize) + (rightRowCnt * rightRowSize)
    val cpuCost = leftRowCnt + rightRowCnt
    val rowCnt = leftRowCnt + rightRowCnt

    planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost)
}

However, as equi-predicates are so important for improving the parallelism
in Flink, the later plan (with equi-predicate in the join node) should
actually be selected, regardless of the cost value. Is there any approach I
can take to solve this *without breaking* the cost model?

Thanks,
Xingcan

Reply via email to