Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/4625#discussion_r139400726
--- Diff:
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
---
@@ -383,13 +384,158 @@ class JoinHarnessTest extends HarnessTestBase{
val expectedOutput = new ConcurrentLinkedQueue[Object]()
expectedOutput.add(new StreamRecord(
- CRow(Row.of(2: JInt, "aaa2", 2: JInt, "bbb7"), true), 7))
+ CRow(Row.of(2L: JLong, "aaa2", 2L: JLong, "bbb7"), true), 7))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa3", 1: JInt, "bbb12"), true), 12))
+ CRow(Row.of(1L: JLong, "aaa3", 1L: JLong, "bbb12"), true), 12))
verify(expectedOutput, result, new RowResultSortComparator())
testHarness.close()
}
+ /** a.c1 >= b.rowtime - 10 and a.rowtime <= b.rowtime + 20 **/
+ @Test
+ def testCommonRowTimeJoin() {
+
+ val joinProcessFunc = new RowTimeBoundedStreamInnerJoin(
+ -10, 20, 0, rT, rT, "TestJoinFunction", funcCode, 0, 0)
+
+ val operator: KeyedCoProcessOperator[String, CRow, CRow, CRow] =
+ new KeyedCoProcessOperator[String, CRow, CRow, CRow](joinProcessFunc)
+ val testHarness: KeyedTwoInputStreamOperatorTestHarness[String, CRow,
CRow, CRow] =
+ new KeyedTwoInputStreamOperatorTestHarness[String, CRow, CRow, CRow](
+ operator,
+ new TupleRowKeySelector[String](1),
+ new TupleRowKeySelector[String](1),
+ BasicTypeInfo.STRING_TYPE_INFO,
+ 1, 1, 0)
+
+ testHarness.open()
+
+ // Advance
+ testHarness.processWatermark1(new Watermark(1))
+ testHarness.processWatermark2(new Watermark(1))
+
+ // Test late data
--- End diff --
A few more comments like this would be good
---