godfreyhe commented on code in PR #21219:
URL: https://github.com/apache/flink/pull/21219#discussion_r1042030876
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.scala:
##########
@@ -570,6 +585,52 @@ class TemporalJoinTest extends TableTestBase {
util.verifyExplainInsert(sql, ExplainDetail.CHANGELOG_MODE)
}
+ @Test
+ def testTemporalJoinUpsertSourceWithPostFilter(): Unit = {
+ val sqlQuery = "SELECT * " +
+ "FROM Orders AS o JOIN " +
+ "UpsertRates FOR SYSTEM_TIME AS OF o.rowtime AS r " +
+ "ON o.currency = r.currency WHERE valid = 'true'"
+
+ util.verifyExplain(sqlQuery, ExplainDetail.CHANGELOG_MODE)
+ }
+
+ @Test
+ def testTemporalJoinUpsertSourceWithPreFilter(): Unit = {
+ util.tableEnv.executeSql(s"""
+ |CREATE TEMPORARY VIEW V1 AS
+ |SELECT * FROM UpsertRates WHERE valid = 'true'
+ |""".stripMargin)
+
+ /**
+ * The problem is: there's exists a filter on an upsert changelog
input(changelogMode=[I,UA,D]),
+ * the UB message must exists for correctness.
+ *
+ * Intermediate plan with modify kind:
+ * {{{
+ * +- TemporalJoin(joinType=[InnerJoin], ..., changelogMode=[I])
+ * :- Exchange(distribution=[hash[currency]], changelogMode=[I])
+ * : +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime],
changelogMode=[I])
+ * : +- Calc(select=[amount, currency, rowtime, ...
changelogMode=[I])
+ * : +- TableSourceScan(table= Orders ... changelogMode=[I])
+ * +- Exchange(distribution=[hash[currency]], changelogMode=[I,UA,D])
+ * +- Calc(select=[currency, ... where=[=(valid, _UTF-16LE'true')],
changelogMode=[I,UA,D])
+ * +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime],
changelogMode=[I,UA,D])
+ * +- TableSourceScan(table= UpsertRates, ...
changelogMode=[I,UA,D])
+ * }}}
+ */
+
+ val sqlQuery = "SELECT * " +
+ "FROM Orders AS o JOIN " +
+ "V1 FOR SYSTEM_TIME AS OF o.rowtime AS r " +
+ "ON o.currency = r.currency"
+
+ expectExceptionThrown(
+ sqlQuery,
+ "Can't generate a valid execution plan for the given query",
Review Comment:
The exception message is not clear
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]