[ 
https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16386639#comment-16386639
 ] 

ASF GitHub Bot commented on FLINK-8480:
---------------------------------------

Github user bowenli86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5482#discussion_r172302147
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
 ---
    @@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelector<T2, KEY> 
keySelector)  {
                        public <W extends Window> WithWindow<T1, T2, KEY, W> 
window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
                                return new WithWindow<>(input1, input2, 
keySelector1, keySelector2, keyType, assigner, null, null);
                        }
    +
    +                   /**
    +                    * Specifies the time boundaries over which the join 
operation works, so that
    +                    * <pre>leftElement.timestamp + lowerBound <= 
rightElement.timestamp <= leftElement.timestamp + upperBound</pre>
    +                    * By default both the lower and the upper bound are 
inclusive. This can be configured
    +                    * with {@link 
TimeBounded#lowerBoundExclusive(boolean)} and
    +                    * {@link TimeBounded#upperBoundExclusive(boolean)}
    +                    *
    +                    * @param lowerBound The lower bound. Needs to be 
smaller than or equal to the upperBound
    +                    * @param upperBound The upper bound. Needs to be 
bigger than or equal to the lowerBound
    +                    */
    +                   public TimeBounded<T1, T2, KEY> between(Time 
lowerBound, Time upperBound) {
    +
    +                           TimeCharacteristic timeCharacteristic =
    +                                   
input1.getExecutionEnvironment().getStreamTimeCharacteristic();
    +
    +                           if (timeCharacteristic != 
TimeCharacteristic.EventTime) {
    +                                   throw new 
RuntimeException("Time-bounded stream joins are only supported in event time");
    --- End diff --
    
    should use `IllegalStateException`. or even better, shall we create a Flink 
specific exception?


> Implement Java API to expose join functionality of 
> TimeBoundedStreamJoinOperator
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-8480
>                 URL: https://issues.apache.org/jira/browse/FLINK-8480
>             Project: Flink
>          Issue Type: Sub-task
>            Reporter: Florian Schmidt
>            Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to