[
https://issues.apache.org/jira/browse/FLINK-3941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15299817#comment-15299817
]
ASF GitHub Bot commented on FLINK-3941:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/2025#discussion_r64549056
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
---
@@ -69,16 +73,23 @@ class DataSetUnion(
rows + metadata.getRowCount(child)
}
- planner.getCostFactory.makeCost(rowCnt, 0, 0)
+ planner.getCostFactory.makeCost(
+ rowCnt,
+ if (all) 0 else rowCnt,
+ if (all) 0 else rowCnt)
}
override def translateToPlan(
tableEnv: BatchTableEnvironment,
expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
- val leftDataSet =
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
- val rightDataSet =
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
- leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
+ val leftDataSet =
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
+ val rightDataSet =
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
+ if (all) {
+ leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
+ } else {
+ leftDataSet.union(rightDataSet).distinct().asInstanceOf[DataSet[Any]]
--- End diff --
This method should called by the optimizer when a new `DataSetUnion` node
is created to estimate the cost of the subplan (the new node + all recursive
input nodes). If there is a cheaper plan that does the same thing, the more
expensive plan is discarded.
So, you have a plan with a non-union all operator. The optimizer knows the
cost of this plan. Then, the `UnionToDistinctRule` is called and a new union +
distinct operators are created. For both, the `computeSelfCost` method is
called to compute the cost estimate of the plan and then the cheaper of both
plans is preserved. (This is a bit simplified, because the optimization rules
are applied on `LogicalRel` nodes but the cost estimation happens on the
`DataSetRel` nodes).
> Add support for UNION (with duplicate elimination)
> --------------------------------------------------
>
> Key: FLINK-3941
> URL: https://issues.apache.org/jira/browse/FLINK-3941
> Project: Flink
> Issue Type: New Feature
> Components: Table API
> Affects Versions: 1.1.0
> Reporter: Fabian Hueske
> Assignee: Yijie Shen
> Priority: Minor
>
> Currently, only UNION ALL is supported by Table API and SQL.
> UNION (with duplicate elimination) can be supported by applying a
> {{DataSet.distinct()}} after the union on all fields. This issue includes:
> - Extending {{DataSetUnion}}
> - Relaxing {{DataSetUnionRule}} to translated non-all unions.
> - Extend the Table API with union() method.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)