[ 
https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16170192#comment-16170192
 ] 

ASF GitHub Bot commented on FLINK-6233:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4625#discussion_r139401824
  
    --- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
 ---
    @@ -383,13 +384,158 @@ class JoinHarnessTest extends HarnessTestBase{
         val expectedOutput = new ConcurrentLinkedQueue[Object]()
     
         expectedOutput.add(new StreamRecord(
    -      CRow(Row.of(2: JInt, "aaa2", 2: JInt, "bbb7"), true), 7))
    +      CRow(Row.of(2L: JLong, "aaa2", 2L: JLong, "bbb7"), true), 7))
         expectedOutput.add(new StreamRecord(
    -      CRow(Row.of(1: JInt, "aaa3", 1: JInt, "bbb12"), true), 12))
    +      CRow(Row.of(1L: JLong, "aaa3", 1L: JLong, "bbb12"), true), 12))
     
         verify(expectedOutput, result, new RowResultSortComparator())
     
         testHarness.close()
       }
     
    +  /** a.c1 >= b.rowtime - 10 and a.rowtime <= b.rowtime + 20 **/
    +  @Test
    +  def testCommonRowTimeJoin() {
    +
    +    val joinProcessFunc = new RowTimeBoundedStreamInnerJoin(
    +      -10, 20, 0, rT, rT, "TestJoinFunction", funcCode, 0, 0)
    +
    +    val operator: KeyedCoProcessOperator[String, CRow, CRow, CRow] =
    +      new KeyedCoProcessOperator[String, CRow, CRow, CRow](joinProcessFunc)
    +    val testHarness: KeyedTwoInputStreamOperatorTestHarness[String, CRow, 
CRow, CRow] =
    +      new KeyedTwoInputStreamOperatorTestHarness[String, CRow, CRow, CRow](
    +        operator,
    +        new TupleRowKeySelector[String](1),
    +        new TupleRowKeySelector[String](1),
    +        BasicTypeInfo.STRING_TYPE_INFO,
    +        1, 1, 0)
    +
    +    testHarness.open()
    +
    +    // Advance
    +    testHarness.processWatermark1(new Watermark(1))
    +    testHarness.processWatermark2(new Watermark(1))
    +
    +    // Test late data
    +    testHarness.processElement1(new StreamRecord[CRow](
    +      CRow(Row.of(1L: JLong, "k1"), true), 0))
    +
    +    assertEquals(0, testHarness.numEventTimeTimers())
    +
    +    testHarness.processElement1(new StreamRecord[CRow](
    +      CRow(Row.of(2L: JLong, "k1"), true), 0))
    +    testHarness.processElement2(new StreamRecord[CRow](
    +      CRow(Row.of(2L: JLong, "k1"), true), 0))
    +
    +    assertEquals(2, testHarness.numEventTimeTimers())
    +    assertEquals(4, testHarness.numKeyedStateEntries())
    +
    +    testHarness.processElement1(new StreamRecord[CRow](
    +      CRow(Row.of(5L: JLong, "k1"), true), 0))
    +    testHarness.processElement2(new StreamRecord[CRow](
    +      CRow(Row.of(15L: JLong, "k1"), true), 0))
    +
    +    testHarness.processWatermark1(new Watermark(20))
    +    testHarness.processWatermark2(new Watermark(20))
    +
    +    assertEquals(4, testHarness.numKeyedStateEntries())
    +
    +    testHarness.processElement1(new StreamRecord[CRow](
    +      CRow(Row.of(35L: JLong, "k1"), true), 0))
    +
    +    testHarness.processWatermark1(new Watermark(38))
    +    testHarness.processWatermark2(new Watermark(38))
    +
    +    testHarness.processElement1(new StreamRecord[CRow](
    +      CRow(Row.of(40L: JLong, "k2"), true), 0))
    +    testHarness.processElement2(new StreamRecord[CRow](
    +      CRow(Row.of(39L: JLong, "k2"), true), 0))
    +
    +    assertEquals(6, testHarness.numKeyedStateEntries())
    +
    +    testHarness.processWatermark1(new Watermark(61))
    +    testHarness.processWatermark2(new Watermark(61))
    +
    +    assertEquals(4, testHarness.numKeyedStateEntries())
    +
    +    val expectedOutput = new ConcurrentLinkedQueue[Object]()
    --- End diff --
    
    Add multiple rows for the same key and time to validate that this case is 
correctly handled. It might make sense to add another string field to the data 
with a unique value ("left1", ...) to make the input and output records easier 
to compare.


> Support rowtime inner equi-join between two streams in the SQL API
> ------------------------------------------------------------------
>
>                 Key: FLINK-6233
>                 URL: https://issues.apache.org/jira/browse/FLINK-6233
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: hongyuhong
>            Assignee: Xingcan Cui
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL 
> '1' HOUR}} only can use rowtime that is a system attribute, the time 
> condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime 
> - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support 
> unbounded like {{o.rowtime &lt; s.rowtime}} ,  and  should include both two 
> stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + 
> 1}} should also not be supported.
> An row-time streams join will not be able to handle late data, because this 
> would mean in insert a row into a sorted order shift all other computations. 
> This would be too expensive to maintain. Therefore, we will throw an error if 
> a user tries to use an row-time stream join with late data handling.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to