[streaming] GroupedTimeDiscretizer added for lighter time policy thread 
management


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/87086882
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/87086882
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/87086882

Branch: refs/heads/master
Commit: 870868828e7329c03547520bdb1762b4a1763514
Parents: aef52e8
Author: Gyula Fora <gyf...@apache.org>
Authored: Sun Feb 8 22:59:22 2015 +0100
Committer: mbalassi <mbala...@apache.org>
Committed: Mon Feb 16 13:06:08 2015 +0100

----------------------------------------------------------------------
 .../api/datastream/WindowedDataStream.java      |  32 ++++--
 .../windowing/GroupedStreamDiscretizer.java     |  12 +--
 .../windowing/GroupedTimeDiscretizer.java       | 101 +++++++++++++++++++
 .../operator/windowing/StreamDiscretizer.java   |  22 +++-
 .../api/windowing/policy/TimeTriggerPolicy.java |  12 ++-
 .../scala/examples/join/WindowJoin.scala        |  19 +++-
 6 files changed, 167 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/87086882/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 7c2dc47..7a214fe 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.streaming.api.function.aggregation.ComparableAggregator;
 import org.apache.flink.streaming.api.function.aggregation.SumAggregator;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import 
org.apache.flink.streaming.api.invokable.operator.windowing.GroupedStreamDiscretizer;
+import 
org.apache.flink.streaming.api.invokable.operator.windowing.GroupedTimeDiscretizer;
 import 
org.apache.flink.streaming.api.invokable.operator.windowing.StreamDiscretizer;
 import 
org.apache.flink.streaming.api.invokable.operator.windowing.StreamWindow;
 import 
org.apache.flink.streaming.api.invokable.operator.windowing.StreamWindowTypeInfo;
@@ -40,6 +41,7 @@ import 
org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
 import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
 import org.apache.flink.streaming.util.keys.KeySelectorUtil;
