[
https://issues.apache.org/jira/browse/FLINK-8470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16337332#comment-16337332
]
ASF GitHub Bot commented on FLINK-8470:
---------------------------------------
Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/5342#discussion_r163499952
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
---
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import static
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+
+// TODO: Make bucket granularity adaptable
+
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ * <p>
+ * By using a configurable lower and upper bound this operator will emit
exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * @param <T1> The type of the elements in the left stream
+ * @param <T2> The type of the elements in the right stream
+ */
+public class TimeBoundedStreamJoinOperator<T1, T2>
+ extends AbstractStreamOperator<Tuple2<T1, T2>>
+ implements TwoInputStreamOperator<T1, T2, Tuple2<T1, T2>> {
+
+ private final long lowerBound;
+ private final long upperBound;
+
+ private final long inverseLowerBound;
+ private final long inverseUpperBound;
+
+ private final boolean lowerBoundInclusive;
+ private final boolean upperBoundInclusive;
+
+ private final long bucketGranularity = 1;
+
+ private static final String LEFT_BUFFER = "LEFT_BUFFER";
+ private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
+ private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT";
+ private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT";
+
+ private transient ValueState<Long> lastCleanupRightBuffer;
+ private transient ValueState<Long> lastCleanupLeftBuffer;
+
+ private transient MapState<Long, List<Tuple3<T1, Long, Boolean>>>
leftBuffer;
+ private transient MapState<Long, List<Tuple3<T2, Long, Boolean>>>
rightBuffer;
+
+ private transient TimestampedCollector<Tuple2<T1, T2>> collector;
+
+ /**
+ * Creates a new TimeBoundedStreamJoinOperator
+ *
+ * @param lowerBound The lower bound for evaluating if
elements should be joined
+ * @param upperBound The upper bound for evaluating if
elements should be joined
+ * @param lowerBoundInclusive Whether or not to include elements where
the timestamp matches
+ * the lower bound
+ * @param upperBoundInclusive Whether or not to include elements where
the timestamp matches
+ * the upper bound
+ */
+ public TimeBoundedStreamJoinOperator(long lowerBound,
+
long upperBound,
+
boolean lowerBoundInclusive,
+
boolean upperBoundInclusive) {
+
+ this.lowerBound = lowerBound;
+ this.upperBound = upperBound;
+
+ this.inverseLowerBound = -1 * upperBound;
+ this.inverseUpperBound = -1 * lowerBound;
+
+ this.lowerBoundInclusive = lowerBoundInclusive;
+ this.upperBoundInclusive = upperBoundInclusive;
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ collector = new TimestampedCollector<>(output);
+
+ this.leftBuffer = getRuntimeContext().getMapState(new
MapStateDescriptor<>(
+ LEFT_BUFFER,
+ LONG_TYPE_INFO,
+ TypeInformation.of(new TypeHint<List<Tuple3<T1, Long,
Boolean>>>() {
+ })
+ ));
+
+ this.rightBuffer = getRuntimeContext().getMapState(new
MapStateDescriptor<>(
--- End diff --
Same as above.
> DelayTrigger and DelayAndCountTrigger in Flink Streaming Window API
> -------------------------------------------------------------------
>
> Key: FLINK-8470
> URL: https://issues.apache.org/jira/browse/FLINK-8470
> Project: Flink
> Issue Type: New Feature
> Components: Streaming
> Affects Versions: 2.0.0
> Reporter: Vijay Kansal
> Priority: Major
> Original Estimate: 24h
> Remaining Estimate: 24h
>
> In Flink streaming API, we do not have any in-built window trigger(s)
> available for the below use cases:
> 1. DelayTrigger: Window function should trigger in case the 1st element
> belonging to this window arrived more than maxDelay(ms) before the current
> processing time.
> 2. DelayAndCountTrigger: Window function should trigger in case the 1st
> element belonging to this window arrived more than maxDelay(ms) before the
> current processing time or there are more than maxCount elements in the
> window.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)