Repository: apex-malhar
Updated Branches:
  refs/heads/master fc8b674e3 -> 4b36bf3e5


APEXMALHAR-2417 Adding Pojo Outer join accumulation


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/4b36bf3e
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/4b36bf3e
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/4b36bf3e

Branch: refs/heads/master
Commit: 4b36bf3e59ecea40e7f3cc1a70636ca263c5f21f
Parents: fc8b674
Author: Hitesh-Scorpio <[email protected]>
Authored: Tue Feb 28 11:50:55 2017 +0530
Committer: Hitesh-Scorpio <[email protected]>
Committed: Thu Mar 2 15:31:56 2017 +0530

----------------------------------------------------------------------
 .../window/accumulation/AbstractPojoJoin.java   | 255 +++++++++++++++++
 .../window/accumulation/PojoFullOuterJoin.java  | 103 +++++++
 .../lib/window/accumulation/PojoInnerJoin.java  | 210 +-------------
 .../window/accumulation/PojoLeftOuterJoin.java  |  63 +++++
 .../window/accumulation/PojoRightOuterJoin.java |  63 +++++
 .../window/accumulation/PojoOuterJoinTest.java  | 280 +++++++++++++++++++
 6 files changed, 771 insertions(+), 203 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4b36bf3e/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java
new file mode 100644
index 0000000..354dd90
--- /dev/null
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.apex.malhar.lib.window.MergeAccumulation;
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.base.Throwables;
+
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Join Accumulation for Pojo Streams.
+ *
+ */
[email protected]
+public abstract class AbstractPojoJoin<InputT1, InputT2>
+    implements MergeAccumulation<InputT1, InputT2, List<List<Map<String, 
Object>>>, List<?>>
+{
+  protected final String[] keys;
+  protected final Class<?> outClass;
+  private transient Map<String,PojoUtils.Getter> gettersStream1;
+  private transient Map<String,PojoUtils.Getter> gettersStream2;
+  private transient Map<String,PojoUtils.Setter> setters;
+  protected transient Set<String> keySetStream2;
+  protected transient Set<String> keySetStream1;
+
+  public AbstractPojoJoin()
+  {
+    keys = new String[]{};
+    outClass = null;
+  }
+
+  public AbstractPojoJoin(Class<?> outClass, String... keys)
+  {
+    if (keys.length % 2 != 0) {
+      throw new IllegalArgumentException("Wrong number of keys.");
+    }
+
+    this.keys = Arrays.copyOf(keys, keys.length);
+    this.outClass = outClass;
+  }
+
+  private void createSetters()
+  {
+    Field[] fields = outClass.getDeclaredFields();
+    setters = new HashMap<>();
+    for (Field field : fields) {
+      Class outputField = ClassUtils.primitiveToWrapper(field.getType());
+      String fieldName = field.getName();
+      
setters.put(fieldName,PojoUtils.createSetter(outClass,fieldName,outputField));
+    }
+  }
+
+  private Map<String,PojoUtils.Getter> createGetters(Class<?> input)
+  {
+    Field[] fields = input.getDeclaredFields();
+    Map<String,PojoUtils.Getter> getters = new HashMap<>();
+    for (Field field : fields) {
+      Class inputField = ClassUtils.primitiveToWrapper(field.getType());
+      String fieldName = field.getName();
+      getters.put(fieldName,PojoUtils.createGetter(input, fieldName, 
inputField));
+    }
+    return getters;
+  }
+
+  @Override
+  public List<List<Map<String, Object>>> accumulate(List<List<Map<String, 
Object>>> accumulatedValue, InputT1 input)
+  {
+    if (gettersStream1 == null) {
+      gettersStream1 = createGetters(input.getClass());
+    }
+    try {
+      return accumulateWithIndex(0, accumulatedValue, input);
+    } catch (NoSuchFieldException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public List<List<Map<String, Object>>> accumulate2(List<List<Map<String, 
Object>>> accumulatedValue, InputT2 input)
+  {
+    if (gettersStream2 == null) {
+      gettersStream2 = createGetters(input.getClass());
+    }
+    try {
+      return accumulateWithIndex(1, accumulatedValue, input);
+    } catch (NoSuchFieldException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+
+  @Override
+  public List<List<Map<String, Object>>> defaultAccumulatedValue()
+  {
+    List<List<Map<String, Object>>> accu = new ArrayList<>();
+    for (int i = 0; i < 2; i++) {
+      accu.add(new ArrayList<Map<String, Object>>());
+    }
+    return accu;
+  }
+
+
+  private List<List<Map<String, Object>>> accumulateWithIndex(int index, 
List<List<Map<String, Object>>> accu, Object input) throws NoSuchFieldException
+  {
+    // TODO: If a stream never sends out any tuple during one window, a wrong 
key would not be detected.
+
+    List<Map<String, Object>> curList = accu.get(index);
+    Map map = pojoToMap(input,index + 1);
+    curList.add(map);
+    accu.set(index, curList);
+
+    return accu;
+  }
+
+  private Map<String, Object> pojoToMap(Object input, int streamIndex)
+  {
+    Map<String, Object> map = new HashMap<>();
+    Map<String,PojoUtils.Getter> gettersStream = streamIndex == 1 ? 
gettersStream1 : gettersStream2;
+
+    for (Map.Entry<String, PojoUtils.Getter> getter : 
gettersStream.entrySet()) {
+      try {
+        Object value = getter.getValue().get(input);
+        map.put(getter.getKey(), value);
+      } catch (Exception e) {
+        throw Throwables.propagate(e);
+      }
+    }
+    return map;
+  }
+
+  @Override
+  public List<List<Map<String, Object>>> merge(List<List<Map<String, Object>>> 
accumulatedValue1, List<List<Map<String, Object>>> accumulatedValue2)
+  {
+    for (int i = 0; i < 2; i++) {
+      List<Map<String, Object>> curList = accumulatedValue1.get(i);
+      curList.addAll(accumulatedValue2.get(i));
+      accumulatedValue1.set(i, curList);
+    }
+    return accumulatedValue1;
+  }
+
+  @Override
+  public List<?> getOutput(List<List<Map<String, Object>>> accumulatedValue)
+  {
+    if (setters == null) {
+      createSetters();
+      keySetStream2 = new HashSet<>();
+      keySetStream1 = new HashSet<>();
+      for (int i = 0; i < keys.length; i = i + 2) {
+        keySetStream1.add(keys[i]);
+        keySetStream2.add(keys[i + 1]);
+      }
+    }
+
+    // TODO: May need to revisit (use state manager).
+    List<Map<String, Object>> result = getAllCombo(0, accumulatedValue, null);
+
+    List<Object> out = new ArrayList<>();
+    for (Map<String, Object> resultMap : result) {
+      Object o;
+      try {
+        o = outClass.newInstance();
+      } catch (Throwable e) {
+        throw Throwables.propagate(e);
+      }
+      for (Map.Entry<String, PojoUtils.Setter> setter : setters.entrySet()) {
+        if (resultMap.get(setter.getKey()) != null) {
+          setter.getValue().set(o, resultMap.get(setter.getKey()));
+        }
+      }
+      out.add(o);
+    }
+
+    return out;
+  }
+
+
+  public List<Map<String, Object>> getAllCombo(int streamIndex, 
List<List<Map<String, Object>>> accu, Map<String, Object> curMap)
+  {
+    List<Map<String, Object>> result = new ArrayList<>();
+    int leftStreamIndex = getLeftStreamIndex();
+    List<Map<String, Object>> leftStream = accu.get(leftStreamIndex);
+    List<Map<String, Object>> rightStream = accu.get((leftStreamIndex + 1) % 
2);
+    for (Map<String, Object> lMap : leftStream) {
+      boolean gotMatch = false;
+      for (Map<String, Object> rMap : rightStream) {
+        Map<String, Object> tempMap = joinTwoMapsWithKeys(lMap, rMap);
+        if (tempMap != null) {
+          result.add(tempMap);
+          gotMatch = true;
+        }
+      }
+      if (!gotMatch) {
+        addNonMatchingResult(result, lMap, rightStream.get(0).keySet());
+      }
+    }
+    return result;
+  }
+
+  public abstract void addNonMatchingResult(List<Map<String, Object>> result, 
Map<String, Object> requiredMap, Set nullFields);
+
+  public abstract int getLeftStreamIndex();
+
+
+  public Map<String, Object> joinTwoMapsWithKeys(Map<String, Object> map1, 
Map<String, Object> map2)
+  {
+    for (int i = 0; i < keys.length; i = i + 2) {
+      String key1 = keys[i];
+      String key2 = keys[i + 1];
+      if (!map1.get(key1).equals(map2.get(key2))) {
+        return null;
+      }
+    }
+    for (String field : map2.keySet()) {
+      if (!keySetStream2.contains(field)) {
+        map1.put(field, map2.get(field));
+      }
+    }
+    return map1;
+  }
+
+  @Override
+  public List<?> getRetraction(List<?> value)
+  {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4b36bf3e/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java
new file mode 100644
index 0000000..8ad0467
--- /dev/null
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Full outer join Accumulation for Pojo Streams.
+ *
+ */
[email protected]
+public class PojoFullOuterJoin<InputT1, InputT2>
+    extends AbstractPojoJoin<InputT1, InputT2>
+{
+  public PojoFullOuterJoin()
+  {
+   super();
+  }
+
+  public PojoFullOuterJoin(int num, Class<?> outClass, String... keys)
+  {
+    super(outClass,keys);
+  }
+
+  @Override
+  public void addNonMatchingResult(List result, Map requiredMap, Set 
nullFields)
+  {
+    for (Object field : nullFields) {
+      if (!keySetStream2.contains(field)) {
+        requiredMap.put(field.toString(), null);
+      }
+    }
+    result.add(requiredMap);
+  }
+
+  @Override
+  public int getLeftStreamIndex()
+  {
+    return 0;
+  }
+
+  @Override
+  public List<Map<String, Object>> getAllCombo(int streamIndex, List accu, Map 
curMap)
+  {
+    List<Map<String, Object>> result = new ArrayList<>();
+    int leftStreamIndex = getLeftStreamIndex();
+    List<Map<String, Object>> leftStream = (List<Map<String, 
Object>>)accu.get(leftStreamIndex);
+    List<Map<String, Object>> rightStream = (List<Map<String, 
Object>>)accu.get((leftStreamIndex + 1) % 2);
+    Set<Map<String,Object>> unMatchedRightStream = new HashSet<>();
+
+    for (Map<String, Object> rMap : rightStream) {
+      unMatchedRightStream.add(rMap);
+    }
+    for (Map<String, Object> lMap : leftStream) {
+      boolean gotMatch = false;
+      for (Map<String, Object> rMap : rightStream) {
+        Map<String, Object> tempMap = joinTwoMapsWithKeys(lMap, rMap);
+        if (tempMap != null) {
+          result.add(tempMap);
+          gotMatch = true;
+          if (unMatchedRightStream.contains(rMap)) {
+            unMatchedRightStream.remove(rMap);
+          }
+        }
+      }
+      if (!gotMatch) {
+        addNonMatchingResult(result, lMap, rightStream.get(0).keySet());
+      }
+    }
+
+    for (Map<String, Object> rMap : unMatchedRightStream) {
+      for (Object field : leftStream.get(0).keySet()) {
+        if (!keySetStream1.contains(field)) {
+          rMap.put(field.toString(), null);
+        }
+      }
+      result.add(rMap);
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4b36bf3e/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
index 1aa55c2..ceb17dd 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
@@ -18,234 +18,38 @@
  */
 package org.apache.apex.malhar.lib.window.accumulation;
 
-import java.lang.reflect.Field;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.apex.malhar.lib.window.MergeAccumulation;
-import org.apache.commons.lang3.ClassUtils;
-
-import com.google.common.base.Throwables;
-
-import com.datatorrent.lib.util.KeyValPair;
-import com.datatorrent.lib.util.PojoUtils;
-
 /**
  * Inner join Accumulation for Pojo Streams.
  *
  * @since 3.6.0
  */
 public class PojoInnerJoin<InputT1, InputT2>
-    implements MergeAccumulation<InputT1, InputT2, List<List<Map<String, 
Object>>>, List<?>>
+    extends AbstractPojoJoin<InputT1, InputT2>
 {
-  protected final String[] keys;
-  protected final Class<?> outClass;
-  private transient List<KeyValPair<String,PojoUtils.Getter>>  gettersStream1;
-  private transient List<KeyValPair<String,PojoUtils.Getter>>  gettersStream2;
-  private transient List<KeyValPair<String,PojoUtils.Setter>>  setters;
-  private transient Set<String> keySetStream2;
-
   public PojoInnerJoin()
   {
-    keys = new String[]{};
-    outClass = null;
+   super();
   }
 
   public PojoInnerJoin(int num, Class<?> outClass, String... keys)
   {
-    if (keys.length % 2 != 0) {
-      throw new IllegalArgumentException("Wrong number of keys.");
-    }
+    super(outClass,keys);
 
-    this.keys = Arrays.copyOf(keys, keys.length);
-    this.outClass = outClass;
-  }
-
-  private void createSetters()
-  {
-    Field[] fields = outClass.getDeclaredFields();
-    setters = new ArrayList<>();
-    for (Field field : fields) {
-      Class outputField = ClassUtils.primitiveToWrapper(field.getType());
-      String fieldName = field.getName();
-      setters.add(new 
KeyValPair<>(fieldName,PojoUtils.createSetter(outClass,fieldName,outputField)));
-    }
-  }
-
-  private List<KeyValPair<String,PojoUtils.Getter>> createGetters(Class<?> 
input)
-  {
-    Field[] fields = input.getDeclaredFields();
-    List<KeyValPair<String,PojoUtils.Getter>> getters = new ArrayList<>();
-    for (Field field : fields) {
-      Class inputField = ClassUtils.primitiveToWrapper(field.getType());
-      String fieldName = field.getName();
-      getters.add(new KeyValPair<>(fieldName,PojoUtils.createGetter(input, 
fieldName, inputField)));
-    }
-    return getters;
   }
 
   @Override
-  public List<List<Map<String, Object>>> accumulate(List<List<Map<String, 
Object>>> accumulatedValue, InputT1 input)
-  {
-    if (gettersStream1 == null) {
-      gettersStream1 = createGetters(input.getClass());
-    }
-    try {
-      return accumulateWithIndex(0, accumulatedValue, input);
-    } catch (NoSuchFieldException e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  @Override
-  public List<List<Map<String, Object>>> accumulate2(List<List<Map<String, 
Object>>> accumulatedValue, InputT2 input)
-  {
-    if (gettersStream2 == null) {
-      gettersStream2 = createGetters(input.getClass());
-    }
-    try {
-      return accumulateWithIndex(1, accumulatedValue, input);
-    } catch (NoSuchFieldException e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-
-  @Override
-  public List<List<Map<String, Object>>> defaultAccumulatedValue()
-  {
-    List<List<Map<String, Object>>> accu = new ArrayList<>();
-    for (int i = 0; i < 2; i++) {
-      accu.add(new ArrayList<Map<String, Object>>());
-    }
-    return accu;
-  }
-
-
-  private List<List<Map<String, Object>>> accumulateWithIndex(int index, 
List<List<Map<String, Object>>> accu, Object input) throws NoSuchFieldException
-  {
-    // TODO: If a stream never sends out any tuple during one window, a wrong 
key would not be detected.
-
-    input.getClass().getDeclaredField(keys[index]);
-
-    List<Map<String, Object>> curList = accu.get(index);
-    Map map = pojoToMap(input,index + 1);
-    curList.add(map);
-    accu.set(index, curList);
-
-    return accu;
-  }
-
-  private Map<String, Object> pojoToMap(Object input, int streamIndex)
-  {
-    Map<String, Object> map = new HashMap<>();
-    List<KeyValPair<String,PojoUtils.Getter>> gettersStream = streamIndex == 1 
? gettersStream1 : gettersStream2;
-
-    for (KeyValPair<String,PojoUtils.Getter> getter : gettersStream) {
-      try {
-        Object value = getter.getValue().get(input);
-        map.put(getter.getKey(), value);
-      } catch (Exception e) {
-        throw Throwables.propagate(e);
-      }
-    }
-    return map;
-  }
-
-  @Override
-  public List<List<Map<String, Object>>> merge(List<List<Map<String, Object>>> 
accumulatedValue1, List<List<Map<String, Object>>> accumulatedValue2)
-  {
-    for (int i = 0; i < 2; i++) {
-      List<Map<String, Object>> curList = accumulatedValue1.get(i);
-      curList.addAll(accumulatedValue2.get(i));
-      accumulatedValue1.set(i, curList);
-    }
-    return accumulatedValue1;
-  }
-
-  @Override
-  public List<?> getOutput(List<List<Map<String, Object>>> accumulatedValue)
-  {
-    List<Map<String, Object>> result = new ArrayList<>();
-    if (setters == null) {
-      createSetters();
-      keySetStream2 = new HashSet<>();
-      for (int i = 0; i < keys.length; i = i + 2) {
-        keySetStream2.add(keys[i + 1]);
-      }
-    }
-
-    // TODO: May need to revisit (use state manager).
-    result = getAllCombo(0, accumulatedValue, result, null);
-
-    List<Object> out = new ArrayList<>();
-    for (Map<String, Object> resultMap : result) {
-      Object o;
-      try {
-        o = outClass.newInstance();
-      } catch (Throwable e) {
-        throw Throwables.propagate(e);
-      }
-      for (KeyValPair<String, PojoUtils.Setter> setter : setters) {
-        setter.getValue().set(o,resultMap.get(setter.getKey()));
-      }
-      out.add(o);
-    }
-
-    return out;
-  }
-
-
-  private List<Map<String, Object>> getAllCombo(int streamIndex, 
List<List<Map<String, Object>>> accu, List<Map<String, Object>> result, 
Map<String, Object> curMap)
-  {
-    if (streamIndex == 2) {
-      if (curMap != null) {
-        result.add(curMap);
-      }
-      return result;
-    } else {
-      for (Map<String, Object> map : accu.get(streamIndex)) {
-        if (streamIndex == 0) {
-          Map<String, Object> tempMap = new HashMap<>(map);
-          result = getAllCombo(streamIndex + 1, accu, result, tempMap);
-        } else if (curMap == null) {
-          return result;
-        } else {
-          Map<String, Object> tempMap = new HashMap<>(curMap);
-          tempMap = joinTwoMapsWithKeys(tempMap, map);
-          result = getAllCombo(streamIndex + 1, accu, result, tempMap);
-        }
-      }
-      return result;
-    }
-  }
-
-  private Map<String, Object> joinTwoMapsWithKeys(Map<String, Object> map1, 
Map<String, Object> map2)
+  public void addNonMatchingResult(List result, Map requiredMap, Set 
nullFields)
   {
-    for (int i = 0; i < keys.length; i = i + 2) {
-      String key1 = keys[i];
-      String key2 = keys[i + 1];
-      if (!map1.get(key1).equals(map2.get(key2))) {
-        return null;
-      }
-    }
-    for (String field : map2.keySet()) {
-      if (!keySetStream2.contains(field)) {
-        map1.put(field, map2.get(field));
-      }
-    }
-    return map1;
+    return;
   }
 
   @Override
-  public List<?> getRetraction(List<?> value)
+  public int getLeftStreamIndex()
   {
-    return null;
+    return 0;
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4b36bf3e/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java
new file mode 100644
index 0000000..0ee3e00
--- /dev/null
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Left Outer join Accumulation for Pojo Streams.
+ *
+ */
[email protected]
+public class PojoLeftOuterJoin<InputT1, InputT2>
+    extends AbstractPojoJoin<InputT1, InputT2>
+{
+  public PojoLeftOuterJoin()
+  {
+   super();
+  }
+
+  public PojoLeftOuterJoin(int num, Class<?> outClass, String... keys)
+  {
+    super(outClass,keys);
+
+  }
+
+  @Override
+  public void addNonMatchingResult(List result, Map requiredMap, Set 
nullFields)
+  {
+    for (Object field : nullFields) {
+      if (!keySetStream2.contains(field)) {
+        requiredMap.put(field.toString(), null);
+      }
+    }
+    result.add(requiredMap);
+  }
+
+  @Override
+  public int getLeftStreamIndex()
+  {
+    return 0;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4b36bf3e/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java
new file mode 100644
index 0000000..60b0252
--- /dev/null
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Right outer join Accumulation for Pojo Streams.
+ *
+ */
[email protected]
+public class PojoRightOuterJoin<InputT1, InputT2>
+    extends AbstractPojoJoin<InputT1, InputT2>
+{
+  public PojoRightOuterJoin()
+  {
+   super();
+  }
+
+  public PojoRightOuterJoin(int num, Class<?> outClass, String... keys)
+  {
+    super(outClass,keys);
+
+  }
+
+  @Override
+  public void addNonMatchingResult(List result, Map requiredMap, Set 
nullFields)
+  {
+    for (Object field : nullFields) {
+      if (!keySetStream1.contains(field)) {
+        requiredMap.put(field.toString(), null);
+      }
+    }
+    result.add(requiredMap);
+  }
+
+  @Override
+  public int getLeftStreamIndex()
+  {
+    return 1;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4b36bf3e/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java
new file mode 100644
index 0000000..aaa7de3
--- /dev/null
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for POJO outer join accumulations
+ */
+public class PojoOuterJoinTest
+{
+
+  public static class TestPojo1
+  {
+    private int uId;
+    private String uName;
+
+    public TestPojo1()
+    {
+
+    }
+
+    public TestPojo1(int id, String name)
+    {
+      this.uId = id;
+      this.uName = name;
+    }
+
+    public int getUId()
+    {
+      return uId;
+    }
+
+    public void setUId(int uId)
+    {
+      this.uId = uId;
+    }
+
+    public String getUName()
+    {
+      return uName;
+    }
+
+    public void setUName(String uName)
+    {
+      this.uName = uName;
+    }
+  }
+
+  public static class TestPojo3
+  {
+    private int uId;
+    private String uNickName;
+    private int age;
+
+    public TestPojo3()
+    {
+
+    }
+
+    public TestPojo3(int id, String name, int age)
+    {
+      this.uId = id;
+      this.uNickName = name;
+      this.age = age;
+    }
+
+    public int getUId()
+    {
+      return uId;
+    }
+
+    public void setUId(int uId)
+    {
+      this.uId = uId;
+    }
+
+    public String getUNickName()
+    {
+      return uNickName;
+    }
+
+    public void setUNickName(String uNickName)
+    {
+      this.uNickName = uNickName;
+    }
+
+    public int getAge()
+    {
+      return age;
+    }
+
+    public void setAge(int age)
+    {
+      this.age = age;
+    }
+  }
+
+
+  public static class TestOutClass
+  {
+    private int uId;
+    private String uName;
+    private String uNickName;
+    private int age;
+
+    public int getUId()
+    {
+      return uId;
+    }
+
+    public void setUId(int uId)
+    {
+      this.uId = uId;
+    }
+
+    public String getUName()
+    {
+      return uName;
+    }
+
+    public void setUName(String uName)
+    {
+      this.uName = uName;
+    }
+
+    public String getUNickName()
+    {
+      return uNickName;
+    }
+
+    public void setUNickName(String uNickName)
+    {
+      this.uNickName = uNickName;
+    }
+
+    public int getAge()
+    {
+      return age;
+    }
+
+    public void setAge(int age)
+    {
+      this.age = age;
+    }
+  }
+
+  public static class TestOutMultipleKeysClass
+  {
+    private int uId;
+    private String uName;
+    private int age;
+
+    public int getUId()
+    {
+      return uId;
+    }
+
+    public void setUId(int uId)
+    {
+      this.uId = uId;
+    }
+
+    public String getUName()
+    {
+      return uName;
+    }
+
+    public void setUName(String uName)
+    {
+      this.uName = uName;
+    }
+
+    public int getAge()
+    {
+      return age;
+    }
+
+    public void setAge(int age)
+    {
+      this.age = age;
+    }
+  }
+
+
+  @Test
+  public void PojoLeftOuterJoinTest()
+  {
+    PojoLeftOuterJoin<TestPojo1, TestPojo3> pij = new PojoLeftOuterJoin<>(2, 
TestOutClass.class, "uId", "uId");
+
+    List<List<Map<String, Object>>> accu = pij.defaultAccumulatedValue();
+
+    Assert.assertEquals(2, accu.size());
+
+    accu = pij.accumulate(accu, new TestPojo1(1, "Josh"));
+    accu = pij.accumulate(accu, new TestPojo1(2, "Bob"));
+
+    accu = pij.accumulate2(accu, new TestPojo3(1, "NickJosh", 12));
+    accu = pij.accumulate2(accu, new TestPojo3(3, "NickBob", 13));
+
+    Assert.assertEquals(2, pij.getOutput(accu).size());
+
+    Object o = pij.getOutput(accu).get(1);
+    Assert.assertTrue(o instanceof TestOutClass);
+    TestOutClass testOutClass = (TestOutClass)o;
+    Assert.assertEquals(2, testOutClass.getUId());
+    Assert.assertEquals("Bob", testOutClass.getUName());
+    Assert.assertEquals(0, testOutClass.getAge());
+  }
+
+  @Test
+  public void PojoRightOuterJoinTest()
+  {
+    PojoRightOuterJoin<TestPojo1, TestPojo3> pij = new PojoRightOuterJoin<>(2, 
TestOutClass.class, "uId", "uId");
+
+    List<List<Map<String, Object>>> accu = pij.defaultAccumulatedValue();
+
+    Assert.assertEquals(2, accu.size());
+
+    accu = pij.accumulate(accu, new TestPojo1(1, "Josh"));
+    accu = pij.accumulate(accu, new TestPojo1(2, "Bob"));
+
+    accu = pij.accumulate2(accu, new TestPojo3(1, "NickJosh", 12));
+    accu = pij.accumulate2(accu, new TestPojo3(3, "NickBob", 13));
+
+    Assert.assertEquals(2, pij.getOutput(accu).size());
+
+    Object o = pij.getOutput(accu).get(1);
+    Assert.assertTrue(o instanceof TestOutClass);
+    TestOutClass testOutClass = (TestOutClass)o;
+    Assert.assertEquals(3, testOutClass.getUId());
+    Assert.assertEquals(null, testOutClass.getUName());
+    Assert.assertEquals(13, testOutClass.getAge());
+  }
+
+  @Test
+  public void PojoFullOuterJoinTest()
+  {
+    PojoFullOuterJoin<TestPojo1, TestPojo3> pij = new PojoFullOuterJoin<>(2, 
TestOutClass.class, "uId", "uId");
+
+    List<List<Map<String, Object>>> accu = pij.defaultAccumulatedValue();
+
+    Assert.assertEquals(2, accu.size());
+
+    accu = pij.accumulate(accu, new TestPojo1(1, "Josh"));
+    accu = pij.accumulate(accu, new TestPojo1(2, "Bob"));
+
+    accu = pij.accumulate2(accu, new TestPojo3(1, "NickJosh", 12));
+    accu = pij.accumulate2(accu, new TestPojo3(3, "NickBob", 13));
+
+    Assert.assertEquals(3, pij.getOutput(accu).size());
+
+    Object o = pij.getOutput(accu).get(1);
+    Assert.assertTrue(o instanceof TestOutClass);
+    TestOutClass testOutClass = (TestOutClass)o;
+    Assert.assertEquals(2, testOutClass.getUId());
+    Assert.assertEquals("Bob", testOutClass.getUName());
+    Assert.assertEquals(0, testOutClass.getAge());
+  }
+}

Reply via email to