Repository: apex-malhar Updated Branches: refs/heads/master 23061c224 -> ef8e64ffe
APEXMALHAR-2415 Taking join on multiple columns in PojoInnerJoin Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/545576d5 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/545576d5 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/545576d5 Branch: refs/heads/master Commit: 545576d5d2ca3e393e6931c98b28a2f02d8958ad Parents: dd5341f Author: Hitesh-Scorpio <[email protected]> Authored: Fri Feb 24 13:45:58 2017 +0530 Committer: Hitesh-Scorpio <[email protected]> Committed: Mon Feb 27 12:19:09 2017 +0530 ---------------------------------------------------------------------- .../lib/window/accumulation/PojoInnerJoin.java | 39 +++--- .../window/accumulation/PojoInnerJoinTest.java | 125 ++++++++++++++++--- 2 files changed, 130 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/545576d5/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java index a5b1117..1aa55c2 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java @@ -23,8 +23,10 @@ import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.apex.malhar.lib.window.MergeAccumulation; import org.apache.commons.lang3.ClassUtils; @@ -47,6 +49,7 @@ public class PojoInnerJoin<InputT1, InputT2> private transient List<KeyValPair<String,PojoUtils.Getter>> gettersStream1; private transient List<KeyValPair<String,PojoUtils.Getter>> gettersStream2; private transient List<KeyValPair<String,PojoUtils.Setter>> setters; + private transient Set<String> keySetStream2; public PojoInnerJoin() { @@ -56,7 +59,7 @@ public class PojoInnerJoin<InputT1, InputT2> public PojoInnerJoin(int num, Class<?> outClass, String... keys) { - if (keys.length != 2) { + if (keys.length % 2 != 0) { throw new IllegalArgumentException("Wrong number of keys."); } @@ -170,14 +173,17 @@ public class PojoInnerJoin<InputT1, InputT2> public List<?> getOutput(List<List<Map<String, Object>>> accumulatedValue) { List<Map<String, Object>> result = new ArrayList<>(); - - // TODO: May need to revisit (use state manager). - result = getAllCombo(0, accumulatedValue, result, null); - if (setters == null) { createSetters(); + keySetStream2 = new HashSet<>(); + for (int i = 0; i < keys.length; i = i + 2) { + keySetStream2.add(keys[i + 1]); + } } + // TODO: May need to revisit (use state manager). + result = getAllCombo(0, accumulatedValue, result, null); + List<Object> out = new ArrayList<>(); for (Map<String, Object> resultMap : result) { Object o; @@ -212,7 +218,7 @@ public class PojoInnerJoin<InputT1, InputT2> return result; } else { Map<String, Object> tempMap = new HashMap<>(curMap); - tempMap = joinTwoMapsWithKeys(tempMap, keys[0], map, keys[streamIndex]); + tempMap = joinTwoMapsWithKeys(tempMap, map); result = getAllCombo(streamIndex + 1, accu, result, tempMap); } } @@ -220,18 +226,21 @@ public class PojoInnerJoin<InputT1, InputT2> } } - private Map<String, Object> joinTwoMapsWithKeys(Map<String, Object> map1, String key1, Map<String, Object> map2, String key2) + private Map<String, Object> joinTwoMapsWithKeys(Map<String, Object> map1, Map<String, Object> map2) { - if (!map1.get(key1).equals(map2.get(key2))) { - return null; - } else { - for (String field : map2.keySet()) { - if (!field.equals(key2)) { - map1.put(field, map2.get(field)); - } + for (int i = 0; i < keys.length; i = i + 2) { + String key1 = keys[i]; + String key2 = keys[i + 1]; + if (!map1.get(key1).equals(map2.get(key2))) { + return null; + } + } + for (String field : map2.keySet()) { + if (!keySetStream2.contains(field)) { + map1.put(field, map2.get(field)); } - return map1; } + return map1; } @Override http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/545576d5/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java index 47c7307..fbbb5b1 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java @@ -68,20 +68,22 @@ public class PojoInnerJoinTest } } - public static class TestPojo2 + public static class TestPojo3 { private int uId; - private String dep; + private String uNickName; + private int age; - public TestPojo2() + public TestPojo3() { } - public TestPojo2(int id, String dep) + public TestPojo3(int id, String name, int age) { this.uId = id; - this.dep = dep; + this.uNickName = name; + this.age = age; } public int getUId() @@ -94,22 +96,81 @@ public class PojoInnerJoinTest this.uId = uId; } - public String getDep() + public String getUNickName() { - return dep; + return uNickName; } - public void setDep(String dep) + public void setUNickName(String uNickName) { - this.dep = dep; + this.uNickName = uNickName; + } + + public int getAge() + { + return age; + } + + public void setAge(int age) + { + this.age = age; } } + public static class TestOutClass { private int uId; private String uName; - private String dep; + private String uNickName; + private int age; + + public int getUId() + { + return uId; + } + + public void setUId(int uId) + { + this.uId = uId; + } + + public String getUName() + { + return uName; + } + + public void setUName(String uName) + { + this.uName = uName; + } + + public String getUNickName() + { + return uNickName; + } + + public void setUNickName(String uNickName) + { + this.uNickName = uNickName; + } + + public int getAge() + { + return age; + } + + public void setAge(int age) + { + this.age = age; + } + } + + public static class TestOutMultipleKeysClass + { + private int uId; + private String uName; + private int age; public int getUId() { @@ -131,14 +192,14 @@ public class PojoInnerJoinTest this.uName = uName; } - public String getDep() + public int getAge() { - return dep; + return age; } - public void setDep(String dep) + public void setAge(int age) { - this.dep = dep; + this.age = age; } } @@ -146,7 +207,7 @@ public class PojoInnerJoinTest @Test public void PojoInnerJoinTest() { - PojoInnerJoin<TestPojo1, TestPojo2> pij = new PojoInnerJoin<>(2, TestOutClass.class, "uId", "uId"); + PojoInnerJoin<TestPojo1, TestPojo3> pij = new PojoInnerJoin<>(2, TestOutClass.class, "uId", "uId"); List<List<Map<String, Object>>> accu = pij.defaultAccumulatedValue(); @@ -155,13 +216,14 @@ public class PojoInnerJoinTest accu = pij.accumulate(accu, new TestPojo1(1, "Josh")); accu = pij.accumulate(accu, new TestPojo1(2, "Bob")); - accu = pij.accumulate2(accu, new TestPojo2(1, "CS")); - accu = pij.accumulate2(accu, new TestPojo2(3, "ECE")); + accu = pij.accumulate2(accu, new TestPojo3(1, "NickJosh", 12)); + accu = pij.accumulate2(accu, new TestPojo3(3, "NickBob", 13)); Map<String, Object> result = new HashMap<>(); result.put("uId", 1); result.put("uName", "Josh"); - result.put("dep", "CS"); + result.put("uNickName", "NickJosh"); + result.put("age", 12); Assert.assertEquals(1, pij.getOutput(accu).size()); @@ -170,6 +232,31 @@ public class PojoInnerJoinTest TestOutClass testOutClass = (TestOutClass)o; Assert.assertEquals(1, testOutClass.getUId()); Assert.assertEquals("Josh", testOutClass.getUName()); - Assert.assertEquals("CS", testOutClass.getDep()); + Assert.assertEquals(12, testOutClass.getAge()); + } + + @Test + public void PojoInnerJoinTestMultipleKeys() + { + PojoInnerJoin<TestPojo1, TestPojo3> pij = new PojoInnerJoin<>(2, TestOutMultipleKeysClass.class, "uId", "uId", "uName", "uNickName"); + + List<List<Map<String, Object>>> accu = pij.defaultAccumulatedValue(); + + Assert.assertEquals(2, accu.size()); + + accu = pij.accumulate(accu, new TestPojo1(1, "Josh")); + accu = pij.accumulate(accu, new TestPojo1(2, "Bob")); + + accu = pij.accumulate2(accu, new TestPojo3(1, "Josh", 12)); + accu = pij.accumulate2(accu, new TestPojo3(3, "ECE", 13)); + + Assert.assertEquals(1, pij.getOutput(accu).size()); + + Object o = pij.getOutput(accu).get(0); + Assert.assertTrue(o instanceof TestOutMultipleKeysClass); + TestOutMultipleKeysClass testOutClass = (TestOutMultipleKeysClass)o; + Assert.assertEquals(1, testOutClass.getUId()); + Assert.assertEquals("Josh", testOutClass.getUName()); + Assert.assertEquals(12, testOutClass.getAge()); } }
