twalthr commented on a change in pull request #16739:
URL: https://github.com/apache/flink/pull/16739#discussion_r686556951
##########
File path:
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalIntervalJoinRule.scala
##########
@@ -38,40 +38,40 @@ class StreamPhysicalIntervalJoinRule
override def matches(call: RelOptRuleCall): Boolean = {
val join: FlinkLogicalJoin = call.rel(0)
- val joinRowType = join.getRowType
- // TODO support SEMI/ANTI join
- if (!join.getJoinType.projectsRight) {
+ if (!satisfyIntervalJoin(join)) {
return false
}
- val (windowBounds, _) = extractWindowBounds(join)
+ // validate the join
+ val windowBounds = extractWindowBounds(join)._1.get
- if (windowBounds.isDefined) {
- if (windowBounds.get.isEventTime) {
- val leftTimeAttributeType = join.getLeft.getRowType
- .getFieldList
- .get(windowBounds.get.getLeftTimeIdx).getType
- val rightTimeAttributeType = join.getRight.getRowType
- .getFieldList
- .get(windowBounds.get.getRightTimeIdx).getType
- if (leftTimeAttributeType.getSqlTypeName !=
rightTimeAttributeType.getSqlTypeName) {
- throw new ValidationException(
- String.format("Interval join with rowtime attribute requires same
rowtime types," +
- " but the types are %s and %s.",
+ if (windowBounds.isEventTime) {
+ val leftTimeAttributeType = join.getLeft.getRowType
+ .getFieldList
+ .get(windowBounds.getLeftTimeIdx).getType
+ val rightTimeAttributeType = join.getRight.getRowType
+ .getFieldList
+ .get(windowBounds.getRightTimeIdx).getType
+ if (leftTimeAttributeType.getSqlTypeName !=
rightTimeAttributeType.getSqlTypeName) {
+ throw new ValidationException(
+ String.format("Interval join with rowtime attribute requires same
rowtime types," +
+ " but the types are %s and %s.",
leftTimeAttributeType.toString, rightTimeAttributeType.toString))
- }
- true
- } else {
- // Check that no event-time attributes are in the input because the
processing time window
- // join does not correctly hold back watermarks.
- // We rely on projection pushdown to remove unused attributes before
the join.
- !joinRowType.getFieldList.exists(f =>
FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
}
} else {
- // the given join does not have valid window bounds. We cannot translate
it.
- false
+ // Check that no event-time attributes are in the input because the
processing time window
+ // join does not correctly hold back watermarks.
+ // We rely on projection pushdown to remove unused attributes before the
join.
+ val joinRowType = join.getRowType
+ val containsRowTime = joinRowType.getFieldList.exists(f =>
isRowtimeIndicatorType(f.getType))
+ if (containsRowTime) {
+ throw new TableException(
Review comment:
Is this only a `Precondition.checkState` or can users really trigger
this exception?
##########
File path:
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRule.scala
##########
@@ -40,51 +41,39 @@ class StreamPhysicalJoinRule
override def matches(call: RelOptRuleCall): Boolean = {
val join: FlinkLogicalJoin = call.rel(0)
- if (!join.getJoinType.projectsRight) {
- // SEMI/ANTI join always converts to StreamExecJoin now
- return true
- }
val left: FlinkLogicalRel = call.rel(1).asInstanceOf[FlinkLogicalRel]
val right: FlinkLogicalRel = call.rel(2).asInstanceOf[FlinkLogicalRel]
- val joinRowType = join.getRowType
-
- if (left.isInstanceOf[FlinkLogicalSnapshot]) {
- throw new TableException(
- "Temporal table join only support apply FOR SYSTEM_TIME AS OF on the
right table.")
- }
- // this rule shouldn't match temporal table join
- if (right.isInstanceOf[FlinkLogicalSnapshot] ||
- TemporalJoinUtil.containsTemporalJoinCondition(join.getCondition)) {
+ if (!satisfyRegularJoin(join, right)) {
return false
}
- val (windowBounds, remainingPreds) = extractWindowBounds(join)
- if (windowBounds.isDefined) {
- return false
- }
-
- if (satisfyWindowJoin(join)) {
- return false
+ // validate the join
+ if (left.isInstanceOf[FlinkLogicalSnapshot]) {
+ throw new TableException(
+ "Temporal table join only support apply FOR SYSTEM_TIME AS OF on the
right table.")
}
- // remaining predicate must not access time attributes
- val remainingPredsAccessTime = remainingPreds.isDefined &&
- IntervalJoinUtil.accessesTimeAttribute(remainingPreds.get, joinRowType)
+ // INITIAL_TEMPORAL_JOIN_CONDITION should not appear in physical phase in
case which fallback
+ // to regular join
+ checkState(!containsInitialTemporalJoinCondition(join.getCondition))
- val rowTimeAttrInOutput = joinRowType.getFieldList
+ val rowTimeAttrInOutput = join.getRowType.getFieldList
.exists(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
if (rowTimeAttrInOutput) {
throw new TableException(
Review comment:
Same question as above. Shouldn't this be rather a check state. The goal
of this effort to get rid of this exception, right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]