[
https://issues.apache.org/jira/browse/FLINK-11916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-11916:
-----------------------------------
Labels: auto-deprioritized-major auto-deprioritized-minor (was:
auto-deprioritized-major stale-minor)
Priority: Not a Priority (was: Minor)
This issue was labeled "stale-minor" 7 days ago and has not received any
updates so it is being deprioritized. If this ticket is actually Minor, please
raise the priority and ask a committer to assign you the issue or revive the
public discussion.
> Join with a Temporal Table should throw exception for left join
> ---------------------------------------------------------------
>
> Key: FLINK-11916
> URL: https://issues.apache.org/jira/browse/FLINK-11916
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Reporter: Hequn Cheng
> Priority: Not a Priority
> Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> InĀ {{TemporalJoinITCase.testProcessTimeInnerJoin}}, if we change the inner
> join to left join the test works fine. We may need to throw an exception if
> we only support inner join.
> CC [~pnowojski]
> The problem can be reproduced with the following sql:
> {code:java}
> @Test
> def testEventTimeInnerJoin(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = StreamTableEnvironment.create(env)
> env.setStateBackend(getStateBackend)
> StreamITCase.clear
> env.setParallelism(1)
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val sqlQuery =
> """
> |SELECT
> | o.amount * r.rate AS amount
> |FROM
> | Orders AS o left join
> | LATERAL TABLE (Rates(o.rowtime)) AS r on true
> |WHERE r.currency = o.currency
> |""".stripMargin
> val ordersData = new mutable.MutableList[(Long, String, Timestamp)]
> ordersData.+=((2L, "Euro", new Timestamp(2L)))
> ordersData.+=((1L, "US Dollar", new Timestamp(3L)))
> ordersData.+=((50L, "Yen", new Timestamp(4L)))
> ordersData.+=((3L, "Euro", new Timestamp(5L)))
> val ratesHistoryData = new mutable.MutableList[(String, Long, Timestamp)]
> ratesHistoryData.+=(("US Dollar", 102L, new Timestamp(1L)))
> ratesHistoryData.+=(("Euro", 114L, new Timestamp(1L)))
> ratesHistoryData.+=(("Yen", 1L, new Timestamp(1L)))
> ratesHistoryData.+=(("Euro", 116L, new Timestamp(5L)))
> ratesHistoryData.+=(("Euro", 119L, new Timestamp(7L)))
> var expectedOutput = new mutable.HashSet[String]()
> expectedOutput += (2 * 114).toString
> expectedOutput += (3 * 116).toString
> val orders = env
> .fromCollection(ordersData)
> .assignTimestampsAndWatermarks(new TimestampExtractor[Long, String]())
> .toTable(tEnv, 'amount, 'currency, 'rowtime.rowtime)
> val ratesHistory = env
> .fromCollection(ratesHistoryData)
> .assignTimestampsAndWatermarks(new TimestampExtractor[String, Long]())
> .toTable(tEnv, 'currency, 'rate, 'rowtime.rowtime)
> tEnv.registerTable("Orders", orders)
> tEnv.registerTable("RatesHistory", ratesHistory)
> tEnv.registerTable("FilteredRatesHistory",
> tEnv.scan("RatesHistory").filter('rate > 110L))
> tEnv.registerFunction(
> "Rates",
> tEnv.scan("FilteredRatesHistory").createTemporalTableFunction('rowtime,
> 'currency))
> tEnv.registerTable("TemporalJoinResult", tEnv.sqlQuery(sqlQuery))
> // Scan from registered table to test for interplay between
> // LogicalCorrelateToTemporalTableJoinRule and TableScanRule
> val result = tEnv.scan("TemporalJoinResult").toAppendStream[Row]
> result.addSink(new StreamITCase.StringSink[Row])
> env.execute()
> assertEquals(expectedOutput, StreamITCase.testResults.toSet)
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)