Repository: apex-malhar
Updated Branches:
  refs/heads/master 23061c224 -> ef8e64ffe


APEXMALHAR-2415 Taking join on multiple columns in PojoInnerJoin


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/545576d5
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/545576d5
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/545576d5

Branch: refs/heads/master
Commit: 545576d5d2ca3e393e6931c98b28a2f02d8958ad
Parents: dd5341f
Author: Hitesh-Scorpio <[email protected]>
Authored: Fri Feb 24 13:45:58 2017 +0530
Committer: Hitesh-Scorpio <[email protected]>
Committed: Mon Feb 27 12:19:09 2017 +0530

----------------------------------------------------------------------
 .../lib/window/accumulation/PojoInnerJoin.java  |  39 +++---
 .../window/accumulation/PojoInnerJoinTest.java  | 125 ++++++++++++++++---
 2 files changed, 130 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/545576d5/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
index a5b1117..1aa55c2 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
@@ -23,8 +23,10 @@ import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.apex.malhar.lib.window.MergeAccumulation;
 import org.apache.commons.lang3.ClassUtils;
@@ -47,6 +49,7 @@ public class PojoInnerJoin<InputT1, InputT2>
   private transient List<KeyValPair<String,PojoUtils.Getter>>  gettersStream1;
   private transient List<KeyValPair<String,PojoUtils.Getter>>  gettersStream2;
   private transient List<KeyValPair<String,PojoUtils.Setter>>  setters;
