[ 
https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16345166#comment-16345166
 ] 

ASF GitHub Bot commented on FLINK-8479:
---------------------------------------

Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5342#discussion_r164766198
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
    @@ -0,0 +1,398 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.api.common.state.MapState;
    +import org.apache.flink.api.common.state.MapStateDescriptor;
    +import org.apache.flink.api.common.state.ValueState;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
    +import org.apache.flink.api.common.typeutils.base.ListSerializer;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
    +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
    +import org.apache.flink.streaming.api.operators.TimestampedCollector;
    +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
    +
    +// TODO: Make bucket granularity adaptable
    +/**
    + * A TwoInputStreamOperator to execute time-bounded stream inner joins.
    + *
    + * <p>By using a configurable lower and upper bound this operator will 
emit exactly those pairs
    + * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
    + * upper bound can be configured to be either inclusive or exclusive.
    + *
    + * <p>As soon as elements are joined they are passed to a user-defined 
{@link JoinedProcessFunction},
    + * as a {@link Tuple2}, with f0 being the left element and f1 being the 
right element
    + *
    + * @param <T1> The type of the elements in the left stream
    + * @param <T2> The type of the elements in the right stream
    + * @param <OUT> The output type created by the user-defined function
    + */
    +public class TimeBoundedStreamJoinOperator<T1, T2, OUT>
    +   extends AbstractUdfStreamOperator<OUT, JoinedProcessFunction<T1, T2, 
