Repository: apex-malhar Updated Branches: refs/heads/master 6a31088c0 -> e29b7c6de
APEXMALHAR-2400 removing dependency of output fieldnames to input field names Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/e29b7c6d Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/e29b7c6d Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/e29b7c6d Branch: refs/heads/master Commit: e29b7c6de8187cb1f2209a495dd0caac97c4dd61 Parents: 6a31088 Author: Hitesh-Scorpio <[email protected]> Authored: Tue Mar 14 15:56:48 2017 +0530 Committer: Hitesh-Scorpio <[email protected]> Committed: Tue Mar 21 15:30:08 2017 +0530 ---------------------------------------------------------------------- .../window/accumulation/AbstractPojoJoin.java | 44 ++++++- .../window/accumulation/PojoFullOuterJoin.java | 37 +++++- .../lib/window/accumulation/PojoInnerJoin.java | 6 + .../window/accumulation/PojoLeftOuterJoin.java | 16 ++- .../window/accumulation/PojoRightOuterJoin.java | 16 ++- .../window/accumulation/PojoInnerJoinTest.java | 32 +++++ .../window/accumulation/PojoOuterJoinTest.java | 126 +++++++++++++++++++ 7 files changed, 269 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e29b7c6d/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java index a0b3fb3..8fe7df3 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java @@ -33,8 +33,11 @@ import com.google.common.base.Throwables; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; +import com.datatorrent.lib.util.KeyValPair; import com.datatorrent.lib.util.PojoUtils; +import static org.apache.apex.malhar.lib.window.accumulation.AbstractPojoJoin.STREAM.LEFT; + /** * Join Accumulation for Pojo Streams. * @@ -47,9 +50,14 @@ public abstract class AbstractPojoJoin<InputT1, InputT2> protected Class<?> outClass; private transient Map<String,PojoUtils.Getter> gettersStream1; private transient Map<String,PojoUtils.Getter> gettersStream2; - private transient Map<String,PojoUtils.Setter> setters; + protected transient Map<String,PojoUtils.Setter> setters; + protected transient Map<String, KeyValPair<STREAM, String>> outputToInputMap; protected transient String[] leftKeys; protected transient String[] rightKeys; + public enum STREAM + { + LEFT, RIGHT + } public AbstractPojoJoin() { @@ -58,6 +66,10 @@ public abstract class AbstractPojoJoin<InputT1, InputT2> outClass = null; } + /** + * This constructor will be used when the user wants to include all the fields of Output POJO + * and the field names of output POJO match the field names of POJO coming on input streams. + */ public AbstractPojoJoin(Class<?> outClass, String[] leftKeys, String[] rightKeys) { if (leftKeys.length != rightKeys.length) { @@ -68,6 +80,16 @@ public abstract class AbstractPojoJoin<InputT1, InputT2> this.outClass = outClass; } + /** + * This constructor will be used when the user wants to include some specific + * fields of the output POJO and/or wants to have a mapping of the fields of output + * POJO to POJO coming on input streams. + */ + public AbstractPojoJoin(Class<?> outClass, String[] leftKeys, String[] rightKeys, Map<String, KeyValPair<STREAM, String>> outputToInputMap) + { + this(outClass,leftKeys,rightKeys); + this.outputToInputMap = outputToInputMap; + } private void createSetters() { @@ -220,8 +242,24 @@ public abstract class AbstractPojoJoin<InputT1, InputT2> } for (Object lObj:left) { for (Object rObj:right) { - setObjectForResult(leftGettersStream, lObj,o); - setObjectForResult(rightGettersStream, rObj,o); + if (outputToInputMap != null) { + for (Map.Entry<String, KeyValPair<STREAM,String>> entry : outputToInputMap.entrySet()) { + KeyValPair<STREAM,String> kv = entry.getValue(); + Object reqObject; + Map<String,PojoUtils.Getter> reqStream; + if (kv.getKey() == LEFT) { + reqObject = leftStreamIndex == 0 ? lObj : rObj; + reqStream = leftStreamIndex == 0 ? leftGettersStream : rightGettersStream; + } else { + reqObject = leftStreamIndex == 0 ? rObj : lObj; + reqStream = leftStreamIndex == 0 ? rightGettersStream : leftGettersStream; + } + setters.get(entry.getKey()).set(o,reqStream.get(entry.getValue().getValue()).get(reqObject)); + } + } else { + setObjectForResult(leftGettersStream, lObj, o); + setObjectForResult(rightGettersStream, rObj, o); + } } result.add(o); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e29b7c6d/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java index edee827..c74ded3 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java @@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability; import com.google.common.base.Throwables; import com.google.common.collect.Multimap; +import com.datatorrent.lib.util.KeyValPair; import com.datatorrent.lib.util.PojoUtils; /** @@ -47,6 +48,11 @@ public class PojoFullOuterJoin<InputT1, InputT2> super(outClass,leftKeys,rightKeys); } + public PojoFullOuterJoin(Class<?> outClass, String[] leftKeys, String[] rightKeys, Map<String, KeyValPair<STREAM, String>> outputToInputMap) + { + super(outClass,leftKeys,rightKeys, outputToInputMap); + } + @Override public void addNonMatchingResult(Collection<Object> left, Map<String,PojoUtils.Getter> leftGettersStream, List<Object> result) { @@ -57,7 +63,15 @@ public class PojoFullOuterJoin<InputT1, InputT2> } catch (Throwable e) { throw Throwables.propagate(e); } - setObjectForResult(leftGettersStream, lObj, o); + if (outputToInputMap != null) { + for (Map.Entry<String, KeyValPair<STREAM,String>> entry : outputToInputMap.entrySet()) { + if (entry.getValue().getKey() == STREAM.LEFT) { + setters.get(entry.getKey()).set(o, leftGettersStream.get(entry.getValue().getValue()).get(lObj)); + } + } + } else { + setObjectForResult(leftGettersStream, lObj, o); + } result.add(o); } } @@ -67,13 +81,30 @@ public class PojoFullOuterJoin<InputT1, InputT2> Map<String,PojoUtils.Getter> rightGettersStream, List<Object> result) { for (Object key : rightStream.keySet()) { - addNonMatchingResult(rightStream.get((List)key), rightGettersStream, result); + if (outputToInputMap == null) { + addNonMatchingResult(rightStream.get((List)key), rightGettersStream, result); + } else { + for (Object obj: rightStream.get((List)key)) { + Object o; + try { + o = outClass.newInstance(); + } catch (Throwable e) { + throw Throwables.propagate(e); + } + for (Map.Entry<String, KeyValPair<STREAM,String>> entry : outputToInputMap.entrySet()) { + if (entry.getValue().getKey() == STREAM.RIGHT) { + setters.get(entry.getKey()).set(o, rightGettersStream.get(entry.getValue().getValue()).get(obj)); + } + } + result.add(o); + } + } } } @Override public int getLeftStreamIndex() { - return 1; + return 0; } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e29b7c6d/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java index a6421fa..1dcc7e5 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java @@ -24,6 +24,7 @@ import java.util.Map; import com.google.common.collect.Multimap; +import com.datatorrent.lib.util.KeyValPair; import com.datatorrent.lib.util.PojoUtils; /** @@ -59,6 +60,11 @@ public class PojoInnerJoin<InputT1, InputT2> super(outClass,leftKeys,rightKeys); } + public PojoInnerJoin(Class<?> outClass, String[] leftKeys, String[] rightKeys, Map<String, KeyValPair<STREAM, String>> outputToInputMap) + { + super(outClass,leftKeys,rightKeys, outputToInputMap); + } + @Override public void addNonMatchingResult(Collection<Object> left, Map<String,PojoUtils.Getter> leftGettersStream, List<Object> result) { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e29b7c6d/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java index 4317e30..5405ca5 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java @@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability; import com.google.common.base.Throwables; import com.google.common.collect.Multimap; +import com.datatorrent.lib.util.KeyValPair; import com.datatorrent.lib.util.PojoUtils; /** @@ -47,6 +48,11 @@ public class PojoLeftOuterJoin<InputT1, InputT2> super(outClass,leftKeys,rightKeys); } + public PojoLeftOuterJoin(Class<?> outClass, String[] leftKeys, String[] rightKeys, Map<String, KeyValPair<STREAM, String>> outputToInputMap) + { + super(outClass,leftKeys,rightKeys, outputToInputMap); + } + @Override public void addNonMatchingResult(Collection<Object> left, Map<String,PojoUtils.Getter> leftGettersStream, List<Object> result) { @@ -57,7 +63,15 @@ public class PojoLeftOuterJoin<InputT1, InputT2> } catch (Throwable e) { throw Throwables.propagate(e); } - setObjectForResult(leftGettersStream, lObj, o); + if (outputToInputMap != null) { + for (Map.Entry<String, KeyValPair<STREAM,String>> entry : outputToInputMap.entrySet()) { + if (entry.getValue().getKey() == STREAM.LEFT) { + setters.get(entry.getKey()).set(o, leftGettersStream.get(entry.getValue().getValue()).get(lObj)); + } + } + } else { + setObjectForResult(leftGettersStream, lObj, o); + } result.add(o); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e29b7c6d/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java index 2d30346..9d22229 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java @@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability; import com.google.common.base.Throwables; import com.google.common.collect.Multimap; +import com.datatorrent.lib.util.KeyValPair; import com.datatorrent.lib.util.PojoUtils; /** @@ -47,6 +48,11 @@ public class PojoRightOuterJoin<InputT1, InputT2> super(outClass,leftKeys,rightKeys); } + public PojoRightOuterJoin(Class<?> outClass, String[] leftKeys, String[] rightKeys, Map<String, KeyValPair<STREAM, String>> outputToInputMap) + { + super(outClass,leftKeys,rightKeys, outputToInputMap); + } + @Override public void addNonMatchingResult(Collection<Object> left, Map<String,PojoUtils.Getter> leftGettersStream, List<Object> result) { @@ -57,7 +63,15 @@ public class PojoRightOuterJoin<InputT1, InputT2> } catch (Throwable e) { throw Throwables.propagate(e); } - setObjectForResult(leftGettersStream, lObj, o); + if (outputToInputMap != null) { + for (Map.Entry<String, KeyValPair<STREAM,String>> entry : outputToInputMap.entrySet()) { + if (entry.getValue().getKey() == STREAM.RIGHT) { + setters.get(entry.getKey()).set(o, leftGettersStream.get(entry.getValue().getValue()).get(lObj)); + } + } + } else { + setObjectForResult(leftGettersStream, lObj, o); + } result.add(o); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e29b7c6d/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java index 483ffdd..0861ca6 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java @@ -27,6 +27,8 @@ import org.junit.Test; import com.google.common.collect.Multimap; +import com.datatorrent.lib.util.KeyValPair; + /** * Test for {@link PojoInnerJoin}. */ @@ -288,4 +290,34 @@ public class PojoInnerJoinTest Assert.assertEquals("Josh", testOutClass.getUName()); Assert.assertEquals(12, testOutClass.getAge()); } + + @Test + public void PojoInnerJoinTestWithMap() + { + String[] leftKeys = {"uId", "uName"}; + String[] rightKeys = {"uId", "uNickName"}; + Map<String,KeyValPair<AbstractPojoJoin.STREAM, String>> outputInputMap = new HashMap<>(); + outputInputMap.put("uId",new KeyValPair<>(AbstractPojoJoin.STREAM.LEFT,"uId")); + outputInputMap.put("age",new KeyValPair<>(AbstractPojoJoin.STREAM.RIGHT,"age")); + PojoInnerJoin<TestPojo1, TestPojo3> pij = new PojoInnerJoin<>(TestOutMultipleKeysClass.class, leftKeys, rightKeys, outputInputMap); + + List<Multimap<List<Object>, Object>> accu = pij.defaultAccumulatedValue(); + + Assert.assertEquals(2, accu.size()); + + accu = pij.accumulate(accu, new TestPojo1(1, "Josh")); + accu = pij.accumulate(accu, new TestPojo1(2, "Bob")); + + accu = pij.accumulate2(accu, new TestPojo3(1, "Josh", 12)); + accu = pij.accumulate2(accu, new TestPojo3(3, "ECE", 13)); + + Assert.assertEquals(1, pij.getOutput(accu).size()); + + Object o = pij.getOutput(accu).get(0); + Assert.assertTrue(o instanceof TestOutMultipleKeysClass); + TestOutMultipleKeysClass testOutClass = (TestOutMultipleKeysClass)o; + Assert.assertEquals(1, testOutClass.getUId()); + Assert.assertEquals(null, testOutClass.getUName()); + Assert.assertEquals(12, testOutClass.getAge()); + } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e29b7c6d/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java index fd9d29b..06d1f2d 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java @@ -18,8 +18,10 @@ */ package org.apache.apex.malhar.lib.window.accumulation; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import org.junit.Assert; @@ -27,6 +29,8 @@ import org.junit.Test; import com.google.common.collect.Multimap; +import com.datatorrent.lib.util.KeyValPair; + /** * Test for POJO outer join accumulations */ @@ -312,4 +316,126 @@ public class PojoOuterJoinTest } Assert.assertEquals(3,checkMap.size()); } + + @Test + public void PojoLeftOuterJoinTestWithMap() + { + String[] leftKeys = {"uId", "uName"}; + String[] rightKeys = {"uId", "uNickName"}; + Map<String,KeyValPair<AbstractPojoJoin.STREAM, String>> outputInputMap = new HashMap<>(); + outputInputMap.put("uId",new KeyValPair<>(AbstractPojoJoin.STREAM.LEFT,"uId")); + outputInputMap.put("age",new KeyValPair<>(AbstractPojoJoin.STREAM.RIGHT,"age")); + PojoLeftOuterJoin<TestPojo1, TestPojo3> pij = new PojoLeftOuterJoin<>(TestOutClass.class, leftKeys, rightKeys, outputInputMap); + + List<Multimap<List<Object>, Object>> accu = pij.defaultAccumulatedValue(); + + Assert.assertEquals(2, accu.size()); + + accu = pij.accumulate(accu, new TestPojo1(1, "Josh")); + accu = pij.accumulate(accu, new TestPojo1(2, "Bob")); + + accu = pij.accumulate2(accu, new TestPojo3(1, "Josh", 12)); + accu = pij.accumulate2(accu, new TestPojo3(3, "ECE", 13)); + + List result = pij.getOutput(accu); + Assert.assertEquals(2, result.size()); + Object o = result.get(0); + Assert.assertTrue(o instanceof TestOutClass); + TestOutClass testOutClass = (TestOutClass)o; + int uId = testOutClass.getUId(); + if (uId == 1) { + checkNameAge(null,12,testOutClass); + o = result.get(1); + Assert.assertTrue(o instanceof TestOutClass); + testOutClass = (TestOutClass)o; + uId = testOutClass.getUId(); + Assert.assertEquals(2, uId); + checkNameAge(null,0,testOutClass); + } else if (uId == 2) { + checkNameAge(null,0,testOutClass); + o = result.get(1); + Assert.assertTrue(o instanceof TestOutClass); + testOutClass = (TestOutClass)o; + uId = testOutClass.getUId(); + Assert.assertEquals(1, uId); + checkNameAge(null,12,testOutClass); + } + } + + @Test + public void PojoRightOuterJoinTestWithMap() + { + String[] leftKeys = {"uId", "uName"}; + String[] rightKeys = {"uId", "uNickName"}; + Map<String,KeyValPair<AbstractPojoJoin.STREAM, String>> outputInputMap = new HashMap<>(); + outputInputMap.put("uId",new KeyValPair<>(AbstractPojoJoin.STREAM.LEFT,"uId")); + outputInputMap.put("age",new KeyValPair<>(AbstractPojoJoin.STREAM.RIGHT,"age")); + PojoRightOuterJoin<TestPojo1, TestPojo3> pij = new PojoRightOuterJoin<>(TestOutClass.class, leftKeys, rightKeys, outputInputMap); + + List<Multimap<List<Object>, Object>> accu = pij.defaultAccumulatedValue(); + + Assert.assertEquals(2, accu.size()); + + accu = pij.accumulate(accu, new TestPojo1(1, "Josh")); + accu = pij.accumulate(accu, new TestPojo1(2, "Bob")); + + accu = pij.accumulate2(accu, new TestPojo3(1, "Josh", 12)); + accu = pij.accumulate2(accu, new TestPojo3(3, "Bob", 13)); + + List result = pij.getOutput(accu); + Assert.assertEquals(2, result.size()); + Object o = result.get(0); + Assert.assertTrue(o instanceof TestOutClass); + TestOutClass testOutClass = (TestOutClass)o; + int uId = testOutClass.getUId(); + if (uId == 1) { + checkNameAge(null,12,testOutClass); + o = result.get(1); + Assert.assertTrue(o instanceof TestOutClass); + testOutClass = (TestOutClass)o; + uId = testOutClass.getUId(); + Assert.assertEquals(0, uId); + checkNameAge(null,13,testOutClass); + } else if (uId == 0) { + checkNameAge(null,13,testOutClass); + o = result.get(1); + Assert.assertTrue(o instanceof TestOutClass); + testOutClass = (TestOutClass)o; + uId = testOutClass.getUId(); + Assert.assertEquals(1, uId); + checkNameAge(null,12,testOutClass); + } + } + + @Test + public void PojoFullOuterJoinTestWithMap() + { + String[] leftKeys = {"uId", "uName"}; + String[] rightKeys = {"uId", "uNickName"}; + Map<String,KeyValPair<AbstractPojoJoin.STREAM, String>> outputInputMap = new HashMap<>(); + outputInputMap.put("uId",new KeyValPair<>(AbstractPojoJoin.STREAM.LEFT,"uId")); + outputInputMap.put("age",new KeyValPair<>(AbstractPojoJoin.STREAM.RIGHT,"age")); + PojoFullOuterJoin<TestPojo1, TestPojo3> pij = new PojoFullOuterJoin<>(TestOutClass.class, leftKeys, rightKeys, outputInputMap); + + List<Multimap<List<Object>, Object>> accu = pij.defaultAccumulatedValue(); + + Assert.assertEquals(2, accu.size()); + + accu = pij.accumulate(accu, new TestPojo1(1, "Josh")); + accu = pij.accumulate(accu, new TestPojo1(2, "Bob")); + + accu = pij.accumulate2(accu, new TestPojo3(1, "Josh", 12)); + accu = pij.accumulate2(accu, new TestPojo3(3, "Bob", 13)); + + Assert.assertEquals(3, pij.getOutput(accu).size()); + Set<Integer> checkMap = new HashSet<>(); + for ( int i = 0; i < 3; i++ ) { + Object o = pij.getOutput(accu).get(i); + Assert.assertTrue(o instanceof TestOutClass); + TestOutClass testOutClass = (TestOutClass)o; + int uId = testOutClass.getUId(); + checkMap.add(uId); + } + Assert.assertEquals(3,checkMap.size()); + } }
