Repository: apex-malhar
Updated Branches:
  refs/heads/master 570ecaeb7 -> cb1ef764c


APEXMALHAR-2430 Optimizing Join accumulation


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/02e10754
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/02e10754
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/02e10754

Branch: refs/heads/master
Commit: 02e107547ed924caad8945971383aa081e0ec483
Parents: 6dcd821
Author: Hitesh-Scorpio <[email protected]>
Authored: Thu Mar 9 14:57:36 2017 +0530
Committer: Hitesh-Scorpio <[email protected]>
Committed: Fri Mar 10 13:56:38 2017 +0530

----------------------------------------------------------------------
 .../window/accumulation/AbstractPojoJoin.java   | 188 +++++++++----------
 .../window/accumulation/PojoFullOuterJoin.java  |  69 +++----
 .../lib/window/accumulation/PojoInnerJoin.java  |  15 +-
 .../window/accumulation/PojoLeftOuterJoin.java  |  30 ++-
 .../window/accumulation/PojoRightOuterJoin.java |  30 ++-
 .../window/accumulation/PojoInnerJoinTest.java  |   8 +-
 .../window/accumulation/PojoOuterJoinTest.java  | 101 ++++++----
 7 files changed, 241 insertions(+), 200 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/02e10754/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java
index 1634cb9..a0b3fb3 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java
@@ -20,17 +20,18 @@ package org.apache.apex.malhar.lib.window.accumulation;
 
 import java.lang.reflect.Field;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.apex.malhar.lib.window.MergeAccumulation;
 import org.apache.commons.lang3.ClassUtils;
 import org.apache.hadoop.classification.InterfaceStability;
 
 import com.google.common.base.Throwables;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
 
 import com.datatorrent.lib.util.PojoUtils;
 
@@ -40,15 +41,13 @@ import com.datatorrent.lib.util.PojoUtils;
  */
 @InterfaceStability.Evolving
 public abstract class AbstractPojoJoin<InputT1, InputT2>
-    implements MergeAccumulation<InputT1, InputT2, List<List<Map<String, 
Object>>>, List<?>>
+    implements MergeAccumulation<InputT1, InputT2, List<Multimap<List<Object>, 
Object>>, List<?>>
 {
   protected String[] keys;
   protected Class<?> outClass;
   private transient Map<String,PojoUtils.Getter> gettersStream1;
   private transient Map<String,PojoUtils.Getter> gettersStream2;
   private transient Map<String,PojoUtils.Setter> setters;
-  protected transient Set<String> keySetStream2;
-  protected transient Set<String> keySetStream1;
   protected transient String[] leftKeys;
   protected transient String[] rightKeys;
 
@@ -94,7 +93,7 @@ public abstract class AbstractPojoJoin<InputT1, InputT2>
   }
 
   @Override