OUT>>
    +   implements TwoInputStreamOperator<T1, T2, OUT> {
    +
    +   private final long lowerBound;
    +   private final long upperBound;
    +
    +   private final long inverseLowerBound;
    +   private final long inverseUpperBound;
    +
    +   private final boolean lowerBoundInclusive;
    +   private final boolean upperBoundInclusive;
    +
    +   private final long bucketGranularity = 1;
    +
    +   private static final String LEFT_BUFFER = "LEFT_BUFFER";
    +   private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
    +   private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT";
    +   private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT";
    +
    +   private transient ValueState<Long> lastCleanupRightBuffer;
    +   private transient ValueState<Long> lastCleanupLeftBuffer;
    +
    +   private transient MapState<Long, List<Tuple3<T1, Long, Boolean>>> 
leftBuffer;
    +   private transient MapState<Long, List<Tuple3<T2, Long, Boolean>>> 
rightBuffer;
    +
    +   private final TypeSerializer<T1> leftTypeSerializer;
    +   private final TypeSerializer<T2> rightTypeSerializer;
    +
    +   private transient TimestampedCollector<OUT> collector;
    +
    +   private ContextImpl context;
    +
    +   /**
    +    * Creates a new TimeBoundedStreamJoinOperator.
    +    *
    +    * @param lowerBound          The lower bound for evaluating if 
elements should be joined
    +    * @param upperBound          The upper bound for evaluating if 
elements should be joined
    +    * @param lowerBoundInclusive Whether or not to include elements where 
the timestamp matches
    +    *                            the lower bound
    +    * @param upperBoundInclusive Whether or not to include elements where 
the timestamp matches
    +    *                            the upper bound
    +    * @param udf                 A user-defined {@link 
JoinedProcessFunction} that gets called
    +    *                            whenever two elements of T1 and T2 are 
joined
    +    */
    +   public TimeBoundedStreamJoinOperator(
    +           long lowerBound,
    +           long upperBound,
    +           boolean lowerBoundInclusive,
    +           boolean upperBoundInclusive,
    +           TypeSerializer<T1> leftTypeSerializer,
    +           TypeSerializer<T2> rightTypeSerializer,
    +           JoinedProcessFunction<T1, T2, OUT> udf
    +   ) {
    +
    +           super(udf);
    +
    +           this.lowerBound = lowerBound;
    +           this.upperBound = upperBound;
    +
    +           this.inverseLowerBound = -1 * upperBound;
    +           this.inverseUpperBound = -1 * lowerBound;
    +
    +           this.lowerBoundInclusive = lowerBoundInclusive;
    +           this.upperBoundInclusive = upperBoundInclusive;
    +           this.leftTypeSerializer = leftTypeSerializer;
    +           this.rightTypeSerializer = rightTypeSerializer;
    +   }
    +
    +   @Override
    +   public void open() throws Exception {
    +           super.open();
    +           collector = new TimestampedCollector<>(output);
    +           context = new ContextImpl(userFunction);
    +
    +           Class<Tuple3<T1, Long, Boolean>> leftTypedTuple =
    +                   (Class<Tuple3<T1, Long, Boolean>>) (Class<?>) 
Tuple3.class;
    +
    +           TupleSerializer<Tuple3<T1, Long, Boolean>> leftTupleSerializer 
= new TupleSerializer<>(
    +                   leftTypedTuple,
    +                   new TypeSerializer[]{
    +                           leftTypeSerializer,
    +                           LongSerializer.INSTANCE,
    +                           BooleanSerializer.INSTANCE
    +                   }
    +           );
    +
    +           Class<Tuple3<T2, Long, Boolean>> rightTypedTuple =
    +                   (Class<Tuple3<T2, Long, Boolean>>) (Class<?>) 
Tuple3.class;
    +
    +           TupleSerializer<Tuple3<T2, Long, Boolean>> rightTupleSerializer 
= new TupleSerializer<>(
    +                   rightTypedTuple,
    +                   new TypeSerializer[]{
    +                           rightTypeSerializer,
    +                           LongSerializer.INSTANCE,
    +                           BooleanSerializer.INSTANCE
    +                   }
    +           );
    +
    +           this.leftBuffer = getRuntimeContext().getMapState(new 
MapStateDescriptor<>(
    +                   LEFT_BUFFER,
    +                   LongSerializer.INSTANCE,
    +                   new ListSerializer<>(leftTupleSerializer)
    +           ));
    +
    +           this.rightBuffer = getRuntimeContext().getMapState(new 
MapStateDescriptor<>(
    +                   RIGHT_BUFFER,
    +                   LongSerializer.INSTANCE,
    +                   new ListSerializer<>(rightTupleSerializer)
    +           ));
    +
    +           this.lastCleanupRightBuffer = getRuntimeContext().getState(new 
ValueStateDescriptor<>(
    +                   LAST_CLEANUP_RIGHT,
    +                   LONG_TYPE_INFO
    +           ));
    +
    +           this.lastCleanupLeftBuffer = getRuntimeContext().getState(new 
ValueStateDescriptor<>(
    +                   LAST_CLEANUP_LEFT,
    +                   LONG_TYPE_INFO
    +           ));
    +   }
    +
    +   /**
    +    * Process a {@link StreamRecord} from the left stream. Whenever an 
{@link StreamRecord}
    +    * arrives at the left stream, it will get added to the left buffer. 
Possible join candidates
    +    * for that element will be looked up from the right buffer and if the 
pair lies within the
    +    * user defined boundaries, it gets collected.
    +    *
    +    * @param record An incoming record to be joined
    +    * @throws Exception Can throw an Exception during state access
    +    */
    +   @Override
    +   public void processElement1(StreamRecord<T1> record) throws Exception {
    +
    +           long leftTs = record.getTimestamp();
    +           T1 leftValue = record.getValue();
    +
    +           addToLeftBuffer(leftValue, leftTs);
    +
    +           long min = leftTs + lowerBound;
    +           long max = leftTs + upperBound;
    +
    +           // TODO: Adapt to different bucket sizes here
    +           // Go over all buckets that are within the time bounds
    +           for (long i = min; i <= max; i++) {
    --- End diff --
    
    Yes but I believe that having the iteration as it is now + the configurable 
bucket size, then assuming that the user knows his data, he can set the 
granularity of the buckets so that "buckets" are not empty. This means big 
buckets for low throughput streams and smaller ones for "denser" streams. 
    
    I understand that this does not solve the issue when you have skewed keys, 
or when the throughput for the stream fluctuates between low and high but still 
we allow the user to configure it.
    
    What do you think?


> Implement time-bounded inner join of streams as a TwoInputStreamOperator
> ------------------------------------------------------------------------
>
>                 Key: FLINK-8479
>                 URL: https://issues.apache.org/jira/browse/FLINK-8479
>             Project: Flink
>          Issue Type: Sub-task
>            Reporter: Florian Schmidt
>            Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to