Repository: apex-malhar
Updated Branches:
  refs/heads/master 96d216e80 -> aeb10f33d


APEXMALHAR-2240 Implement Windowed Merge Operator for join support.


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/92bd7323
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/92bd7323
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/92bd7323

Branch: refs/heads/master
Commit: 92bd732325c9d8b8d6241aea398bb62aaaec7b91
Parents: 96d216e
Author: Shunxin <[email protected]>
Authored: Mon Oct 10 18:47:59 2016 -0700
Committer: Shunxin <[email protected]>
Committed: Mon Oct 10 19:07:24 2016 -0700

----------------------------------------------------------------------
 .../malhar/lib/window/JoinAccumulation.java     |  67 ------
 .../malhar/lib/window/MergeAccumulation.java    |  40 ++++
 .../lib/window/MergeWindowedOperator.java       |  30 +++
 .../malhar/lib/window/accumulation/CoGroup.java |  45 ++++
 .../lib/window/accumulation/InnerJoin.java      | 115 ++++++++++
 .../lib/window/accumulation/PojoInnerJoin.java  | 187 +++++++++++++++++
 .../impl/AbstractWindowedMergeOperator.java     | 123 +++++++++++
 .../window/impl/AbstractWindowedOperator.java   |  87 ++++----
 .../impl/KeyedWindowedMergeOperatorImpl.java    | 120 +++++++++++
 .../window/impl/KeyedWindowedOperatorImpl.java  | 134 ++++++------
 .../window/impl/WindowedMergeOperatorImpl.java  | 112 ++++++++++
 .../lib/window/accumulation/CoGroupTest.java    |  60 ++++++
 .../lib/window/accumulation/InnerJoinTest.java  |  57 +++++
 .../window/accumulation/PojoInnerJoinTest.java  | 133 ++++++++++++
 ...yedWindowedMergeOperatorTestApplication.java | 208 +++++++++++++++++++
 .../window/impl/WindowedMergeOperatorTest.java  | 165 +++++++++++++++
 .../WindowedMergeOperatorTestApplication.java   | 203 ++++++++++++++++++
 17 files changed, 1719 insertions(+), 167 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/main/java/org/apache/apex/malhar/lib/window/JoinAccumulation.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/window/JoinAccumulation.java 
