Repository: apex-malhar Updated Branches: refs/heads/master bc464ab55 -> 22c65c4c0
APEXMALHAR-2409 PojoInnerJoin accumulation to emit a POJO Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/22c65c4c Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/22c65c4c Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/22c65c4c Branch: refs/heads/master Commit: 22c65c4c0c402d39e6a50ae77ba2f04b9bc657b1 Parents: bc464ab Author: Chinmay Kolhatkar <[email protected]> Authored: Fri Feb 17 11:35:30 2017 +0530 Committer: Chinmay Kolhatkar <[email protected]> Committed: Fri Feb 17 13:37:33 2017 +0530 ---------------------------------------------------------------------- .../lib/window/accumulation/PojoInnerJoin.java | 39 +++++++++++++--- .../window/accumulation/PojoInnerJoinTest.java | 48 ++++++++++++++++++-- .../impl/PojoInnerJoinTestApplication.java | 2 +- 3 files changed, 78 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/22c65c4c/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java index 9c9733e..1872d19 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java @@ -36,22 +36,25 @@ import com.google.common.base.Throwables; * @since 3.6.0 */ public class PojoInnerJoin<InputT1, InputT2> - implements MergeAccumulation<InputT1, InputT2, List<List<Map<String, Object>>>, List<Map<String, Object>>> + implements MergeAccumulation<InputT1, InputT2, List<List<Map<String, Object>>>, List<?>> { - protected String[] keys; + protected final String[] keys; + protected final Class<?> outClass; public PojoInnerJoin() { - // for kyro + keys = new String[]{}; + outClass = null; } - public PojoInnerJoin(int num, String... keys) + public PojoInnerJoin(int num, Class<?> outClass, String... keys) { if (keys.length != 2) { throw new IllegalArgumentException("Wrong number of keys."); } this.keys = Arrays.copyOf(keys, keys.length); + this.outClass = outClass; } @Override @@ -132,14 +135,36 @@ public class PojoInnerJoin<InputT1, InputT2> } @Override - public List<Map<String, Object>> getOutput(List<List<Map<String, Object>>> accumulatedValue) + public List<?> getOutput(List<List<Map<String, Object>>> accumulatedValue) { List<Map<String, Object>> result = new ArrayList<>(); // TODO: May need to revisit (use state manager). result = getAllCombo(0, accumulatedValue, result, null); - return result; + List<Object> out = new ArrayList<>(); + for (Map<String, Object> resultMap : result) { + Object o; + try { + o = outClass.newInstance(); + } catch (Throwable e) { + throw Throwables.propagate(e); + } + + for (Map.Entry<String, Object> entry : resultMap.entrySet()) { + Field f; + try { + f = outClass.getDeclaredField(entry.getKey()); + f.setAccessible(true); + f.set(o, entry.getValue()); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw Throwables.propagate(e); + } + } + out.add(o); + } + + return out; } @@ -182,7 +207,7 @@ public class PojoInnerJoin<InputT1, InputT2> } @Override - public List<Map<String, Object>> getRetraction(List<Map<String, Object>> value) + public List<?> getRetraction(List<?> value) { return null; } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/22c65c4c/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java index 63690d1..47ce815 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java @@ -105,11 +105,48 @@ public class PojoInnerJoinTest } } + public static class TestOutClass + { + private int uId; + private String uName; + private String dep; + + public int getuId() + { + return uId; + } + + public void setuId(int uId) + { + this.uId = uId; + } + + public String getuName() + { + return uName; + } + + public void setuName(String uName) + { + this.uName = uName; + } + + public String getDep() + { + return dep; + } + + public void setDep(String dep) + { + this.dep = dep; + } + } + @Test public void PojoInnerJoinTest() { - PojoInnerJoin<TestPojo1, TestPojo2> pij = new PojoInnerJoin<>(2, "uId", "uId"); + PojoInnerJoin<TestPojo1, TestPojo2> pij = new PojoInnerJoin<>(2, TestOutClass.class, "uId", "uId"); List<List<Map<String, Object>>> accu = pij.defaultAccumulatedValue(); @@ -127,7 +164,12 @@ public class PojoInnerJoinTest result.put("dep", "CS"); Assert.assertEquals(1, pij.getOutput(accu).size()); - Assert.assertEquals(result, pij.getOutput(accu).get(0)); - } + Object o = pij.getOutput(accu).get(0); + Assert.assertTrue(o instanceof TestOutClass); + TestOutClass testOutClass = (TestOutClass)o; + Assert.assertEquals(1, testOutClass.getuId()); + Assert.assertEquals("Josh", testOutClass.getuName()); + Assert.assertEquals("CS", testOutClass.getDep()); + } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/22c65c4c/library/src/test/java/org/apache/apex/malhar/lib/window/impl/PojoInnerJoinTestApplication.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/PojoInnerJoinTestApplication.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/PojoInnerJoinTestApplication.java index 969e0fb..809023b 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/PojoInnerJoinTestApplication.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/PojoInnerJoinTestApplication.java @@ -375,7 +375,7 @@ public class PojoInnerJoinTestApplication implements StreamingApplication productGenerator.setSalesEvent(false); WindowedMergeOperatorImpl<POJOGenerator.SalesEvent, POJOGenerator.ProductEvent, List<Set<Object>>, List<List<Object>>> op = dag.addOperator("Merge", new WindowedMergeOperatorImpl<POJOGenerator.SalesEvent, POJOGenerator.ProductEvent, List<Set<Object>>, List<List<Object>>>()); - op.setAccumulation(new PojoInnerJoin(2,"productId","productId")); + op.setAccumulation(new PojoInnerJoin(2, Object.class, "productId","productId")); op.setDataStorage(new InMemoryWindowedStorage<List<Set<Object>>>()); WindowedStorage.WindowedPlainStorage<WindowState> windowStateMap = new InMemoryWindowedStorage<>();
