[ 
https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16363732#comment-16363732
 ] 

ASF GitHub Bot commented on FLINK-8479:
---------------------------------------

GitHub user florianschmidt1994 opened a pull request:

    https://github.com/apache/flink/pull/5482

    [Flink-8480][DataStream] Add Java API for timebounded stream join

    ## What is the purpose of the change
    
    * Add a JavaAPI to the DataStream API to join two streams based on 
user-defined time boundaries
    * Design doc can be found here 
https://docs.google.com/document/d/16GMH5VM6JJiWj_N0W8y3PtQ1aoJFxsKoOTSYOfqlsRE/edit#heading=h.bgl260hr56g6
    
    ## Brief change log
    * Add option`.between(Time, Time)` to streams that are already joined and 
have their key selectors `where` and `equalTo` defined
    * Add new inner class `TimeBounded` to `JoinedStreams`, which exposes 
`process(TimeBoundedJoinFunction)` as well as optional 
`upperBoundExclusive(boolean)` and `lowerBoundExclusive(boolean)` to the user
    * Add new integration test `TimeboundedJoinITCase`
    * **Depends on [FLINK-8479] to be merged**
    
    Full example usage:
    
    ```java
    streamOne
        .join(streamTwo)
        .where(new MyKeySelector())
        .equalTo(new MyKeySelector())
        .between(Time.milliseconds(-1), Time.milliseconds(1))
        .process(new UdfTimeBoundedJoinFunction())
        .addSink(new ResultSink());
    ```
    
    ## Verifying this change
    This change added tests and can be verified as follows: 
    - Added integration tests in `TimeboundedJoinITCase` that validate 
parameter translation and execution
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): no
      - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
      - The serializers: no
      - The runtime per-record code paths (performance sensitive): yes
      - Anything that affects deployment or recovery: no
      - The S3 file system connector: no
    
    ## Documentation
    
      - Does this pull request introduce a new feature? yes
      - If yes, how is the feature documented? JavaDocs


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/florianschmidt1994/flink 
flink-8480-timebounded-join-java-api

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5482.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5482
    
----
commit 34451540116d8bdd284fd01016a4cc74d8564d37
Author: Florian Schmidt <florian.schmidt.1994@...>
Date:   2018-01-18T14:47:14Z

    [FLINK-8479] Implement TimeBoundedStreamJoinOperator
    
    This operator is the basis for performing an inner join on two
    streams using a time criteria defined as a lower and upper bound

commit fe65b1ead0511b0df5d640c728f5ce9e273d7ed5
Author: Florian Schmidt <florian.schmidt.1994@...>
Date:   2018-02-13T14:48:40Z

    [FLINK-8480][DataStream] Add java api for timebounded stream joins
    
    This commit adds a java implementation for timebounded stream joins.
    The usage looks roughly like the following:
    
    ```java
    streamOne
        .join(streamTwo)
        .where(new Tuple2KeyExtractor())
        .equalTo(new Tuple2KeyExtractor())
        .between(Time.milliseconds(0), Time.milliseconds(1))
        .process(new CombineToStringJoinFunction())
        .addSink(new ResultSink());
    ```
    
    This change adds the functionality in JoinedStreams.java and adds
    integration tests in TimeboundedJoinITCase.java

----


> Implement time-bounded inner join of streams as a TwoInputStreamOperator
> ------------------------------------------------------------------------
>
>                 Key: FLINK-8479
>                 URL: https://issues.apache.org/jira/browse/FLINK-8479
>             Project: Flink
>          Issue Type: Sub-task
>            Reporter: Florian Schmidt
>            Assignee: Florian Schmidt
>            Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to