[ 
https://issues.apache.org/jira/browse/FLINK-3941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15299817#comment-15299817
 ] 

ASF GitHub Bot commented on FLINK-3941:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2025#discussion_r64549056
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
 ---
    @@ -69,16 +73,23 @@ class DataSetUnion(
           rows + metadata.getRowCount(child)
         }
     
    -    planner.getCostFactory.makeCost(rowCnt, 0, 0)
    +    planner.getCostFactory.makeCost(
    +      rowCnt,
    +      if (all) 0 else rowCnt,
    +      if (all) 0 else rowCnt)
       }
     
       override def translateToPlan(
           tableEnv: BatchTableEnvironment,
           expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
     
    -    val leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
    -    val rightDataSet = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
    -    leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
    +    val leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
    +    val rightDataSet = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
    +    if (all) {
    +      leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
    +    } else {
    +      leftDataSet.union(rightDataSet).distinct().asInstanceOf[DataSet[Any]]
    --- End diff --
    
    This method should called by the optimizer when a new `DataSetUnion` node 
is created to estimate the cost of the subplan (the new node + all recursive 
input nodes). If there is a cheaper plan that does the same thing, the more 
expensive plan is discarded.
    
    So, you have a plan with a non-union all operator. The optimizer knows the 
cost of this plan. Then, the `UnionToDistinctRule` is called and a new union + 
distinct operators are created. For both, the `computeSelfCost` method is 
called to compute the cost estimate of the plan and then the cheaper of both 
plans is preserved. (This is a bit simplified, because the optimization rules 
are applied on `LogicalRel` nodes but the cost estimation happens on the 
`DataSetRel` nodes).


> Add support for UNION (with duplicate elimination)
> --------------------------------------------------
>
>                 Key: FLINK-3941
>                 URL: https://issues.apache.org/jira/browse/FLINK-3941
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API
>    Affects Versions: 1.1.0
>            Reporter: Fabian Hueske
>            Assignee: Yijie Shen
>            Priority: Minor
>
> Currently, only UNION ALL is supported by Table API and SQL.
> UNION (with duplicate elimination) can be supported by applying a 
> {{DataSet.distinct()}} after the union on all fields. This issue includes:
> - Extending {{DataSetUnion}}
> - Relaxing {{DataSetUnionRule}} to translated non-all unions.
> - Extend the Table API with union() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to