Move window operators and tests to windowing package

The api package is also called windowing, this harmonizes the package
names.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/05d2138f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/05d2138f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/05d2138f

Branch: refs/heads/master
Commit: 05d2138f081ff5fa274dab571b9327f96be693aa
Parents: a606c4a
Author: Aljoscha Krettek <[email protected]>
Authored: Thu Sep 24 16:33:09 2015 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Mon Sep 28 17:04:16 2015 +0200

----------------------------------------------------------------------
 .../api/datastream/KeyedWindowDataStream.java   |   2 +-
 ...ractAlignedProcessingTimeWindowOperator.java | 223 +++++++
 .../windowing/AbstractKeyedTimePanes.java       |  76 +++
 .../windowing/AccumulatingKeyedTimePanes.java   | 126 ++++
 ...ccumulatingProcessingTimeWindowOperator.java |  48 ++
 .../windowing/AggregatingKeyedTimePanes.java    | 103 +++
 ...AggregatingProcessingTimeWindowOperator.java |  47 ++
 .../runtime/operators/windowing/KeyMap.java     | 651 +++++++++++++++++++
 .../operators/windowing/PolicyToOperator.java   |  82 +++
 .../operators/windowing/package-info.java       |  22 +
 ...ractAlignedProcessingTimeWindowOperator.java | 223 -------
 .../windows/AbstractKeyedTimePanes.java         |  76 ---
 .../windows/AccumulatingKeyedTimePanes.java     | 126 ----
 ...ccumulatingProcessingTimeWindowOperator.java |  48 --
 .../windows/AggregatingKeyedTimePanes.java      | 103 ---
 ...AggregatingProcessingTimeWindowOperator.java |  47 --
 .../runtime/operators/windows/KeyMap.java       | 651 -------------------
 .../operators/windows/PolicyToOperator.java     |  82 ---
 .../runtime/operators/windows/package-info.java |  22 -
 ...AlignedProcessingTimeWindowOperatorTest.java | 547 ++++++++++++++++
 ...AlignedProcessingTimeWindowOperatorTest.java | 550 ++++++++++++++++
 .../operators/windowing/CollectingOutput.java   |  80 +++
 .../windowing/KeyMapPutIfAbsentTest.java        | 121 ++++
 .../operators/windowing/KeyMapPutTest.java      | 136 ++++
 .../runtime/operators/windowing/KeyMapTest.java | 344 ++++++++++
 ...AlignedProcessingTimeWindowOperatorTest.java | 547 ----------------
 ...AlignedProcessingTimeWindowOperatorTest.java | 551 ----------------
 .../operators/windows/CollectingOutput.java     |  80 ---
 .../windows/KeyMapPutIfAbsentTest.java          | 121 ----
 .../operators/windows/KeyMapPutTest.java        | 136 ----
 .../runtime/operators/windows/KeyMapTest.java   | 344 ----------
 31 files changed, 3157 insertions(+), 3158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
