ASF GitHub Bot commented on FLINK-8479:

GitHub user florianschmidt1994 opened a pull request:


    [Flink-8480][DataStream] Add Java API for timebounded stream join

    ## What is the purpose of the change
    * Add a JavaAPI to the DataStream API to join two streams based on 
user-defined time boundaries
    * Design doc can be found here 
    ## Brief change log
    * Add option`.between(Time, Time)` to streams that are already joined and 
have their key selectors `where` and `equalTo` defined
    * Add new inner class `TimeBounded` to `JoinedStreams`, which exposes 
`process(TimeBoundedJoinFunction)` as well as optional 
`upperBoundExclusive(boolean)` and `lowerBoundExclusive(boolean)` to the user
    * Add new integration test `TimeboundedJoinITCase`
    * **Depends on [FLINK-8479] to be merged**
    Full example usage:
        .where(new MyKeySelector())
        .equalTo(new MyKeySelector())
        .between(Time.milliseconds(-1), Time.milliseconds(1))
        .process(new UdfTimeBoundedJoinFunction())
        .addSink(new ResultSink());
    ## Verifying this change
    This change added tests and can be verified as follows: 
    - Added integration tests in `TimeboundedJoinITCase` that validate 
parameter translation and execution
    ## Does this pull request potentially affect one of the following parts:
      - Dependencies (does it add or upgrade a dependency): no
      - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
      - The serializers: no
      - The runtime per-record code paths (performance sensitive): yes
      - Anything that affects deployment or recovery: no
      - The S3 file system connector: no
    ## Documentation
      - Does this pull request introduce a new feature? yes
      - If yes, how is the feature documented? JavaDocs

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/florianschmidt1994/flink 

Alternatively you can review and apply these changes as the patch at:


To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5482
commit 34451540116d8bdd284fd01016a4cc74d8564d37
Author: Florian Schmidt <florian.schmidt.1994@...>
Date:   2018-01-18T14:47:14Z

    [FLINK-8479] Implement TimeBoundedStreamJoinOperator
    This operator is the basis for performing an inner join on two
    streams using a time criteria defined as a lower and upper bound

commit fe65b1ead0511b0df5d640c728f5ce9e273d7ed5
Author: Florian Schmidt <florian.schmidt.1994@...>
Date:   2018-02-13T14:48:40Z

    [FLINK-8480][DataStream] Add java api for timebounded stream joins
    This commit adds a java implementation for timebounded stream joins.
    The usage looks roughly like the following:
        .where(new Tuple2KeyExtractor())
        .equalTo(new Tuple2KeyExtractor())
        .between(Time.milliseconds(0), Time.milliseconds(1))
        .process(new CombineToStringJoinFunction())
        .addSink(new ResultSink());
    This change adds the functionality in JoinedStreams.java and adds
    integration tests in TimeboundedJoinITCase.java


> Implement time-bounded inner join of streams as a TwoInputStreamOperator
> ------------------------------------------------------------------------
>                 Key: FLINK-8479
>                 URL: https://issues.apache.org/jira/browse/FLINK-8479
>             Project: Flink
>          Issue Type: Sub-task
>            Reporter: Florian Schmidt
>            Assignee: Florian Schmidt
>            Priority: Major

This message was sent by Atlassian JIRA

Reply via email to