[ 
https://issues.apache.org/jira/browse/FLINK-6094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16121718#comment-16121718
 ] 

ASF GitHub Bot commented on FLINK-6094:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4471#discussion_r132421076
  
    --- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
 ---
    @@ -232,4 +238,111 @@ class JoinHarnessTest extends HarnessTestBase{
         testHarness.close()
       }
     
    +  @Test
    +  def testProcTimeNonWindowInnerJoin() {
    +
    +    val joinReturnType = CRowTypeInfo(new RowTypeInfo(
    +      Array[TypeInformation[_]](
    +        INT_TYPE_INFO,
    +        STRING_TYPE_INFO,
    +        INT_TYPE_INFO,
    +        STRING_TYPE_INFO),
    +      Array("a", "b", "c", "d")))
    +
    +    val joinFun = new FlatJoinRunner[Row, Row, Row](
    +      "TestJoinFunction",
    +      funcCode,
    +      joinReturnType.rowType)
    +
    +    val joinProcessFunc = new ProcTimeNonWindowInnerJoin(
    +      joinFun,
    +      rT,
    +      rT,
    +      joinReturnType,
    +      queryConfig)
    +
    +    val operator: KeyedCoProcessOperator[Integer, CRow, CRow, CRow] =
    +      new KeyedCoProcessOperator[Integer, CRow, CRow, 
CRow](joinProcessFunc)
    +    val testHarness: KeyedTwoInputStreamOperatorTestHarness[Integer, CRow, 
CRow, CRow] =
    +      new KeyedTwoInputStreamOperatorTestHarness[Integer, CRow, CRow, 
CRow](
    +        operator,
    +        new TupleRowKeySelector[Integer](0),
    +        new TupleRowKeySelector[Integer](0),
    +        BasicTypeInfo.INT_TYPE_INFO,
    +        1, 1, 0)
    +
    +    testHarness.open()
    +
    +    // left stream input
    +    testHarness.setProcessingTime(1)
    +    testHarness.processElement1(new StreamRecord(
    +      CRow(Row.of(1: JInt, "aaa"), true), 1))
    --- End diff --
    
    If you are not using the `StreamRecord` timestamp field, you can omit it, 
i.e.,
    
    ```
    new StreamRecord(CRow(Row.of(1: JInt, "aaa"), true)))
    ```


> Implement stream-stream proctime non-window  inner join
> -------------------------------------------------------
>
>                 Key: FLINK-6094
>                 URL: https://issues.apache.org/jira/browse/FLINK-6094
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: Shaoxuan Wang
>            Assignee: Hequn Cheng
>
> This includes:
> 1.Implement stream-stream proctime non-window  inner join
> 2.Implement the retract process logic for join



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to