index dfb7032..37151d7 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
@@ -28,7 +28,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy;
-import org.apache.flink.streaming.runtime.operators.windows.PolicyToOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.PolicyToOperator;
 
 /**
  * A KeyedWindowDataStream represents a data stream where elements are grouped 
by key, and 

http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
new file mode 100644
index 0000000..6c4e53a
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.commons.math3.util.ArithmeticUtils;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+
+public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, 
OUT> extends AbstractStreamOperator<OUT> 
+               implements OneInputStreamOperator<IN, OUT>, Triggerable {
+       
+       private static final long serialVersionUID = 3245500864882459867L;
+       
+       private static final long MIN_SLIDE_TIME = 50;
+       
+       // ----- fields for operator parametrization -----
+       
+       private final Function function;
+       private final KeySelector<IN, KEY> keySelector;
+       
+       private final long windowSize;
+       private final long windowSlide;
+       private final long paneSize;
+       private final int numPanesPerWindow;
+       
+       // ----- fields for operator functionality -----
+       
+       private transient AbstractKeyedTimePanes<IN, KEY, ?, OUT> panes;
+       
+       private transient TimestampedCollector<OUT> out;
+       
+       private transient long nextEvaluationTime;
+       private transient long nextSlideTime;
+       
+       protected AbstractAlignedProcessingTimeWindowOperator(
+                       Function function,
+                       KeySelector<IN, KEY> keySelector,
+                       long windowLength,
+                       long windowSlide)
+       {
+               if (function == null || keySelector == null) {
+                       throw new NullPointerException();
+               }
+               if (windowLength < MIN_SLIDE_TIME) {
+                       throw new IllegalArgumentException("Window length must 
be at least " + MIN_SLIDE_TIME + " msecs");
+               }
+               if (windowSlide < MIN_SLIDE_TIME) {
+                       throw new IllegalArgumentException("Window slide must 
be at least " + MIN_SLIDE_TIME + " msecs");
+               }
+               if (windowLength < windowSlide) {
+                       throw new IllegalArgumentException("The window size 
must be larger than the window slide");
+               }
+               
+               final long paneSlide = ArithmeticUtils.gcd(windowLength, 
windowSlide);
+               if (paneSlide < MIN_SLIDE_TIME) {
+                       throw new IllegalArgumentException(String.format(
+                                       "Cannot compute window of size %d msecs 
sliding by %d msecs. " +
+                                                       "The unit of grouping 
is too small: %d msecs", windowLength, windowSlide, paneSlide));
+               }
+               
+               this.function = function;
+               this.keySelector = keySelector;
+               this.windowSize = windowLength;
+               this.windowSlide = windowSlide;
+               this.paneSize = paneSlide;
+               this.numPanesPerWindow = MathUtils.checkedDownCast(windowLength 
/ paneSlide);
+       }
+       
+       
+       protected abstract AbstractKeyedTimePanes<IN, KEY, ?, OUT> createPanes(
+                       KeySelector<IN, KEY> keySelector, Function function);
+
+       // 
------------------------------------------------------------------------
+       //  startup and shutdown
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               out = new TimestampedCollector<>(output);
+               
+               // create the panes that gather the elements per slide
+               panes = createPanes(keySelector, function);
+               
+               // decide when to first compute the window and when to slide it
+               // the values should align with the start of time (that is, the 
UNIX epoch, not the big bang)
+               final long now = System.currentTimeMillis();
+               nextEvaluationTime = now + windowSlide - (now % windowSlide);
+               nextSlideTime = now + paneSize - (now % paneSize);
+               
+               getRuntimeContext().registerTimer(Math.min(nextEvaluationTime, 
nextSlideTime), this);
+       }
+
+       @Override
+       public void close() throws Exception {
+               final long finalWindowTimestamp = nextEvaluationTime;
+
+               // early stop the triggering thread, so it does not attempt to 
return any more data
+               stopTriggers();
+
+               // emit the remaining data
+               computeWindow(finalWindowTimestamp);
+       }
+
+       @Override
+       public void dispose() {
+               // acquire the lock during shutdown, to prevent trigger calls 
at the same time
+               // fail-safe stop of the triggering thread (in case of an error)
+               stopTriggers();
+
+               // release the panes
+               panes.dispose();
+       }
+       
+       private void stopTriggers() {
+               // reset the action timestamps. this makes sure any pending 
triggers will not evaluate
+               nextEvaluationTime = -1L;
+               nextSlideTime = -1L;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Receiving elements and triggers
+       // 
------------------------------------------------------------------------
+       
+       @Override
+       public void processElement(StreamRecord<IN> element) throws Exception {
+               panes.addElementToLatestPane(element.getValue());
+       }
+
+       @Override
+       public void processWatermark(Watermark mark) {
+               // this operator does not react to watermarks
+       }
+
+       @Override
+       public void trigger(long timestamp) throws Exception {
+               // first we check if we actually trigger the window function
+               if (timestamp == nextEvaluationTime) {
+                       // compute and output the results
+                       computeWindow(timestamp);
+
+                       nextEvaluationTime += windowSlide;
+               }
+
+               // check if we slide the panes by one. this may happen in 
addition to the
+               // window computation, or just by itself
+               if (timestamp == nextSlideTime) {
+                       panes.slidePanes(numPanesPerWindow);
+                       nextSlideTime += paneSize;
+               }
+
+               long nextTriggerTime = Math.min(nextEvaluationTime, 
nextSlideTime);
+               getRuntimeContext().registerTimer(nextTriggerTime, this);
+       }
+       
+       private void computeWindow(long timestamp) throws Exception {
+               out.setTimestamp(timestamp);
+               panes.truncatePanes(numPanesPerWindow);
+               panes.evaluateWindow(out);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Property access (for testing)
+       // 
------------------------------------------------------------------------
+
+       public long getWindowSize() {
+               return windowSize;
+       }
+
+       public long getWindowSlide() {
+               return windowSlide;
+       }
+
+       public long getPaneSize() {
+               return paneSize;
+       }
+       
+       public int getNumPanesPerWindow() {
+               return numPanesPerWindow;
+       }
+
+       public long getNextEvaluationTime() {
+               return nextEvaluationTime;
+       }
+
+       public long getNextSlideTime() {
+               return nextSlideTime;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+       
+       @Override
+       public String toString() {
+               return "Window (processing time) (length=" + windowSize + ", 
slide=" + windowSlide + ')';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
new file mode 100644
index 0000000..fae024b
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayDeque;
+
+
+public abstract class AbstractKeyedTimePanes<Type, Key, Aggregate, Result> {
+       
+       protected KeyMap<Key, Aggregate> latestPane = new KeyMap<>();
+
+       protected final ArrayDeque<KeyMap<Key, Aggregate>> previousPanes = new 
ArrayDeque<>();
+
+       // 
------------------------------------------------------------------------
+
+       public abstract void addElementToLatestPane(Type element) throws 
Exception;
+
+       public abstract void evaluateWindow(Collector<Result> out) throws 
Exception;
+       
+       
+       public void dispose() {
+               // since all is heap data, there is no need to clean up anything
+               latestPane = null;
+               previousPanes.clear();
+       }
+       
+       
+       public void slidePanes(int panesToKeep) {
+               if (panesToKeep > 1) {
+                       // the current pane becomes the latest previous pane
+                       previousPanes.addLast(latestPane);
+
+                       // truncate the history
+                       while (previousPanes.size() >= panesToKeep) {
+                               previousPanes.removeFirst();
+                       }
+               }
+
+               // we need a new latest pane
+               latestPane = new KeyMap<>();
+       }
+       
+       public void truncatePanes(int numToRetain) {
+               while (previousPanes.size() >= numToRetain) {
+                       previousPanes.removeFirst();
+               }
+       }
+       
+       protected void traverseAllPanes(KeyMap.TraversalEvaluator<Key, 
Aggregate> traversal, long traversalPass) throws Exception{
+               // gather all panes in an array (faster iterations)
+               @SuppressWarnings({"unchecked", "rawtypes"})
+               KeyMap<Key, Aggregate>[] panes = previousPanes.toArray(new 
KeyMap[previousPanes.size() + 1]);
+               panes[panes.length - 1] = latestPane;
+
+               // let the maps make a coordinated traversal and evaluate the 
window function per contained key
+               KeyMap.traverseMaps(panes, traversal, traversalPass);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
new file mode 100644
index 0000000..d85c53e
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.util.UnionIterator;
+import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+
+
+public class AccumulatingKeyedTimePanes<Type, Key, Result> extends 
AbstractKeyedTimePanes<Type, Key, ArrayList<Type>, Result> {
+       
+       private final KeySelector<Type, Key> keySelector;
+
+       private final KeyMap.LazyFactory<ArrayList<Type>> listFactory = 
getListFactory();
+
+       private final KeyedWindowFunction<Type, Result, Key> function;
+       
+       private long evaluationPass;
+
+       // 
------------------------------------------------------------------------
+       
+       public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, 
KeyedWindowFunction<Type, Result, Key> function) {
+               this.keySelector = keySelector;
+               this.function = function;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void addElementToLatestPane(Type element) throws Exception {
+               Key k = keySelector.getKey(element);
+               ArrayList<Type> elements = latestPane.putIfAbsent(k, 
listFactory);
+               elements.add(element);
+       }
+
+       @Override
+       public void evaluateWindow(Collector<Result> out) throws Exception {
+               if (previousPanes.isEmpty()) {
+                       // optimized path for single pane case (tumbling window)
+                       for (KeyMap.Entry<Key, ArrayList<Type>> entry : 
latestPane) {
+                               function.evaluate(entry.getKey(), 
entry.getValue(), out);
+                       }
+               }
+               else {
+                       // general code path for multi-pane case
+                       WindowFunctionTraversal<Key, Type, Result> evaluator = 
new WindowFunctionTraversal<>(function, out);
+                       traverseAllPanes(evaluator, evaluationPass);
+               }
+               
+               evaluationPass++;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Running a window function in a map traversal
+       // 
------------------------------------------------------------------------
+       
+       static final class WindowFunctionTraversal<Key, Type, Result> 
implements KeyMap.TraversalEvaluator<Key, ArrayList<Type>> {
+
+               private final KeyedWindowFunction<Type, Result, Key> function;
+               
+               private final UnionIterator<Type> unionIterator;
+               
+               private final Collector<Result> out;
+               
+               private Key currentKey;
+
+               WindowFunctionTraversal(KeyedWindowFunction<Type, Result, Key> 
function, Collector<Result> out) {
+                       this.function = function;
+                       this.out = out;
+                       this.unionIterator = new UnionIterator<>();
+               }
+
+
+               @Override
+               public void startNewKey(Key key) {
+                       unionIterator.clear();
+                       currentKey = key;
+               }
+
+               @Override
+               public void nextValue(ArrayList<Type> value) {
+                       unionIterator.addList(value);
+               }
+
+               @Override
+               public void keyDone() throws Exception {
+                       function.evaluate(currentKey, unionIterator, out);
+               }
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  Lazy factory for lists (put if absent)
+       // 
------------------------------------------------------------------------
+       
+       @SuppressWarnings("unchecked")
+       private static <V> KeyMap.LazyFactory<ArrayList<V>> getListFactory() {
+               return (KeyMap.LazyFactory<ArrayList<V>>) LIST_FACTORY;
+       }
+
+       private static final KeyMap.LazyFactory<?> LIST_FACTORY = new 
KeyMap.LazyFactory<ArrayList<?>>() {
+
+               @Override
+               public ArrayList<?> create() {
+                       return new ArrayList<>(4);
+               }
+       };
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
new file mode 100644
index 0000000..4df308d
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
+
+
+public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT> 
+               extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, 
OUT>  {
+
+       private static final long serialVersionUID = 7305948082830843475L;
+
+       
+       public AccumulatingProcessingTimeWindowOperator(
+                       KeyedWindowFunction<IN, OUT, KEY> function,
+                       KeySelector<IN, KEY> keySelector,
+                       long windowLength,
+                       long windowSlide)
+       {
+               super(function, keySelector, windowLength, windowSlide);
+       }
+
+       @Override
+       protected AccumulatingKeyedTimePanes<IN, KEY, OUT> 
createPanes(KeySelector<IN, KEY> keySelector, Function function) {
+               @SuppressWarnings("unchecked")
+               KeyedWindowFunction<IN, OUT, KEY> windowFunction = 
(KeyedWindowFunction<IN, OUT, KEY>) function;
+               
+               return new AccumulatingKeyedTimePanes<>(keySelector, 
windowFunction);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
new file mode 100644
index 0000000..48f4eb1
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.util.Collector;
+
+
+public class AggregatingKeyedTimePanes<Type, Key> extends 
AbstractKeyedTimePanes<Type, Key, Type, Type> {
+       
+       private final KeySelector<Type, Key> keySelector;
+       
+       private final ReduceFunction<Type> reducer;
+       
+       private long evaluationPass;
+
+       // 
------------------------------------------------------------------------
+       
+       public AggregatingKeyedTimePanes(KeySelector<Type, Key> keySelector, 
ReduceFunction<Type> reducer) {
+               this.keySelector = keySelector;
+               this.reducer = reducer;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void addElementToLatestPane(Type element) throws Exception {
+               Key k = keySelector.getKey(element);
+               latestPane.putOrAggregate(k, element, reducer);
+       }
+
+       @Override
+       public void evaluateWindow(Collector<Type> out) throws Exception {
+               if (previousPanes.isEmpty()) {
+                       // optimized path for single pane case
+                       for (KeyMap.Entry<Key, Type> entry : latestPane) {
+                               out.collect(entry.getValue());
+                       }
+               }
+               else {
+                       // general code path for multi-pane case
+                       AggregatingTraversal<Key, Type> evaluator = new 
AggregatingTraversal<>(reducer, out);
+                       traverseAllPanes(evaluator, evaluationPass);
+               }
+               
+               evaluationPass++;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  The maps traversal that performs the final aggregation
+       // 
------------------------------------------------------------------------
+       
+       static final class AggregatingTraversal<Key, Type> implements 
KeyMap.TraversalEvaluator<Key, Type> {
+
+               private final ReduceFunction<Type> function;
+               
+               private final Collector<Type> out;
+               
+               private Type currentValue;
+
+               AggregatingTraversal(ReduceFunction<Type> function, 
Collector<Type> out) {
+                       this.function = function;
+                       this.out = out;
+               }
+
+               @Override
+               public void startNewKey(Key key) {
+                       currentValue = null;
+               }
+
+               @Override
+               public void nextValue(Type value) throws Exception {
+                       if (currentValue != null) {
+                               currentValue = function.reduce(currentValue, 
value);
+                       }
+                       else {
+                               currentValue = value;
+                       }
+               }
+
+               @Override
+               public void keyDone() throws Exception {
+                       out.collect(currentValue);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
new file mode 100644
index 0000000..99457bb
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+
+public class AggregatingProcessingTimeWindowOperator<KEY, IN> 
+               extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, 
IN>  {
+
+       private static final long serialVersionUID = 7305948082830843475L;
+
+       
+       public AggregatingProcessingTimeWindowOperator(
+                       ReduceFunction<IN> function,
+                       KeySelector<IN, KEY> keySelector,
+                       long windowLength,
+                       long windowSlide)
+       {
+               super(function, keySelector, windowLength, windowSlide);
+       }
+
+       @Override
+       protected AggregatingKeyedTimePanes<IN, KEY> 
createPanes(KeySelector<IN, KEY> keySelector, Function function) {
+               @SuppressWarnings("unchecked")
+               ReduceFunction<IN> windowFunction = (ReduceFunction<IN>) 
function;
+               
+               return new AggregatingKeyedTimePanes<IN, KEY>(keySelector, 
windowFunction);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMap.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMap.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMap.java
new file mode 100644
index 0000000..3f44c4a
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMap.java
@@ -0,0 +1,651 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.runtime.util.MathUtils;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * A special Hash Map implementation that can be traversed efficiently in sync 
with other
+ * hash maps.
+ * <p>
+ * The differences between this hash map and Java's "java.util.HashMap" are:
+ * <ul>
+ *     <li>A different hashing scheme. This implementation uses extensible 
hashing, meaning that
+ *         each hash table growth takes one more lower hash code bit into 
account, and values that where
+ *         formerly in the same bucket will afterwards be in the two adjacent 
buckets.</li>
+ *     <li>This allows an efficient traversal of multiple hash maps together, 
even though the maps are
+ *         of different sizes.</li>
+ *     <li>The map offers functions such as "putIfAbsent()" and 
"putOrAggregate()"</li>
+ *     <li>The map supports no removal/shrinking.</li>
+ * </ul>
+ */
+public class KeyMap<K, V> implements Iterable<KeyMap.Entry<K, V>> {
+       
+       /** The minimum table capacity, 64 entries */
+       private static final int MIN_CAPACITY = 0x40;
+       
+       /** The maximum possible table capacity, the largest positive power of
+        * two in the 32bit signed integer value range */
+       private static final int MAX_CAPACITY = 0x40000000;
+       
+       /** The number of bits used for table addressing when table is at max 
capacity */
+       private static final int FULL_BIT_RANGE = 
MathUtils.log2strict(MAX_CAPACITY);
+       
+       // 
------------------------------------------------------------------------
+       
+       /** The hash index, as an array of entries */
+       private Entry<K, V>[] table;
+       
+       /** The number of bits by which the hash code is shifted right, to find 
the bucket */
+       private int shift;
+       
+       /** The number of elements in the hash table */
+       private int numElements;
+       
+       /** The number of elements above which the hash table needs to grow */
+       private int rehashThreshold;
+       
+       /** The base-2 logarithm of the table capacity */ 
+       private int log2size;
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates a new hash table with the default initial capacity.
+        */
+       public KeyMap() {
+               this(0);
+       }
+
+       /**
+        * Creates a new table with a capacity tailored to the given expected 
number of elements.
+        * 
+        * @param expectedNumberOfElements The number of elements to tailor the 
capacity to.
+        */
+       public KeyMap(int expectedNumberOfElements) {
+               if (expectedNumberOfElements < 0) {
+                       throw new IllegalArgumentException("Invalid capacity: " 
+ expectedNumberOfElements);
+               }
+               
+               // round up to the next power or two
+               // guard against too small capacity and integer overflows
+               int capacity = Integer.highestOneBit(expectedNumberOfElements) 
<< 1;
+               capacity = capacity >= 0 ? Math.max(MIN_CAPACITY, capacity) : 
MAX_CAPACITY;
+
+               // this also acts as a sanity check
+               log2size = MathUtils.log2strict(capacity);
+               shift = FULL_BIT_RANGE - log2size;
+               table = allocateTable(capacity);
+               rehashThreshold = getRehashThreshold(capacity);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Gets and Puts
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Inserts the given value, mapped under the given key. If the table 
already contains a value for
+        * the key, the value is replaced and returned. If no value is 
contained, yet, the function
+        * returns null.
+        * 
+        * @param key The key to insert.
+        * @param value The value to insert.
+        * @return The previously mapped value for the key, or null, if no 
value was mapped for the key.
+        * 
+        * @throws java.lang.NullPointerException Thrown, if the key is null.
+        */
+       public final V put(K key, V value) {
+               final int hash = hash(key);
+               final int slot = indexOf (hash);
+               
+               // search the chain from the slot
+               for (Entry<K, V> e = table[slot]; e != null; e = e.next) {
+                       Object k;
+                       if (e.hashCode == hash && ((k = e.key) == key || 
key.equals(k))) {
+                               // found match
+                               V old = e.value;
+                               e.value = value;
+                               return old;
+                       }
+               }
+
+               // no match, insert a new value
+               insertNewEntry(hash, key, value, slot);
+               return null;
+       }
+
+       /**
+        * Inserts a value for the given key, if no value is yet contained for 
that key. Otherwise,
+        * returns the value currently contained for the key.
+        * <p>
+        * The value that is inserted in case that the key is not contained, 
yet, is lazily created
+        * using the given factory.
+        *
+        * @param key The key to insert.
+        * @param factory The factory that produces the value, if no value is 
contained, yet, for the key.
+        * @return The value in the map after this operation (either the 
previously contained value, or the
+        *         newly created value).
+        * 
+        * @throws java.lang.NullPointerException Thrown, if the key is null.
+        */
+       public final V putIfAbsent(K key, LazyFactory<V> factory) {
+               final int hash = hash(key);
+               final int slot = indexOf(hash);
+
+               // search the chain from the slot
+               for (Entry<K, V> entry = table[slot]; entry != null; entry = 
entry.next) {
+                       if (entry.hashCode == hash && entry.key.equals(key)) {
+                               // found match
+                               return entry.value;
+                       }
+               }
+
+               // no match, insert a new value
+               V value = factory.create();
+               insertNewEntry(hash, key, value, slot);
+
+               // return the created value
+               return value;
+       }
+
+       /**
+        * Inserts or aggregates a value into the hash map. If the hash map 
does not yet contain the key,
+        * this method inserts the value. If the table already contains the key 
(and a value) this
+        * method will use the given ReduceFunction function to combine the 
existing value and the
+        * given value to a new value, and store that value for the key. 
+        * 
+        * @param key The key to map the value.
+        * @param value The new value to insert, or aggregate with the existing 
value.
+        * @param aggregator The aggregator to use if a value is already 
contained.
+        * 
+        * @return The value in the map after this operation: Either the given 
value, or the aggregated value.
+        * 
+        * @throws java.lang.NullPointerException Thrown, if the key is null.
+        * @throws Exception The method forwards exceptions from the 
aggregation function.
+        */
+       public final V putOrAggregate(K key, V value, ReduceFunction<V> 
aggregator) throws Exception {
+               final int hash = hash(key);
+               final int slot = indexOf(hash);
+
+               // search the chain from the slot
+               for (Entry<K, V> entry = table[slot]; entry != null; entry = 
entry.next) {
+                       if (entry.hashCode == hash && entry.key.equals(key)) {
+                               // found match
+                               entry.value = aggregator.reduce(entry.value, 
value);
+                               return entry.value;
+                       }
+               }
+
+               // no match, insert a new value
+               insertNewEntry(hash, key, value, slot);
+               // return the original value
+               return value;
+       }
+
+       /**
+        * Looks up the value mapped under the given key. Returns null if no 
value is mapped under this key.
+        * 
+        * @param key The key to look up.
+        * @return The value associated with the key, or null, if no value is 
found for the key.
+        * 
+        * @throws java.lang.NullPointerException Thrown, if the key is null.
+        */
+       public V get(K key) {
+               final int hash = hash(key);
+               final int slot = indexOf(hash);
+               
+               // search the chain from the slot
+               for (Entry<K, V> entry = table[slot]; entry != null; entry = 
entry.next) {
+                       if (entry.hashCode == hash && entry.key.equals(key)) {
+                               return entry.value;
+                       }
+               }
+               
+               // not found
+               return null;
+       }
+
+       private void insertNewEntry(int hashCode, K key, V value, int position) 
{
+               Entry<K,V> e = table[position];
+               table[position] = new Entry<>(key, value, hashCode, e);
+               numElements++;
+
+               // rehash if necessary
+               if (numElements > rehashThreshold) {
+                       growTable();
+               }
+       }
+       
+       private int indexOf(int hashCode) {
+               return (hashCode >> shift) & (table.length - 1);
+       }
+
+       /**
+        * Creates an iterator over the entries of this map.
+        * 
+        * @return An iterator over the entries of this map.
+        */
+       @Override
+       public Iterator<Entry<K, V>> iterator() {
+               return new Iterator<Entry<K, V>>() {
+                       
+                       private final Entry<K, V>[] tab = KeyMap.this.table;
+                       
+                       private Entry<K, V> nextEntry;
+                       
+                       private int nextPos = 0;
+                       
+                       @Override
+                       public boolean hasNext() {
+                               if (nextEntry != null) {
+                                       return true;
+                               }
+                               else {
+                                       while (nextPos < tab.length) {
+                                               Entry<K, V> e = tab[nextPos++];
+                                               if (e != null) {
+                                                       nextEntry = e;
+                                                       return true;
+                                               }
+                                       }
+                                       return false;
+                               }
+                       }
+
+                       @Override
+                       public Entry<K, V> next() {
+                               if (nextEntry != null || hasNext()) {
+                                       Entry<K, V> e = nextEntry;
+                                       nextEntry = nextEntry.next;
+                                       return e;
+                               }
+                               else {
+                                       throw new NoSuchElementException();
+                               }
+                       }
+
+                       @Override
+                       public void remove() {
+                               throw new UnsupportedOperationException();
+                       }
+               };
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  Properties
+       // 
------------------------------------------------------------------------
+       
+       /**
+        * Gets the number of elements currently in the map.
+        * @return The number of elements currently in the map.
+        */
+       public int size() {
+               return numElements;
+       }
+
+       /**
+        * Checks whether the map is empty.
+        * @return True, if the map is empty, false otherwise.
+        */
+       public boolean isEmpty() {
+               return numElements == 0;
+       }
+
+       /**
+        * Gets the current table capacity, i.e., the number of slots in the 
hash table, without
+        * and overflow chaining.
+        * @return The number of slots in the hash table.
+        */
+       public int getCurrentTableCapacity() {
+               return table.length;
+       }
+
+       /**
+        * Gets the base-2 logarithm of the hash table capacity, as returned by
+        * {@link #getCurrentTableCapacity()}.
+        * 
+        * @return The base-2 logarithm of the hash table capacity.
+        */
+       public int getLog2TableCapacity() {
+               return log2size;
+       }
+       
+       public int getRehashThreshold() {
+               return rehashThreshold;
+       }
+       
+       public int getShift() {
+               return shift;
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+       
+       @SuppressWarnings("unchecked")
+       private Entry<K, V>[] allocateTable(int numElements) {
+               return (Entry<K, V>[]) new Entry<?, ?>[numElements];
+       }
+       
+       private void growTable() {
+               final int newSize = table.length << 1;
+                               
+               // only grow if there is still space to grow the table
+               if (newSize > 0) {
+                       final Entry<K, V>[] oldTable = this.table;
+                       final Entry<K, V>[] newTable = allocateTable(newSize);
+
+                       final int newShift = shift - 1;
+                       final int newMask = newSize - 1;
+                       
+                       // go over all slots from the table. since we hash to 
adjacent positions in
+                       // the new hash table, this is actually cache efficient
+                       for (Entry<K, V> entry : oldTable) {
+                               // traverse the chain for each slot
+                               while (entry != null) {
+                                       final int newPos = (entry.hashCode >> 
newShift) & newMask;
+                                       Entry<K, V> nextEntry = entry.next;
+                                       entry.next = newTable[newPos];
+                                       newTable[newPos] = entry;
+                                       entry = nextEntry;
+                               }
+                       }
+                       
+                       this.table = newTable;
+                       this.shift = newShift;
+                       this.rehashThreshold = getRehashThreshold(newSize);
+                       this.log2size += 1;
+               }
+       }
+       
+       private static int hash(Object key) {
+               int code = key.hashCode();
+               
+               // we need a strong hash function that generates diverse upper 
bits
+               // this hash function is more expensive than the "scramble" 
used by "java.util.HashMap",
+               // but required for this sort of hash table
+               code = (code + 0x7ed55d16) + (code << 12);
+               code = (code ^ 0xc761c23c) ^ (code >>> 19);
+               code = (code + 0x165667b1) + (code << 5);
+               code = (code + 0xd3a2646c) ^ (code << 9);
+               code = (code + 0xfd7046c5) + (code << 3);
+               return (code ^ 0xb55a4f09) ^ (code >>> 16);
+       }
+       
+       private static int getRehashThreshold(int capacity) {
+               // divide before multiply, to avoid overflow
+               return capacity / 4 * 3;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Testing Utilities
+       // 
------------------------------------------------------------------------
+
+       /**
+        * For testing only: Actively counts the number of entries, rather than 
using the
+        * counter variable. This method has linear complexity, rather than 
constant.
+        * 
+        * @return The counted number of entries.
+        */
+       int traverseAndCountElements() {
+               int num = 0;
+               
+               for (Entry<?, ?> entry : table) {
+                       while (entry != null) {
+                               num++;
+                               entry = entry.next;
+                       }
+               }
+               
+               return num;
+       }
+
+       /**
+        * For testing only: Gets the length of the longest overflow chain.
+        * This method has linear complexity.
+        * 
+        * @return The length of the longest overflow chain.
+        */
+       int getLongestChainLength() {
+               int maxLen = 0;
+
+               for (Entry<?, ?> entry : table) {
+                       int thisLen = 0;
+                       while (entry != null) {
+                               thisLen++;
+                               entry = entry.next;
+                       }
+                       maxLen = Math.max(maxLen, thisLen);
+               }
+
+               return maxLen;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * An entry in the hash table.
+        * 
+        * @param <K> Type of the key.
+        * @param <V> Type of the value.
+        */
+       public static final class Entry<K, V> {
+               
+               final K key;
+               final int hashCode;
+               
+               V value;
+               Entry<K, V> next;
+               long touchedTag;
+
+               Entry(K key, V value, int hashCode, Entry<K, V> next) {
+                       this.key = key;
+                       this.value = value;
+                       this.next = next;
+                       this.hashCode = hashCode;
+               }
+
+               public K getKey() {
+                       return key;
+               }
+
+               public V getValue() {
+                       return value;
+               }
+       }
+       
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Performs a traversal about logical the multi-map that results from 
the union of the
+        * given maps. This method does not actually build a union of the map, 
but traverses the hash maps
+        * together.
+        * 
+        * @param maps The array uf maps whose union should be traversed.
+        * @param visitor The visitor that is called for each key and all 
values.
+        * @param touchedTag A tag that is used to mark elements that have been 
touched in this specific
+        *                   traversal. Each successive traversal should supply 
a larger value for this
+        *                   tag than the previous one.
+        * 
+        * @param <K> The type of the map's key.
+        * @param <V> The type of the map's value.
+        */
+       public static <K, V> void traverseMaps(
+                                       final KeyMap<K, V>[] maps,
+                                       final TraversalEvaluator<K, V> visitor,
+                                       final long touchedTag)
+               throws Exception
+       {
+               // we need to work on the maps in descending size
+               Arrays.sort(maps, CapacityDescendingComparator.INSTANCE);
+               
+               final int[] shifts = new int[maps.length];
+               final int[] lowBitsMask = new int[maps.length];
+               final int numSlots = maps[0].table.length;
+               final int numTables = maps.length;
+               
+               // figure out how much each hash table collapses the entries
+               for (int i = 0; i < numTables; i++) {
+                       shifts[i] = maps[0].log2size - maps[i].log2size;
+                       lowBitsMask[i] = (1 << shifts[i]) - 1;
+               }
+               
+               // go over all slots (based on the largest hash table)
+               for (int pos = 0; pos < numSlots; pos++) {
+                       
+                       // for each slot, go over all tables, until the table 
does not have that slot any more
+                       // for tables where multiple slots collapse into one, 
we visit that one when we process the
+                       // latest of all slots that collapse to that one
+                       int mask;
+                       for (int rootTable = 0;
+                                       rootTable < numTables && ((mask = 
lowBitsMask[rootTable]) & pos) == mask;
+                                       rootTable++)
+                       {
+                               // use that table to gather keys and start 
collecting keys from the following tables
+                               // go over all entries of that slot in the table
+                               Entry<K, V> entry = maps[rootTable].table[pos 
>> shifts[rootTable]];
+                               while (entry != null) {
+                                       // take only entries that have not been 
collected as part of other tables
+                                       if (entry.touchedTag < touchedTag) {
+                                               entry.touchedTag = touchedTag;
+                                               
+                                               final K key = entry.key;
+                                               final int hashCode = 
entry.hashCode;
+                                               visitor.startNewKey(key);
+                                               visitor.nextValue(entry.value);
+                                               
+                                               addEntriesFromChain(entry.next, 
visitor, key, touchedTag, hashCode);
+                                               
+                                               // go over the other hash 
tables and collect their entries for the key
+                                               for (int followupTable = 
rootTable + 1; followupTable < numTables; followupTable++) {
+                                                       Entry<K, V> 
followupEntry = maps[followupTable].table[pos >> shifts[followupTable]];
+                                                       if (followupEntry != 
null) {
+                                                               
addEntriesFromChain(followupEntry, visitor, key, touchedTag, hashCode);
+                                                       }
+                                               }
+
+                                               visitor.keyDone();
+                                       }
+                                       
+                                       entry = entry.next;
+                               }
+                       }
+               }
+       }
+       
+       private static <K, V> void addEntriesFromChain(
+                       Entry<K, V> entry,
+                       TraversalEvaluator<K, V> visitor,
+                       K key,
+                       long touchedTag,
+                       int hashCode) throws Exception
+       {
+               while (entry != null) {
+                       if (entry.touchedTag < touchedTag && entry.hashCode == 
hashCode && entry.key.equals(key)) {
+                               entry.touchedTag = touchedTag;
+                               visitor.nextValue(entry.value);
+                       }
+                       entry = entry.next;
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       
+       /**
+        * Comparator that defines a descending order on maps depending on 
their table capacity
+        * and number of elements.
+        */
+       static final class CapacityDescendingComparator implements 
Comparator<KeyMap<?, ?>> {
+               
+               static final CapacityDescendingComparator INSTANCE = new 
CapacityDescendingComparator();
+               
+               private CapacityDescendingComparator() {}
+
+
+               @Override
+               public int compare(KeyMap<?, ?> o1, KeyMap<?, ?> o2) {
+                       // this sorts descending
+                       int cmp = o2.getLog2TableCapacity() - 
o1.getLog2TableCapacity();
+                       if (cmp != 0) {
+                               return cmp;
+                       }
+                       else {
+                               return o2.size() - o1.size();
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * A factory for lazy/on-demand instantiation of values.
+        *
+        * @param <V> The type created by the factory.
+        */
+       public static interface LazyFactory<V> {
+
+               /**
+                * The factory method; creates the value.
+                * @return The value.
+                */
+               V create();
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * A visitor for a traversal over the union of multiple hash maps. The 
visitor is
+        * called for each key in the union of the maps and all values 
associated with that key
+        * (one per map, but multiple across maps). 
+        * 
+        * @param <K> The type of the key.
+        * @param <V> The type of the value.
+        */
+       public static interface TraversalEvaluator<K, V> {
+
+               /**
+                * Called whenever the traversal starts with a new key.
+                * 
+                * @param key The key traversed.
+                * @throws Exception Method forwards all exceptions.
+                */
+               void startNewKey(K key) throws Exception;
+
+               /**
+                * Called for each value found for the current key.
+                * 
+                * @param value The next value.
+                * @throws Exception Method forwards all exceptions.
+                */
+               void nextValue(V value) throws Exception;
+
+               /**
+                * Called when the traversal for the current key is complete.
+                * 
+                * @throws Exception Method forwards all exceptions.
+                */
+               void keyDone() throws Exception;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java
new file mode 100644
index 0000000..b34d0bc
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.windowing.windowpolicy.EventTime;
+import org.apache.flink.streaming.api.windowing.windowpolicy.ProcessingTime;
+import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy;
+
+/**
+ * This class implements the conversion from window policies to concrete 
operator
+ * implementations.
+ */
+public class PolicyToOperator {
+
+       /**
+        * Entry point to create an operator for the given window policies and 
the window function.
+        */
+       public static <IN, OUT, KEY> OneInputStreamOperator<IN, OUT> 
createOperatorForPolicies(
+                       WindowPolicy window, WindowPolicy slide, Function 
function, KeySelector<IN, KEY> keySelector)
+       {
+               if (window == null || function == null) {
+                       throw new NullPointerException();
+               }
+               
+               // -- case 1: both policies are processing time policies
+               if (window instanceof ProcessingTime && (slide == null || slide 
instanceof ProcessingTime)) {
+                       final long windowLength = ((ProcessingTime) 
window).toMilliseconds();
+                       final long windowSlide = slide == null ? windowLength : 
((ProcessingTime) slide).toMilliseconds();
+                       
+                       if (function instanceof ReduceFunction) {
+                               @SuppressWarnings("unchecked")
+                               ReduceFunction<IN> reducer = 
(ReduceFunction<IN>) function;
+
+                               @SuppressWarnings("unchecked")
+                               OneInputStreamOperator<IN, OUT> op = 
(OneInputStreamOperator<IN, OUT>)
+                                               new 
AggregatingProcessingTimeWindowOperator<KEY, IN>(
+                                                               reducer, 
keySelector, windowLength, windowSlide);
+                               return op;
+                       }
+                       else if (function instanceof KeyedWindowFunction) {
+                               @SuppressWarnings("unchecked")
+                               KeyedWindowFunction<IN, OUT, KEY> wf = 
(KeyedWindowFunction<IN, OUT, KEY>) function;
+
+                               return new 
AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>(
+                                                               wf, 
keySelector, windowLength, windowSlide);
+                       }
+               }
+
+               // -- case 2: both policies are event time policies
+               if (window instanceof EventTime && (slide == null || slide 
instanceof EventTime)) {
+                       // add event time implementation
+               }
+               
+               throw new UnsupportedOperationException("The windowing 
mechanism does not yet support " + window.toString(slide));
+       }
+       
+       // 
------------------------------------------------------------------------
+       
+       /** Don't instantiate */
+       private PolicyToOperator() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/package-info.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/package-info.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/package-info.java
new file mode 100644
index 0000000..55749a1
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains the operators that implement the various window 
operations
+ * on data streams. 
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;

http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractAlignedProcessingTimeWindowOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractAlignedProcessingTimeWindowOperator.java
deleted file mode 100644
index 2e926bc..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractAlignedProcessingTimeWindowOperator.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windows;
-
-import org.apache.commons.math3.util.ArithmeticUtils;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.util.MathUtils;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-
-public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, 
OUT> extends AbstractStreamOperator<OUT> 
-               implements OneInputStreamOperator<IN, OUT>, Triggerable {
-       
-       private static final long serialVersionUID = 3245500864882459867L;
-       
-       private static final long MIN_SLIDE_TIME = 50;
-       
-       // ----- fields for operator parametrization -----
-       
-       private final Function function;
-       private final KeySelector<IN, KEY> keySelector;
-       
-       private final long windowSize;
-       private final long windowSlide;
-       private final long paneSize;
-       private final int numPanesPerWindow;
-       
-       // ----- fields for operator functionality -----
-       
-       private transient AbstractKeyedTimePanes<IN, KEY, ?, OUT> panes;
-       
-       private transient TimestampedCollector<OUT> out;
-       
-       private transient long nextEvaluationTime;
-       private transient long nextSlideTime;
-       
-       protected AbstractAlignedProcessingTimeWindowOperator(
-                       Function function,
-                       KeySelector<IN, KEY> keySelector,
-                       long windowLength,
-                       long windowSlide)
-       {
-               if (function == null || keySelector == null) {
-                       throw new NullPointerException();
-               }
-               if (windowLength < MIN_SLIDE_TIME) {
-                       throw new IllegalArgumentException("Window length must 
be at least " + MIN_SLIDE_TIME + " msecs");
-               }
-               if (windowSlide < MIN_SLIDE_TIME) {
-                       throw new IllegalArgumentException("Window slide must 
be at least " + MIN_SLIDE_TIME + " msecs");
-               }
-               if (windowLength < windowSlide) {
-                       throw new IllegalArgumentException("The window size 
must be larger than the window slide");
-               }
-               
-               final long paneSlide = ArithmeticUtils.gcd(windowLength, 
windowSlide);
-               if (paneSlide < MIN_SLIDE_TIME) {
-                       throw new IllegalArgumentException(String.format(
-                                       "Cannot compute window of size %d msecs 
sliding by %d msecs. " +
-                                                       "The unit of grouping 
is too small: %d msecs", windowLength, windowSlide, paneSlide));
-               }
-               
-               this.function = function;
-               this.keySelector = keySelector;
-               this.windowSize = windowLength;
-               this.windowSlide = windowSlide;
-               this.paneSize = paneSlide;
-               this.numPanesPerWindow = MathUtils.checkedDownCast(windowLength 
/ paneSlide);
-       }
-       
-       
-       protected abstract AbstractKeyedTimePanes<IN, KEY, ?, OUT> createPanes(
-                       KeySelector<IN, KEY> keySelector, Function function);
-
-       // 
------------------------------------------------------------------------
-       //  startup and shutdown
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public void open(Configuration parameters) throws Exception {
-               out = new TimestampedCollector<>(output);
-               
-               // create the panes that gather the elements per slide
-               panes = createPanes(keySelector, function);
-               
-               // decide when to first compute the window and when to slide it
-               // the values should align with the start of time (that is, the 
UNIX epoch, not the big bang)
-               final long now = System.currentTimeMillis();
-               nextEvaluationTime = now + windowSlide - (now % windowSlide);
-               nextSlideTime = now + paneSize - (now % paneSize);
-               
-               getRuntimeContext().registerTimer(Math.min(nextEvaluationTime, 
nextSlideTime), this);
-       }
-
-       @Override
-       public void close() throws Exception {
-               final long finalWindowTimestamp = nextEvaluationTime;
-
-               // early stop the triggering thread, so it does not attempt to 
return any more data
-               stopTriggers();
-
-               // emit the remaining data
-               computeWindow(finalWindowTimestamp);
-       }
-
-       @Override
-       public void dispose() {
-               // acquire the lock during shutdown, to prevent trigger calls 
at the same time
-               // fail-safe stop of the triggering thread (in case of an error)
-               stopTriggers();
-
-               // release the panes
-               panes.dispose();
-       }
-       
-       private void stopTriggers() {
-               // reset the action timestamps. this makes sure any pending 
triggers will not evaluate
-               nextEvaluationTime = -1L;
-               nextSlideTime = -1L;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Receiving elements and triggers
-       // 
------------------------------------------------------------------------
-       
-       @Override
-       public void processElement(StreamRecord<IN> element) throws Exception {
-               panes.addElementToLatestPane(element.getValue());
-       }
-
-       @Override
-       public void processWatermark(Watermark mark) {
-               // this operator does not react to watermarks
-       }
-
-       @Override
-       public void trigger(long timestamp) throws Exception {
-               // first we check if we actually trigger the window function
-               if (timestamp == nextEvaluationTime) {
-                       // compute and output the results
-                       computeWindow(timestamp);
-
-                       nextEvaluationTime += windowSlide;
-               }
-
-               // check if we slide the panes by one. this may happen in 
addition to the
-               // window computation, or just by itself
-               if (timestamp == nextSlideTime) {
-                       panes.slidePanes(numPanesPerWindow);
-                       nextSlideTime += paneSize;
-               }
-
-               long nextTriggerTime = Math.min(nextEvaluationTime, 
nextSlideTime);
-               getRuntimeContext().registerTimer(nextTriggerTime, this);
-       }
-       
-       private void computeWindow(long timestamp) throws Exception {
-               out.setTimestamp(timestamp);
-               panes.truncatePanes(numPanesPerWindow);
-               panes.evaluateWindow(out);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Property access (for testing)
-       // 
------------------------------------------------------------------------
-
-       public long getWindowSize() {
-               return windowSize;
-       }
-
-       public long getWindowSlide() {
-               return windowSlide;
-       }
-
-       public long getPaneSize() {
-               return paneSize;
-       }
-       
-       public int getNumPanesPerWindow() {
-               return numPanesPerWindow;
-       }
-
-       public long getNextEvaluationTime() {
-               return nextEvaluationTime;
-       }
-
-       public long getNextSlideTime() {
-               return nextSlideTime;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Utilities
-       // 
------------------------------------------------------------------------
-       
-       @Override
-       public String toString() {
-               return "Window (processing time) (length=" + windowSize + ", 
slide=" + windowSlide + ')';
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractKeyedTimePanes.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractKeyedTimePanes.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractKeyedTimePanes.java
deleted file mode 100644
index a49b2e6..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractKeyedTimePanes.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windows;
-
-import org.apache.flink.util.Collector;
-
-import java.util.ArrayDeque;
-
-
-public abstract class AbstractKeyedTimePanes<Type, Key, Aggregate, Result> {
-       
-       protected KeyMap<Key, Aggregate> latestPane = new KeyMap<>();
-
-       protected final ArrayDeque<KeyMap<Key, Aggregate>> previousPanes = new 
ArrayDeque<>();
-
-       // 
------------------------------------------------------------------------
-
-       public abstract void addElementToLatestPane(Type element) throws 
Exception;
-
-       public abstract void evaluateWindow(Collector<Result> out) throws 
Exception;
-       
-       
-       public void dispose() {
-               // since all is heap data, there is no need to clean up anything
-               latestPane = null;
-               previousPanes.clear();
-       }
-       
-       
-       public void slidePanes(int panesToKeep) {
-               if (panesToKeep > 1) {
-                       // the current pane becomes the latest previous pane
-                       previousPanes.addLast(latestPane);
-
-                       // truncate the history
-                       while (previousPanes.size() >= panesToKeep) {
-                               previousPanes.removeFirst();
-                       }
-               }
-
-               // we need a new latest pane
-               latestPane = new KeyMap<>();
-       }
-       
-       public void truncatePanes(int numToRetain) {
-               while (previousPanes.size() >= numToRetain) {
-                       previousPanes.removeFirst();
-               }
-       }
-       
-       protected void traverseAllPanes(KeyMap.TraversalEvaluator<Key, 
Aggregate> traversal, long traversalPass) throws Exception{
-               // gather all panes in an array (faster iterations)
-               @SuppressWarnings({"unchecked", "rawtypes"})
-               KeyMap<Key, Aggregate>[] panes = previousPanes.toArray(new 
KeyMap[previousPanes.size() + 1]);
-               panes[panes.length - 1] = latestPane;
-
-               // let the maps make a coordinated traversal and evaluate the 
window function per contained key
-               KeyMap.traverseMaps(panes, traversal, traversalPass);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java
deleted file mode 100644
index 1212123..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windows;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.runtime.util.UnionIterator;
-import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
-import org.apache.flink.util.Collector;
-
-import java.util.ArrayList;
-
-
-public class AccumulatingKeyedTimePanes<Type, Key, Result> extends 
AbstractKeyedTimePanes<Type, Key, ArrayList<Type>, Result> {
-       
-       private final KeySelector<Type, Key> keySelector;
-
-       private final KeyMap.LazyFactory<ArrayList<Type>> listFactory = 
getListFactory();
-
-       private final KeyedWindowFunction<Type, Result, Key> function;
-       
-       private long evaluationPass;
-
-       // 
------------------------------------------------------------------------
-       
-       public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, 
KeyedWindowFunction<Type, Result, Key> function) {
-               this.keySelector = keySelector;
-               this.function = function;
-       }
-
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public void addElementToLatestPane(Type element) throws Exception {
-               Key k = keySelector.getKey(element);
-               ArrayList<Type> elements = latestPane.putIfAbsent(k, 
listFactory);
-               elements.add(element);
-       }
-
-       @Override
-       public void evaluateWindow(Collector<Result> out) throws Exception {
-               if (previousPanes.isEmpty()) {
-                       // optimized path for single pane case (tumbling window)
-                       for (KeyMap.Entry<Key, ArrayList<Type>> entry : 
latestPane) {
-                               function.evaluate(entry.getKey(), 
entry.getValue(), out);
-                       }
-               }
-               else {
-                       // general code path for multi-pane case
-                       WindowFunctionTraversal<Key, Type, Result> evaluator = 
new WindowFunctionTraversal<>(function, out);
-                       traverseAllPanes(evaluator, evaluationPass);
-               }
-               
-               evaluationPass++;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Running a window function in a map traversal
-       // 
------------------------------------------------------------------------
-       
-       static final class WindowFunctionTraversal<Key, Type, Result> 
implements KeyMap.TraversalEvaluator<Key, ArrayList<Type>> {
-
-               private final KeyedWindowFunction<Type, Result, Key> function;
-               
-               private final UnionIterator<Type> unionIterator;
-               
-               private final Collector<Result> out;
-               
-               private Key currentKey;
-
-               WindowFunctionTraversal(KeyedWindowFunction<Type, Result, Key> 
function, Collector<Result> out) {
-                       this.function = function;
-                       this.out = out;
-                       this.unionIterator = new UnionIterator<>();
-               }
-
-
-               @Override
-               public void startNewKey(Key key) {
-                       unionIterator.clear();
-                       currentKey = key;
-               }
-
-               @Override
-               public void nextValue(ArrayList<Type> value) {
-                       unionIterator.addList(value);
-               }
-
-               @Override
-               public void keyDone() throws Exception {
-                       function.evaluate(currentKey, unionIterator, out);
-               }
-       }
-       
-       // 
------------------------------------------------------------------------
-       //  Lazy factory for lists (put if absent)
-       // 
------------------------------------------------------------------------
-       
-       @SuppressWarnings("unchecked")
-       private static <V> KeyMap.LazyFactory<ArrayList<V>> getListFactory() {
-               return (KeyMap.LazyFactory<ArrayList<V>>) LIST_FACTORY;
-       }
-
-       private static final KeyMap.LazyFactory<?> LIST_FACTORY = new 
KeyMap.LazyFactory<ArrayList<?>>() {
-
-               @Override
-               public ArrayList<?> create() {
-                       return new ArrayList<>(4);
-               }
-       };
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java
deleted file mode 100644
index fb9d163..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windows;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
-
-
-public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT> 
-               extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, 
OUT>  {
-
-       private static final long serialVersionUID = 7305948082830843475L;
-
-       
-       public AccumulatingProcessingTimeWindowOperator(
-                       KeyedWindowFunction<IN, OUT, KEY> function,
-                       KeySelector<IN, KEY> keySelector,
-                       long windowLength,
-                       long windowSlide)
-       {
-               super(function, keySelector, windowLength, windowSlide);
-       }
-
-       @Override
-       protected AccumulatingKeyedTimePanes<IN, KEY, OUT> 
createPanes(KeySelector<IN, KEY> keySelector, Function function) {
-               @SuppressWarnings("unchecked")
-               KeyedWindowFunction<IN, OUT, KEY> windowFunction = 
(KeyedWindowFunction<IN, OUT, KEY>) function;
-               
-               return new AccumulatingKeyedTimePanes<>(keySelector, 
windowFunction);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingKeyedTimePanes.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingKeyedTimePanes.java
deleted file mode 100644
index 730c984..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingKeyedTimePanes.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windows;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.util.Collector;
-
-
-public class AggregatingKeyedTimePanes<Type, Key> extends 
AbstractKeyedTimePanes<Type, Key, Type, Type> {
-       
-       private final KeySelector<Type, Key> keySelector;
-       
-       private final ReduceFunction<Type> reducer;
-       
-       private long evaluationPass;
-
-       // 
------------------------------------------------------------------------
-       
-       public AggregatingKeyedTimePanes(KeySelector<Type, Key> keySelector, 
ReduceFunction<Type> reducer) {
-               this.keySelector = keySelector;
-               this.reducer = reducer;
-       }
-
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public void addElementToLatestPane(Type element) throws Exception {
-               Key k = keySelector.getKey(element);
-               latestPane.putOrAggregate(k, element, reducer);
-       }
-
-       @Override
-       public void evaluateWindow(Collector<Type> out) throws Exception {
-               if (previousPanes.isEmpty()) {
-                       // optimized path for single pane case
-                       for (KeyMap.Entry<Key, Type> entry : latestPane) {
-                               out.collect(entry.getValue());
-                       }
-               }
-               else {
-                       // general code path for multi-pane case
-                       AggregatingTraversal<Key, Type> evaluator = new 
AggregatingTraversal<>(reducer, out);
-                       traverseAllPanes(evaluator, evaluationPass);
-               }
-               
-               evaluationPass++;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  The maps traversal that performs the final aggregation
-       // 
------------------------------------------------------------------------
-       
-       static final class AggregatingTraversal<Key, Type> implements 
KeyMap.TraversalEvaluator<Key, Type> {
-
-               private final ReduceFunction<Type> function;
-               
-               private final Collector<Type> out;
-               
-               private Type currentValue;
-
-               AggregatingTraversal(ReduceFunction<Type> function, 
Collector<Type> out) {
-                       this.function = function;
-                       this.out = out;
-               }
-
-               @Override
-               public void startNewKey(Key key) {
-                       currentValue = null;
-               }
-
-               @Override
-               public void nextValue(Type value) throws Exception {
-                       if (currentValue != null) {
-                               currentValue = function.reduce(currentValue, 
value);
-                       }
-                       else {
-                               currentValue = value;
-                       }
-               }
-
-               @Override
-               public void keyDone() throws Exception {
-                       out.collect(currentValue);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingProcessingTimeWindowOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingProcessingTimeWindowOperator.java
deleted file mode 100644
index 8bed749..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingProcessingTimeWindowOperator.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windows;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-
-public class AggregatingProcessingTimeWindowOperator<KEY, IN> 
-               extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, 
IN>  {
-
-       private static final long serialVersionUID = 7305948082830843475L;
-
-       
-       public AggregatingProcessingTimeWindowOperator(
-                       ReduceFunction<IN> function,
-                       KeySelector<IN, KEY> keySelector,
-                       long windowLength,
-                       long windowSlide)
-       {
-               super(function, keySelector, windowLength, windowSlide);
-       }
-
-       @Override
-       protected AggregatingKeyedTimePanes<IN, KEY> 
createPanes(KeySelector<IN, KEY> keySelector, Function function) {
-               @SuppressWarnings("unchecked")
-               ReduceFunction<IN> windowFunction = (ReduceFunction<IN>) 
function;
-               
-               return new AggregatingKeyedTimePanes<IN, KEY>(keySelector, 
windowFunction);
-       }
-}

Reply via email to