Repository: apex-malhar Updated Branches: refs/heads/master 570ecaeb7 -> cb1ef764c
APEXMALHAR-2430 Optimizing Join accumulation Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/02e10754 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/02e10754 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/02e10754 Branch: refs/heads/master Commit: 02e107547ed924caad8945971383aa081e0ec483 Parents: 6dcd821 Author: Hitesh-Scorpio <[email protected]> Authored: Thu Mar 9 14:57:36 2017 +0530 Committer: Hitesh-Scorpio <[email protected]> Committed: Fri Mar 10 13:56:38 2017 +0530 ---------------------------------------------------------------------- .../window/accumulation/AbstractPojoJoin.java | 188 +++++++++---------- .../window/accumulation/PojoFullOuterJoin.java | 69 +++---- .../lib/window/accumulation/PojoInnerJoin.java | 15 +- .../window/accumulation/PojoLeftOuterJoin.java | 30 ++- .../window/accumulation/PojoRightOuterJoin.java | 30 ++- .../window/accumulation/PojoInnerJoinTest.java | 8 +- .../window/accumulation/PojoOuterJoinTest.java | 101 ++++++---- 7 files changed, 241 insertions(+), 200 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/02e10754/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java index 1634cb9..a0b3fb3 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java @@ -20,17 +20,18 @@ package org.apache.apex.malhar.lib.window.accumulation; import java.lang.reflect.Field; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.apex.malhar.lib.window.MergeAccumulation; import org.apache.commons.lang3.ClassUtils; import org.apache.hadoop.classification.InterfaceStability; import com.google.common.base.Throwables; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; import com.datatorrent.lib.util.PojoUtils; @@ -40,15 +41,13 @@ import com.datatorrent.lib.util.PojoUtils; */ @InterfaceStability.Evolving public abstract class AbstractPojoJoin<InputT1, InputT2> - implements MergeAccumulation<InputT1, InputT2, List<List<Map<String, Object>>>, List<?>> + implements MergeAccumulation<InputT1, InputT2, List<Multimap<List<Object>, Object>>, List<?>> { protected String[] keys; protected Class<?> outClass; private transient Map<String,PojoUtils.Getter> gettersStream1; private transient Map<String,PojoUtils.Getter> gettersStream2; private transient Map<String,PojoUtils.Setter> setters; - protected transient Set<String> keySetStream2; - protected transient Set<String> keySetStream1; protected transient String[] leftKeys; protected transient String[] rightKeys; @@ -94,7 +93,7 @@ public abstract class AbstractPojoJoin<InputT1, InputT2> } @Override - public List<List<Map<String, Object>>> accumulate(List<List<Map<String, Object>>> accumulatedValue, InputT1 input) + public List<Multimap<List<Object>, Object>> accumulate(List<Multimap<List<Object>, Object>> accumulatedValue, InputT1 input) { if (gettersStream1 == null) { gettersStream1 = createGetters(input.getClass()); @@ -107,7 +106,7 @@ public abstract class AbstractPojoJoin<InputT1, InputT2> } @Override - public List<List<Map<String, Object>>> accumulate2(List<List<Map<String, Object>>> accumulatedValue, InputT2 input) + public List<Multimap<List<Object>, Object>> accumulate2(List<Multimap<List<Object>, Object>> accumulatedValue, InputT2 input) { if (gettersStream2 == null) { gettersStream2 = createGetters(input.getClass()); @@ -121,136 +120,135 @@ public abstract class AbstractPojoJoin<InputT1, InputT2> @Override - public List<List<Map<String, Object>>> defaultAccumulatedValue() + public List<Multimap<List<Object>, Object>> defaultAccumulatedValue() { - List<List<Map<String, Object>>> accu = new ArrayList<>(); + List<Multimap<List<Object>, Object>> accu = new ArrayList<>(); for (int i = 0; i < 2; i++) { - accu.add(new ArrayList<Map<String, Object>>()); + Multimap<List<Object>, Object> mMap = ArrayListMultimap.create(); + accu.add(mMap); } return accu; } - - private List<List<Map<String, Object>>> accumulateWithIndex(int index, List<List<Map<String, Object>>> accu, Object input) throws NoSuchFieldException + private List<Multimap<List<Object>, Object>> accumulateWithIndex(int index, + List<Multimap<List<Object>, Object>> accu, Object input) throws NoSuchFieldException { // TODO: If a stream never sends out any tuple during one window, a wrong key would not be detected. - - List<Map<String, Object>> curList = accu.get(index); - Map map = pojoToMap(input,index + 1); - curList.add(map); - accu.set(index, curList); - + Multimap<List<Object>, Object> curMap = accu.get(index); + List<Object> key = getKeyForMultiMap(input,index); + curMap.put(key,input); return accu; } - private Map<String, Object> pojoToMap(Object input, int streamIndex) + private List<Object> getKeyForMultiMap(Object input, int index) { - Map<String, Object> map = new HashMap<>(); - Map<String,PojoUtils.Getter> gettersStream = streamIndex == 1 ? gettersStream1 : gettersStream2; - - for (Map.Entry<String, PojoUtils.Getter> getter : gettersStream.entrySet()) { - try { - Object value = getter.getValue().get(input); - map.put(getter.getKey(), value); - } catch (Exception e) { - throw Throwables.propagate(e); - } + List<Object> key = new ArrayList<>(); + String[] reqKeys = index == 0 ? leftKeys : rightKeys; + Map<String,PojoUtils.Getter> gettersStream = index == 0 ? gettersStream1 : gettersStream2; + for (String lkey : reqKeys ) { + key.add(gettersStream.get(lkey).get(input)); } - return map; + return key; } @Override - public List<List<Map<String, Object>>> merge(List<List<Map<String, Object>>> accumulatedValue1, List<List<Map<String, Object>>> accumulatedValue2) + public List<Multimap<List<Object>, Object>> merge(List<Multimap<List<Object>, Object>> accumulatedValue1, List<Multimap<List<Object>, Object>> accumulatedValue2) { for (int i = 0; i < 2; i++) { - List<Map<String, Object>> curList = accumulatedValue1.get(i); - curList.addAll(accumulatedValue2.get(i)); - accumulatedValue1.set(i, curList); + Multimap<List<Object>, Object> mMap1 = accumulatedValue1.get(i); + Multimap<List<Object>, Object> mMap2 = accumulatedValue2.get(i); + for (Map.Entry<List<Object>, Object> entry : mMap2.entries()) { + mMap1.put(entry.getKey(),entry.getValue()); + } } return accumulatedValue1; } @Override - public List<?> getOutput(List<List<Map<String, Object>>> accumulatedValue) + public List<?> getOutput(List<Multimap<List<Object>, Object>> accumulatedValue) { if (setters == null) { createSetters(); - keySetStream2 = new HashSet<>(); - keySetStream1 = new HashSet<>(); - int lIndex = getLeftStreamIndex(); - for (int i = 0; i < leftKeys.length; i++) { - keySetStream1.add(lIndex == 0 ? leftKeys[i] : rightKeys[i]); - keySetStream2.add(lIndex == 1 ? leftKeys[i] : rightKeys[i]); - } } - // TODO: May need to revisit (use state manager). - List<Map<String, Object>> result = getAllCombo(0, accumulatedValue, null); + return getAllCombo(accumulatedValue); + } - List<Object> out = new ArrayList<>(); - for (Map<String, Object> resultMap : result) { - Object o; - try { - o = outClass.newInstance(); - } catch (Throwable e) { - throw Throwables.propagate(e); - } - for (Map.Entry<String, PojoUtils.Setter> setter : setters.entrySet()) { - if (resultMap.get(setter.getKey()) != null) { - setter.getValue().set(o, resultMap.get(setter.getKey())); - } + protected void setObjectForResult(Map<String,PojoUtils.Getter> stream, Object input, Object output) + { + for (Map.Entry<String, PojoUtils.Getter> getter : stream.entrySet()) { + if (setters.containsKey(getter.getKey())) { + setters.get(getter.getKey()).set(output, getter.getValue().get(input)); } - out.add(o); } - return out; } - - public List<Map<String, Object>> getAllCombo(int streamIndex, List<List<Map<String, Object>>> accu, Map<String, Object> curMap) + /** + * This function takes the required join on the 2 input streams for matching keys + * and allows the derived classes to implement the logic in case of non matching keys. + * + * It is designed such that for outer joins it will always assume that it is + * a left outer join and hence it considers right stream as left in case of + * right outer join keeping the code and logic the same. + * + * For each key in the left stream a corresponding key is searched in the right stream + * if a match is found then the all the objects with that key are added to Output list, + * also that key is removed from right stream as it will no longer be required in any join + * scenario.If a match is not found then it relies on derived class implementation to handle it. + * + * @param accu which is the main accumulation data structure + * @return List of objects got after joining the streams + */ + private List<Object> getAllCombo(List<Multimap<List<Object>, Object>> accu) { - List<Map<String, Object>> result = new ArrayList<>(); + List<Object> result = new ArrayList<>(); int leftStreamIndex = getLeftStreamIndex(); - List<Map<String, Object>> leftStream = accu.get(leftStreamIndex); - List<Map<String, Object>> rightStream = accu.get((leftStreamIndex + 1) % 2); - for (Map<String, Object> lMap : leftStream) { - boolean gotMatch = false; - for (Map<String, Object> rMap : rightStream) { - Map<String, Object> tempMap = joinTwoMapsWithKeys(lMap, rMap); - if (tempMap != null) { - result.add(tempMap); - gotMatch = true; + Multimap<List<Object>, Object> leftStream = accu.get(leftStreamIndex); + Multimap<List<Object>, Object> rightStream = ArrayListMultimap.create(accu.get((leftStreamIndex + 1) % 2)); + Map<String,PojoUtils.Getter> leftGettersStream = leftStreamIndex == 0 ? gettersStream1 : gettersStream2; + Map<String,PojoUtils.Getter> rightGettersStream = leftStreamIndex == 1 ? gettersStream1 : gettersStream2; + for (List lMap : leftStream.keySet()) { + Collection<Object> left = leftStream.get(lMap); + if (rightStream.containsKey(lMap)) { + Collection<Object> right = rightStream.get(lMap); + Object o; + try { + o = outClass.newInstance(); + } catch (Throwable e) { + throw Throwables.propagate(e); } - } - if (!gotMatch) { - addNonMatchingResult(result, lMap, rightStream.get(0).keySet()); + for (Object lObj:left) { + for (Object rObj:right) { + setObjectForResult(leftGettersStream, lObj,o); + setObjectForResult(rightGettersStream, rObj,o); + } + result.add(o); + } + rightStream.removeAll(lMap); + } else { + addNonMatchingResult(left, leftGettersStream, result); } } + addNonMatchingRightStream(rightStream, rightGettersStream, result); return result; } - public abstract void addNonMatchingResult(List<Map<String, Object>> result, Map<String, Object> requiredMap, Set nullFields); - - public abstract int getLeftStreamIndex(); - - - public Map<String, Object> joinTwoMapsWithKeys(Map<String, Object> map1, Map<String, Object> map2) - { - int lIndex = getLeftStreamIndex(); - for (int i = 0; i < leftKeys.length; i++) { - String key1 = lIndex == 0 ? leftKeys[i] : rightKeys[i]; - String key2 = lIndex == 1 ? leftKeys[i] : rightKeys[i]; - if (!map1.get(key1).equals(map2.get(key2))) { - return null; - } - } - for (String field : map2.keySet()) { - if (!keySetStream2.contains(field)) { - map1.put(field, map2.get(field)); - } - } - return map1; - } + /** + * This function defines the strategy to be used when no matching key is found. + */ + protected abstract void addNonMatchingResult(Collection<Object> left, Map<String,PojoUtils.Getter> leftGettersStream, List<Object> result); + + /** + * This function defines the strategy to be used when the join is interested to add POJO's + * from right stream when no matching key is found. + */ + protected abstract void addNonMatchingRightStream(Multimap<List<Object>, Object> rightStream, Map<String,PojoUtils.Getter> rightGettersStream, List<Object> result); + + /** + * This function lets the join decide which is the left stream and which is the right stream. + */ + protected abstract int getLeftStreamIndex(); @Override public List<?> getRetraction(List<?> value) http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/02e10754/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java index 61b37f3..edee827 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java @@ -18,14 +18,17 @@ */ package org.apache.apex.malhar.lib.window.accumulation; -import java.util.ArrayList; -import java.util.HashSet; +import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.hadoop.classification.InterfaceStability; +import com.google.common.base.Throwables; +import com.google.common.collect.Multimap; + +import com.datatorrent.lib.util.PojoUtils; + /** * Full outer join Accumulation for Pojo Streams. * @@ -44,61 +47,33 @@ public class PojoFullOuterJoin<InputT1, InputT2> super(outClass,leftKeys,rightKeys); } - @Override - public void addNonMatchingResult(List result, Map requiredMap, Set nullFields) + public void addNonMatchingResult(Collection<Object> left, Map<String,PojoUtils.Getter> leftGettersStream, List<Object> result) { - for (Object field : nullFields) { - if (!keySetStream2.contains(field)) { - requiredMap.put(field.toString(), null); + for (Object lObj:left) { + Object o; + try { + o = outClass.newInstance(); + } catch (Throwable e) { + throw Throwables.propagate(e); } + setObjectForResult(leftGettersStream, lObj, o); + result.add(o); } - result.add(requiredMap); } @Override - public int getLeftStreamIndex() + public void addNonMatchingRightStream(Multimap<List<Object>, Object> rightStream, + Map<String,PojoUtils.Getter> rightGettersStream, List<Object> result) { - return 0; + for (Object key : rightStream.keySet()) { + addNonMatchingResult(rightStream.get((List)key), rightGettersStream, result); + } } @Override - public List<Map<String, Object>> getAllCombo(int streamIndex, List accu, Map curMap) + public int getLeftStreamIndex() { - List<Map<String, Object>> result = new ArrayList<>(); - int leftStreamIndex = getLeftStreamIndex(); - List<Map<String, Object>> leftStream = (List<Map<String, Object>>)accu.get(leftStreamIndex); - List<Map<String, Object>> rightStream = (List<Map<String, Object>>)accu.get((leftStreamIndex + 1) % 2); - Set<Map<String,Object>> unMatchedRightStream = new HashSet<>(); - - for (Map<String, Object> rMap : rightStream) { - unMatchedRightStream.add(rMap); - } - for (Map<String, Object> lMap : leftStream) { - boolean gotMatch = false; - for (Map<String, Object> rMap : rightStream) { - Map<String, Object> tempMap = joinTwoMapsWithKeys(lMap, rMap); - if (tempMap != null) { - result.add(tempMap); - gotMatch = true; - if (unMatchedRightStream.contains(rMap)) { - unMatchedRightStream.remove(rMap); - } - } - } - if (!gotMatch) { - addNonMatchingResult(result, lMap, rightStream.get(0).keySet()); - } - } - - for (Map<String, Object> rMap : unMatchedRightStream) { - for (Object field : leftStream.get(0).keySet()) { - if (!keySetStream1.contains(field)) { - rMap.put(field.toString(), null); - } - } - result.add(rMap); - } - return result; + return 1; } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/02e10754/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java index 3654e6a..a6421fa 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java @@ -18,9 +18,13 @@ */ package org.apache.apex.malhar.lib.window.accumulation; +import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.Set; + +import com.google.common.collect.Multimap; + +import com.datatorrent.lib.util.PojoUtils; /** * Inner join Accumulation for Pojo Streams. @@ -56,7 +60,14 @@ public class PojoInnerJoin<InputT1, InputT2> } @Override - public void addNonMatchingResult(List result, Map requiredMap, Set nullFields) + public void addNonMatchingResult(Collection<Object> left, Map<String,PojoUtils.Getter> leftGettersStream, List<Object> result) + { + return; + } + + @Override + public void addNonMatchingRightStream(Multimap<List<Object>, Object> rightStream, + Map<String,PojoUtils.Getter> rightGettersStream, List<Object> result) { return; } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/02e10754/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java index c6e899c..4317e30 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java @@ -18,12 +18,17 @@ */ package org.apache.apex.malhar.lib.window.accumulation; +import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.hadoop.classification.InterfaceStability; +import com.google.common.base.Throwables; +import com.google.common.collect.Multimap; + +import com.datatorrent.lib.util.PojoUtils; + /** * Left Outer join Accumulation for Pojo Streams. * @@ -42,16 +47,26 @@ public class PojoLeftOuterJoin<InputT1, InputT2> super(outClass,leftKeys,rightKeys); } - @Override - public void addNonMatchingResult(List result, Map requiredMap, Set nullFields) + public void addNonMatchingResult(Collection<Object> left, Map<String,PojoUtils.Getter> leftGettersStream, List<Object> result) { - for (Object field : nullFields) { - if (!keySetStream2.contains(field)) { - requiredMap.put(field.toString(), null); + for (Object lObj:left) { + Object o; + try { + o = outClass.newInstance(); + } catch (Throwable e) { + throw Throwables.propagate(e); } + setObjectForResult(leftGettersStream, lObj, o); + result.add(o); } - result.add(requiredMap); + } + + @Override + public void addNonMatchingRightStream(Multimap<List<Object>, Object> rightStream, + Map<String,PojoUtils.Getter> rightGettersStream, List<Object> result) + { + return; } @Override @@ -59,5 +74,4 @@ public class PojoLeftOuterJoin<InputT1, InputT2> { return 0; } - } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/02e10754/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java index b87d4bd..2d30346 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java @@ -18,12 +18,17 @@ */ package org.apache.apex.malhar.lib.window.accumulation; +import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.hadoop.classification.InterfaceStability; +import com.google.common.base.Throwables; +import com.google.common.collect.Multimap; + +import com.datatorrent.lib.util.PojoUtils; + /** * Right outer join Accumulation for Pojo Streams. * @@ -42,16 +47,26 @@ public class PojoRightOuterJoin<InputT1, InputT2> super(outClass,leftKeys,rightKeys); } - @Override - public void addNonMatchingResult(List result, Map requiredMap, Set nullFields) + public void addNonMatchingResult(Collection<Object> left, Map<String,PojoUtils.Getter> leftGettersStream, List<Object> result) { - for (Object field : nullFields) { - if (!keySetStream1.contains(field)) { - requiredMap.put(field.toString(), null); + for (Object lObj:left) { + Object o; + try { + o = outClass.newInstance(); + } catch (Throwable e) { + throw Throwables.propagate(e); } + setObjectForResult(leftGettersStream, lObj, o); + result.add(o); } - result.add(requiredMap); + } + + @Override + public void addNonMatchingRightStream(Multimap<List<Object>, Object> rightStream, + Map<String,PojoUtils.Getter> rightGettersStream, List<Object> result) + { + return; } @Override @@ -59,5 +74,4 @@ public class PojoRightOuterJoin<InputT1, InputT2> { return 1; } - } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/02e10754/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java index 377a684..483ffdd 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java @@ -25,6 +25,8 @@ import java.util.Map; import org.junit.Assert; import org.junit.Test; +import com.google.common.collect.Multimap; + /** * Test for {@link PojoInnerJoin}. */ @@ -209,7 +211,7 @@ public class PojoInnerJoinTest { PojoInnerJoin<TestPojo1, TestPojo3> pij = new PojoInnerJoin<>(2, TestOutClass.class, "uId", "uId"); - List<List<Map<String, Object>>> accu = pij.defaultAccumulatedValue(); + List<Multimap<List<Object>, Object>> accu = pij.defaultAccumulatedValue(); Assert.assertEquals(2, accu.size()); @@ -240,7 +242,7 @@ public class PojoInnerJoinTest { PojoInnerJoin<TestPojo1, TestPojo3> pij = new PojoInnerJoin<>(2, TestOutMultipleKeysClass.class, "uId", "uId", "uName", "uNickName"); - List<List<Map<String, Object>>> accu = pij.defaultAccumulatedValue(); + List<Multimap<List<Object>, Object>> accu = pij.defaultAccumulatedValue(); Assert.assertEquals(2, accu.size()); @@ -267,7 +269,7 @@ public class PojoInnerJoinTest String[] rightKeys = {"uId", "uNickName"}; PojoInnerJoin<TestPojo1, TestPojo3> pij = new PojoInnerJoin<>(TestOutMultipleKeysClass.class, leftKeys, rightKeys); - List<List<Map<String, Object>>> accu = pij.defaultAccumulatedValue(); + List<Multimap<List<Object>, Object>> accu = pij.defaultAccumulatedValue(); Assert.assertEquals(2, accu.size()); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/02e10754/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java index e3f5bb7..fd9d29b 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java @@ -18,12 +18,15 @@ */ package org.apache.apex.malhar.lib.window.accumulation; +import java.util.HashSet; import java.util.List; -import java.util.Map; +import java.util.Set; import org.junit.Assert; import org.junit.Test; +import com.google.common.collect.Multimap; + /** * Test for POJO outer join accumulations */ @@ -209,25 +212,42 @@ public class PojoOuterJoinTest String[] leftKeys = {"uId"}; String[] rightKeys = {"uId"}; PojoLeftOuterJoin<TestPojo1, TestPojo3> pij = new PojoLeftOuterJoin<>(TestOutClass.class, leftKeys, rightKeys); - - List<List<Map<String, Object>>> accu = pij.defaultAccumulatedValue(); - + List<Multimap<List<Object>, Object>> accu = pij.defaultAccumulatedValue(); Assert.assertEquals(2, accu.size()); - accu = pij.accumulate(accu, new TestPojo1(1, "Josh")); accu = pij.accumulate(accu, new TestPojo1(2, "Bob")); - accu = pij.accumulate2(accu, new TestPojo3(1, "NickJosh", 12)); accu = pij.accumulate2(accu, new TestPojo3(3, "NickBob", 13)); - Assert.assertEquals(2, pij.getOutput(accu).size()); - - Object o = pij.getOutput(accu).get(1); + List result = pij.getOutput(accu); + Assert.assertEquals(2, result.size()); + Object o = result.get(0); Assert.assertTrue(o instanceof TestOutClass); TestOutClass testOutClass = (TestOutClass)o; - Assert.assertEquals(2, testOutClass.getUId()); - Assert.assertEquals("Bob", testOutClass.getUName()); - Assert.assertEquals(0, testOutClass.getAge()); + int uId = testOutClass.getUId(); + if (uId == 1) { + checkNameAge("Josh",12,testOutClass); + o = result.get(1); + Assert.assertTrue(o instanceof TestOutClass); + testOutClass = (TestOutClass)o; + uId = testOutClass.getUId(); + Assert.assertEquals(2, testOutClass.getUId()); + checkNameAge("Bob",0,testOutClass); + } else if (uId == 2) { + checkNameAge("Bob",0,testOutClass); + o = result.get(1); + Assert.assertTrue(o instanceof TestOutClass); + testOutClass = (TestOutClass)o; + uId = testOutClass.getUId(); + Assert.assertEquals(1, testOutClass.getUId()); + checkNameAge("Josh",12,testOutClass); + } + } + + public void checkNameAge(String name, int age, TestOutClass testOutClass) + { + Assert.assertEquals(name, testOutClass.getUName()); + Assert.assertEquals(age, testOutClass.getAge()); } @Test @@ -236,26 +256,36 @@ public class PojoOuterJoinTest String[] leftKeys = {"uId"}; String[] rightKeys = {"uId"}; PojoRightOuterJoin<TestPojo1, TestPojo3> pij = new PojoRightOuterJoin<>(TestOutClass.class, leftKeys, rightKeys); - - - List<List<Map<String, Object>>> accu = pij.defaultAccumulatedValue(); - + List<Multimap<List<Object>, Object>> accu = pij.defaultAccumulatedValue(); Assert.assertEquals(2, accu.size()); - accu = pij.accumulate(accu, new TestPojo1(1, "Josh")); accu = pij.accumulate(accu, new TestPojo1(2, "Bob")); - accu = pij.accumulate2(accu, new TestPojo3(1, "NickJosh", 12)); accu = pij.accumulate2(accu, new TestPojo3(3, "NickBob", 13)); - Assert.assertEquals(2, pij.getOutput(accu).size()); - - Object o = pij.getOutput(accu).get(1); + List result = pij.getOutput(accu); + Assert.assertEquals(2, result.size()); + Object o = result.get(0); Assert.assertTrue(o instanceof TestOutClass); TestOutClass testOutClass = (TestOutClass)o; - Assert.assertEquals(3, testOutClass.getUId()); - Assert.assertEquals(null, testOutClass.getUName()); - Assert.assertEquals(13, testOutClass.getAge()); + int uId = testOutClass.getUId(); + if (uId == 1) { + checkNameAge("Josh",12,testOutClass); + o = result.get(1); + Assert.assertTrue(o instanceof TestOutClass); + testOutClass = (TestOutClass)o; + uId = testOutClass.getUId(); + Assert.assertEquals(3, testOutClass.getUId()); + checkNameAge(null,13,testOutClass); + } else if (uId == 3) { + checkNameAge(null,13,testOutClass); + o = result.get(1); + Assert.assertTrue(o instanceof TestOutClass); + testOutClass = (TestOutClass)o; + uId = testOutClass.getUId(); + Assert.assertEquals(1, testOutClass.getUId()); + checkNameAge("Josh",12,testOutClass); + } } @Test @@ -264,25 +294,22 @@ public class PojoOuterJoinTest String[] leftKeys = {"uId"}; String[] rightKeys = {"uId"}; PojoFullOuterJoin<TestPojo1, TestPojo3> pij = new PojoFullOuterJoin<>(TestOutClass.class, leftKeys, rightKeys); - - - List<List<Map<String, Object>>> accu = pij.defaultAccumulatedValue(); - + List<Multimap<List<Object>, Object>> accu = pij.defaultAccumulatedValue(); Assert.assertEquals(2, accu.size()); - accu = pij.accumulate(accu, new TestPojo1(1, "Josh")); accu = pij.accumulate(accu, new TestPojo1(2, "Bob")); - accu = pij.accumulate2(accu, new TestPojo3(1, "NickJosh", 12)); accu = pij.accumulate2(accu, new TestPojo3(3, "NickBob", 13)); Assert.assertEquals(3, pij.getOutput(accu).size()); - - Object o = pij.getOutput(accu).get(1); - Assert.assertTrue(o instanceof TestOutClass); - TestOutClass testOutClass = (TestOutClass)o; - Assert.assertEquals(2, testOutClass.getUId()); - Assert.assertEquals("Bob", testOutClass.getUName()); - Assert.assertEquals(0, testOutClass.getAge()); + Set<Integer> checkMap = new HashSet<>(); + for ( int i = 0; i < 3; i++ ) { + Object o = pij.getOutput(accu).get(i); + Assert.assertTrue(o instanceof TestOutClass); + TestOutClass testOutClass = (TestOutClass)o; + int uId = testOutClass.getUId(); + checkMap.add(uId); + } + Assert.assertEquals(3,checkMap.size()); } }
