[
https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16170183#comment-16170183
]
ASF GitHub Bot commented on FLINK-6233:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/4625#discussion_r139400962
--- Diff:
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
---
@@ -383,13 +384,158 @@ class JoinHarnessTest extends HarnessTestBase{
val expectedOutput = new ConcurrentLinkedQueue[Object]()
expectedOutput.add(new StreamRecord(
- CRow(Row.of(2: JInt, "aaa2", 2: JInt, "bbb7"), true), 7))
+ CRow(Row.of(2L: JLong, "aaa2", 2L: JLong, "bbb7"), true), 7))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa3", 1: JInt, "bbb12"), true), 12))
+ CRow(Row.of(1L: JLong, "aaa3", 1L: JLong, "bbb12"), true), 12))
verify(expectedOutput, result, new RowResultSortComparator())
testHarness.close()
}
+ /** a.c1 >= b.rowtime - 10 and a.rowtime <= b.rowtime + 20 **/
+ @Test
+ def testCommonRowTimeJoin() {
+
+ val joinProcessFunc = new RowTimeBoundedStreamInnerJoin(
+ -10, 20, 0, rT, rT, "TestJoinFunction", funcCode, 0, 0)
+
+ val operator: KeyedCoProcessOperator[String, CRow, CRow, CRow] =
+ new KeyedCoProcessOperator[String, CRow, CRow, CRow](joinProcessFunc)
+ val testHarness: KeyedTwoInputStreamOperatorTestHarness[String, CRow,
CRow, CRow] =
+ new KeyedTwoInputStreamOperatorTestHarness[String, CRow, CRow, CRow](
+ operator,
+ new TupleRowKeySelector[String](1),
+ new TupleRowKeySelector[String](1),
+ BasicTypeInfo.STRING_TYPE_INFO,
+ 1, 1, 0)
+
+ testHarness.open()
+
+ // Advance
+ testHarness.processWatermark1(new Watermark(1))
+ testHarness.processWatermark2(new Watermark(1))
+
+ // Test late data
+ testHarness.processElement1(new StreamRecord[CRow](
+ CRow(Row.of(1L: JLong, "k1"), true), 0))
+
+ assertEquals(0, testHarness.numEventTimeTimers())
+
+ testHarness.processElement1(new StreamRecord[CRow](
+ CRow(Row.of(2L: JLong, "k1"), true), 0))
+ testHarness.processElement2(new StreamRecord[CRow](
+ CRow(Row.of(2L: JLong, "k1"), true), 0))
+
+ assertEquals(2, testHarness.numEventTimeTimers())
+ assertEquals(4, testHarness.numKeyedStateEntries())
+
+ testHarness.processElement1(new StreamRecord[CRow](
+ CRow(Row.of(5L: JLong, "k1"), true), 0))
+ testHarness.processElement2(new StreamRecord[CRow](
+ CRow(Row.of(15L: JLong, "k1"), true), 0))
+
+ testHarness.processWatermark1(new Watermark(20))
+ testHarness.processWatermark2(new Watermark(20))
+
+ assertEquals(4, testHarness.numKeyedStateEntries())
+
+ testHarness.processElement1(new StreamRecord[CRow](
+ CRow(Row.of(35L: JLong, "k1"), true), 0))
+
+ testHarness.processWatermark1(new Watermark(38))
+ testHarness.processWatermark2(new Watermark(38))
+
+ testHarness.processElement1(new StreamRecord[CRow](
+ CRow(Row.of(40L: JLong, "k2"), true), 0))
+ testHarness.processElement2(new StreamRecord[CRow](
+ CRow(Row.of(39L: JLong, "k2"), true), 0))
+
+ assertEquals(6, testHarness.numKeyedStateEntries())
+
+ testHarness.processWatermark1(new Watermark(61))
+ testHarness.processWatermark2(new Watermark(61))
+
+ assertEquals(4, testHarness.numKeyedStateEntries())
+
+ val expectedOutput = new ConcurrentLinkedQueue[Object]()
+ expectedOutput.add(new StreamRecord(
+ CRow(Row.of(2L: JLong, "k1", 2L: JLong, "k1"), true), 0))
+ expectedOutput.add(new StreamRecord(
+ CRow(Row.of(5L: JLong, "k1", 2L: JLong, "k1"), true), 0))
+ expectedOutput.add(new StreamRecord(
+ CRow(Row.of(5L: JLong, "k1", 15L: JLong, "k1"), true), 0))
+ expectedOutput.add(new StreamRecord(
+ CRow(Row.of(35L: JLong, "k1", 15L: JLong, "k1"), true), 0))
+ expectedOutput.add(new StreamRecord(
+ CRow(Row.of(40L: JLong, "k2", 39L: JLong, "k2"), true), 0))
+
+ val result = testHarness.getOutput
+ verify(expectedOutput, result, new RowResultSortComparator())
+ testHarness.close()
+ }
+
+ /** a.rowtime >= b.rowtime - 10 and a.rowtime <= b.rowtime - 7 **/
+ @Test
+ def testNegativeRowTimeJoin() {
+
+ val joinProcessFunc = new RowTimeBoundedStreamInnerJoin(
+ -10, -7, 0, rT, rT, "TestJoinFunction", funcCode, 0, 0)
+
+ val operator: KeyedCoProcessOperator[String, CRow, CRow, CRow] =
+ new KeyedCoProcessOperator[String, CRow, CRow, CRow](joinProcessFunc)
+ val testHarness: KeyedTwoInputStreamOperatorTestHarness[String, CRow,
CRow, CRow] =
+ new KeyedTwoInputStreamOperatorTestHarness[String, CRow, CRow, CRow](
+ operator,
+ new TupleRowKeySelector[String](1),
+ new TupleRowKeySelector[String](1),
+ BasicTypeInfo.STRING_TYPE_INFO,
+ 1, 1, 0)
+
+ testHarness.open()
+
+ // Advance
--- End diff --
Add more inline comments.
> Support rowtime inner equi-join between two streams in the SQL API
> ------------------------------------------------------------------
>
> Key: FLINK-6233
> URL: https://issues.apache.org/jira/browse/FLINK-6233
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: hongyuhong
> Assignee: Xingcan Cui
>
> The goal of this issue is to add support for inner equi-join on proc time
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime
> FROM Orders AS o
> JOIN Shipments AS s
> ON o.orderId = s.orderId
> AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL
> '1' HOUR}} only can use rowtime that is a system attribute, the time
> condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime
> - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support
> unbounded like {{o.rowtime < s.rowtime}} , and should include both two
> stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () +
> 1}} should also not be supported.
> An row-time streams join will not be able to handle late data, because this
> would mean in insert a row into a sorted order shift all other computations.
> This would be too expensive to maintain. Therefore, we will throw an error if
> a user tries to use an row-time stream join with late data handling.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin).
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)