Repository: apex-malhar Updated Branches: refs/heads/master 8486493a0 -> dd5341f22
APEXMALHAR-2414 PojoInner Join accumulation was using java reflection Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/dd5341f2 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/dd5341f2 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/dd5341f2 Branch: refs/heads/master Commit: dd5341f222b8141b76ecbc5ea9653c70b3c78c44 Parents: 8486493 Author: Hitesh-Scorpio <[email protected]> Authored: Wed Feb 22 23:55:38 2017 +0530 Committer: Tushar R. Gosavi <[email protected]> Committed: Thu Feb 23 16:51:01 2017 +0530 ---------------------------------------------------------------------- .../lib/window/accumulation/PojoInnerJoin.java | 70 +++++++++++++------ .../window/accumulation/PojoInnerJoinTest.java | 24 +++---- .../impl/PojoInnerJoinTestApplication.java | 72 +++++++++++++++++++- 3 files changed, 132 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dd5341f2/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java index 1872d19..a5b1117 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java @@ -27,9 +27,13 @@ import java.util.List; import java.util.Map; import org.apache.apex.malhar.lib.window.MergeAccumulation; +import org.apache.commons.lang3.ClassUtils; import com.google.common.base.Throwables; +import com.datatorrent.lib.util.KeyValPair; +import com.datatorrent.lib.util.PojoUtils; + /** * Inner join Accumulation for Pojo Streams. * @@ -40,6 +44,9 @@ public class PojoInnerJoin<InputT1, InputT2> { protected final String[] keys; protected final Class<?> outClass; + private transient List<KeyValPair<String,PojoUtils.Getter>> gettersStream1; + private transient List<KeyValPair<String,PojoUtils.Getter>> gettersStream2; + private transient List<KeyValPair<String,PojoUtils.Setter>> setters; public PojoInnerJoin() { @@ -57,9 +64,35 @@ public class PojoInnerJoin<InputT1, InputT2> this.outClass = outClass; } + private void createSetters() + { + Field[] fields = outClass.getDeclaredFields(); + setters = new ArrayList<>(); + for (Field field : fields) { + Class outputField = ClassUtils.primitiveToWrapper(field.getType()); + String fieldName = field.getName(); + setters.add(new KeyValPair<>(fieldName,PojoUtils.createSetter(outClass,fieldName,outputField))); + } + } + + private List<KeyValPair<String,PojoUtils.Getter>> createGetters(Class<?> input) + { + Field[] fields = input.getDeclaredFields(); + List<KeyValPair<String,PojoUtils.Getter>> getters = new ArrayList<>(); + for (Field field : fields) { + Class inputField = ClassUtils.primitiveToWrapper(field.getType()); + String fieldName = field.getName(); + getters.add(new KeyValPair<>(fieldName,PojoUtils.createGetter(input, fieldName, inputField))); + } + return getters; + } + @Override public List<List<Map<String, Object>>> accumulate(List<List<Map<String, Object>>> accumulatedValue, InputT1 input) { + if (gettersStream1 == null) { + gettersStream1 = createGetters(input.getClass()); + } try { return accumulateWithIndex(0, accumulatedValue, input); } catch (NoSuchFieldException e) { @@ -70,6 +103,9 @@ public class PojoInnerJoin<InputT1, InputT2> @Override public List<List<Map<String, Object>>> accumulate2(List<List<Map<String, Object>>> accumulatedValue, InputT2 input) { + if (gettersStream2 == null) { + gettersStream2 = createGetters(input.getClass()); + } try { return accumulateWithIndex(1, accumulatedValue, input); } catch (NoSuchFieldException e) { @@ -96,27 +132,23 @@ public class PojoInnerJoin<InputT1, InputT2> input.getClass().getDeclaredField(keys[index]); List<Map<String, Object>> curList = accu.get(index); - Map map = pojoToMap(input); + Map map = pojoToMap(input,index + 1); curList.add(map); accu.set(index, curList); return accu; } - private Map<String, Object> pojoToMap(Object input) + private Map<String, Object> pojoToMap(Object input, int streamIndex) { Map<String, Object> map = new HashMap<>(); + List<KeyValPair<String,PojoUtils.Getter>> gettersStream = streamIndex == 1 ? gettersStream1 : gettersStream2; - Field[] fields = input.getClass().getDeclaredFields(); - - for (Field field : fields) { - String[] words = field.getName().split("\\."); - String fieldName = words[words.length - 1]; - field.setAccessible(true); + for (KeyValPair<String,PojoUtils.Getter> getter : gettersStream) { try { - Object value = field.get(input); - map.put(fieldName, value); - } catch (IllegalAccessException e) { + Object value = getter.getValue().get(input); + map.put(getter.getKey(), value); + } catch (Exception e) { throw Throwables.propagate(e); } } @@ -142,6 +174,10 @@ public class PojoInnerJoin<InputT1, InputT2> // TODO: May need to revisit (use state manager). result = getAllCombo(0, accumulatedValue, result, null); + if (setters == null) { + createSetters(); + } + List<Object> out = new ArrayList<>(); for (Map<String, Object> resultMap : result) { Object o; @@ -150,16 +186,8 @@ public class PojoInnerJoin<InputT1, InputT2> } catch (Throwable e) { throw Throwables.propagate(e); } - - for (Map.Entry<String, Object> entry : resultMap.entrySet()) { - Field f; - try { - f = outClass.getDeclaredField(entry.getKey()); - f.setAccessible(true); - f.set(o, entry.getValue()); - } catch (NoSuchFieldException | IllegalAccessException e) { - throw Throwables.propagate(e); - } + for (KeyValPair<String, PojoUtils.Setter> setter : setters) { + setter.getValue().set(o,resultMap.get(setter.getKey())); } out.add(o); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dd5341f2/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java index 47ce815..47c7307 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java @@ -47,22 +47,22 @@ public class PojoInnerJoinTest this.uName = name; } - public int getuId() + public int getUId() { return uId; } - public void setuId(int uId) + public void setUId(int uId) { this.uId = uId; } - public String getuName() + public String getUName() { return uName; } - public void setuName(String uName) + public void setUName(String uName) { this.uName = uName; } @@ -84,12 +84,12 @@ public class PojoInnerJoinTest this.dep = dep; } - public int getuId() + public int getUId() { return uId; } - public void setuId(int uId) + public void setUId(int uId) { this.uId = uId; } @@ -111,22 +111,22 @@ public class PojoInnerJoinTest private String uName; private String dep; - public int getuId() + public int getUId() { return uId; } - public void setuId(int uId) + public void setUId(int uId) { this.uId = uId; } - public String getuName() + public String getUName() { return uName; } - public void setuName(String uName) + public void setUName(String uName) { this.uName = uName; } @@ -168,8 +168,8 @@ public class PojoInnerJoinTest Object o = pij.getOutput(accu).get(0); Assert.assertTrue(o instanceof TestOutClass); TestOutClass testOutClass = (TestOutClass)o; - Assert.assertEquals(1, testOutClass.getuId()); - Assert.assertEquals("Josh", testOutClass.getuName()); + Assert.assertEquals(1, testOutClass.getUId()); + Assert.assertEquals("Josh", testOutClass.getUName()); Assert.assertEquals("CS", testOutClass.getDep()); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dd5341f2/library/src/test/java/org/apache/apex/malhar/lib/window/impl/PojoInnerJoinTestApplication.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/PojoInnerJoinTestApplication.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/PojoInnerJoinTestApplication.java index 809023b..3d2e82a 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/PojoInnerJoinTestApplication.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/PojoInnerJoinTestApplication.java @@ -292,6 +292,76 @@ public class PojoInnerJoinTestApplication implements StreamingApplication } } + public static class OutputEvent + { + public int customerId; + public int productId; + public int productCategory; + public long timestamp; + public double amount; + public long timestamps; + + public int getCustomerId() + { + return customerId; + } + + public void setCustomerId(int customerId) + { + this.customerId = customerId; + } + + public int getProductId() + { + return productId; + } + + public void setProductId(int productId) + { + this.productId = productId; + } + + public int getProductCategory() + { + return productCategory; + } + + public void setProductCategory(int productCategory) + { + this.productCategory = productCategory; + } + + public long getTimestamp() + { + return timestamp; + } + + public void setTimestamp(long timestamp) + { + this.timestamp = timestamp; + } + + public double getAmount() + { + return amount; + } + + public void setAmount(double amount) + { + this.amount = amount; + } + + public long getTimestamps() + { + return timestamps; + } + + public void setTimestamps(long timestamp) + { + this.timestamps = timestamp; + } + } + public int getMaxProductId() { return maxProductId; @@ -375,7 +445,7 @@ public class PojoInnerJoinTestApplication implements StreamingApplication productGenerator.setSalesEvent(false); WindowedMergeOperatorImpl<POJOGenerator.SalesEvent, POJOGenerator.ProductEvent, List<Set<Object>>, List<List<Object>>> op = dag.addOperator("Merge", new WindowedMergeOperatorImpl<POJOGenerator.SalesEvent, POJOGenerator.ProductEvent, List<Set<Object>>, List<List<Object>>>()); - op.setAccumulation(new PojoInnerJoin(2, Object.class, "productId","productId")); + op.setAccumulation(new PojoInnerJoin(2, POJOGenerator.OutputEvent.class, "productId","productId")); op.setDataStorage(new InMemoryWindowedStorage<List<Set<Object>>>()); WindowedStorage.WindowedPlainStorage<WindowState> windowStateMap = new InMemoryWindowedStorage<>();