@@ -210,15 +212,7 @@ public class WindowedDataStream<OUT> {
 
        private DiscretizedStream<OUT> discretize(boolean isMap) {
 
-               StreamInvokable<OUT, StreamWindow<OUT>> discretizer;
-
-               if (discretizerKey == null) {
-                       discretizer = new StreamDiscretizer<OUT>(getTrigger(), 
getEvicter());
-               } else {
-                       discretizer = new 
GroupedStreamDiscretizer<OUT>(discretizerKey,
-                                       (CloneableTriggerPolicy<OUT>) 
getTrigger(),
-                                       (CloneableEvictionPolicy<OUT>) 
getEvicter());
-               }
+               StreamInvokable<OUT, StreamWindow<OUT>> discretizer = 
getDiscretizer();
 
                int parallelism = isLocal || (discretizerKey != null) ? 
dataStream.environment
                                .getDegreeOfParallelism() : 1;
@@ -229,6 +223,22 @@ public class WindowedDataStream<OUT> {
 
        }
 
+       private StreamInvokable<OUT, StreamWindow<OUT>> getDiscretizer() {
+               if (discretizerKey == null) {
+                       return new StreamDiscretizer<OUT>(getTrigger(), 
getEvicter());
+               } else if (getTrigger() instanceof TimeTriggerPolicy
+                               && ((TimeTriggerPolicy<OUT>) 
getTrigger()).timestampWrapper.isDefaultTimestamp()) {
+                       return new GroupedTimeDiscretizer<OUT>(discretizerKey,
+                                       (TimeTriggerPolicy<OUT>) getTrigger(),
+                                       (CloneableEvictionPolicy<OUT>) 
getEvicter());
+               } else {
+                       return new GroupedStreamDiscretizer<OUT>(discretizerKey,
+                                       (CloneableTriggerPolicy<OUT>) 
getTrigger(),
+                                       (CloneableEvictionPolicy<OUT>) 
getEvicter());
+               }
+
+       }
+
        /**
         * Applies a reduce transformation on the windowed data stream by 
reducing
         * the current window at every trigger.The user can also extend the
@@ -276,8 +286,8 @@ public class WindowedDataStream<OUT> {
         *            The reduce function that will be applied to the windows.
         * @return The transformed DataStream
         */
-       public <R> WindowedDataStream<R> mapWindow(
-                       GroupReduceFunction<OUT, R> reduceFunction, 
TypeInformation<R> outType) {
+       public <R> WindowedDataStream<R> mapWindow(GroupReduceFunction<OUT, R> 
reduceFunction,
+                       TypeInformation<R> outType) {
 
                return discretize(true).mapWindow(reduceFunction, outType);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/87086882/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
index efd2e06..e90726d 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
@@ -33,12 +33,12 @@ public class GroupedStreamDiscretizer<IN> extends 
StreamInvokable<IN, StreamWind
         */
        private static final long serialVersionUID = -3469545957144404137L;
 
-       private KeySelector<IN, ?> keySelector;
-       private Configuration parameters;
-       private CloneableTriggerPolicy<IN> triggerPolicy;
-       private CloneableEvictionPolicy<IN> evictionPolicy;
+       protected KeySelector<IN, ?> keySelector;
+       protected Configuration parameters;
+       protected CloneableTriggerPolicy<IN> triggerPolicy;
+       protected CloneableEvictionPolicy<IN> evictionPolicy;
 
-       private Map<Object, StreamDiscretizer<IN>> groupedDiscretizers;
+       protected Map<Object, StreamDiscretizer<IN>> groupedDiscretizers;
 
        public GroupedStreamDiscretizer(KeySelector<IN, ?> keySelector,
                        CloneableTriggerPolicy<IN> triggerPolicy, 
CloneableEvictionPolicy<IN> evictionPolicy) {
@@ -92,7 +92,7 @@ public class GroupedStreamDiscretizer<IN> extends 
StreamInvokable<IN, StreamWind
         * @param key
         *            The key of the new group.
         */
-       private StreamDiscretizer<IN> makeNewGroup(Object key) throws Exception 
{
+       protected StreamDiscretizer<IN> makeNewGroup(Object key) throws 
Exception {
 
                StreamDiscretizer<IN> groupDiscretizer = new 
StreamDiscretizer<IN>(triggerPolicy.clone(),
                                evictionPolicy.clone());

http://git-wip-us.apache.org/repos/asf/flink/blob/87086882/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedTimeDiscretizer.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedTimeDiscretizer.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedTimeDiscretizer.java
new file mode 100644
index 0000000..63901a8
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedTimeDiscretizer.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.windowing;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
+
+public class GroupedTimeDiscretizer<IN> extends GroupedStreamDiscretizer<IN> {
+
+       private static final long serialVersionUID = -3469545957144404137L;
+
+       private TimeTriggerPolicy<IN> timeTriggerPolicy;
+       private Thread policyThread;
+
+       public GroupedTimeDiscretizer(KeySelector<IN, ?> keySelector,
+                       TimeTriggerPolicy<IN> triggerPolicy, 
CloneableEvictionPolicy<IN> evictionPolicy) {
+
+               super(keySelector, triggerPolicy, evictionPolicy);
+               this.timeTriggerPolicy = triggerPolicy;
+       }
+
+       @Override
+       protected StreamDiscretizer<IN> makeNewGroup(Object key) throws 
Exception {
+
+               StreamDiscretizer<IN> groupDiscretizer = new 
StreamDiscretizer<IN>(triggerPolicy.clone(),
+                               evictionPolicy.clone());
+
+               groupDiscretizer.collector = taskContext.getOutputCollector();
+               // We omit the groupDiscretizer.open(...) call here to avoid 
starting
+               // new active threads
+               return groupDiscretizer;
+       }
+
+       @Override
+       public void open(org.apache.flink.configuration.Configuration 
parameters) throws Exception {
+               super.open(parameters);
+
+               Runnable runnable = new TimeCheck();
+               policyThread = new Thread(runnable);
+               policyThread.start();
+       }
+
+       private void removeUnusedGroups(int threshold) {
+               List<Object> toRemove = new ArrayList<Object>();
+
+               for (Entry<Object, StreamDiscretizer<IN>> entry : 
groupedDiscretizers.entrySet()) {
+                       if (entry.getValue().emptyCount > threshold) {
+                               toRemove.add(entry.getKey());
+                       }
+               }
+
+               for (Object key : toRemove) {
+                       groupedDiscretizers.remove(key);
+               }
+       }
+
+       private class TimeCheck implements Runnable {
+
+               @Override
+               public void run() {
+                       while (true) {
+                               // wait for the specified granularity
+                               try {
+                                       
Thread.sleep(timeTriggerPolicy.granularity);
+                               } catch (InterruptedException e) {
+                                       // ignore it...
+                               }
+
+                               for (StreamDiscretizer<IN> group : 
groupedDiscretizers.values()) {
+                                       TimeTriggerPolicy<IN> groupTrigger = 
(TimeTriggerPolicy<IN>) group.triggerPolicy;
+                                       Object fake = 
groupTrigger.activeFakeElementEmission(null);
+                                       if (fake != null) {
+                                               
group.triggerOnFakeElement(fake);
+                                       }
+                               }
+
+                               removeUnusedGroups(10);
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/87086882/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
index d5b4354..ac02af3 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
@@ -34,12 +34,13 @@ public class StreamDiscretizer<IN> extends 
StreamInvokable<IN, StreamWindow<IN>>
         */
        private static final long serialVersionUID = -8038984294071650730L;
 
-       private TriggerPolicy<IN> triggerPolicy;
-       private EvictionPolicy<IN> evictionPolicy;
+       protected TriggerPolicy<IN> triggerPolicy;
+       protected EvictionPolicy<IN> evictionPolicy;
        private boolean isActiveTrigger;
        private boolean isActiveEviction;
        private Thread activePolicyThread;
        protected LinkedList<IN> buffer;
+       public int emptyCount = 0;
 
        public StreamDiscretizer(TriggerPolicy<IN> triggerPolicy, 
EvictionPolicy<IN> evictionPolicy) {
                super(null);
@@ -113,13 +114,24 @@ public class StreamDiscretizer<IN> extends 
StreamInvokable<IN, StreamWindow<IN>>
                emitWindow();
        }
 
+       protected synchronized void externalTriggerOnFakeElement(Object input) {
+               emitWindow();
+               activeEvict(input);
+       }
+
        /**
         * This method emits the content of the buffer as a new {@link 
StreamWindow}
+        * if not empty
         */
        protected void emitWindow() {
-               StreamWindow<IN> currentWindow = new StreamWindow<IN>();
-               currentWindow.addAll(buffer);
-               collector.collect(currentWindow);
+               if (!buffer.isEmpty()) {
+                       StreamWindow<IN> currentWindow = new StreamWindow<IN>();
+                       currentWindow.addAll(buffer);
+                       collector.collect(currentWindow);
+                       emptyCount = 0;
+               } else {
+                       emptyCount++;
+               }
        }
 
        private void activeEvict(Object input) {

http://git-wip-us.apache.org/repos/asf/flink/blob/87086882/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
index 1e91b8e..fb249cf 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
@@ -41,8 +41,8 @@ public class TimeTriggerPolicy<DATA> implements 
ActiveTriggerPolicy<DATA>,
        private static final long serialVersionUID = -5122753802440196719L;
 
        protected long startTime;
-       protected long granularity;
-       protected TimestampWrapper<DATA> timestampWrapper;
+       public long granularity;
+       public TimestampWrapper<DATA> timestampWrapper;
        protected long delay;
 
        /**
@@ -141,13 +141,17 @@ public class TimeTriggerPolicy<DATA> implements 
ActiveTriggerPolicy<DATA>,
         * @param callback
         *            The callback object.
         */
-       private synchronized void 
activeFakeElementEmission(ActiveTriggerCallback callback) {
+       public synchronized Object 
activeFakeElementEmission(ActiveTriggerCallback callback) {
 
                // start time is excluded, but end time is included: >=
                if (System.currentTimeMillis() >= startTime + granularity) {
                        startTime += granularity;
-                       callback.sendFakeElement(startTime - 1);
+                       if (callback != null) {
+                               callback.sendFakeElement(startTime - 1);
+                       }
+                       return startTime - 1;
                }
+               return null;
 
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/87086882/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
index 6ea7be0..abd6d50 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
@@ -34,12 +34,21 @@ object WindowJoin {
 
   def main(args: Array[String]) {
 
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setDegreeOfParallelism(1)
+   val env = StreamExecutionEnvironment.getExecutionEnvironment
 
-    val split = env.generateSequence(1, 10).split(x => x % 3 toString)
-    
-    split.select("0").merge(split.select("1")).print
+    //Create streams for names and ages by mapping the inputs to the 
corresponding objects
+    val names = env.fromCollection(nameStream).map(x => Name(x._1, x._2))
+    val ages = env.fromCollection(ageStream).map(x => Age(x._1, x._2))
+
+    //Join the two input streams by id on the last 2 seconds every second and 
create new 
+    //Person objects containing both name and age
+    val joined =
+      names.join(ages).onWindow(2, TimeUnit.SECONDS)
+                      .every(1, TimeUnit.SECONDS)
+                      .where("id")
+                      .equalTo("id") { (n, a) => Person(n.name, a.age) }
+
+    joined print
 
     env.execute("WindowJoin")
   }

Reply via email to