[ 
https://issues.apache.org/jira/browse/FLINK-11916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-11916:
-----------------------------------
      Labels: auto-deprioritized-major auto-deprioritized-minor  (was: 
auto-deprioritized-major stale-minor)
    Priority: Not a Priority  (was: Minor)

This issue was labeled "stale-minor" 7 days ago and has not received any 
updates so it is being deprioritized. If this ticket is actually Minor, please 
raise the priority and ask a committer to assign you the issue or revive the 
public discussion.


> Join with a Temporal Table should throw exception for left join
> ---------------------------------------------------------------
>
>                 Key: FLINK-11916
>                 URL: https://issues.apache.org/jira/browse/FLINK-11916
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>            Reporter: Hequn Cheng
>            Priority: Not a Priority
>              Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> InĀ {{TemporalJoinITCase.testProcessTimeInnerJoin}}, if we change the inner 
> join to left join the test works fine. We may need to throw an exception if 
> we only support inner join.
> CC [~pnowojski]
> The problem can be reproduced with the following sql:
> {code:java}
> @Test
>   def testEventTimeInnerJoin(): Unit = {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     val tEnv = StreamTableEnvironment.create(env)
>     env.setStateBackend(getStateBackend)
>     StreamITCase.clear
>     env.setParallelism(1)
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>     val sqlQuery =
>       """
>         |SELECT
>         |  o.amount * r.rate AS amount
>         |FROM
>         |  Orders AS o left join
>         |  LATERAL TABLE (Rates(o.rowtime)) AS r on true
>         |WHERE r.currency = o.currency
>         |""".stripMargin
>     val ordersData = new mutable.MutableList[(Long, String, Timestamp)]
>     ordersData.+=((2L, "Euro", new Timestamp(2L)))
>     ordersData.+=((1L, "US Dollar", new Timestamp(3L)))
>     ordersData.+=((50L, "Yen", new Timestamp(4L)))
>     ordersData.+=((3L, "Euro", new Timestamp(5L)))
>     val ratesHistoryData = new mutable.MutableList[(String, Long, Timestamp)]
>     ratesHistoryData.+=(("US Dollar", 102L, new Timestamp(1L)))
>     ratesHistoryData.+=(("Euro", 114L, new Timestamp(1L)))
>     ratesHistoryData.+=(("Yen", 1L, new Timestamp(1L)))
>     ratesHistoryData.+=(("Euro", 116L, new Timestamp(5L)))
>     ratesHistoryData.+=(("Euro", 119L, new Timestamp(7L)))
>     var expectedOutput = new mutable.HashSet[String]()
>     expectedOutput += (2 * 114).toString
>     expectedOutput += (3 * 116).toString
>     val orders = env
>       .fromCollection(ordersData)
>       .assignTimestampsAndWatermarks(new TimestampExtractor[Long, String]())
>       .toTable(tEnv, 'amount, 'currency, 'rowtime.rowtime)
>     val ratesHistory = env
>       .fromCollection(ratesHistoryData)
>       .assignTimestampsAndWatermarks(new TimestampExtractor[String, Long]())
>       .toTable(tEnv, 'currency, 'rate, 'rowtime.rowtime)
>     tEnv.registerTable("Orders", orders)
>     tEnv.registerTable("RatesHistory", ratesHistory)
>     tEnv.registerTable("FilteredRatesHistory", 
> tEnv.scan("RatesHistory").filter('rate > 110L))
>     tEnv.registerFunction(
>       "Rates",
>       tEnv.scan("FilteredRatesHistory").createTemporalTableFunction('rowtime, 
> 'currency))
>     tEnv.registerTable("TemporalJoinResult", tEnv.sqlQuery(sqlQuery))
>     // Scan from registered table to test for interplay between
>     // LogicalCorrelateToTemporalTableJoinRule and TableScanRule
>     val result = tEnv.scan("TemporalJoinResult").toAppendStream[Row]
>     result.addSink(new StreamITCase.StringSink[Row])
>     env.execute()
>     assertEquals(expectedOutput, StreamITCase.testResults.toSet)
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to