Repository: apex-malhar Updated Branches: refs/heads/master fc8b674e3 -> 4b36bf3e5
APEXMALHAR-2417 Adding Pojo Outer join accumulation Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/4b36bf3e Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/4b36bf3e Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/4b36bf3e Branch: refs/heads/master Commit: 4b36bf3e59ecea40e7f3cc1a70636ca263c5f21f Parents: fc8b674 Author: Hitesh-Scorpio <[email protected]> Authored: Tue Feb 28 11:50:55 2017 +0530 Committer: Hitesh-Scorpio <[email protected]> Committed: Thu Mar 2 15:31:56 2017 +0530 ---------------------------------------------------------------------- .../window/accumulation/AbstractPojoJoin.java | 255 +++++++++++++++++ .../window/accumulation/PojoFullOuterJoin.java | 103 +++++++ .../lib/window/accumulation/PojoInnerJoin.java | 210 +------------- .../window/accumulation/PojoLeftOuterJoin.java | 63 +++++ .../window/accumulation/PojoRightOuterJoin.java | 63 +++++ .../window/accumulation/PojoOuterJoinTest.java | 280 +++++++++++++++++++ 6 files changed, 771 insertions(+), 203 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4b36bf3e/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java new file mode 100644 index 0000000..354dd90 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.apex.malhar.lib.window.MergeAccumulation; +import org.apache.commons.lang3.ClassUtils; +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.base.Throwables; + +import com.datatorrent.lib.util.PojoUtils; + +/** + * Join Accumulation for Pojo Streams. + * + */ [email protected] +public abstract class AbstractPojoJoin<InputT1, InputT2> + implements MergeAccumulation<InputT1, InputT2, List<List<Map<String, Object>>>, List<?>> +{ + protected final String[] keys; + protected final Class<?> outClass; + private transient Map<String,PojoUtils.Getter> gettersStream1; + private transient Map<String,PojoUtils.Getter> gettersStream2; + private transient Map<String,PojoUtils.Setter> setters; + protected transient Set<String> keySetStream2; + protected transient Set<String> keySetStream1; + + public AbstractPojoJoin() + { + keys = new String[]{}; + outClass = null; + } + + public AbstractPojoJoin(Class<?> outClass, String... keys) + { + if (keys.length % 2 != 0) { + throw new IllegalArgumentException("Wrong number of keys."); + } + + this.keys = Arrays.copyOf(keys, keys.length); + this.outClass = outClass; + } + + private void createSetters() + { + Field[] fields = outClass.getDeclaredFields(); + setters = new HashMap<>(); + for (Field field : fields) { + Class outputField = ClassUtils.primitiveToWrapper(field.getType()); + String fieldName = field.getName(); + setters.put(fieldName,PojoUtils.createSetter(outClass,fieldName,outputField)); + } + } + + private Map<String,PojoUtils.Getter> createGetters(Class<?> input) + { + Field[] fields = input.getDeclaredFields(); + Map<String,PojoUtils.Getter> getters = new HashMap<>(); + for (Field field : fields) { + Class inputField = ClassUtils.primitiveToWrapper(field.getType()); + String fieldName = field.getName(); + getters.put(fieldName,PojoUtils.createGetter(input, fieldName, inputField)); + } + return getters; + } + + @Override + public List<List<Map<String, Object>>> accumulate(List<List<Map<String, Object>>> accumulatedValue, InputT1 input) + { + if (gettersStream1 == null) { + gettersStream1 = createGetters(input.getClass()); + } + try { + return accumulateWithIndex(0, accumulatedValue, input); + } catch (NoSuchFieldException e) { + throw Throwables.propagate(e); + } + } + + @Override + public List<List<Map<String, Object>>> accumulate2(List<List<Map<String, Object>>> accumulatedValue, InputT2 input) + { + if (gettersStream2 == null) { + gettersStream2 = createGetters(input.getClass()); + } + try { + return accumulateWithIndex(1, accumulatedValue, input); + } catch (NoSuchFieldException e) { + throw Throwables.propagate(e); + } + } + + + @Override + public List<List<Map<String, Object>>> defaultAccumulatedValue() + { + List<List<Map<String, Object>>> accu = new ArrayList<>(); + for (int i = 0; i < 2; i++) { + accu.add(new ArrayList<Map<String, Object>>()); + } + return accu; + } + + + private List<List<Map<String, Object>>> accumulateWithIndex(int index, List<List<Map<String, Object>>> accu, Object input) throws NoSuchFieldException + { + // TODO: If a stream never sends out any tuple during one window, a wrong key would not be detected. + + List<Map<String, Object>> curList = accu.get(index); + Map map = pojoToMap(input,index + 1); + curList.add(map); + accu.set(index, curList); + + return accu; + } + + private Map<String, Object> pojoToMap(Object input, int streamIndex) + { + Map<String, Object> map = new HashMap<>(); + Map<String,PojoUtils.Getter> gettersStream = streamIndex == 1 ? gettersStream1 : gettersStream2; + + for (Map.Entry<String, PojoUtils.Getter> getter : gettersStream.entrySet()) { + try { + Object value = getter.getValue().get(input); + map.put(getter.getKey(), value); + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + return map; + } + + @Override + public List<List<Map<String, Object>>> merge(List<List<Map<String, Object>>> accumulatedValue1, List<List<Map<String, Object>>> accumulatedValue2) + { + for (int i = 0; i < 2; i++) { + List<Map<String, Object>> curList = accumulatedValue1.get(i); + curList.addAll(accumulatedValue2.get(i)); + accumulatedValue1.set(i, curList); + } + return accumulatedValue1; + } + + @Override + public List<?> getOutput(List<List<Map<String, Object>>> accumulatedValue) + { + if (setters == null) { + createSetters(); + keySetStream2 = new HashSet<>(); + keySetStream1 = new HashSet<>(); + for (int i = 0; i < keys.length; i = i + 2) { + keySetStream1.add(keys[i]); + keySetStream2.add(keys[i + 1]); + } + } + + // TODO: May need to revisit (use state manager). + List<Map<String, Object>> result = getAllCombo(0, accumulatedValue, null); + + List<Object> out = new ArrayList<>(); + for (Map<String, Object> resultMap : result) { + Object o; + try { + o = outClass.newInstance(); + } catch (Throwable e) { + throw Throwables.propagate(e); + } + for (Map.Entry<String, PojoUtils.Setter> setter : setters.entrySet()) { + if (resultMap.get(setter.getKey()) != null) { + setter.getValue().set(o, resultMap.get(setter.getKey())); + } + } + out.add(o); + } + + return out; + } + + + public List<Map<String, Object>> getAllCombo(int streamIndex, List<List<Map<String, Object>>> accu, Map<String, Object> curMap) + { + List<Map<String, Object>> result = new ArrayList<>(); + int leftStreamIndex = getLeftStreamIndex(); + List<Map<String, Object>> leftStream = accu.get(leftStreamIndex); + List<Map<String, Object>> rightStream = accu.get((leftStreamIndex + 1) % 2); + for (Map<String, Object> lMap : leftStream) { + boolean gotMatch = false; + for (Map<String, Object> rMap : rightStream) { + Map<String, Object> tempMap = joinTwoMapsWithKeys(lMap, rMap); + if (tempMap != null) { + result.add(tempMap); + gotMatch = true; + } + } + if (!gotMatch) { + addNonMatchingResult(result, lMap, rightStream.get(0).keySet()); + } + } + return result; + } + + public abstract void addNonMatchingResult(List<Map<String, Object>> result, Map<String, Object> requiredMap, Set nullFields); + + public abstract int getLeftStreamIndex(); + + + public Map<String, Object> joinTwoMapsWithKeys(Map<String, Object> map1, Map<String, Object> map2) + { + for (int i = 0; i < keys.length; i = i + 2) { + String key1 = keys[i]; + String key2 = keys[i + 1]; + if (!map1.get(key1).equals(map2.get(key2))) { + return null; + } + } + for (String field : map2.keySet()) { + if (!keySetStream2.contains(field)) { + map1.put(field, map2.get(field)); + } + } + return map1; + } + + @Override + public List<?> getRetraction(List<?> value) + { + return null; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4b36bf3e/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java new file mode 100644 index 0000000..8ad0467 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Full outer join Accumulation for Pojo Streams. + * + */ [email protected] +public class PojoFullOuterJoin<InputT1, InputT2> + extends AbstractPojoJoin<InputT1, InputT2> +{ + public PojoFullOuterJoin() + { + super(); + } + + public PojoFullOuterJoin(int num, Class<?> outClass, String... keys) + { + super(outClass,keys); + } + + @Override + public void addNonMatchingResult(List result, Map requiredMap, Set nullFields) + { + for (Object field : nullFields) { + if (!keySetStream2.contains(field)) { + requiredMap.put(field.toString(), null); + } + } + result.add(requiredMap); + } + + @Override + public int getLeftStreamIndex() + { + return 0; + } + + @Override + public List<Map<String, Object>> getAllCombo(int streamIndex, List accu, Map curMap) + { + List<Map<String, Object>> result = new ArrayList<>(); + int leftStreamIndex = getLeftStreamIndex(); + List<Map<String, Object>> leftStream = (List<Map<String, Object>>)accu.get(leftStreamIndex); + List<Map<String, Object>> rightStream = (List<Map<String, Object>>)accu.get((leftStreamIndex + 1) % 2); + Set<Map<String,Object>> unMatchedRightStream = new HashSet<>(); + + for (Map<String, Object> rMap : rightStream) { + unMatchedRightStream.add(rMap); + } + for (Map<String, Object> lMap : leftStream) { + boolean gotMatch = false; + for (Map<String, Object> rMap : rightStream) { + Map<String, Object> tempMap = joinTwoMapsWithKeys(lMap, rMap); + if (tempMap != null) { + result.add(tempMap); + gotMatch = true; + if (unMatchedRightStream.contains(rMap)) { + unMatchedRightStream.remove(rMap); + } + } + } + if (!gotMatch) { + addNonMatchingResult(result, lMap, rightStream.get(0).keySet()); + } + } + + for (Map<String, Object> rMap : unMatchedRightStream) { + for (Object field : leftStream.get(0).keySet()) { + if (!keySetStream1.contains(field)) { + rMap.put(field.toString(), null); + } + } + result.add(rMap); + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4b36bf3e/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java index 1aa55c2..ceb17dd 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java @@ -18,234 +18,38 @@ */ package org.apache.apex.malhar.lib.window.accumulation; -import java.lang.reflect.Field; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.apex.malhar.lib.window.MergeAccumulation; -import org.apache.commons.lang3.ClassUtils; - -import com.google.common.base.Throwables; - -import com.datatorrent.lib.util.KeyValPair; -import com.datatorrent.lib.util.PojoUtils; - /** * Inner join Accumulation for Pojo Streams. * * @since 3.6.0 */ public class PojoInnerJoin<InputT1, InputT2> - implements MergeAccumulation<InputT1, InputT2, List<List<Map<String, Object>>>, List<?>> + extends AbstractPojoJoin<InputT1, InputT2> { - protected final String[] keys; - protected final Class<?> outClass; - private transient List<KeyValPair<String,PojoUtils.Getter>> gettersStream1; - private transient List<KeyValPair<String,PojoUtils.Getter>> gettersStream2; - private transient List<KeyValPair<String,PojoUtils.Setter>> setters; - private transient Set<String> keySetStream2; - public PojoInnerJoin() { - keys = new String[]{}; - outClass = null; + super(); } public PojoInnerJoin(int num, Class<?> outClass, String... keys) { - if (keys.length % 2 != 0) { - throw new IllegalArgumentException("Wrong number of keys."); - } + super(outClass,keys); - this.keys = Arrays.copyOf(keys, keys.length); - this.outClass = outClass; - } - - private void createSetters() - { - Field[] fields = outClass.getDeclaredFields(); - setters = new ArrayList<>(); - for (Field field : fields) { - Class outputField = ClassUtils.primitiveToWrapper(field.getType()); - String fieldName = field.getName(); - setters.add(new KeyValPair<>(fieldName,PojoUtils.createSetter(outClass,fieldName,outputField))); - } - } - - private List<KeyValPair<String,PojoUtils.Getter>> createGetters(Class<?> input) - { - Field[] fields = input.getDeclaredFields(); - List<KeyValPair<String,PojoUtils.Getter>> getters = new ArrayList<>(); - for (Field field : fields) { - Class inputField = ClassUtils.primitiveToWrapper(field.getType()); - String fieldName = field.getName(); - getters.add(new KeyValPair<>(fieldName,PojoUtils.createGetter(input, fieldName, inputField))); - } - return getters; } @Override - public List<List<Map<String, Object>>> accumulate(List<List<Map<String, Object>>> accumulatedValue, InputT1 input) - { - if (gettersStream1 == null) { - gettersStream1 = createGetters(input.getClass()); - } - try { - return accumulateWithIndex(0, accumulatedValue, input); - } catch (NoSuchFieldException e) { - throw Throwables.propagate(e); - } - } - - @Override - public List<List<Map<String, Object>>> accumulate2(List<List<Map<String, Object>>> accumulatedValue, InputT2 input) - { - if (gettersStream2 == null) { - gettersStream2 = createGetters(input.getClass()); - } - try { - return accumulateWithIndex(1, accumulatedValue, input); - } catch (NoSuchFieldException e) { - throw Throwables.propagate(e); - } - } - - - @Override - public List<List<Map<String, Object>>> defaultAccumulatedValue() - { - List<List<Map<String, Object>>> accu = new ArrayList<>(); - for (int i = 0; i < 2; i++) { - accu.add(new ArrayList<Map<String, Object>>()); - } - return accu; - } - - - private List<List<Map<String, Object>>> accumulateWithIndex(int index, List<List<Map<String, Object>>> accu, Object input) throws NoSuchFieldException - { - // TODO: If a stream never sends out any tuple during one window, a wrong key would not be detected. - - input.getClass().getDeclaredField(keys[index]); - - List<Map<String, Object>> curList = accu.get(index); - Map map = pojoToMap(input,index + 1); - curList.add(map); - accu.set(index, curList); - - return accu; - } - - private Map<String, Object> pojoToMap(Object input, int streamIndex) - { - Map<String, Object> map = new HashMap<>(); - List<KeyValPair<String,PojoUtils.Getter>> gettersStream = streamIndex == 1 ? gettersStream1 : gettersStream2; - - for (KeyValPair<String,PojoUtils.Getter> getter : gettersStream) { - try { - Object value = getter.getValue().get(input); - map.put(getter.getKey(), value); - } catch (Exception e) { - throw Throwables.propagate(e); - } - } - return map; - } - - @Override - public List<List<Map<String, Object>>> merge(List<List<Map<String, Object>>> accumulatedValue1, List<List<Map<String, Object>>> accumulatedValue2) - { - for (int i = 0; i < 2; i++) { - List<Map<String, Object>> curList = accumulatedValue1.get(i); - curList.addAll(accumulatedValue2.get(i)); - accumulatedValue1.set(i, curList); - } - return accumulatedValue1; - } - - @Override - public List<?> getOutput(List<List<Map<String, Object>>> accumulatedValue) - { - List<Map<String, Object>> result = new ArrayList<>(); - if (setters == null) { - createSetters(); - keySetStream2 = new HashSet<>(); - for (int i = 0; i < keys.length; i = i + 2) { - keySetStream2.add(keys[i + 1]); - } - } - - // TODO: May need to revisit (use state manager). - result = getAllCombo(0, accumulatedValue, result, null); - - List<Object> out = new ArrayList<>(); - for (Map<String, Object> resultMap : result) { - Object o; - try { - o = outClass.newInstance(); - } catch (Throwable e) { - throw Throwables.propagate(e); - } - for (KeyValPair<String, PojoUtils.Setter> setter : setters) { - setter.getValue().set(o,resultMap.get(setter.getKey())); - } - out.add(o); - } - - return out; - } - - - private List<Map<String, Object>> getAllCombo(int streamIndex, List<List<Map<String, Object>>> accu, List<Map<String, Object>> result, Map<String, Object> curMap) - { - if (streamIndex == 2) { - if (curMap != null) { - result.add(curMap); - } - return result; - } else { - for (Map<String, Object> map : accu.get(streamIndex)) { - if (streamIndex == 0) { - Map<String, Object> tempMap = new HashMap<>(map); - result = getAllCombo(streamIndex + 1, accu, result, tempMap); - } else if (curMap == null) { - return result; - } else { - Map<String, Object> tempMap = new HashMap<>(curMap); - tempMap = joinTwoMapsWithKeys(tempMap, map); - result = getAllCombo(streamIndex + 1, accu, result, tempMap); - } - } - return result; - } - } - - private Map<String, Object> joinTwoMapsWithKeys(Map<String, Object> map1, Map<String, Object> map2) + public void addNonMatchingResult(List result, Map requiredMap, Set nullFields) { - for (int i = 0; i < keys.length; i = i + 2) { - String key1 = keys[i]; - String key2 = keys[i + 1]; - if (!map1.get(key1).equals(map2.get(key2))) { - return null; - } - } - for (String field : map2.keySet()) { - if (!keySetStream2.contains(field)) { - map1.put(field, map2.get(field)); - } - } - return map1; + return; } @Override - public List<?> getRetraction(List<?> value) + public int getLeftStreamIndex() { - return null; + return 0; } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4b36bf3e/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java new file mode 100644 index 0000000..0ee3e00 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Left Outer join Accumulation for Pojo Streams. + * + */ [email protected] +public class PojoLeftOuterJoin<InputT1, InputT2> + extends AbstractPojoJoin<InputT1, InputT2> +{ + public PojoLeftOuterJoin() + { + super(); + } + + public PojoLeftOuterJoin(int num, Class<?> outClass, String... keys) + { + super(outClass,keys); + + } + + @Override + public void addNonMatchingResult(List result, Map requiredMap, Set nullFields) + { + for (Object field : nullFields) { + if (!keySetStream2.contains(field)) { + requiredMap.put(field.toString(), null); + } + } + result.add(requiredMap); + } + + @Override + public int getLeftStreamIndex() + { + return 0; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4b36bf3e/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java new file mode 100644 index 0000000..60b0252 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Right outer join Accumulation for Pojo Streams. + * + */ [email protected] +public class PojoRightOuterJoin<InputT1, InputT2> + extends AbstractPojoJoin<InputT1, InputT2> +{ + public PojoRightOuterJoin() + { + super(); + } + + public PojoRightOuterJoin(int num, Class<?> outClass, String... keys) + { + super(outClass,keys); + + } + + @Override + public void addNonMatchingResult(List result, Map requiredMap, Set nullFields) + { + for (Object field : nullFields) { + if (!keySetStream1.contains(field)) { + requiredMap.put(field.toString(), null); + } + } + result.add(requiredMap); + } + + @Override + public int getLeftStreamIndex() + { + return 1; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4b36bf3e/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java new file mode 100644 index 0000000..aaa7de3 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java @@ -0,0 +1,280 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import java.util.List; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Test; + +/** + * Test for POJO outer join accumulations + */ +public class PojoOuterJoinTest +{ + + public static class TestPojo1 + { + private int uId; + private String uName; + + public TestPojo1() + { + + } + + public TestPojo1(int id, String name) + { + this.uId = id; + this.uName = name; + } + + public int getUId() + { + return uId; + } + + public void setUId(int uId) + { + this.uId = uId; + } + + public String getUName() + { + return uName; + } + + public void setUName(String uName) + { + this.uName = uName; + } + } + + public static class TestPojo3 + { + private int uId; + private String uNickName; + private int age; + + public TestPojo3() + { + + } + + public TestPojo3(int id, String name, int age) + { + this.uId = id; + this.uNickName = name; + this.age = age; + } + + public int getUId() + { + return uId; + } + + public void setUId(int uId) + { + this.uId = uId; + } + + public String getUNickName() + { + return uNickName; + } + + public void setUNickName(String uNickName) + { + this.uNickName = uNickName; + } + + public int getAge() + { + return age; + } + + public void setAge(int age) + { + this.age = age; + } + } + + + public static class TestOutClass + { + private int uId; + private String uName; + private String uNickName; + private int age; + + public int getUId() + { + return uId; + } + + public void setUId(int uId) + { + this.uId = uId; + } + + public String getUName() + { + return uName; + } + + public void setUName(String uName) + { + this.uName = uName; + } + + public String getUNickName() + { + return uNickName; + } + + public void setUNickName(String uNickName) + { + this.uNickName = uNickName; + } + + public int getAge() + { + return age; + } + + public void setAge(int age) + { + this.age = age; + } + } + + public static class TestOutMultipleKeysClass + { + private int uId; + private String uName; + private int age; + + public int getUId() + { + return uId; + } + + public void setUId(int uId) + { + this.uId = uId; + } + + public String getUName() + { + return uName; + } + + public void setUName(String uName) + { + this.uName = uName; + } + + public int getAge() + { + return age; + } + + public void setAge(int age) + { + this.age = age; + } + } + + + @Test + public void PojoLeftOuterJoinTest() + { + PojoLeftOuterJoin<TestPojo1, TestPojo3> pij = new PojoLeftOuterJoin<>(2, TestOutClass.class, "uId", "uId"); + + List<List<Map<String, Object>>> accu = pij.defaultAccumulatedValue(); + + Assert.assertEquals(2, accu.size()); + + accu = pij.accumulate(accu, new TestPojo1(1, "Josh")); + accu = pij.accumulate(accu, new TestPojo1(2, "Bob")); + + accu = pij.accumulate2(accu, new TestPojo3(1, "NickJosh", 12)); + accu = pij.accumulate2(accu, new TestPojo3(3, "NickBob", 13)); + + Assert.assertEquals(2, pij.getOutput(accu).size()); + + Object o = pij.getOutput(accu).get(1); + Assert.assertTrue(o instanceof TestOutClass); + TestOutClass testOutClass = (TestOutClass)o; + Assert.assertEquals(2, testOutClass.getUId()); + Assert.assertEquals("Bob", testOutClass.getUName()); + Assert.assertEquals(0, testOutClass.getAge()); + } + + @Test + public void PojoRightOuterJoinTest() + { + PojoRightOuterJoin<TestPojo1, TestPojo3> pij = new PojoRightOuterJoin<>(2, TestOutClass.class, "uId", "uId"); + + List<List<Map<String, Object>>> accu = pij.defaultAccumulatedValue(); + + Assert.assertEquals(2, accu.size()); + + accu = pij.accumulate(accu, new TestPojo1(1, "Josh")); + accu = pij.accumulate(accu, new TestPojo1(2, "Bob")); + + accu = pij.accumulate2(accu, new TestPojo3(1, "NickJosh", 12)); + accu = pij.accumulate2(accu, new TestPojo3(3, "NickBob", 13)); + + Assert.assertEquals(2, pij.getOutput(accu).size()); + + Object o = pij.getOutput(accu).get(1); + Assert.assertTrue(o instanceof TestOutClass); + TestOutClass testOutClass = (TestOutClass)o; + Assert.assertEquals(3, testOutClass.getUId()); + Assert.assertEquals(null, testOutClass.getUName()); + Assert.assertEquals(13, testOutClass.getAge()); + } + + @Test + public void PojoFullOuterJoinTest() + { + PojoFullOuterJoin<TestPojo1, TestPojo3> pij = new PojoFullOuterJoin<>(2, TestOutClass.class, "uId", "uId"); + + List<List<Map<String, Object>>> accu = pij.defaultAccumulatedValue(); + + Assert.assertEquals(2, accu.size()); + + accu = pij.accumulate(accu, new TestPojo1(1, "Josh")); + accu = pij.accumulate(accu, new TestPojo1(2, "Bob")); + + accu = pij.accumulate2(accu, new TestPojo3(1, "NickJosh", 12)); + accu = pij.accumulate2(accu, new TestPojo3(3, "NickBob", 13)); + + Assert.assertEquals(3, pij.getOutput(accu).size()); + + Object o = pij.getOutput(accu).get(1); + Assert.assertTrue(o instanceof TestOutClass); + TestOutClass testOutClass = (TestOutClass)o; + Assert.assertEquals(2, testOutClass.getUId()); + Assert.assertEquals("Bob", testOutClass.getUName()); + Assert.assertEquals(0, testOutClass.getAge()); + } +}