b/library/src/main/java/org/apache/apex/malhar/lib/window/JoinAccumulation.java
deleted file mode 100644
index 69240e0..0000000
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/window/JoinAccumulation.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window;
-
-import org.apache.hadoop.classification.InterfaceStability;
-
-/**
- * This is the interface for accumulation when joining multiple streams.
- *
- * @since 3.5.0
- */
[email protected]
-public interface JoinAccumulation<InputT1, InputT2, InputT3, InputT4, InputT5, 
AccumT, OutputT> extends Accumulation<InputT1, AccumT, OutputT>
-{
-  /**
-   * Accumulate the second input type to the accumulated value
-   *
-   * @param accumulatedValue
-   * @param input
-   * @return
-   */
-  AccumT accumulate2(AccumT accumulatedValue, InputT2 input);
-
-  /**
-   * Accumulate the third input type to the accumulated value
-   *
-   * @param accumulatedValue
-   * @param input
-   * @return
-   */
-  AccumT accumulate3(AccumT accumulatedValue, InputT3 input);
-
-  /**
-   * Accumulate the fourth input type to the accumulated value
-   *
-   * @param accumulatedValue
-   * @param input
-   * @return
-   */
-  AccumT accumulate4(AccumT accumulatedValue, InputT4 input);
-
-  /**
-   * Accumulate the fifth input type to the accumulated value
-   *
-   * @param accumulatedValue
-   * @param input
-   * @return
-   */
-  AccumT accumulate5(AccumT accumulatedValue, InputT5 input);
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/main/java/org/apache/apex/malhar/lib/window/MergeAccumulation.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/window/MergeAccumulation.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/window/MergeAccumulation.java
new file mode 100644
index 0000000..71f4408
--- /dev/null
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/window/MergeAccumulation.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * This is the interface for accumulation when joining multiple streams.
+ *
+ * @since 3.5.0
+ */
[email protected]
+public interface MergeAccumulation<InputT1, InputT2, AccumT, OutputT> extends 
Accumulation<InputT1, AccumT, OutputT>
+{
+  /**
+   * Accumulate the second input type to the accumulated value
+   *
+   * @param accumulatedValue
+   * @param input
+   * @return
+   */
+  AccumT accumulate2(AccumT accumulatedValue, InputT2 input);
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/main/java/org/apache/apex/malhar/lib/window/MergeWindowedOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/window/MergeWindowedOperator.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/window/MergeWindowedOperator.java
new file mode 100644
index 0000000..89a70a4
--- /dev/null
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/window/MergeWindowedOperator.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window;
+
+/**
+ * Interface for Join Windowed Operator.
+ */
+public interface MergeWindowedOperator<InputT1, InputT2>
+    extends WindowedOperator<InputT1>
+{
+  void accumulateTuple2(Tuple.WindowedTuple<InputT2> tuple);
+
+  void processWatermark2(ControlTuple.Watermark watermark);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/CoGroup.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/CoGroup.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/CoGroup.java
new file mode 100644
index 0000000..e22d582
--- /dev/null
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/CoGroup.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * CoGroup Join Accumulation.
+ */
+public class CoGroup<T> extends InnerJoin<T>
+{
+  public CoGroup()
+  {
+    //for kryo
+  }
+
+
+  @Override
+  public List<List<T>> getOutput(List<Set<T>> accumulatedValue)
+  {
+    List<List<T>> result = new ArrayList<>();
+    for (int i = 0; i < 2; i++) {
+      result.add(new ArrayList<T>(accumulatedValue.get(i)));
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/InnerJoin.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/InnerJoin.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/InnerJoin.java
new file mode 100644
index 0000000..fd250a2
--- /dev/null
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/InnerJoin.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.apex.malhar.lib.window.MergeAccumulation;
+
+/**
+ * Combine Join Accumulation, inner-joins tuples with same type from different 
streams.
+ */
+public class InnerJoin<T> implements MergeAccumulation<T, T, List<Set<T>>, 
List<List<T>>>
+{
+
+  public InnerJoin()
+  {
+    //for kryo
+  }
+
+  @Override
+  public List<Set<T>> accumulate(List<Set<T>> accumulatedValue, T input)
+  {
+    return accumulateWithIndex(0, accumulatedValue, input);
+  }
+
+  @Override
+  public List<Set<T>> accumulate2(List<Set<T>> accumulatedValue, T input)
+  {
+    return accumulateWithIndex(1, accumulatedValue, input);
+  }
+
+
+  public List<Set<T>> accumulateWithIndex(int index, List<Set<T>> 
accumulatedValue, T input)
+  {
+    Set<T> accuSet = accumulatedValue.get(index);
+    accuSet.add(input);
+    accumulatedValue.set(index, accuSet);
+    return accumulatedValue;
+  }
+
+  @Override
+  public List<Set<T>> defaultAccumulatedValue()
+  {
+    ArrayList<Set<T>> accu = new ArrayList<>();
+    for (int i = 0; i < 2; i++) {
+      accu.add(new HashSet<T>());
+    }
+    return accu;
+  }
+
+  @Override
+  public List<Set<T>> merge(List<Set<T>> accumulatedValue1, List<Set<T>> 
accumulatedValue2)
+  {
+    for (int i = 0; i < 2; i++) {
+      Set<T> accuSet1 = accumulatedValue1.get(i);
+      Set<T> accuSet2 = accumulatedValue2.get(i);
+      accuSet1.addAll(accuSet2);
+      accumulatedValue1.set(i, accuSet1);
+    }
+    return accumulatedValue1;
+  }
+
+  @Override
+  public List<List<T>> getOutput(List<Set<T>> accumulatedValue)
+  {
+    List<List<T>> result = new ArrayList<>();
+
+    // TODO: May need to revisit (use state manager).
+    result = getAllCombo(accumulatedValue, result, new ArrayList<T>());
+
+    return result;
+  }
+
+
+  public List<List<T>> getAllCombo(List<Set<T>> accu, List<List<T>> result, 
List<T> curList)
+  {
+    if (curList.size() == 2) {
+      result.add(curList);
+      return result;
+    } else {
+      for (T item : accu.get(curList.size())) {
+        List<T> tempList = new ArrayList<>(curList);
+        tempList.add(item);
+        result = getAllCombo(accu, result, tempList);
+      }
+      return result;
+    }
+  }
+
+
+  @Override
+  public List<List<T>> getRetraction(List<List<T>> value)
+  {
+    return new ArrayList<>();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
new file mode 100644
index 0000000..4b3bc69
--- /dev/null
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.lang.reflect.Field;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.apex.malhar.lib.window.MergeAccumulation;
+
+import com.google.common.base.Throwables;
+
+/**
+ * Inner join Accumulation for Pojo Streams.
+ */
+public class PojoInnerJoin<InputT1, InputT2>
+    implements MergeAccumulation<InputT1, InputT2, List<List<Map<String, 
Object>>>, List<Map<String, Object>>>
+{
+  protected String[] keys;
+
+  public PojoInnerJoin()
+  {
+    throw new IllegalArgumentException("Please specify number of streams that 
are joining.");
+  }
+
+  public PojoInnerJoin(int num, String... keys)
+  {
+    if (keys.length != 2) {
+      throw new IllegalArgumentException("Wrong number of keys.");
+    }
+
+    this.keys = Arrays.copyOf(keys, keys.length);
+  }
+
+  @Override
+  public List<List<Map<String, Object>>> accumulate(List<List<Map<String, 
Object>>> accumulatedValue, InputT1 input)
+  {
+    try {
+      return accumulateWithIndex(0, accumulatedValue, input);
+    } catch (NoSuchFieldException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public List<List<Map<String, Object>>> accumulate2(List<List<Map<String, 
Object>>> accumulatedValue, InputT2 input)
+  {
+    try {
+      return accumulateWithIndex(1, accumulatedValue, input);
+    } catch (NoSuchFieldException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+
+  @Override
+  public List<List<Map<String, Object>>> defaultAccumulatedValue()
+  {
+    List<List<Map<String, Object>>> accu = new ArrayList<>();
+    for (int i = 0; i < 2; i++) {
+      accu.add(new ArrayList<Map<String, Object>>());
+    }
+    return accu;
+  }
+
+
+  private List<List<Map<String, Object>>> accumulateWithIndex(int index, 
List<List<Map<String, Object>>> accu, Object input) throws NoSuchFieldException
+  {
+    // TODO: If a stream never sends out any tuple during one window, a wrong 
key would not be detected.
+
+    input.getClass().getDeclaredField(keys[index]);
+
+    List<Map<String, Object>> curList = accu.get(index);
+    Map map = pojoToMap(input);
+    curList.add(map);
+    accu.set(index, curList);
+
+    return accu;
+  }
+
+  private Map<String, Object> pojoToMap(Object input)
+  {
+    Map<String, Object> map = new HashMap<>();
+
+    Field[] fields = input.getClass().getDeclaredFields();
+
+    for (Field field : fields) {
+      String[] words = field.getName().split("\\.");
+      String fieldName = words[words.length - 1];
+      field.setAccessible(true);
+      try {
+        Object value = field.get(input);
+        map.put(fieldName, value);
+      } catch (IllegalAccessException e) {
+        throw Throwables.propagate(e);
+      }
+    }
+    return map;
+  }
+
+  @Override
+  public List<List<Map<String, Object>>> merge(List<List<Map<String, Object>>> 
accumulatedValue1, List<List<Map<String, Object>>> accumulatedValue2)
+  {
+    for (int i = 0; i < 2; i++) {
+      List<Map<String, Object>> curList = accumulatedValue1.get(i);
+      curList.addAll(accumulatedValue2.get(i));
+      accumulatedValue1.set(i, curList);
+    }
+    return accumulatedValue1;
+  }
+
+  @Override
+  public List<Map<String, Object>> getOutput(List<List<Map<String, Object>>> 
accumulatedValue)
+  {
+    List<Map<String, Object>> result = new ArrayList<>();
+
+    // TODO: May need to revisit (use state manager).
+    result = getAllCombo(0, accumulatedValue, result, null);
+
+    return result;
+  }
+
+
+  private List<Map<String, Object>> getAllCombo(int streamIndex, 
List<List<Map<String, Object>>> accu, List<Map<String, Object>> result, 
Map<String, Object> curMap)
+  {
+    if (streamIndex == 2) {
+      if (curMap != null) {
+        result.add(curMap);
+      }
+      return result;
+    } else {
+      for (Map<String, Object> map : accu.get(streamIndex)) {
+        if (streamIndex == 0) {
+          Map<String, Object> tempMap = new HashMap<>(map);
+          result = getAllCombo(streamIndex + 1, accu, result, tempMap);
+        } else if (curMap == null) {
+          return result;
+        } else {
+          Map<String, Object> tempMap = new HashMap<>(curMap);
+          tempMap = joinTwoMapsWithKeys(tempMap, keys[0], map, 
keys[streamIndex]);
+          result = getAllCombo(streamIndex + 1, accu, result, tempMap);
+        }
+      }
+      return result;
+    }
+  }
+
+  private Map<String, Object> joinTwoMapsWithKeys(Map<String, Object> map1, 
String key1, Map<String, Object> map2, String key2)
+  {
+    if (!map1.get(key1).equals(map2.get(key2))) {
+      return null;
+    } else {
+      for (String field : map2.keySet()) {
+        if (!field.equals(key2)) {
+          map1.put(field, map2.get(field));
+        }
+      }
+      return map1;
+    }
+  }
+
+  @Override
+  public List<Map<String, Object>> getRetraction(List<Map<String, Object>> 
value)
+  {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedMergeOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedMergeOperator.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedMergeOperator.java
new file mode 100644
index 0000000..05a2495
--- /dev/null
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedMergeOperator.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.apex.malhar.lib.window.ControlTuple;
+import org.apache.apex.malhar.lib.window.MergeAccumulation;
+import org.apache.apex.malhar.lib.window.MergeWindowedOperator;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
+
+import com.google.common.base.Function;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+
+
+/**
+ * Abstract Windowed Merge Operator.
+ */
+public abstract class AbstractWindowedMergeOperator<InputT1, InputT2, OutputT, 
DataStorageT extends WindowedStorage,
+    RetractionStorageT extends WindowedStorage, AccumulationT extends
+    MergeAccumulation>
+    extends AbstractWindowedOperator<InputT1, OutputT, DataStorageT, 
RetractionStorageT, AccumulationT>
+    implements MergeWindowedOperator<InputT1, InputT2>
+{
+
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(AbstractWindowedMergeOperator.class);
+  private Function<InputT2, Long> timestampExtractor2;
+
+  private long latestWatermark1 = -1;  // latest watermark from stream 1
+  private long latestWatermark2 = -1;  // latest watermark from stream 2
+
+  public final transient DefaultInputPort<Tuple<InputT2>> input2 = new 
DefaultInputPort<Tuple<InputT2>>()
+  {
+    @Override
+    public void process(Tuple<InputT2> tuple)
+    {
+      processTuple2(tuple);
+    }
+  };
+
+  // TODO: This port should be removed when Apex Core has native support for 
custom control tuples
+  @InputPortFieldAnnotation(optional = true)
+  public final transient DefaultInputPort<ControlTuple> controlInput2 = new 
DefaultInputPort<ControlTuple>()
+  {
+    @Override
+    public void process(ControlTuple tuple)
+    {
+      if (tuple instanceof ControlTuple.Watermark) {
+        processWatermark2((ControlTuple.Watermark)tuple);
+      }
+    }
+  };
+
+  public void processTuple2(Tuple<InputT2> tuple)
+  {
+    long timestamp = extractTimestamp(tuple, timestampExtractor2);
+    if (isTooLate(timestamp)) {
+      dropTuple(tuple);
+    } else {
+      Tuple.WindowedTuple<InputT2> windowedTuple = 
getWindowedValueWithTimestamp(tuple, timestamp);
+      // do the accumulation
+      accumulateTuple2(windowedTuple);
+      processWindowState(windowedTuple);
+    }
+  }
+
+  public void setTimestampExtractor2(Function<InputT2, Long> 
timestampExtractor2)
+  {
+    this.timestampExtractor2 = timestampExtractor2;
+  }
+
+
+  @Override
+  public void processWatermark(ControlTuple.Watermark watermark)
+  {
+    latestWatermark1 = watermark.getTimestamp();
+    if (latestWatermark1 >= 0 && latestWatermark2 >= 0) {
+      // Select the smallest timestamp of the latest watermarks as the 
watermark of the operator.
+      long minWatermark = Math.min(latestWatermark1, latestWatermark2);
+      if (this.watermarkTimestamp < 0 || minWatermark != 
this.watermarkTimestamp) {
+        this.watermarkTimestamp = minWatermark;
+      }
+    }
+  }
+
+  @Override
+  public void processWatermark2(ControlTuple.Watermark watermark)
+  {
+    latestWatermark2 = watermark.getTimestamp();
+    if (latestWatermark1 >= 0 && latestWatermark2 >= 0) {
+      long minWatermark = Math.min(latestWatermark1, latestWatermark2);
+      if (this.watermarkTimestamp < 0 || minWatermark != 
this.watermarkTimestamp) {
+        this.watermarkTimestamp = minWatermark;
+      }
+    }
+  }
+
+  @Override
+  protected void processWatermarkAtEndWindow()
+  {
+    if (fixedWatermarkMillis > 0 || this.watermarkTimestamp != 
this.currentWatermark) {
+      super.processWatermarkAtEndWindow();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
index 0ece11e..f965a01 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
@@ -79,16 +79,16 @@ public abstract class AbstractWindowedOperator<InputT, 
OutputT, DataStorageT ext
 
   private Function<InputT, Long> timestampExtractor;
 
-  private long currentWatermark = -1;
-  private long watermarkTimestamp = -1;
+  protected long currentWatermark = -1;
+  protected long watermarkTimestamp = -1;
   private boolean triggerAtWatermark;
-  private long earlyTriggerCount;
+  protected long earlyTriggerCount;
   private long earlyTriggerMillis;
-  private long lateTriggerCount;
+  protected long lateTriggerCount;
   private long lateTriggerMillis;
   private long currentDerivedTimestamp = -1;
   private long timeIncrement;
-  private long fixedWatermarkMillis = -1;
+  protected long fixedWatermarkMillis = -1;
 
   private Map<String, Component<Context.OperatorContext>> components = new 
HashMap<>();
 
@@ -96,7 +96,9 @@ public abstract class AbstractWindowedOperator<InputT, 
OutputT, DataStorageT ext
   protected RetractionStorageT retractionStorage;
   protected AccumulationT accumulation;
 
-  private static final transient Collection<? extends Window> 
GLOBAL_WINDOW_SINGLETON_SET = 
Collections.singleton(Window.GlobalWindow.INSTANCE);
+
+  protected static final transient Collection<? extends Window> 
GLOBAL_WINDOW_SINGLETON_SET = 
Collections.singleton(Window.GlobalWindow.INSTANCE);
+
   private static final transient Logger LOG = 
LoggerFactory.getLogger(AbstractWindowedOperator.class);
 
   public final transient DefaultInputPort<Tuple<InputT>> input = new 
DefaultInputPort<Tuple<InputT>>()
@@ -135,28 +137,32 @@ public abstract class AbstractWindowedOperator<InputT, 
OutputT, DataStorageT ext
    */
   public void processTuple(Tuple<InputT> tuple)
   {
-    long timestamp = extractTimestamp(tuple);
+    long timestamp = extractTimestamp(tuple, timestampExtractor);
     if (isTooLate(timestamp)) {
       dropTuple(tuple);
     } else {
       Tuple.WindowedTuple<InputT> windowedTuple = getWindowedValue(tuple);
       // do the accumulation
       accumulateTuple(windowedTuple);
+      processWindowState(windowedTuple);
+    }
+  }
 
-      for (Window window : windowedTuple.getWindows()) {
-        WindowState windowState = windowStateMap.get(window);
-        windowState.tupleCount++;
-        // process any count based triggers
-        if (windowState.watermarkArrivalTime == -1) {
-          // watermark has not arrived yet, check for early count based trigger
-          if (earlyTriggerCount > 0 && (windowState.tupleCount % 
earlyTriggerCount) == 0) {
-            fireTrigger(window, windowState);
-          }
-        } else {
-          // watermark has arrived, check for late count based trigger
-          if (lateTriggerCount > 0 && (windowState.tupleCount % 
lateTriggerCount) == 0) {
-            fireTrigger(window, windowState);
-          }
+  protected void processWindowState(Tuple.WindowedTuple<? extends Object> 
windowedTuple)
+  {
+    for (Window window : windowedTuple.getWindows()) {
+      WindowState windowState = windowStateMap.get(window);
+      windowState.tupleCount++;
+      // process any count based triggers
+      if (windowState.watermarkArrivalTime == -1) {
+        // watermark has not arrived yet, check for early count based trigger
+        if (earlyTriggerCount > 0 && (windowState.tupleCount % 
earlyTriggerCount) == 0) {
+          fireTrigger(window, windowState);
+        }
+      } else {
+        // watermark has arrived, check for late count based trigger
+        if (lateTriggerCount > 0 && (windowState.tupleCount % 
lateTriggerCount) == 0) {
+          fireTrigger(window, windowState);
         }
       }
     }
@@ -292,15 +298,31 @@ public abstract class AbstractWindowedOperator<InputT, 
OutputT, DataStorageT ext
   @Override
   public Tuple.WindowedTuple<InputT> getWindowedValue(Tuple<InputT> input)
   {
+    long timestamp = extractTimestamp(input, timestampExtractor);
+    return getWindowedValueWithTimestamp(input, timestamp);
+  }
+
+  public <T> Tuple.WindowedTuple<T> getWindowedValueWithTimestamp(Tuple<T> 
input, long timestamp)
+  {
     if (windowOption == null && input instanceof Tuple.WindowedTuple) {
       // inherit the windows from upstream
-      return (Tuple.WindowedTuple<InputT>)input;
+      initializeWindowStates(((Tuple.WindowedTuple<T>)input).getWindows());
+      return (Tuple.WindowedTuple<T>)input;
     } else {
-      return new Tuple.WindowedTuple<>(assignWindows(input), 
extractTimestamp(input), input.getValue());
+      return new Tuple.WindowedTuple<>(assignWindows(input, timestamp), 
timestamp, input.getValue());
     }
   }
 
-  private long extractTimestamp(Tuple<InputT> tuple)
+  protected void initializeWindowStates(Collection<? extends Window> windows)
+  {
+    for (Window window : windows) {
+      if (!windowStateMap.containsWindow(window)) {
+        windowStateMap.put(window, new WindowState());
+      }
+    }
+  }
+
+  protected <T> long extractTimestamp(Tuple<T> tuple, Function<T, Long> 
timestampExtractor)
   {
     if (timestampExtractor == null) {
       if (tuple instanceof Tuple.TimestampedTuple) {
@@ -313,19 +335,14 @@ public abstract class AbstractWindowedOperator<InputT, 
OutputT, DataStorageT ext
     }
   }
 
-  private Collection<? extends Window> assignWindows(Tuple<InputT> inputTuple)
+  protected <T> Collection<? extends Window> assignWindows(Tuple<T> 
inputTuple, long timestamp)
   {
     if (windowOption instanceof WindowOption.GlobalWindow) {
       return GLOBAL_WINDOW_SINGLETON_SET;
     } else {
-      long timestamp = extractTimestamp(inputTuple);
       if (windowOption instanceof WindowOption.TimeWindows) {
         Collection<? extends Window> windows = 
getTimeWindowsForTimestamp(timestamp);
-        for (Window window : windows) {
-          if (!windowStateMap.containsWindow(window)) {
-            windowStateMap.put(window, new WindowState());
-          }
-        }
+        initializeWindowStates(windows);
         return windows;
       } else if (windowOption instanceof WindowOption.SessionWindows) {
         return assignSessionWindows(timestamp, inputTuple);
@@ -335,7 +352,7 @@ public abstract class AbstractWindowedOperator<InputT, 
OutputT, DataStorageT ext
     }
   }
 
-  protected Collection<Window.SessionWindow> assignSessionWindows(long 
timestamp, Tuple<InputT> inputTuple)
+  protected <T> Collection<Window.SessionWindow> assignSessionWindows(long 
timestamp, Tuple<T> inputTuple)
   {
     throw new UnsupportedOperationException("Session window require keyed 
tuples");
   }
@@ -348,7 +365,7 @@ public abstract class AbstractWindowedOperator<InputT, 
OutputT, DataStorageT ext
    * @param timestamp the timestamp
    * @return the windows this timestamp belongs to
    */
-  private Collection<Window.TimeWindow> getTimeWindowsForTimestamp(long 
timestamp)
+  protected Collection<Window.TimeWindow> getTimeWindowsForTimestamp(long 
timestamp)
   {
     List<Window.TimeWindow> windows = new ArrayList<>();
     if (windowOption instanceof WindowOption.TimeWindows) {
@@ -382,7 +399,7 @@ public abstract class AbstractWindowedOperator<InputT, 
OutputT, DataStorageT ext
   }
 
   @Override
-  public void dropTuple(Tuple<InputT> input)
+  public void dropTuple(Tuple input)
   {
     // do nothing
     LOG.debug("Dropping late tuple {}", input);
@@ -464,7 +481,7 @@ public abstract class AbstractWindowedOperator<InputT, 
OutputT, DataStorageT ext
     }
   }
 
-  private void processWatermarkAtEndWindow()
+  protected void processWatermarkAtEndWindow()
   {
     if (fixedWatermarkMillis > 0) {
       watermarkTimestamp = currentDerivedTimestamp - fixedWatermarkMillis;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedMergeOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedMergeOperatorImpl.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedMergeOperatorImpl.java
new file mode 100644
index 0000000..a5f17c5
--- /dev/null
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedMergeOperatorImpl.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import java.util.Map;
+
+import org.apache.apex.malhar.lib.window.MergeAccumulation;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
+
+import com.datatorrent.lib.util.KeyValPair;
+
+
+/**
+ * Keyed Windowed Merge Operator to merge two streams of keyed tuple with a 
key. Please use
+ * {@link WindowedMergeOperatorImpl} for non-keyed merging.
+ *
+ * @param <KeyT> Type of the key used to merge two streams.
+ * @param <InputT1> The type of the value of the keyed input tuple from first 
stream.
+ * @param <InputT2> The type of the value of the keyed input tuple from second 
stream.
+ * @param <AccumT> The type of the accumulated value in the operator state per 
key per window.
+ * @param <OutputT> The type of the value of the keyed output tuple.
+ */
+public class KeyedWindowedMergeOperatorImpl<KeyT, InputT1, InputT2, AccumT, 
OutputT>
+    extends AbstractWindowedMergeOperator<KeyValPair<KeyT, InputT1>, 
KeyValPair<KeyT, InputT2>, KeyValPair<KeyT, OutputT>, 
WindowedStorage.WindowedKeyedStorage<KeyT, AccumT>, 
WindowedStorage.WindowedKeyedStorage<KeyT, OutputT>, MergeAccumulation<InputT1, 
InputT2, AccumT, OutputT>>
+{
+  // TODO: Add session window support.
+
+  private abstract class AccumFunction<T>
+  {
+    abstract AccumT accumulate(AccumT accum, T value);
+  }
+
+  private <T> void accumulateTupleHelper(Tuple.WindowedTuple<KeyValPair<KeyT, 
T>> tuple, AccumFunction<T> accumFn)
+  {
+    final KeyValPair<KeyT, T> kvData = tuple.getValue();
+    KeyT key = kvData.getKey();
+    for (Window window : tuple.getWindows()) {
+      // process each window
+      AccumT accum = dataStorage.get(window, key);
+      if (accum == null) {
+        accum = accumulation.defaultAccumulatedValue();
+      }
+      dataStorage.put(window, key, accumFn.accumulate(accum, 
kvData.getValue()));
+    }
+  }
+
+  @Override
+  public void accumulateTuple(Tuple.WindowedTuple<KeyValPair<KeyT, InputT1>> 
tuple)
+  {
+    accumulateTupleHelper(tuple, new AccumFunction<InputT1>()
+    {
+      @Override
+      AccumT accumulate(AccumT accum, InputT1 value)
+      {
+        return accumulation.accumulate(accum, value);
+      }
+    });
+  }
+
+  @Override
+  public void accumulateTuple2(Tuple.WindowedTuple<KeyValPair<KeyT, InputT2>> 
tuple)
+  {
+    accumulateTupleHelper(tuple, new AccumFunction<InputT2>()
+    {
+      @Override
+      AccumT accumulate(AccumT accum, InputT2 value)
+      {
+        return accumulation.accumulate2(accum, value);
+      }
+    });
+  }
+
+  @Override
+  public void fireNormalTrigger(Window window, boolean fireOnlyUpdatedPanes)
+  {
+    for (Map.Entry<KeyT, AccumT> entry : dataStorage.entries(window)) {
+      OutputT outputVal = accumulation.getOutput(entry.getValue());
+      if (fireOnlyUpdatedPanes) {
+        OutputT oldValue = retractionStorage.get(window, entry.getKey());
+        if (oldValue != null && oldValue.equals(outputVal)) {
+          continue;
+        }
+      }
+      output.emit(new Tuple.WindowedTuple<>(window, new 
KeyValPair<>(entry.getKey(), outputVal)));
+      if (retractionStorage != null) {
+        retractionStorage.put(window, entry.getKey(), outputVal);
+      }
+    }
+  }
+
+  @Override
+  public void fireRetractionTrigger(Window window, boolean 
firingOnlyUpdatedPanes)
+  {
+    if (triggerOption.getAccumulationMode() != 
TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) {
+      throw new UnsupportedOperationException();
+    }
+    for (Map.Entry<KeyT, OutputT> entry : retractionStorage.entries(window)) {
+      output.emit(new Tuple.WindowedTuple<>(window, new 
KeyValPair<>(entry.getKey(), accumulation.getRetraction(entry.getValue()))));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
index 6fab7de..a33133b 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
@@ -52,81 +52,85 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, 
AccumT, OutputValT>
 {
 
   @Override
-  protected Collection<Window.SessionWindow> assignSessionWindows(long 
timestamp, Tuple<KeyValPair<KeyT, InputValT>> inputTuple)
+  protected <T> Collection<Window.SessionWindow> assignSessionWindows(long 
timestamp, Tuple<T> inputTuple)
   {
-    KeyT key = inputTuple.getValue().getKey();
-    WindowOption.SessionWindows sessionWindowOption = 
(WindowOption.SessionWindows)windowOption;
-    SessionWindowedStorage<KeyT, AccumT> sessionStorage = 
(SessionWindowedStorage<KeyT, AccumT>)dataStorage;
-    Collection<Map.Entry<Window.SessionWindow<KeyT>, AccumT>> sessionEntries = 
sessionStorage.getSessionEntries(key, timestamp, 
sessionWindowOption.getMinGap().getMillis());
-    Window.SessionWindow<KeyT> sessionWindowToAssign;
-    switch (sessionEntries.size()) {
-      case 0: {
-        // There are no existing windows within the minimum gap. Create a new 
session window
-        Window.SessionWindow<KeyT> sessionWindow = new 
Window.SessionWindow<>(key, timestamp, 1);
-        windowStateMap.put(sessionWindow, new WindowState());
-        sessionWindowToAssign = sessionWindow;
-        break;
-      }
-      case 1: {
-        // There is already one existing window within the minimum gap. See 
whether we need to extend the time of that window
-        Map.Entry<Window.SessionWindow<KeyT>, AccumT> sessionWindowEntry = 
sessionEntries.iterator().next();
-        Window.SessionWindow<KeyT> sessionWindow = sessionWindowEntry.getKey();
-        if (sessionWindow.getBeginTimestamp() <= timestamp && timestamp < 
sessionWindow.getBeginTimestamp() + sessionWindow.getDurationMillis()) {
-          // The session window already covers the event
+    if (!(inputTuple.getValue() instanceof KeyValPair)) {
+      throw new UnsupportedOperationException("Session window require keyed 
tuples");
+    } else {
+      KeyT key = ((KeyValPair<KeyT, ?>)inputTuple.getValue()).getKey();
+      WindowOption.SessionWindows sessionWindowOption = 
(WindowOption.SessionWindows)windowOption;
+      SessionWindowedStorage<KeyT, AccumT> sessionStorage = 
(SessionWindowedStorage<KeyT, AccumT>)dataStorage;
+      Collection<Map.Entry<Window.SessionWindow<KeyT>, AccumT>> sessionEntries 
= sessionStorage.getSessionEntries(key, timestamp, 
sessionWindowOption.getMinGap().getMillis());
+      Window.SessionWindow<KeyT> sessionWindowToAssign;
+      switch (sessionEntries.size()) {
+        case 0: {
+          // There are no existing windows within the minimum gap. Create a 
new session window
+          Window.SessionWindow<KeyT> sessionWindow = new 
Window.SessionWindow<>(key, timestamp, 1);
+          windowStateMap.put(sessionWindow, new WindowState());
           sessionWindowToAssign = sessionWindow;
-        } else {
-          // The session window does not cover the event but is within the min 
gap
+          break;
+        }
+        case 1: {
+          // There is already one existing window within the minimum gap. See 
whether we need to extend the time of that window
+          Map.Entry<Window.SessionWindow<KeyT>, AccumT> sessionWindowEntry = 
sessionEntries.iterator().next();
+          Window.SessionWindow<KeyT> sessionWindow = 
sessionWindowEntry.getKey();
+          if (sessionWindow.getBeginTimestamp() <= timestamp && timestamp < 
sessionWindow.getBeginTimestamp() + sessionWindow.getDurationMillis()) {
+            // The session window already covers the event
+            sessionWindowToAssign = sessionWindow;
+          } else {
+            // The session window does not cover the event but is within the 
min gap
+            if (triggerOption != null &&
+                triggerOption.getAccumulationMode() == 
TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) {
+              // fire a retraction trigger because the session window will be 
enlarged
+              fireRetractionTrigger(sessionWindow, false);
+            }
+            // create a new session window that covers the timestamp
+            long newBeginTimestamp = 
Math.min(sessionWindow.getBeginTimestamp(), timestamp);
+            long newEndTimestamp = Math.max(sessionWindow.getBeginTimestamp() 
+ sessionWindow.getDurationMillis(), timestamp + 1);
+            Window.SessionWindow<KeyT> newSessionWindow =
+                new Window.SessionWindow<>(key, newBeginTimestamp, 
newEndTimestamp - newBeginTimestamp);
+            windowStateMap.remove(sessionWindow);
+            sessionStorage.migrateWindow(sessionWindow, newSessionWindow);
+            windowStateMap.put(newSessionWindow, new WindowState());
+            sessionWindowToAssign = newSessionWindow;
+          }
+          break;
+        }
+        case 2: {
+          // There are two windows that fall within the minimum gap of the 
timestamp. We need to merge the two windows
+          Iterator<Map.Entry<Window.SessionWindow<KeyT>, AccumT>> iterator = 
sessionEntries.iterator();
+          Map.Entry<Window.SessionWindow<KeyT>, AccumT> sessionWindowEntry1 = 
iterator.next();
+          Map.Entry<Window.SessionWindow<KeyT>, AccumT> sessionWindowEntry2 = 
iterator.next();
+          Window.SessionWindow<KeyT> sessionWindow1 = 
sessionWindowEntry1.getKey();
+          Window.SessionWindow<KeyT> sessionWindow2 = 
sessionWindowEntry2.getKey();
+          AccumT sessionData1 = sessionWindowEntry1.getValue();
+          AccumT sessionData2 = sessionWindowEntry2.getValue();
           if (triggerOption != null &&
               triggerOption.getAccumulationMode() == 
TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) {
-            // fire a retraction trigger because the session window will be 
enlarged
-            fireRetractionTrigger(sessionWindow, false);
+            // fire a retraction trigger because the two session windows will 
be merged to a new window
+            fireRetractionTrigger(sessionWindow1, false);
+            fireRetractionTrigger(sessionWindow2, false);
           }
-          // create a new session window that covers the timestamp
-          long newBeginTimestamp = Math.min(sessionWindow.getBeginTimestamp(), 
timestamp);
-          long newEndTimestamp = Math.max(sessionWindow.getBeginTimestamp() + 
sessionWindow.getDurationMillis(), timestamp + 1);
-          Window.SessionWindow<KeyT> newSessionWindow =
-              new Window.SessionWindow<>(key, newBeginTimestamp, 
newEndTimestamp - newBeginTimestamp);
-          windowStateMap.remove(sessionWindow);
-          sessionStorage.migrateWindow(sessionWindow, newSessionWindow);
+          long newBeginTimestamp = 
Math.min(sessionWindow1.getBeginTimestamp(), 
sessionWindow2.getBeginTimestamp());
+          long newEndTimestamp = Math.max(sessionWindow1.getBeginTimestamp() + 
sessionWindow1.getDurationMillis(),
+              sessionWindow2.getBeginTimestamp() + 
sessionWindow2.getDurationMillis());
+
+          Window.SessionWindow<KeyT> newSessionWindow = new 
Window.SessionWindow<>(key, newBeginTimestamp, newEndTimestamp - 
newBeginTimestamp);
+          AccumT newSessionData = accumulation.merge(sessionData1, 
sessionData2);
+          sessionStorage.remove(sessionWindow1);
+          sessionStorage.remove(sessionWindow2);
+          sessionStorage.put(newSessionWindow, key, newSessionData);
+          windowStateMap.remove(sessionWindow1);
+          windowStateMap.remove(sessionWindow2);
           windowStateMap.put(newSessionWindow, new WindowState());
           sessionWindowToAssign = newSessionWindow;
+          break;
         }
-        break;
-      }
-      case 2: {
-        // There are two windows that fall within the minimum gap of the 
timestamp. We need to merge the two windows
-        Iterator<Map.Entry<Window.SessionWindow<KeyT>, AccumT>> iterator = 
sessionEntries.iterator();
-        Map.Entry<Window.SessionWindow<KeyT>, AccumT> sessionWindowEntry1 = 
iterator.next();
-        Map.Entry<Window.SessionWindow<KeyT>, AccumT> sessionWindowEntry2 = 
iterator.next();
-        Window.SessionWindow<KeyT> sessionWindow1 = 
sessionWindowEntry1.getKey();
-        Window.SessionWindow<KeyT> sessionWindow2 = 
sessionWindowEntry2.getKey();
-        AccumT sessionData1 = sessionWindowEntry1.getValue();
-        AccumT sessionData2 = sessionWindowEntry2.getValue();
-        if (triggerOption != null &&
-            triggerOption.getAccumulationMode() == 
TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) {
-          // fire a retraction trigger because the two session windows will be 
merged to a new window
-          fireRetractionTrigger(sessionWindow1, false);
-          fireRetractionTrigger(sessionWindow2, false);
-        }
-        long newBeginTimestamp = Math.min(sessionWindow1.getBeginTimestamp(), 
sessionWindow2.getBeginTimestamp());
-        long newEndTimestamp = Math.max(sessionWindow1.getBeginTimestamp() + 
sessionWindow1.getDurationMillis(),
-            sessionWindow2.getBeginTimestamp() + 
sessionWindow2.getDurationMillis());
-
-        Window.SessionWindow<KeyT> newSessionWindow = new 
Window.SessionWindow<>(key, newBeginTimestamp, newEndTimestamp - 
newBeginTimestamp);
-        AccumT newSessionData = accumulation.merge(sessionData1, sessionData2);
-        sessionStorage.remove(sessionWindow1);
-        sessionStorage.remove(sessionWindow2);
-        sessionStorage.put(newSessionWindow, key, newSessionData);
-        windowStateMap.remove(sessionWindow1);
-        windowStateMap.remove(sessionWindow2);
-        windowStateMap.put(newSessionWindow, new WindowState());
-        sessionWindowToAssign = newSessionWindow;
-        break;
+        default:
+          throw new IllegalStateException("There are more than two sessions 
matching one timestamp");
       }
-      default:
-        throw new IllegalStateException("There are more than two sessions 
matching one timestamp");
+      return 
Collections.<Window.SessionWindow>singleton(sessionWindowToAssign);
     }
-    return Collections.<Window.SessionWindow>singleton(sessionWindowToAssign);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorImpl.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorImpl.java
new file mode 100644
index 0000000..38eeff0
--- /dev/null
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorImpl.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import org.apache.apex.malhar.lib.window.MergeAccumulation;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
+
+/**
+ * Windowed Merge Operator to merge two streams together. It aggregates tuple 
from two
+ * input streams, perform merge operation base on its merge accumulation, and 
output one
+ * result stream to downstream.
+ *
+ * @param <InputT1> The type of input tuple from first stream.
+ * @param <InputT2> The type of input tuple from first stream.
+ * @param <AccumT> The type of the accumulated value in the operator state per 
key per window.
+ * @param <OutputT> The type of output tuple.
+ */
+public class WindowedMergeOperatorImpl<InputT1, InputT2, AccumT, OutputT>
+    extends AbstractWindowedMergeOperator<InputT1, InputT2, OutputT, 
WindowedStorage.WindowedPlainStorage<AccumT>, 
WindowedStorage.WindowedPlainStorage<OutputT>, MergeAccumulation<InputT1, 
InputT2, AccumT, OutputT>>
+{
+  private abstract class AccumFunction<T>
+  {
+    abstract AccumT accumulate(AccumT accum, T value);
+  }
+
+  private <T> void accumulateTupleHelper(Tuple.WindowedTuple<T> tuple, 
AccumFunction<T> accumFn)
+  {
+    for (Window window : tuple.getWindows()) {
+      // process each window
+      AccumT accum = dataStorage.get(window);
+      if (accum == null) {
+        accum = accumulation.defaultAccumulatedValue();
+      }
+      dataStorage.put(window, accumFn.accumulate(accum, tuple.getValue()));
+    }
+  }
+
+  @Override
+  public void accumulateTuple2(Tuple.WindowedTuple<InputT2> tuple)
+  {
+    accumulateTupleHelper(tuple, new AccumFunction<InputT2>()
+    {
+      @Override
+      AccumT accumulate(AccumT accum, InputT2 value)
+      {
+        return accumulation.accumulate2(accum, value);
+      }
+    });
+  }
+
+  @Override
+  public void accumulateTuple(Tuple.WindowedTuple<InputT1> tuple)
+  {
+    accumulateTupleHelper(tuple, new AccumFunction<InputT1>()
+    {
+      @Override
+      AccumT accumulate(AccumT accum, InputT1 value)
+      {
+        return accumulation.accumulate(accum, value);
+      }
+    });
+  }
+
+  @Override
+  public void fireNormalTrigger(Window window, boolean fireOnlyUpdatedPanes)
+  {
+    AccumT accumulatedValue = dataStorage.get(window);
+    OutputT outputValue = accumulation.getOutput(accumulatedValue);
+
+    if (fireOnlyUpdatedPanes) {
+      OutputT oldValue = retractionStorage.get(window);
+      if (oldValue != null && oldValue.equals(outputValue)) {
+        return;
+      }
+    }
+    output.emit(new Tuple.WindowedTuple<>(window, outputValue));
+    if (retractionStorage != null) {
+      retractionStorage.put(window, outputValue);
+    }
+  }
+
+  @Override
+  public void fireRetractionTrigger(Window window, boolean 
firingOnlyUpdatedPanes)
+  {
+    if (triggerOption.getAccumulationMode() != 
TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) {
+      throw new UnsupportedOperationException();
+    }
+    OutputT oldValue = retractionStorage.get(window);
+    if (oldValue != null) {
+      output.emit(new Tuple.WindowedTuple<>(window, 
accumulation.getRetraction(oldValue)));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/CoGroupTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/CoGroupTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/CoGroupTest.java
new file mode 100644
index 0000000..d988081
--- /dev/null
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/CoGroupTest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.List;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link InnerJoin}.
+ */
+public class CoGroupTest
+{
+
+  @Test
+  public void CoGroupTest()
+  {
+    CoGroup<Long> cg = new CoGroup<Long>();
+    List<Set<Long>> accu = cg.defaultAccumulatedValue();
+
+    Assert.assertEquals(2, accu.size());
+    for (int i = 0; i < 2; i++) {
+      Assert.assertEquals(0, accu.get(i).size());
+    }
+
+    for (long i = 1; i <= 3; i++) {
+      accu = cg.accumulate(accu, i);
+      accu = cg.accumulate2(accu, i * 2);
+    }
+
+    for (int i = 0; i < 2; i++) {
+      Assert.assertEquals(3, accu.get(i).size());
+    }
+
+    Assert.assertEquals(2, cg.getOutput(accu).size());
+    for (int i = 0; i < 2; i++) {
+      Assert.assertEquals(3, cg.getOutput(accu).get(i).size());
+    }
+    Assert.assertTrue(1 == cg.getOutput(accu).get(0).get(0));
+    Assert.assertTrue(4 == cg.getOutput(accu).get(1).get(1));
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/InnerJoinTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/InnerJoinTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/InnerJoinTest.java
new file mode 100644
index 0000000..1a379a4
--- /dev/null
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/InnerJoinTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.List;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link InnerJoin}.
+ */
+public class InnerJoinTest
+{
+  @Test
+  public void CombineTest()
+  {
+    InnerJoin<Long> cb = new InnerJoin<Long>();
+    List<Set<Long>> accu = cb.defaultAccumulatedValue();
+
+    Assert.assertEquals(2, accu.size());
+    for (int i = 0; i < 2; i++) {
+      Assert.assertEquals(0, accu.get(i).size());
+    }
+
+    for (long i = 1; i <= 3; i++) {
+      accu = cb.accumulate(accu, i);
+      accu = cb.accumulate2(accu, i * 2);
+    }
+
+    for (int i = 0; i < 2; i++) {
+      Assert.assertEquals(3, accu.get(i).size());
+    }
+
+    Assert.assertEquals(9, cb.getOutput(accu).size());
+    for (int i = 0; i < 9; i++) {
+      Assert.assertEquals(2, cb.getOutput(accu).get(i).size());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
new file mode 100644
index 0000000..63690d1
--- /dev/null
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link PojoInnerJoin}.
+ */
+public class PojoInnerJoinTest
+{
+
+  public static class TestPojo1
+  {
+    private int uId;
+    private String uName;
+
+    public TestPojo1()
+    {
+
+    }
+
+    public TestPojo1(int id, String name)
+    {
+      this.uId = id;
+      this.uName = name;
+    }
+
+    public int getuId()
+    {
+      return uId;
+    }
+
+    public void setuId(int uId)
+    {
+      this.uId = uId;
+    }
+
+    public String getuName()
+    {
+      return uName;
+    }
+
+    public void setuName(String uName)
+    {
+      this.uName = uName;
+    }
+  }
+
+  public static class TestPojo2
+  {
+    private int uId;
+    private String dep;
+
+    public TestPojo2()
+    {
+
+    }
+
+    public TestPojo2(int id, String dep)
+    {
+      this.uId = id;
+      this.dep = dep;
+    }
+
+    public int getuId()
+    {
+      return uId;
+    }
+
+    public void setuId(int uId)
+    {
+      this.uId = uId;
+    }
+
+    public String getDep()
+    {
+      return dep;
+    }
+
+    public void setDep(String dep)
+    {
+      this.dep = dep;
+    }
+  }
+
+
+  @Test
+  public void PojoInnerJoinTest()
+  {
+    PojoInnerJoin<TestPojo1, TestPojo2> pij = new PojoInnerJoin<>(2, "uId", 
"uId");
+
+    List<List<Map<String, Object>>> accu = pij.defaultAccumulatedValue();
+
+    Assert.assertEquals(2, accu.size());
+
+    accu = pij.accumulate(accu, new TestPojo1(1, "Josh"));
+    accu = pij.accumulate(accu, new TestPojo1(2, "Bob"));
+
+    accu = pij.accumulate2(accu, new TestPojo2(1, "CS"));
+    accu = pij.accumulate2(accu, new TestPojo2(3, "ECE"));
+
+    Map<String, Object> result = new HashMap<>();
+    result.put("uId", 1);
+    result.put("uName", "Josh");
+    result.put("dep", "CS");
+
+    Assert.assertEquals(1, pij.getOutput(accu).size());
+    Assert.assertEquals(result, pij.getOutput(accu).get(0));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/test/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedMergeOperatorTestApplication.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedMergeOperatorTestApplication.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedMergeOperatorTestApplication.java
new file mode 100644
index 0000000..15e3d4e
--- /dev/null
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedMergeOperatorTestApplication.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.joda.time.Duration;
+
+import org.apache.apex.malhar.lib.window.ControlTuple;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.WindowState;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
+import org.apache.apex.malhar.lib.window.accumulation.InnerJoin;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * Example application using {@link KeyedWindowedMergeOperatorImpl}. 
Generators send streams of key-value pairs of
+ * <{@link String}, {@link Integer}>, Merge operator combines the two streams 
base on the key of the tuple.
+ */
+public class KeyedWindowedMergeOperatorTestApplication implements 
StreamingApplication
+{
+  private static WindowedStorage.WindowedPlainStorage<WindowState> 
windowStateMap = new InMemoryWindowedStorage<>();
+  private static final long windowDuration = 1000;
+  private static final String[] keys = new String[]{"A", "B", "C", "D", "E"};
+
+
+  public static Window.TimeWindow assignTestWindow(long timestamp)
+  {
+    long beginTimestamp = timestamp - timestamp % windowDuration;
+    Window.TimeWindow window = new Window.TimeWindow(beginTimestamp, 
windowDuration);
+    if (!windowStateMap.containsWindow(window)) {
+      windowStateMap.put(window, new WindowState());
+    }
+    return window;
+  }
+
+  public static class NumGen1 extends BaseOperator implements InputOperator
+  {
+    private int i;
+    private long watermarkTime;
+    private long startingTime;
+
+    public final transient 
DefaultOutputPort<Tuple.WindowedTuple<KeyValPair<String, Integer>>> output = 
new DefaultOutputPort<>();
+    public final transient DefaultOutputPort<ControlTuple> 
watermarkDefaultOutputPort = new DefaultOutputPort<>();
+
+    @Override
+    public void setup(Context.OperatorContext context)
+    {
+      super.setup(context);
+      startingTime = System.currentTimeMillis();
+      watermarkTime = System.currentTimeMillis() + 10000;
+      i = 1;
+    }
+
+    @Override
+    public void emitTuples()
+    {
+      while (i <= 20) {
+        if (System.currentTimeMillis() - startingTime >= (i + 1) * 400) {
+          output.emit(new Tuple.WindowedTuple<KeyValPair<String, 
Integer>>(assignTestWindow(System.currentTimeMillis()), new KeyValPair<String, 
Integer>(keys[i % 5], i)));
+          i++;
+        }
+      }
+    }
+
+    @Override
+    public void endWindow()
+    {
+      if (i <= 20) {
+        watermarkDefaultOutputPort.emit(new WatermarkImpl(watermarkTime));
+      }
+    }
+  }
+
+  public static class NumGen2 extends BaseOperator implements InputOperator
+  {
+    private int i;
+    private long watermarkTime;
+    private long startingTime;
+
+    public final transient 
DefaultOutputPort<Tuple.WindowedTuple<KeyValPair<String, Integer>>> output = 
new DefaultOutputPort<>();
+    public final transient DefaultOutputPort<ControlTuple> 
watermarkDefaultOutputPort = new DefaultOutputPort<>();
+
+
+    @Override
+    public void setup(Context.OperatorContext context)
+    {
+      super.setup(context);
+      startingTime = System.currentTimeMillis();
+      watermarkTime = System.currentTimeMillis() + 10000;
+      i = 1;
+    }
+
+    @Override
+    public void emitTuples()
+    {
+      while (i <= 20) {
+        if (System.currentTimeMillis() - startingTime >= (i + 1) * 400) {
+          output.emit(new Tuple.WindowedTuple<KeyValPair<String, 
Integer>>(assignTestWindow(System.currentTimeMillis()), new KeyValPair<String, 
Integer>(keys[i % 5], 10 * i)));
+          i++;
+        }
+      }
+    }
+
+    @Override
+    public void endWindow()
+    {
+      if (i <= 20) {
+        watermarkDefaultOutputPort.emit(new WatermarkImpl(watermarkTime));
+      }
+    }
+  }
+
+  public static class Collector extends BaseOperator
+  {
+    public static Map<String, List<List<Integer>>> result = new HashMap<>();
+
+    public final transient 
DefaultOutputPort<Tuple.WindowedTuple<KeyValPair<String, List<List<Integer>>>>> 
output = new DefaultOutputPort<>();
+
+    public final transient 
DefaultInputPort<Tuple.WindowedTuple<KeyValPair<String, List<List<Integer>>>>> 
input = new DefaultInputPort<Tuple.WindowedTuple<KeyValPair<String, 
List<List<Integer>>>>>()
+    {
+      @Override
+      public void process(Tuple.WindowedTuple<KeyValPair<String, 
List<List<Integer>>>> tuple)
+      {
+        result.put(tuple.getValue().getKey(), tuple.getValue().getValue());
+        output.emit(tuple);
+      }
+    };
+  }
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    KeyedWindowedMergeOperatorImpl<String, Integer, Integer, 
List<Set<Integer>>, List<List<Integer>>> op
+        = dag.addOperator("Merge", new KeyedWindowedMergeOperatorImpl<String, 
Integer, Integer, List<Set<Integer>>, List<List<Integer>>>());
+
+    //op.setAccumulation(new CoGroup<Integer>());
+    op.setAccumulation(new InnerJoin<Integer>());
+
+    op.setDataStorage(new InMemoryWindowedKeyedStorage<String, 
List<Set<Integer>>>());
+    op.setRetractionStorage(new InMemoryWindowedKeyedStorage<String, 
List<List<Integer>>>());
+    op.setWindowStateStorage(windowStateMap);
+
+    // Can select one of the following window options, or don't select any of 
them.
+    op.setWindowOption(new WindowOption.GlobalWindow());
+    //op.setWindowOption(new WindowOption.TimeWindows(Duration.millis(4000)));
+
+    op.setTriggerOption(new 
TriggerOption().withEarlyFiringsAtEvery(1).accumulatingFiredPanes());
+    op.setAllowedLateness(Duration.millis(500));
+
+    NumGen1 numGen1 = dag.addOperator("numGen1", new NumGen1());
+    NumGen2 numGen2 = dag.addOperator("numGen2", new NumGen2());
+
+    Collector collector = dag.addOperator("collector", new Collector());
+    ConsoleOutputOperator con = dag.addOperator("console", new 
ConsoleOutputOperator());
+
+    dag.addStream("num1", numGen1.output, op.input);
+    dag.addStream("num2", numGen2.output, op.input2);
+    dag.addStream("wm1", numGen1.watermarkDefaultOutputPort, op.controlInput);
+    dag.addStream("wm2", numGen2.watermarkDefaultOutputPort, op.controlInput2);
+
+    dag.addStream("MergedResult", op.output, collector.input);
+    dag.addStream("output", collector.output, con.input);
+  }
+
+  public static void main(String[] args) throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    lma.prepareDAG(new KeyedWindowedMergeOperatorTestApplication(), conf);
+    LocalMode.Controller lc = lma.getController();
+    lc.run(20000);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/test/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTest.java
new file mode 100644
index 0000000..7dc09d0
--- /dev/null
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTest.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import java.util.List;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.WindowState;
+import org.apache.apex.malhar.lib.window.accumulation.CoGroup;
+
+import com.google.common.base.Function;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * Unit tests for Windowed Join Operator
+ */
+public class WindowedMergeOperatorTest
+{
+  @Test
+  public void extractTimestampTest()
+  {
+    WindowedMergeOperatorImpl op = createDefaultWindowedMergeOperator();
+    Function<Integer, Long> timestampExtractor = new Function<Integer, Long>()
+    {
+      @Override
+      public Long apply(Integer input)
+      {
+        return (input * 10L);
+      }
+    };
+
+    Assert.assertEquals(1000L, op.extractTimestamp(new 
Tuple.PlainTuple<Integer>(100), timestampExtractor));
+    Assert.assertEquals(2000L, op.extractTimestamp(new 
Tuple.PlainTuple<Integer>(200), timestampExtractor));
+    Assert.assertEquals(200L, op.extractTimestamp(new 
Tuple.TimestampedTuple<Integer>(200L, 10), null));
+  }
+
+
+  @Test
+  public void windowedMergeOperatorMergeTest()
+  {
+    WindowedMergeOperatorImpl<Integer, Integer, List<Set<Integer>>, 
List<List<Integer>>> op = createDefaultWindowedMergeOperator();
+    Window global = Window.GlobalWindow.INSTANCE;
+    op.setDataStorage(new InMemoryWindowedStorage<List<Set<Integer>>>());
+    op.setWindowOption(new WindowOption.GlobalWindow());
+    
op.initializeWindowStates(AbstractWindowedOperator.GLOBAL_WINDOW_SINGLETON_SET);
+
+    op.processTuple(new Tuple.WindowedTuple<Integer>(global, 100));
+    Assert.assertEquals(1, op.dataStorage.get(global).get(0).size());
+    op.processTuple2(new Tuple.WindowedTuple<Integer>(global, 200));
+    Assert.assertEquals(1, op.dataStorage.get(global).get(1).size());
+    op.processTuple(new Tuple.WindowedTuple<Integer>(global, 300));
+    Assert.assertEquals(2, op.dataStorage.get(global).get(0).size());
+    Assert.assertEquals(2, 
op.accumulation.getOutput(op.dataStorage.get(global)).size());
+  }
+
+  @Test
+  public void keyedWindowedMergeOperatorMergeTest()
+  {
+    KeyedWindowedMergeOperatorImpl<String, Integer, Integer, 
List<Set<Integer>>, List<List<Integer>>> op
+        = createDefaultKeyedWindowedMergeOperator();
+    Window global = Window.GlobalWindow.INSTANCE;
+    op.setDataStorage(new InMemoryWindowedKeyedStorage<String, 
List<Set<Integer>>>());
+    op.setWindowOption(new WindowOption.GlobalWindow());
+    
op.initializeWindowStates(AbstractWindowedOperator.GLOBAL_WINDOW_SINGLETON_SET);
+
+    op.processTuple(new Tuple.WindowedTuple<KeyValPair<String, 
Integer>>(global, new KeyValPair<String, Integer>("A", 100)));
+    Assert.assertEquals(1, op.dataStorage.get(global, "A").get(0).size());
+    Assert.assertTrue(op.dataStorage.get(global, "A").get(0).contains(100));
+
+    op.processTuple2(new Tuple.WindowedTuple<KeyValPair<String, 
Integer>>(global, new KeyValPair<String, Integer>("A", 200)));
+    Assert.assertEquals(1, op.dataStorage.get(global, "A").get(1).size());
+    Assert.assertTrue(op.dataStorage.get(global, "A").get(1).contains(200));
+
+    op.processTuple2(new Tuple.WindowedTuple<KeyValPair<String, 
Integer>>(global, new KeyValPair<String, Integer>("B", 300)));
+    Assert.assertEquals(1, op.dataStorage.get(global, "A").get(1).size());
+    Assert.assertEquals(1, op.dataStorage.get(global, "B").get(1).size());
+    Assert.assertTrue(op.dataStorage.get(global, "B").get(1).contains(300));
+
+    Assert.assertEquals(2, 
op.accumulation.getOutput(op.dataStorage.get(global, "A")).size());
+  }
+
+  @Test
+  public void windowedMergeOperatorWatermarkTest()
+  {
+    WindowedMergeOperatorImpl op = createDefaultWindowedMergeOperator();
+    CollectorTestSink<WatermarkImpl> sink = new CollectorTestSink<>();
+    op.controlOutput.setSink(sink);
+
+    // No watermark is generated if the Merge operator haven't seen all 
watermarks from all input streams.
+    op.controlInput.process(new WatermarkImpl(1000000));
+    op.endWindow();
+    Assert.assertEquals(-1, op.currentWatermark);
+    Assert.assertEquals(0, sink.collectedTuples.size());
+
+    // Once both input streams sent watermarks to Merge operator, it should 
generate a watermark and send to downstream.
+    op.controlInput2.process(new WatermarkImpl(200000));
+    op.endWindow();
+    Assert.assertEquals(200000, op.currentWatermark);
+    Assert.assertEquals(1, sink.collectedTuples.size());
+
+    // If the minimum of the latest input watermarks changes, Merge operator 
should also generate a new watermark.
+    op.controlInput2.process(new WatermarkImpl(2100000));
+    op.endWindow();
+    Assert.assertEquals(1000000, op.currentWatermark);
+    Assert.assertEquals(2, sink.collectedTuples.size());
+
+    // Current watermark of Merge operator could only change during 
endWindow() event.
+    op.controlInput.process(new WatermarkImpl(1100000));
+    Assert.assertEquals(1100000, op.currentWatermark);
+    op.endWindow();
+    Assert.assertEquals(3, sink.collectedTuples.size());
+
+    // If the upstreams sent a watermark but the minimum of the latest input 
watermarks doesn't change, the Merge
+    // operator should not generate a new watermark, thus nothing will be sent 
to downstream.
+    op.controlInput.process(new WatermarkImpl(1100000));
+    op.endWindow();
+    Assert.assertEquals(1100000, op.currentWatermark);
+    Assert.assertEquals(3, sink.collectedTuples.size());
+  }
+
+  private WindowedMergeOperatorImpl<Integer, Integer, List<Set<Integer>>, 
List<List<Integer>>> createDefaultWindowedMergeOperator()
+  {
+    WindowedMergeOperatorImpl<Integer, Integer, List<Set<Integer>>, 
List<List<Integer>>> windowedMergeOperator = new WindowedMergeOperatorImpl<>();
+    windowedMergeOperator.setDataStorage(new 
InMemoryWindowedStorage<List<Set<Integer>>>());
+    windowedMergeOperator.setRetractionStorage(new 
InMemoryWindowedStorage<List<List<Integer>>>());
+    windowedMergeOperator.setWindowStateStorage(new 
InMemoryWindowedStorage<WindowState>());
+    windowedMergeOperator.setAccumulation(new CoGroup<Integer>());
+    return windowedMergeOperator;
+  }
+
+  private KeyedWindowedMergeOperatorImpl<String, Integer, Integer, 
List<Set<Integer>>, List<List<Integer>>> 
createDefaultKeyedWindowedMergeOperator()
+  {
+    KeyedWindowedMergeOperatorImpl<String, Integer, Integer, 
List<Set<Integer>>, List<List<Integer>>> windowedMergeOperator = new 
KeyedWindowedMergeOperatorImpl<>();
+    windowedMergeOperator.setDataStorage(new 
InMemoryWindowedKeyedStorage<String, List<Set<Integer>>>());
+    windowedMergeOperator.setRetractionStorage(new 
InMemoryWindowedKeyedStorage<String, List<List<Integer>>>());
+    windowedMergeOperator.setWindowStateStorage(new 
InMemoryWindowedStorage<WindowState>());
+    windowedMergeOperator.setAccumulation(new CoGroup<Integer>());
+    return windowedMergeOperator;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/test/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTestApplication.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTestApplication.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTestApplication.java
new file mode 100644
index 0000000..5d80388
--- /dev/null
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTestApplication.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.joda.time.Duration;
+
+import org.apache.apex.malhar.lib.window.ControlTuple;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.WindowState;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
+import org.apache.apex.malhar.lib.window.accumulation.CoGroup;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+
+/**
+ * Example application to show usage of Windowed Merge Operator. Two 
generators send out two streams of integers,
+ * the Merge operator will do a co-group to Merge all income tuples and send 
output to collector and output to console.
+ * User can choose different window options and see how the application 
behaves.
+ */
+public class WindowedMergeOperatorTestApplication implements 
StreamingApplication
+{
+  private static WindowedStorage.WindowedPlainStorage<WindowState> 
windowStateMap = new InMemoryWindowedStorage<>();
+  private static final long windowDuration = 1000;
+
+
+  public static Window.TimeWindow assignTestWindow(long timestamp)
+  {
+    long beginTimestamp = timestamp - timestamp % windowDuration;
+    Window.TimeWindow window = new Window.TimeWindow(beginTimestamp, 
windowDuration);
+    if (!windowStateMap.containsWindow(window)) {
+      windowStateMap.put(window, new WindowState());
+    }
+    return window;
+  }
+
+  public static class NumGen1 extends BaseOperator implements InputOperator
+  {
+    private int i;
+    private long watermarkTime;
+    private long startingTime;
+
+    public final transient DefaultOutputPort<Tuple.WindowedTuple<Integer>> 
output = new DefaultOutputPort<>();
+    public final transient DefaultOutputPort<ControlTuple> 
watermarkDefaultOutputPort = new DefaultOutputPort<>();
+
+    @Override
+    public void setup(Context.OperatorContext context)
+    {
+      super.setup(context);
+      startingTime = System.currentTimeMillis();
+      watermarkTime = System.currentTimeMillis() + 10000;
+      i = 1;
+    }
+
+    @Override
+    public void emitTuples()
+    {
+      while (i <= 20) {
+        if (System.currentTimeMillis() - startingTime >= (i + 1) * 400) {
+          output.emit(new 
Tuple.WindowedTuple<Integer>(assignTestWindow(System.currentTimeMillis()), i));
+          i++;
+        }
+      }
+    }
+
+    @Override
+    public void endWindow()
+    {
+      if (i <= 20) {
+        watermarkDefaultOutputPort.emit(new WatermarkImpl(watermarkTime));
+      }
+    }
+  }
+
+  public static class NumGen2 extends BaseOperator implements InputOperator
+  {
+    private int i;
+    private long watermarkTime;
+    private long startingTime;
+
+    public final transient DefaultOutputPort<Tuple.WindowedTuple<Integer>> 
output = new DefaultOutputPort<>();
+    public final transient DefaultOutputPort<ControlTuple> 
watermarkDefaultOutputPort = new DefaultOutputPort<>();
+
+
+    @Override
+    public void setup(Context.OperatorContext context)
+    {
+      super.setup(context);
+      startingTime = System.currentTimeMillis();
+      watermarkTime = System.currentTimeMillis() + 10000;
+      i = 1;
+    }
+
+    @Override
+    public void emitTuples()
+    {
+      while (i <= 20) {
+        if (System.currentTimeMillis() - startingTime >= (i + 1) * 400) {
+          output.emit(new 
Tuple.WindowedTuple<Integer>(assignTestWindow(System.currentTimeMillis()), 10 * 
i));
+          i++;
+        }
+      }
+    }
+
+    @Override
+    public void endWindow()
+    {
+      if (i <= 20) {
+        watermarkDefaultOutputPort.emit(new WatermarkImpl(watermarkTime));
+      }
+    }
+  }
+
+  public static class Collector extends BaseOperator
+  {
+    public static List<List<List<Integer>>> result = new ArrayList<>();
+
+    public final transient DefaultOutputPort<Tuple<List<List<Integer>>>> 
output = new DefaultOutputPort<>();
+
+    public final transient DefaultInputPort<Tuple<List<List<Integer>>>> input 
= new DefaultInputPort<Tuple<List<List<Integer>>>>()
+    {
+      @Override
+      public void process(Tuple<List<List<Integer>>> tuple)
+      {
+        result.add(tuple.getValue());
+        output.emit(tuple);
+      }
+    };
+  }
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    WindowedMergeOperatorImpl<Integer, Integer, List<Set<Integer>>, 
List<List<Integer>>> op
+        = dag.addOperator("Merge", new WindowedMergeOperatorImpl<Integer, 
Integer, List<Set<Integer>>, List<List<Integer>>>());
+    op.setAccumulation(new CoGroup<Integer>());
+    op.setDataStorage(new InMemoryWindowedStorage<List<Set<Integer>>>());
+    op.setRetractionStorage(new 
InMemoryWindowedStorage<List<List<Integer>>>());
+    op.setWindowStateStorage(windowStateMap);
+
+    // Can select one of the following window options, or don't select any of 
them.
+    //op.setWindowOption(new WindowOption.GlobalWindow());
+    op.setWindowOption(new WindowOption.TimeWindows(Duration.millis(2000)));
+
+    op.setTriggerOption(new 
TriggerOption().withEarlyFiringsAtEvery(1).accumulatingFiredPanes());
+    op.setAllowedLateness(Duration.millis(500));
+
+    NumGen1 numGen1 = dag.addOperator("numGen1", new NumGen1());
+    NumGen2 numGen2 = dag.addOperator("numGen2", new NumGen2());
+
+    Collector collector = dag.addOperator("collector", new Collector());
+    ConsoleOutputOperator con = dag.addOperator("console", new 
ConsoleOutputOperator());
+
+    dag.addStream("num1", numGen1.output, op.input);
+    dag.addStream("num2", numGen2.output, op.input2);
+    dag.addStream("wm1", numGen1.watermarkDefaultOutputPort, op.controlInput);
+    dag.addStream("wm2", numGen2.watermarkDefaultOutputPort, op.controlInput2);
+
+    dag.addStream("MergedResult", op.output, collector.input);
+    dag.addStream("output", collector.output, con.input);
+  }
+
+  public static void main(String[] args) throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    lma.prepareDAG(new WindowedMergeOperatorTestApplication(), conf);
+    LocalMode.Controller lc = lma.getController();
+    lc.run(20000);
+  }
+}

Reply via email to