-  public List<List<Map<String, Object>>> accumulate(List<List<Map<String, 
Object>>> accumulatedValue, InputT1 input)
+  public List<Multimap<List<Object>, Object>> 
accumulate(List<Multimap<List<Object>, Object>> accumulatedValue, InputT1 input)
   {
     if (gettersStream1 == null) {
       gettersStream1 = createGetters(input.getClass());
@@ -107,7 +106,7 @@ public abstract class AbstractPojoJoin<InputT1, InputT2>
   }
 
   @Override
-  public List<List<Map<String, Object>>> accumulate2(List<List<Map<String, 
Object>>> accumulatedValue, InputT2 input)
+  public List<Multimap<List<Object>, Object>> 
accumulate2(List<Multimap<List<Object>, Object>> accumulatedValue, InputT2 
input)
   {
     if (gettersStream2 == null) {
       gettersStream2 = createGetters(input.getClass());
@@ -121,136 +120,135 @@ public abstract class AbstractPojoJoin<InputT1, InputT2>
 
 
   @Override
-  public List<List<Map<String, Object>>> defaultAccumulatedValue()
+  public List<Multimap<List<Object>, Object>> defaultAccumulatedValue()
   {
-    List<List<Map<String, Object>>> accu = new ArrayList<>();
+    List<Multimap<List<Object>, Object>> accu = new ArrayList<>();
     for (int i = 0; i < 2; i++) {
-      accu.add(new ArrayList<Map<String, Object>>());
+      Multimap<List<Object>, Object> mMap = ArrayListMultimap.create();
+      accu.add(mMap);
     }
     return accu;
   }
 
-
-  private List<List<Map<String, Object>>> accumulateWithIndex(int index, 
List<List<Map<String, Object>>> accu, Object input) throws NoSuchFieldException
+  private List<Multimap<List<Object>, Object>>  accumulateWithIndex(int index,
+      List<Multimap<List<Object>, Object>> accu, Object input) throws 
NoSuchFieldException
   {
     // TODO: If a stream never sends out any tuple during one window, a wrong 
key would not be detected.
-
-    List<Map<String, Object>> curList = accu.get(index);
-    Map map = pojoToMap(input,index + 1);
-    curList.add(map);
-    accu.set(index, curList);
-
+    Multimap<List<Object>, Object> curMap = accu.get(index);
+    List<Object> key = getKeyForMultiMap(input,index);
+    curMap.put(key,input);
     return accu;
   }
 
-  private Map<String, Object> pojoToMap(Object input, int streamIndex)
+  private List<Object> getKeyForMultiMap(Object input, int index)
   {
-    Map<String, Object> map = new HashMap<>();
-    Map<String,PojoUtils.Getter> gettersStream = streamIndex == 1 ? 
gettersStream1 : gettersStream2;
-
-    for (Map.Entry<String, PojoUtils.Getter> getter : 
gettersStream.entrySet()) {
-      try {
-        Object value = getter.getValue().get(input);
-        map.put(getter.getKey(), value);
-      } catch (Exception e) {
-        throw Throwables.propagate(e);
-      }
+    List<Object> key = new ArrayList<>();
+    String[] reqKeys = index == 0 ? leftKeys : rightKeys;
+    Map<String,PojoUtils.Getter> gettersStream = index == 0 ? gettersStream1 : 
gettersStream2;
+    for (String lkey : reqKeys ) {
+      key.add(gettersStream.get(lkey).get(input));
     }
-    return map;
+    return key;
   }
 
   @Override
-  public List<List<Map<String, Object>>> merge(List<List<Map<String, Object>>> 
accumulatedValue1, List<List<Map<String, Object>>> accumulatedValue2)
+  public List<Multimap<List<Object>, Object>> 
merge(List<Multimap<List<Object>, Object>> accumulatedValue1, 
List<Multimap<List<Object>, Object>> accumulatedValue2)
   {
     for (int i = 0; i < 2; i++) {
-      List<Map<String, Object>> curList = accumulatedValue1.get(i);
-      curList.addAll(accumulatedValue2.get(i));
-      accumulatedValue1.set(i, curList);
+      Multimap<List<Object>, Object> mMap1 = accumulatedValue1.get(i);
+      Multimap<List<Object>, Object> mMap2 = accumulatedValue2.get(i);
+      for (Map.Entry<List<Object>, Object> entry : mMap2.entries()) {
+        mMap1.put(entry.getKey(),entry.getValue());
+      }
     }
     return accumulatedValue1;
   }
 
   @Override
-  public List<?> getOutput(List<List<Map<String, Object>>> accumulatedValue)
+  public List<?> getOutput(List<Multimap<List<Object>, Object>> 
accumulatedValue)
   {
     if (setters == null) {
       createSetters();
-      keySetStream2 = new HashSet<>();
-      keySetStream1 = new HashSet<>();
-      int lIndex = getLeftStreamIndex();
-      for (int i = 0; i < leftKeys.length; i++) {
-        keySetStream1.add(lIndex == 0 ? leftKeys[i] : rightKeys[i]);
-        keySetStream2.add(lIndex == 1 ? leftKeys[i] : rightKeys[i]);
-      }
     }
-
     // TODO: May need to revisit (use state manager).
-    List<Map<String, Object>> result = getAllCombo(0, accumulatedValue, null);
+    return getAllCombo(accumulatedValue);
+  }
 
-    List<Object> out = new ArrayList<>();
-    for (Map<String, Object> resultMap : result) {
-      Object o;
-      try {
-        o = outClass.newInstance();
-      } catch (Throwable e) {
-        throw Throwables.propagate(e);
-      }
-      for (Map.Entry<String, PojoUtils.Setter> setter : setters.entrySet()) {
-        if (resultMap.get(setter.getKey()) != null) {
-          setter.getValue().set(o, resultMap.get(setter.getKey()));
-        }
+  protected void setObjectForResult(Map<String,PojoUtils.Getter> stream, 
Object input, Object output)
+  {
+    for (Map.Entry<String, PojoUtils.Getter> getter : stream.entrySet()) {
+      if (setters.containsKey(getter.getKey())) {
+        setters.get(getter.getKey()).set(output, getter.getValue().get(input));
       }
-      out.add(o);
     }
 
-    return out;
   }
 
-
-  public List<Map<String, Object>> getAllCombo(int streamIndex, 
List<List<Map<String, Object>>> accu, Map<String, Object> curMap)
+  /**
+   * This function takes the required join on the 2 input streams for matching 
keys
+   * and allows the derived classes to implement the logic in case of non 
matching keys.
+   *
+   * It is designed such that for outer joins it will always assume that it is
+   * a left outer join and hence it considers right stream as left in case of
+   * right outer join keeping the code and logic the same.
+   *
+   * For each key in the left stream a corresponding key is searched in the 
right stream
+   * if a match is found then the all the objects with that key are added to 
Output list,
+   * also that key is removed from right stream as it will no longer be 
required in any join
+   * scenario.If a match is not found then it relies on derived class 
implementation to handle it.
+   *
+   * @param accu which is the main accumulation data structure
+   * @return List of objects got after joining the streams
+   */
+  private List<Object> getAllCombo(List<Multimap<List<Object>, Object>> accu)
   {
-    List<Map<String, Object>> result = new ArrayList<>();
+    List<Object> result = new ArrayList<>();
     int leftStreamIndex = getLeftStreamIndex();
-    List<Map<String, Object>> leftStream = accu.get(leftStreamIndex);
-    List<Map<String, Object>> rightStream = accu.get((leftStreamIndex + 1) % 
2);
-    for (Map<String, Object> lMap : leftStream) {
-      boolean gotMatch = false;
-      for (Map<String, Object> rMap : rightStream) {
-        Map<String, Object> tempMap = joinTwoMapsWithKeys(lMap, rMap);
-        if (tempMap != null) {
-          result.add(tempMap);
-          gotMatch = true;
+    Multimap<List<Object>, Object> leftStream = accu.get(leftStreamIndex);
+    Multimap<List<Object>, Object> rightStream = 
ArrayListMultimap.create(accu.get((leftStreamIndex + 1) % 2));
+    Map<String,PojoUtils.Getter> leftGettersStream = leftStreamIndex == 0 ? 
gettersStream1 : gettersStream2;
+    Map<String,PojoUtils.Getter> rightGettersStream = leftStreamIndex == 1 ? 
gettersStream1 : gettersStream2;
+    for (List lMap : leftStream.keySet()) {
+      Collection<Object> left = leftStream.get(lMap);
+      if (rightStream.containsKey(lMap)) {
+        Collection<Object> right = rightStream.get(lMap);
+        Object o;
+        try {
+          o = outClass.newInstance();
+        } catch (Throwable e) {
+          throw Throwables.propagate(e);
         }
-      }
-      if (!gotMatch) {
-        addNonMatchingResult(result, lMap, rightStream.get(0).keySet());
+        for (Object lObj:left) {
+          for (Object rObj:right) {
+            setObjectForResult(leftGettersStream, lObj,o);
+            setObjectForResult(rightGettersStream, rObj,o);
+          }
+          result.add(o);
+        }
+        rightStream.removeAll(lMap);
+      } else {
+        addNonMatchingResult(left, leftGettersStream, result);
       }
     }
+    addNonMatchingRightStream(rightStream, rightGettersStream, result);
     return result;
   }
 
-  public abstract void addNonMatchingResult(List<Map<String, Object>> result, 
Map<String, Object> requiredMap, Set nullFields);
-
-  public abstract int getLeftStreamIndex();
-
-
-  public Map<String, Object> joinTwoMapsWithKeys(Map<String, Object> map1, 
Map<String, Object> map2)
-  {
-    int lIndex = getLeftStreamIndex();
-    for (int i = 0; i < leftKeys.length; i++) {
-      String key1 = lIndex == 0 ? leftKeys[i] : rightKeys[i];
-      String key2 = lIndex == 1 ? leftKeys[i] : rightKeys[i];
-      if (!map1.get(key1).equals(map2.get(key2))) {
-        return null;
-      }
-    }
-    for (String field : map2.keySet()) {
-      if (!keySetStream2.contains(field)) {
-        map1.put(field, map2.get(field));
-      }
-    }
-    return map1;
-  }
+  /**
+   * This function defines the strategy to be used when no matching key is 
found.
+   */
+  protected abstract void addNonMatchingResult(Collection<Object> left, 
Map<String,PojoUtils.Getter> leftGettersStream, List<Object> result);
+
+  /**
+   * This function defines the strategy to be used when the join is interested 
to add POJO's
+   * from right stream when no matching key is found.
+   */
+  protected abstract void addNonMatchingRightStream(Multimap<List<Object>, 
Object> rightStream, Map<String,PojoUtils.Getter> rightGettersStream, 
List<Object> result);
+
+  /**
+   * This function lets the join decide which is the left stream and which is 
the right stream.
+   */
+  protected abstract int getLeftStreamIndex();
 
   @Override
   public List<?> getRetraction(List<?> value)

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/02e10754/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java
index 61b37f3..edee827 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java
@@ -18,14 +18,17 @@
  */
 package org.apache.apex.malhar.lib.window.accumulation;
 
-import java.util.ArrayList;
-import java.util.HashSet;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.hadoop.classification.InterfaceStability;
 
+import com.google.common.base.Throwables;
+import com.google.common.collect.Multimap;
+
+import com.datatorrent.lib.util.PojoUtils;
+
 /**
  * Full outer join Accumulation for Pojo Streams.
  *
@@ -44,61 +47,33 @@ public class PojoFullOuterJoin<InputT1, InputT2>
     super(outClass,leftKeys,rightKeys);
   }
 
-
   @Override
-  public void addNonMatchingResult(List result, Map requiredMap, Set 
nullFields)
+  public void addNonMatchingResult(Collection<Object> left, 
Map<String,PojoUtils.Getter> leftGettersStream, List<Object> result)
   {
-    for (Object field : nullFields) {
-      if (!keySetStream2.contains(field)) {
-        requiredMap.put(field.toString(), null);
+    for (Object lObj:left) {
+      Object o;
+      try {
+        o = outClass.newInstance();
+      } catch (Throwable e) {
+        throw Throwables.propagate(e);
       }
+      setObjectForResult(leftGettersStream, lObj, o);
+      result.add(o);
     }
-    result.add(requiredMap);
   }
 
   @Override
-  public int getLeftStreamIndex()
+  public void addNonMatchingRightStream(Multimap<List<Object>, Object> 
rightStream,
+      Map<String,PojoUtils.Getter> rightGettersStream, List<Object> result)
   {
-    return 0;
+    for (Object key : rightStream.keySet()) {
+      addNonMatchingResult(rightStream.get((List)key), rightGettersStream, 
result);
+    }
   }
 
   @Override
-  public List<Map<String, Object>> getAllCombo(int streamIndex, List accu, Map 
curMap)
+  public int getLeftStreamIndex()
   {
-    List<Map<String, Object>> result = new ArrayList<>();
-    int leftStreamIndex = getLeftStreamIndex();
-    List<Map<String, Object>> leftStream = (List<Map<String, 
Object>>)accu.get(leftStreamIndex);
-    List<Map<String, Object>> rightStream = (List<Map<String, 
Object>>)accu.get((leftStreamIndex + 1) % 2);
-    Set<Map<String,Object>> unMatchedRightStream = new HashSet<>();
-
-    for (Map<String, Object> rMap : rightStream) {
-      unMatchedRightStream.add(rMap);
-    }
-    for (Map<String, Object> lMap : leftStream) {
-      boolean gotMatch = false;
-      for (Map<String, Object> rMap : rightStream) {
-        Map<String, Object> tempMap = joinTwoMapsWithKeys(lMap, rMap);
-        if (tempMap != null) {
-          result.add(tempMap);
-          gotMatch = true;
-          if (unMatchedRightStream.contains(rMap)) {
-            unMatchedRightStream.remove(rMap);
-          }
-        }
-      }
-      if (!gotMatch) {
-        addNonMatchingResult(result, lMap, rightStream.get(0).keySet());
-      }
-    }
-
-    for (Map<String, Object> rMap : unMatchedRightStream) {
-      for (Object field : leftStream.get(0).keySet()) {
-        if (!keySetStream1.contains(field)) {
-          rMap.put(field.toString(), null);
-        }
-      }
-      result.add(rMap);
-    }
-    return result;
+    return 1;
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/02e10754/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
index 3654e6a..a6421fa 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
@@ -18,9 +18,13 @@
  */
 package org.apache.apex.malhar.lib.window.accumulation;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+
+import com.google.common.collect.Multimap;
+
+import com.datatorrent.lib.util.PojoUtils;
 
 /**
  * Inner join Accumulation for Pojo Streams.
@@ -56,7 +60,14 @@ public class PojoInnerJoin<InputT1, InputT2>
   }
 
   @Override
-  public void addNonMatchingResult(List result, Map requiredMap, Set 
nullFields)
+  public void addNonMatchingResult(Collection<Object> left, 
Map<String,PojoUtils.Getter> leftGettersStream, List<Object> result)
+  {
+    return;
+  }
+
+  @Override
+  public void addNonMatchingRightStream(Multimap<List<Object>, Object> 
rightStream,
+      Map<String,PojoUtils.Getter> rightGettersStream, List<Object> result)
   {
     return;
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/02e10754/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java
index c6e899c..4317e30 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java
@@ -18,12 +18,17 @@
  */
 package org.apache.apex.malhar.lib.window.accumulation;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.hadoop.classification.InterfaceStability;
 
+import com.google.common.base.Throwables;
+import com.google.common.collect.Multimap;
+
+import com.datatorrent.lib.util.PojoUtils;
+
 /**
  * Left Outer join Accumulation for Pojo Streams.
  *
@@ -42,16 +47,26 @@ public class PojoLeftOuterJoin<InputT1, InputT2>
     super(outClass,leftKeys,rightKeys);
   }
 
-
   @Override
-  public void addNonMatchingResult(List result, Map requiredMap, Set 
nullFields)
+  public void addNonMatchingResult(Collection<Object> left, 
Map<String,PojoUtils.Getter> leftGettersStream, List<Object> result)
   {
-    for (Object field : nullFields) {
-      if (!keySetStream2.contains(field)) {
-        requiredMap.put(field.toString(), null);
+    for (Object lObj:left) {
+      Object o;
+      try {
+        o = outClass.newInstance();
+      } catch (Throwable e) {
+        throw Throwables.propagate(e);
       }
+      setObjectForResult(leftGettersStream, lObj, o);
+      result.add(o);
     }
-    result.add(requiredMap);
+  }
+
+  @Override
+  public void addNonMatchingRightStream(Multimap<List<Object>, Object> 
rightStream,
+      Map<String,PojoUtils.Getter> rightGettersStream, List<Object> result)
+  {
+    return;
   }
 
   @Override
@@ -59,5 +74,4 @@ public class PojoLeftOuterJoin<InputT1, InputT2>
   {
     return 0;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/02e10754/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java
index b87d4bd..2d30346 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java
@@ -18,12 +18,17 @@
  */
 package org.apache.apex.malhar.lib.window.accumulation;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.hadoop.classification.InterfaceStability;
 
+import com.google.common.base.Throwables;
+import com.google.common.collect.Multimap;
+
+import com.datatorrent.lib.util.PojoUtils;
+
 /**
  * Right outer join Accumulation for Pojo Streams.
  *
@@ -42,16 +47,26 @@ public class PojoRightOuterJoin<InputT1, InputT2>
     super(outClass,leftKeys,rightKeys);
   }
 
-
   @Override
-  public void addNonMatchingResult(List result, Map requiredMap, Set 
nullFields)
+  public void addNonMatchingResult(Collection<Object> left, 
Map<String,PojoUtils.Getter> leftGettersStream, List<Object> result)
   {
-    for (Object field : nullFields) {
-      if (!keySetStream1.contains(field)) {
-        requiredMap.put(field.toString(), null);
+    for (Object lObj:left) {
+      Object o;
+      try {
+        o = outClass.newInstance();
+      } catch (Throwable e) {
+        throw Throwables.propagate(e);
       }
+      setObjectForResult(leftGettersStream, lObj, o);
+      result.add(o);
     }
-    result.add(requiredMap);
+  }
+
+  @Override
+  public void addNonMatchingRightStream(Multimap<List<Object>, Object> 
rightStream,
+      Map<String,PojoUtils.Getter> rightGettersStream, List<Object> result)
+  {
+    return;
   }
 
   @Override
@@ -59,5 +74,4 @@ public class PojoRightOuterJoin<InputT1, InputT2>
   {
     return 1;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/02e10754/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
index 377a684..483ffdd 100644
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
@@ -25,6 +25,8 @@ import java.util.Map;
 import org.junit.Assert;
 import org.junit.Test;
 
+import com.google.common.collect.Multimap;
+
 /**
  * Test for {@link PojoInnerJoin}.
  */
@@ -209,7 +211,7 @@ public class PojoInnerJoinTest
   {
     PojoInnerJoin<TestPojo1, TestPojo3> pij = new PojoInnerJoin<>(2, 
TestOutClass.class, "uId", "uId");
 
-    List<List<Map<String, Object>>> accu = pij.defaultAccumulatedValue();
+    List<Multimap<List<Object>, Object>> accu = pij.defaultAccumulatedValue();
 
     Assert.assertEquals(2, accu.size());
 
@@ -240,7 +242,7 @@ public class PojoInnerJoinTest
   {
     PojoInnerJoin<TestPojo1, TestPojo3> pij = new PojoInnerJoin<>(2, 
TestOutMultipleKeysClass.class, "uId", "uId", "uName", "uNickName");
 
-    List<List<Map<String, Object>>> accu = pij.defaultAccumulatedValue();
+    List<Multimap<List<Object>, Object>> accu = pij.defaultAccumulatedValue();
 
     Assert.assertEquals(2, accu.size());
 
@@ -267,7 +269,7 @@ public class PojoInnerJoinTest
     String[] rightKeys = {"uId", "uNickName"};
     PojoInnerJoin<TestPojo1, TestPojo3> pij = new 
PojoInnerJoin<>(TestOutMultipleKeysClass.class, leftKeys, rightKeys);
 
-    List<List<Map<String, Object>>> accu = pij.defaultAccumulatedValue();
+    List<Multimap<List<Object>, Object>> accu = pij.defaultAccumulatedValue();
 
     Assert.assertEquals(2, accu.size());
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/02e10754/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java
index e3f5bb7..fd9d29b 100644
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java
@@ -18,12 +18,15 @@
  */
 package org.apache.apex.malhar.lib.window.accumulation;
 
+import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
+import java.util.Set;
 
 import org.junit.Assert;
 import org.junit.Test;
 
+import com.google.common.collect.Multimap;
+
 /**
  * Test for POJO outer join accumulations
  */
@@ -209,25 +212,42 @@ public class PojoOuterJoinTest
     String[] leftKeys = {"uId"};
     String[] rightKeys = {"uId"};
     PojoLeftOuterJoin<TestPojo1, TestPojo3> pij = new 
PojoLeftOuterJoin<>(TestOutClass.class, leftKeys, rightKeys);
-
-    List<List<Map<String, Object>>> accu = pij.defaultAccumulatedValue();
-
+    List<Multimap<List<Object>, Object>> accu = pij.defaultAccumulatedValue();
     Assert.assertEquals(2, accu.size());
-
     accu = pij.accumulate(accu, new TestPojo1(1, "Josh"));
     accu = pij.accumulate(accu, new TestPojo1(2, "Bob"));
-
     accu = pij.accumulate2(accu, new TestPojo3(1, "NickJosh", 12));
     accu = pij.accumulate2(accu, new TestPojo3(3, "NickBob", 13));
 
-    Assert.assertEquals(2, pij.getOutput(accu).size());
-
-    Object o = pij.getOutput(accu).get(1);
+    List result = pij.getOutput(accu);
+    Assert.assertEquals(2, result.size());
+    Object o = result.get(0);
     Assert.assertTrue(o instanceof TestOutClass);
     TestOutClass testOutClass = (TestOutClass)o;
-    Assert.assertEquals(2, testOutClass.getUId());
-    Assert.assertEquals("Bob", testOutClass.getUName());
-    Assert.assertEquals(0, testOutClass.getAge());
+    int uId = testOutClass.getUId();
+    if (uId == 1) {
+      checkNameAge("Josh",12,testOutClass);
+      o = result.get(1);
+      Assert.assertTrue(o instanceof TestOutClass);
+      testOutClass = (TestOutClass)o;
+      uId = testOutClass.getUId();
+      Assert.assertEquals(2, testOutClass.getUId());
+      checkNameAge("Bob",0,testOutClass);
+    } else if (uId == 2) {
+      checkNameAge("Bob",0,testOutClass);
+      o = result.get(1);
+      Assert.assertTrue(o instanceof TestOutClass);
+      testOutClass = (TestOutClass)o;
+      uId = testOutClass.getUId();
+      Assert.assertEquals(1, testOutClass.getUId());
+      checkNameAge("Josh",12,testOutClass);
+    }
+  }
+
+  public void checkNameAge(String name, int age, TestOutClass testOutClass)
+  {
+    Assert.assertEquals(name, testOutClass.getUName());
+    Assert.assertEquals(age, testOutClass.getAge());
   }
 
   @Test
@@ -236,26 +256,36 @@ public class PojoOuterJoinTest
     String[] leftKeys = {"uId"};
     String[] rightKeys = {"uId"};
     PojoRightOuterJoin<TestPojo1, TestPojo3> pij = new 
PojoRightOuterJoin<>(TestOutClass.class, leftKeys, rightKeys);
-
-
-    List<List<Map<String, Object>>> accu = pij.defaultAccumulatedValue();
-
+    List<Multimap<List<Object>, Object>> accu = pij.defaultAccumulatedValue();
     Assert.assertEquals(2, accu.size());
-
     accu = pij.accumulate(accu, new TestPojo1(1, "Josh"));
     accu = pij.accumulate(accu, new TestPojo1(2, "Bob"));
-
     accu = pij.accumulate2(accu, new TestPojo3(1, "NickJosh", 12));
     accu = pij.accumulate2(accu, new TestPojo3(3, "NickBob", 13));
 
-    Assert.assertEquals(2, pij.getOutput(accu).size());
-
-    Object o = pij.getOutput(accu).get(1);
+    List result = pij.getOutput(accu);
+    Assert.assertEquals(2, result.size());
+    Object o = result.get(0);
     Assert.assertTrue(o instanceof TestOutClass);
     TestOutClass testOutClass = (TestOutClass)o;
-    Assert.assertEquals(3, testOutClass.getUId());
-    Assert.assertEquals(null, testOutClass.getUName());
-    Assert.assertEquals(13, testOutClass.getAge());
+    int uId = testOutClass.getUId();
+    if (uId == 1) {
+      checkNameAge("Josh",12,testOutClass);
+      o = result.get(1);
+      Assert.assertTrue(o instanceof TestOutClass);
+      testOutClass = (TestOutClass)o;
+      uId = testOutClass.getUId();
+      Assert.assertEquals(3, testOutClass.getUId());
+      checkNameAge(null,13,testOutClass);
+    } else if (uId == 3) {
+      checkNameAge(null,13,testOutClass);
+      o = result.get(1);
+      Assert.assertTrue(o instanceof TestOutClass);
+      testOutClass = (TestOutClass)o;
+      uId = testOutClass.getUId();
+      Assert.assertEquals(1, testOutClass.getUId());
+      checkNameAge("Josh",12,testOutClass);
+    }
   }
 
   @Test
@@ -264,25 +294,22 @@ public class PojoOuterJoinTest
     String[] leftKeys = {"uId"};
     String[] rightKeys = {"uId"};
     PojoFullOuterJoin<TestPojo1, TestPojo3> pij = new 
PojoFullOuterJoin<>(TestOutClass.class, leftKeys, rightKeys);
-
-
-    List<List<Map<String, Object>>> accu = pij.defaultAccumulatedValue();
-
+    List<Multimap<List<Object>, Object>> accu = pij.defaultAccumulatedValue();
     Assert.assertEquals(2, accu.size());
-
     accu = pij.accumulate(accu, new TestPojo1(1, "Josh"));
     accu = pij.accumulate(accu, new TestPojo1(2, "Bob"));
-
     accu = pij.accumulate2(accu, new TestPojo3(1, "NickJosh", 12));
     accu = pij.accumulate2(accu, new TestPojo3(3, "NickBob", 13));
 
     Assert.assertEquals(3, pij.getOutput(accu).size());
-
-    Object o = pij.getOutput(accu).get(1);
-    Assert.assertTrue(o instanceof TestOutClass);
-    TestOutClass testOutClass = (TestOutClass)o;
-    Assert.assertEquals(2, testOutClass.getUId());
-    Assert.assertEquals("Bob", testOutClass.getUName());
-    Assert.assertEquals(0, testOutClass.getAge());
+    Set<Integer> checkMap = new HashSet<>();
+    for ( int i = 0; i < 3; i++ ) {
+      Object o = pij.getOutput(accu).get(i);
+      Assert.assertTrue(o instanceof TestOutClass);
+      TestOutClass testOutClass = (TestOutClass)o;
+      int uId = testOutClass.getUId();
+      checkMap.add(uId);
+    }
+    Assert.assertEquals(3,checkMap.size());
   }
 }

Reply via email to