[ 
https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16386643#comment-16386643
 ] 

ASF GitHub Bot commented on FLINK-8480:
---------------------------------------

Github user bowenli86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5482#discussion_r172303424
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
 ---
    @@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelector<T2, KEY> 
keySelector)  {
                        public <W extends Window> WithWindow<T1, T2, KEY, W> 
window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
                                return new WithWindow<>(input1, input2, 
keySelector1, keySelector2, keyType, assigner, null, null);
                        }
    +
    +                   /**
    +                    * Specifies the time boundaries over which the join 
operation works, so that
    +                    * <pre>leftElement.timestamp + lowerBound <= 
rightElement.timestamp <= leftElement.timestamp + upperBound</pre>
    +                    * By default both the lower and the upper bound are 
inclusive. This can be configured
    +                    * with {@link 
TimeBounded#lowerBoundExclusive(boolean)} and
    +                    * {@link TimeBounded#upperBoundExclusive(boolean)}
    +                    *
    +                    * @param lowerBound The lower bound. Needs to be 
smaller than or equal to the upperBound
    +                    * @param upperBound The upper bound. Needs to be 
bigger than or equal to the lowerBound
    +                    */
    +                   public TimeBounded<T1, T2, KEY> between(Time 
lowerBound, Time upperBound) {
    +
    +                           TimeCharacteristic timeCharacteristic =
    +                                   
input1.getExecutionEnvironment().getStreamTimeCharacteristic();
    +
    +                           if (timeCharacteristic != 
TimeCharacteristic.EventTime) {
    +                                   throw new 
RuntimeException("Time-bounded stream joins are only supported in event time");
    +                           }
    +
    +                           checkNotNull(lowerBound, "A lower bound needs 
to be provided for a time-bounded join");
    +                           checkNotNull(upperBound, "An upper bound needs 
to be provided for a time-bounded join");
    +                           return new TimeBounded<>(
    +                                   input1,
    +                                   input2,
    +                                   lowerBound.toMilliseconds(),
    +                                   upperBound.toMilliseconds(),
    +                                   true,
    +                                   true,
    +                                   keySelector1,
    +                                   keySelector2
    +                           );
    +                   }
    +           }
    +   }
    +
    +   /**
    +    * Joined streams that have keys for both sides as well as the time 
boundaries over which
    +    * elements should be joined defined.
    +    *
    +    * @param <IN1> Input type of elements from the first stream
    +    * @param <IN2> Input type of elements from the second stream
    +    * @param <KEY> The type of the key
    +    */
    +   public static class TimeBounded<IN1, IN2, KEY> {
    +
    +           private static final String TIMEBOUNDED_JOIN_FUNC_NAME = 
"TimeBoundedJoin";
    +
    +           private final DataStream<IN1> left;
    +           private final DataStream<IN2> right;
    +
    +           private final long lowerBound;
    +           private final long upperBound;
    +
    +           private final KeySelector<IN1, KEY> keySelector1;
    +           private final KeySelector<IN2, KEY> keySelector2;
    +
    +           private boolean lowerBoundInclusive;
    +           private boolean upperBoundInclusive;
    +
    +           public TimeBounded(
    +                   DataStream<IN1> left,
    +                   DataStream<IN2> right,
    +                   long lowerBound,
    +                   long upperBound,
    +                   boolean lowerBoundInclusive,
    +                   boolean upperBoundInclusive,
    +                   KeySelector<IN1, KEY> keySelector1,
    +                   KeySelector<IN2, KEY> keySelector2) {
    +
    +                   this.left = Preconditions.checkNotNull(left);
    +                   this.right = Preconditions.checkNotNull(right);
    +
    +                   this.lowerBound = lowerBound;
    +                   this.upperBound = upperBound;
    +
    +                   this.lowerBoundInclusive = lowerBoundInclusive;
    +                   this.upperBoundInclusive = upperBoundInclusive;
    +
    +                   this.keySelector1 = 
Preconditions.checkNotNull(keySelector1);
    +                   this.keySelector2 = 
Preconditions.checkNotNull(keySelector2);
    +           }
    +
    +           /**
    +            * Configure whether the upper bound should be considered 
exclusive or inclusive.
    +            */
    +           public TimeBounded<IN1, IN2, KEY> upperBoundExclusive(boolean 
exclusive) {
    +                   this.upperBoundInclusive = !exclusive;
    +                   return this;
    +           }
    +
    +           /**
    +            * Configure whether the lower bound should be considered 
exclusive or inclusive.
    +            */
    +           public TimeBounded<IN1, IN2, KEY> lowerBoundExclusive(boolean 
exclusive) {
    +                   this.lowerBoundInclusive = !exclusive;
    +                   return this;
    +           }
    +
    +           /**
    +            * Completes the join operation with the user function that is 
executed for each joined pair
    +            * of elements.
    +            * @param udf The user-defined function
    +            * @param <OUT> The output type
    +            * @return Returns a DataStream
    +            */
    +           public <OUT> DataStream<OUT> 
process(TimeBoundedJoinFunction<IN1, IN2, OUT> udf) {
    +
    +                   ConnectedStreams<IN1, IN2> connected = 
left.connect(right);
    +
    +                   udf = left.getExecutionEnvironment().clean(udf);
    +
    +                   TypeInformation<OUT> resultType = 
TypeExtractor.getBinaryOperatorReturnType(
    +                           udf,
    +                           TimeBoundedJoinFunction.class,    // 
TimeBoundedJoinFunction<IN1, IN2, OUT>
    +                           0,                                //            
                                  0    1    2
    +                           1,
    +                           2,
    +                           new int[]{0},                   // lambda input 
1 type arg indices
    +                           new int[]{1},                   // lambda input 
1 type arg indices
    +                           TypeExtractor.NO_INDEX,         // output arg 
indices
    +                           left.getType(),                 // input 1 type 
information
    +                           right.getType(),                // input 1 type 
information
    +                           TIMEBOUNDED_JOIN_FUNC_NAME,
    +                           false
    +                   );
    +
    +                   long bucketGranularity = 
Time.seconds(1).toMilliseconds();
    --- End diff --
    
    does the granularity need to be documented?


> Implement Java API to expose join functionality of 
> TimeBoundedStreamJoinOperator
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-8480
>                 URL: https://issues.apache.org/jira/browse/FLINK-8480
>             Project: Flink
>          Issue Type: Sub-task
>            Reporter: Florian Schmidt
>            Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to