Repository: apex-malhar Updated Branches: refs/heads/master 032c6672f -> 6dcd82120
APEXMALHAR-2429 Ambiguity in passing key parameter to Join accumulation Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/6dcd8212 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/6dcd8212 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/6dcd8212 Branch: refs/heads/master Commit: 6dcd821208ce33cf5a0fd9472e882e17cf524731 Parents: 032c667 Author: Hitesh-Scorpio <[email protected]> Authored: Mon Mar 6 16:10:45 2017 +0530 Committer: Hitesh-Scorpio <[email protected]> Committed: Wed Mar 8 11:02:43 2017 +0530 ---------------------------------------------------------------------- .../window/accumulation/AbstractPojoJoin.java | 35 +++++++++++--------- .../window/accumulation/PojoFullOuterJoin.java | 5 +-- .../lib/window/accumulation/PojoInnerJoin.java | 16 ++++++++- .../window/accumulation/PojoLeftOuterJoin.java | 6 ++-- .../window/accumulation/PojoRightOuterJoin.java | 6 ++-- .../window/accumulation/PojoInnerJoinTest.java | 27 +++++++++++++++ .../window/accumulation/PojoOuterJoinTest.java | 14 ++++++-- 7 files changed, 82 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6dcd8212/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java index 354dd90..1634cb9 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java @@ -20,7 +20,6 @@ package org.apache.apex.malhar.lib.window.accumulation; import java.lang.reflect.Field; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -43,30 +42,34 @@ import com.datatorrent.lib.util.PojoUtils; public abstract class AbstractPojoJoin<InputT1, InputT2> implements MergeAccumulation<InputT1, InputT2, List<List<Map<String, Object>>>, List<?>> { - protected final String[] keys; - protected final Class<?> outClass; + protected String[] keys; + protected Class<?> outClass; private transient Map<String,PojoUtils.Getter> gettersStream1; private transient Map<String,PojoUtils.Getter> gettersStream2; private transient Map<String,PojoUtils.Setter> setters; protected transient Set<String> keySetStream2; protected transient Set<String> keySetStream1; + protected transient String[] leftKeys; + protected transient String[] rightKeys; public AbstractPojoJoin() { - keys = new String[]{}; + leftKeys = new String[]{}; + rightKeys = new String[]{}; outClass = null; } - public AbstractPojoJoin(Class<?> outClass, String... keys) + public AbstractPojoJoin(Class<?> outClass, String[] leftKeys, String[] rightKeys) { - if (keys.length % 2 != 0) { - throw new IllegalArgumentException("Wrong number of keys."); + if (leftKeys.length != rightKeys.length) { + throw new IllegalArgumentException("Number of keys in left stream should match in right stream"); } - - this.keys = Arrays.copyOf(keys, keys.length); + this.leftKeys = leftKeys; + this.rightKeys = rightKeys; this.outClass = outClass; } + private void createSetters() { Field[] fields = outClass.getDeclaredFields(); @@ -174,9 +177,10 @@ public abstract class AbstractPojoJoin<InputT1, InputT2> createSetters(); keySetStream2 = new HashSet<>(); keySetStream1 = new HashSet<>(); - for (int i = 0; i < keys.length; i = i + 2) { - keySetStream1.add(keys[i]); - keySetStream2.add(keys[i + 1]); + int lIndex = getLeftStreamIndex(); + for (int i = 0; i < leftKeys.length; i++) { + keySetStream1.add(lIndex == 0 ? leftKeys[i] : rightKeys[i]); + keySetStream2.add(lIndex == 1 ? leftKeys[i] : rightKeys[i]); } } @@ -232,9 +236,10 @@ public abstract class AbstractPojoJoin<InputT1, InputT2> public Map<String, Object> joinTwoMapsWithKeys(Map<String, Object> map1, Map<String, Object> map2) { - for (int i = 0; i < keys.length; i = i + 2) { - String key1 = keys[i]; - String key2 = keys[i + 1]; + int lIndex = getLeftStreamIndex(); + for (int i = 0; i < leftKeys.length; i++) { + String key1 = lIndex == 0 ? leftKeys[i] : rightKeys[i]; + String key2 = lIndex == 1 ? leftKeys[i] : rightKeys[i]; if (!map1.get(key1).equals(map2.get(key2))) { return null; } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6dcd8212/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java index 8ad0467..61b37f3 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java @@ -39,11 +39,12 @@ public class PojoFullOuterJoin<InputT1, InputT2> super(); } - public PojoFullOuterJoin(int num, Class<?> outClass, String... keys) + public PojoFullOuterJoin(Class<?> outClass, String[] leftKeys, String[] rightKeys) { - super(outClass,keys); + super(outClass,leftKeys,rightKeys); } + @Override public void addNonMatchingResult(List result, Map requiredMap, Set nullFields) { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6dcd8212/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java index ceb17dd..3654e6a 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java @@ -35,10 +35,24 @@ public class PojoInnerJoin<InputT1, InputT2> super(); } + @Deprecated public PojoInnerJoin(int num, Class<?> outClass, String... keys) { - super(outClass,keys); + this.outClass = outClass; + if (keys.length % 2 != 0) { + throw new IllegalArgumentException("Wrong number of keys."); + } + this.leftKeys = new String[keys.length / 2]; + this.rightKeys = new String[keys.length / 2]; + for (int i = 0,j = 0; i < keys.length; i = i + 2, j++) { + this.leftKeys[j] = keys[i]; + this.rightKeys[j] = keys[i + 1]; + } + } + public PojoInnerJoin(Class<?> outClass, String[] leftKeys, String[] rightKeys) + { + super(outClass,leftKeys,rightKeys); } @Override http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6dcd8212/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java index 0ee3e00..c6e899c 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java @@ -37,12 +37,12 @@ public class PojoLeftOuterJoin<InputT1, InputT2> super(); } - public PojoLeftOuterJoin(int num, Class<?> outClass, String... keys) + public PojoLeftOuterJoin(Class<?> outClass, String[] leftKeys, String[] rightKeys) { - super(outClass,keys); - + super(outClass,leftKeys,rightKeys); } + @Override public void addNonMatchingResult(List result, Map requiredMap, Set nullFields) { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6dcd8212/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java index 60b0252..b87d4bd 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java @@ -37,12 +37,12 @@ public class PojoRightOuterJoin<InputT1, InputT2> super(); } - public PojoRightOuterJoin(int num, Class<?> outClass, String... keys) + public PojoRightOuterJoin(Class<?> outClass, String[] leftKeys, String[] rightKeys) { - super(outClass,keys); - + super(outClass,leftKeys,rightKeys); } + @Override public void addNonMatchingResult(List result, Map requiredMap, Set nullFields) { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6dcd8212/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java index fbbb5b1..377a684 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java @@ -259,4 +259,31 @@ public class PojoInnerJoinTest Assert.assertEquals("Josh", testOutClass.getUName()); Assert.assertEquals(12, testOutClass.getAge()); } + + @Test + public void PojoInnerJoinTestSeparateLeftAndRightKeys() + { + String[] leftKeys = {"uId", "uName"}; + String[] rightKeys = {"uId", "uNickName"}; + PojoInnerJoin<TestPojo1, TestPojo3> pij = new PojoInnerJoin<>(TestOutMultipleKeysClass.class, leftKeys, rightKeys); + + List<List<Map<String, Object>>> accu = pij.defaultAccumulatedValue(); + + Assert.assertEquals(2, accu.size()); + + accu = pij.accumulate(accu, new TestPojo1(1, "Josh")); + accu = pij.accumulate(accu, new TestPojo1(2, "Bob")); + + accu = pij.accumulate2(accu, new TestPojo3(1, "Josh", 12)); + accu = pij.accumulate2(accu, new TestPojo3(3, "ECE", 13)); + + Assert.assertEquals(1, pij.getOutput(accu).size()); + + Object o = pij.getOutput(accu).get(0); + Assert.assertTrue(o instanceof TestOutMultipleKeysClass); + TestOutMultipleKeysClass testOutClass = (TestOutMultipleKeysClass)o; + Assert.assertEquals(1, testOutClass.getUId()); + Assert.assertEquals("Josh", testOutClass.getUName()); + Assert.assertEquals(12, testOutClass.getAge()); + } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6dcd8212/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java index aaa7de3..e3f5bb7 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java @@ -206,7 +206,9 @@ public class PojoOuterJoinTest @Test public void PojoLeftOuterJoinTest() { - PojoLeftOuterJoin<TestPojo1, TestPojo3> pij = new PojoLeftOuterJoin<>(2, TestOutClass.class, "uId", "uId"); + String[] leftKeys = {"uId"}; + String[] rightKeys = {"uId"}; + PojoLeftOuterJoin<TestPojo1, TestPojo3> pij = new PojoLeftOuterJoin<>(TestOutClass.class, leftKeys, rightKeys); List<List<Map<String, Object>>> accu = pij.defaultAccumulatedValue(); @@ -231,7 +233,10 @@ public class PojoOuterJoinTest @Test public void PojoRightOuterJoinTest() { - PojoRightOuterJoin<TestPojo1, TestPojo3> pij = new PojoRightOuterJoin<>(2, TestOutClass.class, "uId", "uId"); + String[] leftKeys = {"uId"}; + String[] rightKeys = {"uId"}; + PojoRightOuterJoin<TestPojo1, TestPojo3> pij = new PojoRightOuterJoin<>(TestOutClass.class, leftKeys, rightKeys); + List<List<Map<String, Object>>> accu = pij.defaultAccumulatedValue(); @@ -256,7 +261,10 @@ public class PojoOuterJoinTest @Test public void PojoFullOuterJoinTest() { - PojoFullOuterJoin<TestPojo1, TestPojo3> pij = new PojoFullOuterJoin<>(2, TestOutClass.class, "uId", "uId"); + String[] leftKeys = {"uId"}; + String[] rightKeys = {"uId"}; + PojoFullOuterJoin<TestPojo1, TestPojo3> pij = new PojoFullOuterJoin<>(TestOutClass.class, leftKeys, rightKeys); + List<List<Map<String, Object>>> accu = pij.defaultAccumulatedValue();
