Hi Fabian,

thanks for your response ... here. That sounds like a plan. I'll try it.

Thanks,
Xingcan

On Thu, Nov 2, 2017 at 7:13 PM, Fabian Hueske <[email protected]> wrote:

> Hi Xingcan,
>
> you could restrict the rule that translates the join operator by adding a
> condition that checks if the join condition contains an equality predicate
> on expressions that do not access a field (i.e., expressions that compute a
> value).
> This would prevent such plans from being translated and only leave the plan
> with pushed down expressions.
>
> Best, Fabian
>
> 2017-10-28 11:36 GMT+02:00 Xingcan Cui <[email protected]>:
>
> > Hi all,
> >
> > I got a question about selecting the best plan for join in Flink. For
> > instance, given the logical plan below:
> >
> > LogicalProject(c=[$2], g=[$6])
> >   LogicalFilter(condition=[AND(=($1, +($7, 1)), =(-($0, 1), +($3, 2)))])
> >     LogicalJoin(condition=[true], joinType=[inner])
> >       LogicalTableScan(table=[[_DataSetTable_0]])
> >       LogicalTableScan(table=[[_DataSetTable_1]]),
> >
> > there could be two kinds of physical plans generated:
> >
> > DataSetCalc(select=[c, g])
> >   DataSetJoin(where=[AND(=(b, +(h, 1)), =(-(a, 1), +(d, 2)))], join=[a,
> b,
> > c, d, g, h], joinType=[InnerJoin])
> >     DataSetScan(table=[[_DataSetTable_0]])
> >     DataSetCalc(select=[d, g, h])
> >       DataSetScan(table=[[_DataSetTable_1]]),
> >
> > and
> >
> > DataSetCalc(select=[c, g])
> >   DataSetJoin(where=[AND(=(b, $f30), =($f3, $f4))], join=[b, c, $f3, g,
> > $f30, $f4], joinType=[InnerJoin])
> >     DataSetCalc(select=[b, c, -(a, 1) AS $f3])
> >       DataSetScan(table=[[_DataSetTable_0]])
> >     DataSetCalc(select=[g, +(h, 1) AS $f3, +(d, 2) AS $f4])
> >       DataSetScan(table=[[_DataSetTable_1]]).
> >
> > Normally, the former one will be evaluated to be more efficient with the
> > cost evaluation method below, which I think is sort of classic.
> >
> > override def computeSelfCost (planner: RelOptPlanner, metadata:
> > RelMetadataQuery): RelOptCost = {
> >     val leftRowCnt = metadata.getRowCount(getLeft)
> >     val leftRowSize = estimateRowSize(getLeft.getRowType)
> >
> >     val rightRowCnt = metadata.getRowCount(getRight)
> >     val rightRowSize = estimateRowSize(getRight.getRowType)
> >
> >     val ioCost = (leftRowCnt * leftRowSize) + (rightRowCnt *
> rightRowSize)
> >     val cpuCost = leftRowCnt + rightRowCnt
> >     val rowCnt = leftRowCnt + rightRowCnt
> >
> >     planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost)
> > }
> >
> > However, as equi-predicates are so important for improving the
> parallelism
> > in Flink, the later plan (with equi-predicate in the join node) should
> > actually be selected, regardless of the cost value. Is there any
> approach I
> > can take to solve this *without breaking* the cost model?
> >
> > Thanks,
> > Xingcan
> >
>

Reply via email to