beyond1920 commented on a change in pull request #16739:
URL: https://github.com/apache/flink/pull/16739#discussion_r686670399
##########
File path:
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalIntervalJoinRule.scala
##########
@@ -38,40 +38,40 @@ class StreamPhysicalIntervalJoinRule
override def matches(call: RelOptRuleCall): Boolean = {
val join: FlinkLogicalJoin = call.rel(0)
- val joinRowType = join.getRowType
- // TODO support SEMI/ANTI join
- if (!join.getJoinType.projectsRight) {
+ if (!satisfyIntervalJoin(join)) {
return false
}
- val (windowBounds, _) = extractWindowBounds(join)
+ // validate the join
+ val windowBounds = extractWindowBounds(join)._1.get
- if (windowBounds.isDefined) {
- if (windowBounds.get.isEventTime) {
- val leftTimeAttributeType = join.getLeft.getRowType
- .getFieldList
- .get(windowBounds.get.getLeftTimeIdx).getType
- val rightTimeAttributeType = join.getRight.getRowType
- .getFieldList
- .get(windowBounds.get.getRightTimeIdx).getType
- if (leftTimeAttributeType.getSqlTypeName !=
rightTimeAttributeType.getSqlTypeName) {
- throw new ValidationException(
- String.format("Interval join with rowtime attribute requires same
rowtime types," +
- " but the types are %s and %s.",
+ if (windowBounds.isEventTime) {
+ val leftTimeAttributeType = join.getLeft.getRowType
+ .getFieldList
+ .get(windowBounds.getLeftTimeIdx).getType
+ val rightTimeAttributeType = join.getRight.getRowType
+ .getFieldList
+ .get(windowBounds.getRightTimeIdx).getType
+ if (leftTimeAttributeType.getSqlTypeName !=
rightTimeAttributeType.getSqlTypeName) {
+ throw new ValidationException(
+ String.format("Interval join with rowtime attribute requires same
rowtime types," +
+ " but the types are %s and %s.",
leftTimeAttributeType.toString, rightTimeAttributeType.toString))
- }
- true
- } else {
- // Check that no event-time attributes are in the input because the
processing time window
- // join does not correctly hold back watermarks.
- // We rely on projection pushdown to remove unused attributes before
the join.
- !joinRowType.getFieldList.exists(f =>
FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
}
} else {
- // the given join does not have valid window bounds. We cannot translate
it.
- false
+ // Check that no event-time attributes are in the input because the
processing time window
+ // join does not correctly hold back watermarks.
+ // We rely on projection pushdown to remove unused attributes before the
join.
+ val joinRowType = join.getRowType
+ val containsRowTime = joinRowType.getFieldList.exists(f =>
isRowtimeIndicatorType(f.getType))
+ if (containsRowTime) {
+ throw new TableException(
Review comment:
The exception would really trigger by user in some corner case ( for
example `IntervalJoinTest#testNoRowtimeAttributeInResult`).
Before the pr, this case would also throw the following `TableException`
because the logical join could not translated to any kind of physical join. I
think it's more friendly to user to given a clear error message.
`org.apache.flink.table.api.TableException: Cannot generate a valid
execution plan for the given query:
FlinkLogicalCalc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS
proctime, rowtime, a0, b0, c0, PROCTIME_MATERIALIZE(proctime0) AS proctime0,
rowtime0])
+- FlinkLogicalJoin(condition=[AND(=($0, $5), >=($3, -($8, 5000:INTERVAL
SECOND)), <=($3, $8))], joinType=[inner])
:- FlinkLogicalDataStreamTableScan(table=[[default_catalog,
default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+- FlinkLogicalDataStreamTableScan(table=[[default_catalog,
default_database, MyTable2]], fields=[a, b, c, proctime, rowtime])
This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL
features.
`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]