[
https://issues.apache.org/jira/browse/FLINK-3942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15355890#comment-15355890
]
ASF GitHub Bot commented on FLINK-3942:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/2159#discussion_r69037698
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala
---
@@ -83,66 +82,68 @@ class DataSetIntersect(
tableEnv: BatchTableEnvironment,
expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
- var leftDataSet: DataSet[Any] = null
- var rightDataSet: DataSet[Any] = null
+ val leftDataSet: DataSet[Any] =
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+ val rightDataSet: DataSet[Any] =
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
- expectedType match {
- case None =>
- leftDataSet =
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
- rightDataSet =
- right.asInstanceOf[DataSetRel].translateToPlan(tableEnv,
Some(leftDataSet.getType))
- case _ =>
- leftDataSet =
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
- rightDataSet =
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
- }
-
- val config = tableEnv.getConfig
-
- val returnType = determineReturnType(
- getRowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
-
- val generator = new CodeGenerator(
- config,
- false,
- leftDataSet.getType,
- Some(rightDataSet.getType))
-
- val conversion = generator.generateConverterResultExpression(
- returnType,
- left.getRowType.getFieldNames)
+ val coGroupedDs = leftDataSet.coGroup(rightDataSet)
+ val leftType = leftDataSet.getType
+ val rightType = rightDataSet.getType
- val body = s"""
- |${conversion.code}
- |${generator.collectorTerm}.collect(${conversion.resultTerm});
- |""".stripMargin
+ // If it is atomic type, the field expression need to be "*".
--- End diff --
I think you can use `*` for all cases. Composite types should be
(recursively) expanded to all atomic member types.
> Add support for INTERSECT
> -------------------------
>
> Key: FLINK-3942
> URL: https://issues.apache.org/jira/browse/FLINK-3942
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Affects Versions: 1.1.0
> Reporter: Fabian Hueske
> Assignee: Jark Wu
> Priority: Minor
>
> Currently, the Table API and SQL do not support INTERSECT.
> INTERSECT can be executed as join on all fields.
> In order to add support for INTERSECT to the Table API and SQL we need to:
> - Implement a {{DataSetIntersect}} class that translates an INTERSECT into a
> DataSet API program using a join on all fields.
> - Implement a {{DataSetIntersectRule}} that translates a Calcite
> {{LogicalIntersect}} into a {{DataSetIntersect}}.
> - Extend the Table API (and validation phase) to provide an intersect()
> method.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)