Repository: apex-malhar
Updated Branches:
  refs/heads/master bc464ab55 -> 22c65c4c0


APEXMALHAR-2409 PojoInnerJoin accumulation to emit a POJO


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/22c65c4c
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/22c65c4c
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/22c65c4c

Branch: refs/heads/master
Commit: 22c65c4c0c402d39e6a50ae77ba2f04b9bc657b1
Parents: bc464ab
Author: Chinmay Kolhatkar <[email protected]>
Authored: Fri Feb 17 11:35:30 2017 +0530
Committer: Chinmay Kolhatkar <[email protected]>
Committed: Fri Feb 17 13:37:33 2017 +0530

----------------------------------------------------------------------
 .../lib/window/accumulation/PojoInnerJoin.java  | 39 +++++++++++++---
 .../window/accumulation/PojoInnerJoinTest.java  | 48 ++++++++++++++++++--
 .../impl/PojoInnerJoinTestApplication.java      |  2 +-
 3 files changed, 78 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/22c65c4c/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
index 9c9733e..1872d19 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
@@ -36,22 +36,25 @@ import com.google.common.base.Throwables;
  * @since 3.6.0
  */
 public class PojoInnerJoin<InputT1, InputT2>
-    implements MergeAccumulation<InputT1, InputT2, List<List<Map<String, 
Object>>>, List<Map<String, Object>>>
+    implements MergeAccumulation<InputT1, InputT2, List<List<Map<String, 
Object>>>, List<?>>
 {
-  protected String[] keys;
+  protected final String[] keys;
+  protected final Class<?> outClass;
 
   public PojoInnerJoin()
   {
-    // for kyro
+    keys = new String[]{};
+    outClass = null;
   }
 
-  public PojoInnerJoin(int num, String... keys)
+  public PojoInnerJoin(int num, Class<?> outClass, String... keys)
   {
     if (keys.length != 2) {
       throw new IllegalArgumentException("Wrong number of keys.");
     }
 
     this.keys = Arrays.copyOf(keys, keys.length);
+    this.outClass = outClass;
   }
 
   @Override
@@ -132,14 +135,36 @@ public class PojoInnerJoin<InputT1, InputT2>
   }
 
   @Override
-  public List<Map<String, Object>> getOutput(List<List<Map<String, Object>>> 
accumulatedValue)
+  public List<?> getOutput(List<List<Map<String, Object>>> accumulatedValue)
   {
     List<Map<String, Object>> result = new ArrayList<>();
 
     // TODO: May need to revisit (use state manager).
     result = getAllCombo(0, accumulatedValue, result, null);
 
-    return result;
+    List<Object> out = new ArrayList<>();
+    for (Map<String, Object> resultMap : result) {
+      Object o;
+      try {
+        o = outClass.newInstance();
+      } catch (Throwable e) {
+        throw Throwables.propagate(e);
+      }
+
+      for (Map.Entry<String, Object> entry : resultMap.entrySet()) {
+        Field f;
+        try {
+          f = outClass.getDeclaredField(entry.getKey());
+          f.setAccessible(true);
+          f.set(o, entry.getValue());
+        } catch (NoSuchFieldException | IllegalAccessException e) {
+          throw Throwables.propagate(e);
+        }
+      }
+      out.add(o);
+    }
+
+    return out;
   }
 
 
@@ -182,7 +207,7 @@ public class PojoInnerJoin<InputT1, InputT2>
   }
 
   @Override
-  public List<Map<String, Object>> getRetraction(List<Map<String, Object>> 
value)
+  public List<?> getRetraction(List<?> value)
   {
     return null;
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/22c65c4c/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
index 63690d1..47ce815 100644
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
@@ -105,11 +105,48 @@ public class PojoInnerJoinTest
     }
   }
 
+  public static class TestOutClass
+  {
+    private int uId;
+    private String uName;
+    private String dep;
+
+    public int getuId()
+    {
+      return uId;
+    }
+
+    public void setuId(int uId)
+    {
+      this.uId = uId;
+    }
+
+    public String getuName()
+    {
+      return uName;
+    }
+
+    public void setuName(String uName)
+    {
+      this.uName = uName;
+    }
+
+    public String getDep()
+    {
+      return dep;
+    }
+
+    public void setDep(String dep)
+    {
+      this.dep = dep;
+    }
+  }
+
 
   @Test
   public void PojoInnerJoinTest()
   {
-    PojoInnerJoin<TestPojo1, TestPojo2> pij = new PojoInnerJoin<>(2, "uId", 
"uId");
+    PojoInnerJoin<TestPojo1, TestPojo2> pij = new PojoInnerJoin<>(2, 
TestOutClass.class, "uId", "uId");
 
     List<List<Map<String, Object>>> accu = pij.defaultAccumulatedValue();
 
@@ -127,7 +164,12 @@ public class PojoInnerJoinTest
     result.put("dep", "CS");
 
     Assert.assertEquals(1, pij.getOutput(accu).size());
-    Assert.assertEquals(result, pij.getOutput(accu).get(0));
-  }
 
+    Object o = pij.getOutput(accu).get(0);
+    Assert.assertTrue(o instanceof TestOutClass);
+    TestOutClass testOutClass = (TestOutClass)o;
+    Assert.assertEquals(1, testOutClass.getuId());
+    Assert.assertEquals("Josh", testOutClass.getuName());
+    Assert.assertEquals("CS", testOutClass.getDep());
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/22c65c4c/library/src/test/java/org/apache/apex/malhar/lib/window/impl/PojoInnerJoinTestApplication.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/PojoInnerJoinTestApplication.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/PojoInnerJoinTestApplication.java
index 969e0fb..809023b 100644
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/PojoInnerJoinTestApplication.java
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/PojoInnerJoinTestApplication.java
@@ -375,7 +375,7 @@ public class PojoInnerJoinTestApplication implements 
StreamingApplication
     productGenerator.setSalesEvent(false);
     WindowedMergeOperatorImpl<POJOGenerator.SalesEvent, 
POJOGenerator.ProductEvent, List<Set<Object>>, List<List<Object>>> op
         = dag.addOperator("Merge", new 
WindowedMergeOperatorImpl<POJOGenerator.SalesEvent, POJOGenerator.ProductEvent, 
List<Set<Object>>, List<List<Object>>>());
-    op.setAccumulation(new PojoInnerJoin(2,"productId","productId"));
+    op.setAccumulation(new PojoInnerJoin(2, Object.class, 
"productId","productId"));
     op.setDataStorage(new InMemoryWindowedStorage<List<Set<Object>>>());
 
     WindowedStorage.WindowedPlainStorage<WindowState> windowStateMap = new 
InMemoryWindowedStorage<>();

Reply via email to