+  private transient Set<String> keySetStream2;
 
   public PojoInnerJoin()
   {
@@ -56,7 +59,7 @@ public class PojoInnerJoin<InputT1, InputT2>
 
   public PojoInnerJoin(int num, Class<?> outClass, String... keys)
   {
-    if (keys.length != 2) {
+    if (keys.length % 2 != 0) {
       throw new IllegalArgumentException("Wrong number of keys.");
     }
 
@@ -170,14 +173,17 @@ public class PojoInnerJoin<InputT1, InputT2>
   public List<?> getOutput(List<List<Map<String, Object>>> accumulatedValue)
   {
     List<Map<String, Object>> result = new ArrayList<>();
-
-    // TODO: May need to revisit (use state manager).
-    result = getAllCombo(0, accumulatedValue, result, null);
-
     if (setters == null) {
       createSetters();
+      keySetStream2 = new HashSet<>();
+      for (int i = 0; i < keys.length; i = i + 2) {
+        keySetStream2.add(keys[i + 1]);
+      }
     }
 
+    // TODO: May need to revisit (use state manager).
+    result = getAllCombo(0, accumulatedValue, result, null);
+
     List<Object> out = new ArrayList<>();
     for (Map<String, Object> resultMap : result) {
       Object o;
@@ -212,7 +218,7 @@ public class PojoInnerJoin<InputT1, InputT2>
           return result;
         } else {
           Map<String, Object> tempMap = new HashMap<>(curMap);
-          tempMap = joinTwoMapsWithKeys(tempMap, keys[0], map, 
keys[streamIndex]);
+          tempMap = joinTwoMapsWithKeys(tempMap, map);
           result = getAllCombo(streamIndex + 1, accu, result, tempMap);
         }
       }
@@ -220,18 +226,21 @@ public class PojoInnerJoin<InputT1, InputT2>
     }
   }
 
-  private Map<String, Object> joinTwoMapsWithKeys(Map<String, Object> map1, 
String key1, Map<String, Object> map2, String key2)
+  private Map<String, Object> joinTwoMapsWithKeys(Map<String, Object> map1, 
Map<String, Object> map2)
   {
-    if (!map1.get(key1).equals(map2.get(key2))) {
-      return null;
-    } else {
-      for (String field : map2.keySet()) {
-        if (!field.equals(key2)) {
-          map1.put(field, map2.get(field));
-        }
+    for (int i = 0; i < keys.length; i = i + 2) {
+      String key1 = keys[i];
+      String key2 = keys[i + 1];
+      if (!map1.get(key1).equals(map2.get(key2))) {
+        return null;
+      }
+    }
+    for (String field : map2.keySet()) {
+      if (!keySetStream2.contains(field)) {
+        map1.put(field, map2.get(field));
       }
-      return map1;
     }
+    return map1;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/545576d5/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
index 47c7307..fbbb5b1 100644
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
@@ -68,20 +68,22 @@ public class PojoInnerJoinTest
     }
   }
 
-  public static class TestPojo2
+  public static class TestPojo3
   {
     private int uId;
-    private String dep;
+    private String uNickName;
+    private int age;
 
-    public TestPojo2()
+    public TestPojo3()
     {
 
     }
 
-    public TestPojo2(int id, String dep)
+    public TestPojo3(int id, String name, int age)
     {
       this.uId = id;
-      this.dep = dep;
+      this.uNickName = name;
+      this.age = age;
     }
 
     public int getUId()
@@ -94,22 +96,81 @@ public class PojoInnerJoinTest
       this.uId = uId;
     }
 
-    public String getDep()
+    public String getUNickName()
     {
-      return dep;
+      return uNickName;
     }
 
-    public void setDep(String dep)
+    public void setUNickName(String uNickName)
     {
-      this.dep = dep;
+      this.uNickName = uNickName;
+    }
+
+    public int getAge()
+    {
+      return age;
+    }
+
+    public void setAge(int age)
+    {
+      this.age = age;
     }
   }
 
+
   public static class TestOutClass
   {
     private int uId;
     private String uName;
-    private String dep;
+    private String uNickName;
+    private int age;
+
+    public int getUId()
+    {
+      return uId;
+    }
+
+    public void setUId(int uId)
+    {
+      this.uId = uId;
+    }
+
+    public String getUName()
+    {
+      return uName;
+    }
+
+    public void setUName(String uName)
+    {
+      this.uName = uName;
+    }
+
+    public String getUNickName()
+    {
+      return uNickName;
+    }
+
+    public void setUNickName(String uNickName)
+    {
+      this.uNickName = uNickName;
+    }
+
+    public int getAge()
+    {
+      return age;
+    }
+
+    public void setAge(int age)
+    {
+      this.age = age;
+    }
+  }
+
+  public static class TestOutMultipleKeysClass
+  {
+    private int uId;
+    private String uName;
+    private int age;
 
     public int getUId()
     {
@@ -131,14 +192,14 @@ public class PojoInnerJoinTest
       this.uName = uName;
     }
 
-    public String getDep()
+    public int getAge()
     {
-      return dep;
+      return age;
     }
 
-    public void setDep(String dep)
+    public void setAge(int age)
     {
-      this.dep = dep;
+      this.age = age;
     }
   }
 
@@ -146,7 +207,7 @@ public class PojoInnerJoinTest
   @Test
   public void PojoInnerJoinTest()
   {
-    PojoInnerJoin<TestPojo1, TestPojo2> pij = new PojoInnerJoin<>(2, 
TestOutClass.class, "uId", "uId");
+    PojoInnerJoin<TestPojo1, TestPojo3> pij = new PojoInnerJoin<>(2, 
TestOutClass.class, "uId", "uId");
 
     List<List<Map<String, Object>>> accu = pij.defaultAccumulatedValue();
 
@@ -155,13 +216,14 @@ public class PojoInnerJoinTest
     accu = pij.accumulate(accu, new TestPojo1(1, "Josh"));
     accu = pij.accumulate(accu, new TestPojo1(2, "Bob"));
 
-    accu = pij.accumulate2(accu, new TestPojo2(1, "CS"));
-    accu = pij.accumulate2(accu, new TestPojo2(3, "ECE"));
+    accu = pij.accumulate2(accu, new TestPojo3(1, "NickJosh", 12));
+    accu = pij.accumulate2(accu, new TestPojo3(3, "NickBob", 13));
 
     Map<String, Object> result = new HashMap<>();
     result.put("uId", 1);
     result.put("uName", "Josh");
-    result.put("dep", "CS");
+    result.put("uNickName", "NickJosh");
+    result.put("age", 12);
 
     Assert.assertEquals(1, pij.getOutput(accu).size());
 
@@ -170,6 +232,31 @@ public class PojoInnerJoinTest
     TestOutClass testOutClass = (TestOutClass)o;
     Assert.assertEquals(1, testOutClass.getUId());
     Assert.assertEquals("Josh", testOutClass.getUName());
-    Assert.assertEquals("CS", testOutClass.getDep());
+    Assert.assertEquals(12, testOutClass.getAge());
+  }
+
+  @Test
+  public void PojoInnerJoinTestMultipleKeys()
+  {
+    PojoInnerJoin<TestPojo1, TestPojo3> pij = new PojoInnerJoin<>(2, 
TestOutMultipleKeysClass.class, "uId", "uId", "uName", "uNickName");
+
+    List<List<Map<String, Object>>> accu = pij.defaultAccumulatedValue();
+
+    Assert.assertEquals(2, accu.size());
+
+    accu = pij.accumulate(accu, new TestPojo1(1, "Josh"));
+    accu = pij.accumulate(accu, new TestPojo1(2, "Bob"));
+
+    accu = pij.accumulate2(accu, new TestPojo3(1, "Josh", 12));
+    accu = pij.accumulate2(accu, new TestPojo3(3, "ECE", 13));
+
+    Assert.assertEquals(1, pij.getOutput(accu).size());
+
+    Object o = pij.getOutput(accu).get(0);
+    Assert.assertTrue(o instanceof TestOutMultipleKeysClass);
+    TestOutMultipleKeysClass testOutClass = (TestOutMultipleKeysClass)o;
+    Assert.assertEquals(1, testOutClass.getUId());
+    Assert.assertEquals("Josh", testOutClass.getUName());
+    Assert.assertEquals(12, testOutClass.getAge());
   }
 }

Reply via email to