[streaming] GroupedTimeDiscretizer added for lighter time policy thread management
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/87086882 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/87086882 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/87086882 Branch: refs/heads/master Commit: 870868828e7329c03547520bdb1762b4a1763514 Parents: aef52e8 Author: Gyula Fora <gyf...@apache.org> Authored: Sun Feb 8 22:59:22 2015 +0100 Committer: mbalassi <mbala...@apache.org> Committed: Mon Feb 16 13:06:08 2015 +0100 ---------------------------------------------------------------------- .../api/datastream/WindowedDataStream.java | 32 ++++-- .../windowing/GroupedStreamDiscretizer.java | 12 +-- .../windowing/GroupedTimeDiscretizer.java | 101 +++++++++++++++++++ .../operator/windowing/StreamDiscretizer.java | 22 +++- .../api/windowing/policy/TimeTriggerPolicy.java | 12 ++- .../scala/examples/join/WindowJoin.scala | 19 +++- 6 files changed, 167 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/87086882/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java index 7c2dc47..7a214fe 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java @@ -32,6 +32,7 @@ import org.apache.flink.streaming.api.function.aggregation.ComparableAggregator; import org.apache.flink.streaming.api.function.aggregation.SumAggregator; import org.apache.flink.streaming.api.invokable.StreamInvokable; import org.apache.flink.streaming.api.invokable.operator.windowing.GroupedStreamDiscretizer; +import org.apache.flink.streaming.api.invokable.operator.windowing.GroupedTimeDiscretizer; import org.apache.flink.streaming.api.invokable.operator.windowing.StreamDiscretizer; import org.apache.flink.streaming.api.invokable.operator.windowing.StreamWindow; import org.apache.flink.streaming.api.invokable.operator.windowing.StreamWindowTypeInfo; @@ -40,6 +41,7 @@ import org.apache.flink.streaming.api.windowing.helper.WindowingHelper; import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy; import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy; import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy; +import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy; import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy; import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy; import org.apache.flink.streaming.util.keys.KeySelectorUtil; @@ -210,15 +212,7 @@ public class WindowedDataStream<OUT> { private DiscretizedStream<OUT> discretize(boolean isMap) { - StreamInvokable<OUT, StreamWindow<OUT>> discretizer; - - if (discretizerKey == null) { - discretizer = new StreamDiscretizer<OUT>(getTrigger(), getEvicter()); - } else { - discretizer = new GroupedStreamDiscretizer<OUT>(discretizerKey, - (CloneableTriggerPolicy<OUT>) getTrigger(), - (CloneableEvictionPolicy<OUT>) getEvicter()); - } + StreamInvokable<OUT, StreamWindow<OUT>> discretizer = getDiscretizer(); int parallelism = isLocal || (discretizerKey != null) ? dataStream.environment .getDegreeOfParallelism() : 1; @@ -229,6 +223,22 @@ public class WindowedDataStream<OUT> { } + private StreamInvokable<OUT, StreamWindow<OUT>> getDiscretizer() { + if (discretizerKey == null) { + return new StreamDiscretizer<OUT>(getTrigger(), getEvicter()); + } else if (getTrigger() instanceof TimeTriggerPolicy + && ((TimeTriggerPolicy<OUT>) getTrigger()).timestampWrapper.isDefaultTimestamp()) { + return new GroupedTimeDiscretizer<OUT>(discretizerKey, + (TimeTriggerPolicy<OUT>) getTrigger(), + (CloneableEvictionPolicy<OUT>) getEvicter()); + } else { + return new GroupedStreamDiscretizer<OUT>(discretizerKey, + (CloneableTriggerPolicy<OUT>) getTrigger(), + (CloneableEvictionPolicy<OUT>) getEvicter()); + } + + } + /** * Applies a reduce transformation on the windowed data stream by reducing * the current window at every trigger.The user can also extend the @@ -276,8 +286,8 @@ public class WindowedDataStream<OUT> { * The reduce function that will be applied to the windows. * @return The transformed DataStream */ - public <R> WindowedDataStream<R> mapWindow( - GroupReduceFunction<OUT, R> reduceFunction, TypeInformation<R> outType) { + public <R> WindowedDataStream<R> mapWindow(GroupReduceFunction<OUT, R> reduceFunction, + TypeInformation<R> outType) { return discretize(true).mapWindow(reduceFunction, outType); } http://git-wip-us.apache.org/repos/asf/flink/blob/87086882/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java index efd2e06..e90726d 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java @@ -33,12 +33,12 @@ public class GroupedStreamDiscretizer<IN> extends StreamInvokable<IN, StreamWind */ private static final long serialVersionUID = -3469545957144404137L; - private KeySelector<IN, ?> keySelector; - private Configuration parameters; - private CloneableTriggerPolicy<IN> triggerPolicy; - private CloneableEvictionPolicy<IN> evictionPolicy; + protected KeySelector<IN, ?> keySelector; + protected Configuration parameters; + protected CloneableTriggerPolicy<IN> triggerPolicy; + protected CloneableEvictionPolicy<IN> evictionPolicy; - private Map<Object, StreamDiscretizer<IN>> groupedDiscretizers; + protected Map<Object, StreamDiscretizer<IN>> groupedDiscretizers; public GroupedStreamDiscretizer(KeySelector<IN, ?> keySelector, CloneableTriggerPolicy<IN> triggerPolicy, CloneableEvictionPolicy<IN> evictionPolicy) { @@ -92,7 +92,7 @@ public class GroupedStreamDiscretizer<IN> extends StreamInvokable<IN, StreamWind * @param key * The key of the new group. */ - private StreamDiscretizer<IN> makeNewGroup(Object key) throws Exception { + protected StreamDiscretizer<IN> makeNewGroup(Object key) throws Exception { StreamDiscretizer<IN> groupDiscretizer = new StreamDiscretizer<IN>(triggerPolicy.clone(), evictionPolicy.clone()); http://git-wip-us.apache.org/repos/asf/flink/blob/87086882/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedTimeDiscretizer.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedTimeDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedTimeDiscretizer.java new file mode 100644 index 0000000..63901a8 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedTimeDiscretizer.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.invokable.operator.windowing; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy; +import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy; + +public class GroupedTimeDiscretizer<IN> extends GroupedStreamDiscretizer<IN> { + + private static final long serialVersionUID = -3469545957144404137L; + + private TimeTriggerPolicy<IN> timeTriggerPolicy; + private Thread policyThread; + + public GroupedTimeDiscretizer(KeySelector<IN, ?> keySelector, + TimeTriggerPolicy<IN> triggerPolicy, CloneableEvictionPolicy<IN> evictionPolicy) { + + super(keySelector, triggerPolicy, evictionPolicy); + this.timeTriggerPolicy = triggerPolicy; + } + + @Override + protected StreamDiscretizer<IN> makeNewGroup(Object key) throws Exception { + + StreamDiscretizer<IN> groupDiscretizer = new StreamDiscretizer<IN>(triggerPolicy.clone(), + evictionPolicy.clone()); + + groupDiscretizer.collector = taskContext.getOutputCollector(); + // We omit the groupDiscretizer.open(...) call here to avoid starting + // new active threads + return groupDiscretizer; + } + + @Override + public void open(org.apache.flink.configuration.Configuration parameters) throws Exception { + super.open(parameters); + + Runnable runnable = new TimeCheck(); + policyThread = new Thread(runnable); + policyThread.start(); + } + + private void removeUnusedGroups(int threshold) { + List<Object> toRemove = new ArrayList<Object>(); + + for (Entry<Object, StreamDiscretizer<IN>> entry : groupedDiscretizers.entrySet()) { + if (entry.getValue().emptyCount > threshold) { + toRemove.add(entry.getKey()); + } + } + + for (Object key : toRemove) { + groupedDiscretizers.remove(key); + } + } + + private class TimeCheck implements Runnable { + + @Override + public void run() { + while (true) { + // wait for the specified granularity + try { + Thread.sleep(timeTriggerPolicy.granularity); + } catch (InterruptedException e) { + // ignore it... + } + + for (StreamDiscretizer<IN> group : groupedDiscretizers.values()) { + TimeTriggerPolicy<IN> groupTrigger = (TimeTriggerPolicy<IN>) group.triggerPolicy; + Object fake = groupTrigger.activeFakeElementEmission(null); + if (fake != null) { + group.triggerOnFakeElement(fake); + } + } + + removeUnusedGroups(10); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/87086882/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java index d5b4354..ac02af3 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java @@ -34,12 +34,13 @@ public class StreamDiscretizer<IN> extends StreamInvokable<IN, StreamWindow<IN>> */ private static final long serialVersionUID = -8038984294071650730L; - private TriggerPolicy<IN> triggerPolicy; - private EvictionPolicy<IN> evictionPolicy; + protected TriggerPolicy<IN> triggerPolicy; + protected EvictionPolicy<IN> evictionPolicy; private boolean isActiveTrigger; private boolean isActiveEviction; private Thread activePolicyThread; protected LinkedList<IN> buffer; + public int emptyCount = 0; public StreamDiscretizer(TriggerPolicy<IN> triggerPolicy, EvictionPolicy<IN> evictionPolicy) { super(null); @@ -113,13 +114,24 @@ public class StreamDiscretizer<IN> extends StreamInvokable<IN, StreamWindow<IN>> emitWindow(); } + protected synchronized void externalTriggerOnFakeElement(Object input) { + emitWindow(); + activeEvict(input); + } + /** * This method emits the content of the buffer as a new {@link StreamWindow} + * if not empty */ protected void emitWindow() { - StreamWindow<IN> currentWindow = new StreamWindow<IN>(); - currentWindow.addAll(buffer); - collector.collect(currentWindow); + if (!buffer.isEmpty()) { + StreamWindow<IN> currentWindow = new StreamWindow<IN>(); + currentWindow.addAll(buffer); + collector.collect(currentWindow); + emptyCount = 0; + } else { + emptyCount++; + } } private void activeEvict(Object input) { http://git-wip-us.apache.org/repos/asf/flink/blob/87086882/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java index 1e91b8e..fb249cf 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java @@ -41,8 +41,8 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>, private static final long serialVersionUID = -5122753802440196719L; protected long startTime; - protected long granularity; - protected TimestampWrapper<DATA> timestampWrapper; + public long granularity; + public TimestampWrapper<DATA> timestampWrapper; protected long delay; /** @@ -141,13 +141,17 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>, * @param callback * The callback object. */ - private synchronized void activeFakeElementEmission(ActiveTriggerCallback callback) { + public synchronized Object activeFakeElementEmission(ActiveTriggerCallback callback) { // start time is excluded, but end time is included: >= if (System.currentTimeMillis() >= startTime + granularity) { startTime += granularity; - callback.sendFakeElement(startTime - 1); + if (callback != null) { + callback.sendFakeElement(startTime - 1); + } + return startTime - 1; } + return null; } http://git-wip-us.apache.org/repos/asf/flink/blob/87086882/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala index 6ea7be0..abd6d50 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala @@ -34,12 +34,21 @@ object WindowJoin { def main(args: Array[String]) { - val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setDegreeOfParallelism(1) + val env = StreamExecutionEnvironment.getExecutionEnvironment - val split = env.generateSequence(1, 10).split(x => x % 3 toString) - - split.select("0").merge(split.select("1")).print + //Create streams for names and ages by mapping the inputs to the corresponding objects + val names = env.fromCollection(nameStream).map(x => Name(x._1, x._2)) + val ages = env.fromCollection(ageStream).map(x => Age(x._1, x._2)) + + //Join the two input streams by id on the last 2 seconds every second and create new + //Person objects containing both name and age + val joined = + names.join(ages).onWindow(2, TimeUnit.SECONDS) + .every(1, TimeUnit.SECONDS) + .where("id") + .equalTo("id") { (n, a) => Person(n.name, a.age) } + + joined print env.execute("WindowJoin") }