[
https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16305851#comment-16305851
]
ASF GitHub Bot commented on FLINK-7797:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5140#discussion_r158737260
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
---
@@ -142,50 +143,47 @@ class DataStreamWindowJoin(
s"${joinConditionToString(schema.relDataType, joinCondition,
getExpressionString)}), " +
s"join: (${joinSelectionToString(schema.relDataType)})"
- joinType match {
- case JoinRelType.INNER =>
- if (relativeWindowSize < 0) {
- LOG.warn(s"The relative window size $relativeWindowSize is
negative," +
- " please check the join conditions.")
- createEmptyInnerJoin(leftDataStream, rightDataStream,
returnTypeInfo)
- } else {
- if (isRowTime) {
- createRowTimeInnerJoin(
- leftDataStream,
- rightDataStream,
- returnTypeInfo,
- joinOpName,
- joinFunction.name,
- joinFunction.code,
- leftKeys,
- rightKeys
- )
- } else {
- createProcTimeInnerJoin(
- leftDataStream,
- rightDataStream,
- returnTypeInfo,
- joinOpName,
- joinFunction.name,
- joinFunction.code,
- leftKeys,
- rightKeys
- )
- }
- }
- case JoinRelType.FULL =>
- throw new TableException(
- "Full join between stream and stream is not supported yet.")
- case JoinRelType.LEFT =>
- throw new TableException(
- "Left join between stream and stream is not supported yet.")
- case JoinRelType.RIGHT =>
- throw new TableException(
- "Right join between stream and stream is not supported yet.")
+ val flinkJoinType = joinType match {
+ case JoinRelType.INNER => JoinType.INNER
+ case JoinRelType.FULL => JoinType.FULL_OUTER
+ case JoinRelType.LEFT => JoinType.LEFT_OUTER
+ case JoinRelType.RIGHT => JoinType.RIGHT_OUTER
+ }
+
+ if (relativeWindowSize < 0) {
+ LOG.warn(s"The relative window size $relativeWindowSize is
negative," +
+ " please check the join conditions.")
+ createEmptyJoin(leftDataStream, rightDataStream, returnTypeInfo)
--- End diff --
Empty outer joins need to be handled differently than empty inner joins
because the records of the outer side(s) must be preserved and padded with
nulls. Hence, we need to pass the join type and the generated code.
> Add support for windowed outer joins for streaming tables
> ---------------------------------------------------------
>
> Key: FLINK-7797
> URL: https://issues.apache.org/jira/browse/FLINK-7797
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Affects Versions: 1.4.0
> Reporter: Fabian Hueske
> Assignee: Xingcan Cui
>
> Currently, only windowed inner joins for streaming tables are supported.
> This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER
> joins.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)