Repository: apex-malhar Updated Branches: refs/heads/master 96d216e80 -> aeb10f33d
APEXMALHAR-2240 Implement Windowed Merge Operator for join support. Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/92bd7323 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/92bd7323 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/92bd7323 Branch: refs/heads/master Commit: 92bd732325c9d8b8d6241aea398bb62aaaec7b91 Parents: 96d216e Author: Shunxin <[email protected]> Authored: Mon Oct 10 18:47:59 2016 -0700 Committer: Shunxin <[email protected]> Committed: Mon Oct 10 19:07:24 2016 -0700 ---------------------------------------------------------------------- .../malhar/lib/window/JoinAccumulation.java | 67 ------ .../malhar/lib/window/MergeAccumulation.java | 40 ++++ .../lib/window/MergeWindowedOperator.java | 30 +++ .../malhar/lib/window/accumulation/CoGroup.java | 45 ++++ .../lib/window/accumulation/InnerJoin.java | 115 ++++++++++ .../lib/window/accumulation/PojoInnerJoin.java | 187 +++++++++++++++++ .../impl/AbstractWindowedMergeOperator.java | 123 +++++++++++ .../window/impl/AbstractWindowedOperator.java | 87 ++++---- .../impl/KeyedWindowedMergeOperatorImpl.java | 120 +++++++++++ .../window/impl/KeyedWindowedOperatorImpl.java | 134 ++++++------ .../window/impl/WindowedMergeOperatorImpl.java | 112 ++++++++++ .../lib/window/accumulation/CoGroupTest.java | 60 ++++++ .../lib/window/accumulation/InnerJoinTest.java | 57 +++++ .../window/accumulation/PojoInnerJoinTest.java | 133 ++++++++++++ ...yedWindowedMergeOperatorTestApplication.java | 208 +++++++++++++++++++ .../window/impl/WindowedMergeOperatorTest.java | 165 +++++++++++++++ .../WindowedMergeOperatorTestApplication.java | 203 ++++++++++++++++++ 17 files changed, 1719 insertions(+), 167 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/main/java/org/apache/apex/malhar/lib/window/JoinAccumulation.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/JoinAccumulation.java b/library/src/main/java/org/apache/apex/malhar/lib/window/JoinAccumulation.java deleted file mode 100644 index 69240e0..0000000 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/JoinAccumulation.java +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.window; - -import org.apache.hadoop.classification.InterfaceStability; - -/** - * This is the interface for accumulation when joining multiple streams. - * - * @since 3.5.0 - */ [email protected] -public interface JoinAccumulation<InputT1, InputT2, InputT3, InputT4, InputT5, AccumT, OutputT> extends Accumulation<InputT1, AccumT, OutputT> -{ - /** - * Accumulate the second input type to the accumulated value - * - * @param accumulatedValue - * @param input - * @return - */ - AccumT accumulate2(AccumT accumulatedValue, InputT2 input); - - /** - * Accumulate the third input type to the accumulated value - * - * @param accumulatedValue - * @param input - * @return - */ - AccumT accumulate3(AccumT accumulatedValue, InputT3 input); - - /** - * Accumulate the fourth input type to the accumulated value - * - * @param accumulatedValue - * @param input - * @return - */ - AccumT accumulate4(AccumT accumulatedValue, InputT4 input); - - /** - * Accumulate the fifth input type to the accumulated value - * - * @param accumulatedValue - * @param input - * @return - */ - AccumT accumulate5(AccumT accumulatedValue, InputT5 input); - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/main/java/org/apache/apex/malhar/lib/window/MergeAccumulation.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/MergeAccumulation.java b/library/src/main/java/org/apache/apex/malhar/lib/window/MergeAccumulation.java new file mode 100644 index 0000000..71f4408 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/MergeAccumulation.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window; + +import org.apache.hadoop.classification.InterfaceStability; + +/** + * This is the interface for accumulation when joining multiple streams. + * + * @since 3.5.0 + */ [email protected] +public interface MergeAccumulation<InputT1, InputT2, AccumT, OutputT> extends Accumulation<InputT1, AccumT, OutputT> +{ + /** + * Accumulate the second input type to the accumulated value + * + * @param accumulatedValue + * @param input + * @return + */ + AccumT accumulate2(AccumT accumulatedValue, InputT2 input); + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/main/java/org/apache/apex/malhar/lib/window/MergeWindowedOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/MergeWindowedOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/MergeWindowedOperator.java new file mode 100644 index 0000000..89a70a4 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/MergeWindowedOperator.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window; + +/** + * Interface for Join Windowed Operator. + */ +public interface MergeWindowedOperator<InputT1, InputT2> + extends WindowedOperator<InputT1> +{ + void accumulateTuple2(Tuple.WindowedTuple<InputT2> tuple); + + void processWatermark2(ControlTuple.Watermark watermark); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/CoGroup.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/CoGroup.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/CoGroup.java new file mode 100644 index 0000000..e22d582 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/CoGroup.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +/** + * CoGroup Join Accumulation. + */ +public class CoGroup<T> extends InnerJoin<T> +{ + public CoGroup() + { + //for kryo + } + + + @Override + public List<List<T>> getOutput(List<Set<T>> accumulatedValue) + { + List<List<T>> result = new ArrayList<>(); + for (int i = 0; i < 2; i++) { + result.add(new ArrayList<T>(accumulatedValue.get(i))); + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/InnerJoin.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/InnerJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/InnerJoin.java new file mode 100644 index 0000000..fd250a2 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/InnerJoin.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.apex.malhar.lib.window.MergeAccumulation; + +/** + * Combine Join Accumulation, inner-joins tuples with same type from different streams. + */ +public class InnerJoin<T> implements MergeAccumulation<T, T, List<Set<T>>, List<List<T>>> +{ + + public InnerJoin() + { + //for kryo + } + + @Override + public List<Set<T>> accumulate(List<Set<T>> accumulatedValue, T input) + { + return accumulateWithIndex(0, accumulatedValue, input); + } + + @Override + public List<Set<T>> accumulate2(List<Set<T>> accumulatedValue, T input) + { + return accumulateWithIndex(1, accumulatedValue, input); + } + + + public List<Set<T>> accumulateWithIndex(int index, List<Set<T>> accumulatedValue, T input) + { + Set<T> accuSet = accumulatedValue.get(index); + accuSet.add(input); + accumulatedValue.set(index, accuSet); + return accumulatedValue; + } + + @Override + public List<Set<T>> defaultAccumulatedValue() + { + ArrayList<Set<T>> accu = new ArrayList<>(); + for (int i = 0; i < 2; i++) { + accu.add(new HashSet<T>()); + } + return accu; + } + + @Override + public List<Set<T>> merge(List<Set<T>> accumulatedValue1, List<Set<T>> accumulatedValue2) + { + for (int i = 0; i < 2; i++) { + Set<T> accuSet1 = accumulatedValue1.get(i); + Set<T> accuSet2 = accumulatedValue2.get(i); + accuSet1.addAll(accuSet2); + accumulatedValue1.set(i, accuSet1); + } + return accumulatedValue1; + } + + @Override + public List<List<T>> getOutput(List<Set<T>> accumulatedValue) + { + List<List<T>> result = new ArrayList<>(); + + // TODO: May need to revisit (use state manager). + result = getAllCombo(accumulatedValue, result, new ArrayList<T>()); + + return result; + } + + + public List<List<T>> getAllCombo(List<Set<T>> accu, List<List<T>> result, List<T> curList) + { + if (curList.size() == 2) { + result.add(curList); + return result; + } else { + for (T item : accu.get(curList.size())) { + List<T> tempList = new ArrayList<>(curList); + tempList.add(item); + result = getAllCombo(accu, result, tempList); + } + return result; + } + } + + + @Override + public List<List<T>> getRetraction(List<List<T>> value) + { + return new ArrayList<>(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java new file mode 100644 index 0000000..4b3bc69 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import java.lang.reflect.Field; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.apex.malhar.lib.window.MergeAccumulation; + +import com.google.common.base.Throwables; + +/** + * Inner join Accumulation for Pojo Streams. + */ +public class PojoInnerJoin<InputT1, InputT2> + implements MergeAccumulation<InputT1, InputT2, List<List<Map<String, Object>>>, List<Map<String, Object>>> +{ + protected String[] keys; + + public PojoInnerJoin() + { + throw new IllegalArgumentException("Please specify number of streams that are joining."); + } + + public PojoInnerJoin(int num, String... keys) + { + if (keys.length != 2) { + throw new IllegalArgumentException("Wrong number of keys."); + } + + this.keys = Arrays.copyOf(keys, keys.length); + } + + @Override + public List<List<Map<String, Object>>> accumulate(List<List<Map<String, Object>>> accumulatedValue, InputT1 input) + { + try { + return accumulateWithIndex(0, accumulatedValue, input); + } catch (NoSuchFieldException e) { + throw Throwables.propagate(e); + } + } + + @Override + public List<List<Map<String, Object>>> accumulate2(List<List<Map<String, Object>>> accumulatedValue, InputT2 input) + { + try { + return accumulateWithIndex(1, accumulatedValue, input); + } catch (NoSuchFieldException e) { + throw Throwables.propagate(e); + } + } + + + @Override + public List<List<Map<String, Object>>> defaultAccumulatedValue() + { + List<List<Map<String, Object>>> accu = new ArrayList<>(); + for (int i = 0; i < 2; i++) { + accu.add(new ArrayList<Map<String, Object>>()); + } + return accu; + } + + + private List<List<Map<String, Object>>> accumulateWithIndex(int index, List<List<Map<String, Object>>> accu, Object input) throws NoSuchFieldException + { + // TODO: If a stream never sends out any tuple during one window, a wrong key would not be detected. + + input.getClass().getDeclaredField(keys[index]); + + List<Map<String, Object>> curList = accu.get(index); + Map map = pojoToMap(input); + curList.add(map); + accu.set(index, curList); + + return accu; + } + + private Map<String, Object> pojoToMap(Object input) + { + Map<String, Object> map = new HashMap<>(); + + Field[] fields = input.getClass().getDeclaredFields(); + + for (Field field : fields) { + String[] words = field.getName().split("\\."); + String fieldName = words[words.length - 1]; + field.setAccessible(true); + try { + Object value = field.get(input); + map.put(fieldName, value); + } catch (IllegalAccessException e) { + throw Throwables.propagate(e); + } + } + return map; + } + + @Override + public List<List<Map<String, Object>>> merge(List<List<Map<String, Object>>> accumulatedValue1, List<List<Map<String, Object>>> accumulatedValue2) + { + for (int i = 0; i < 2; i++) { + List<Map<String, Object>> curList = accumulatedValue1.get(i); + curList.addAll(accumulatedValue2.get(i)); + accumulatedValue1.set(i, curList); + } + return accumulatedValue1; + } + + @Override + public List<Map<String, Object>> getOutput(List<List<Map<String, Object>>> accumulatedValue) + { + List<Map<String, Object>> result = new ArrayList<>(); + + // TODO: May need to revisit (use state manager). + result = getAllCombo(0, accumulatedValue, result, null); + + return result; + } + + + private List<Map<String, Object>> getAllCombo(int streamIndex, List<List<Map<String, Object>>> accu, List<Map<String, Object>> result, Map<String, Object> curMap) + { + if (streamIndex == 2) { + if (curMap != null) { + result.add(curMap); + } + return result; + } else { + for (Map<String, Object> map : accu.get(streamIndex)) { + if (streamIndex == 0) { + Map<String, Object> tempMap = new HashMap<>(map); + result = getAllCombo(streamIndex + 1, accu, result, tempMap); + } else if (curMap == null) { + return result; + } else { + Map<String, Object> tempMap = new HashMap<>(curMap); + tempMap = joinTwoMapsWithKeys(tempMap, keys[0], map, keys[streamIndex]); + result = getAllCombo(streamIndex + 1, accu, result, tempMap); + } + } + return result; + } + } + + private Map<String, Object> joinTwoMapsWithKeys(Map<String, Object> map1, String key1, Map<String, Object> map2, String key2) + { + if (!map1.get(key1).equals(map2.get(key2))) { + return null; + } else { + for (String field : map2.keySet()) { + if (!field.equals(key2)) { + map1.put(field, map2.get(field)); + } + } + return map1; + } + } + + @Override + public List<Map<String, Object>> getRetraction(List<Map<String, Object>> value) + { + return null; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedMergeOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedMergeOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedMergeOperator.java new file mode 100644 index 0000000..05a2495 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedMergeOperator.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.lib.window.ControlTuple; +import org.apache.apex.malhar.lib.window.MergeAccumulation; +import org.apache.apex.malhar.lib.window.MergeWindowedOperator; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.WindowedStorage; + +import com.google.common.base.Function; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; + + +/** + * Abstract Windowed Merge Operator. + */ +public abstract class AbstractWindowedMergeOperator<InputT1, InputT2, OutputT, DataStorageT extends WindowedStorage, + RetractionStorageT extends WindowedStorage, AccumulationT extends + MergeAccumulation> + extends AbstractWindowedOperator<InputT1, OutputT, DataStorageT, RetractionStorageT, AccumulationT> + implements MergeWindowedOperator<InputT1, InputT2> +{ + + private static final transient Logger LOG = LoggerFactory.getLogger(AbstractWindowedMergeOperator.class); + private Function<InputT2, Long> timestampExtractor2; + + private long latestWatermark1 = -1; // latest watermark from stream 1 + private long latestWatermark2 = -1; // latest watermark from stream 2 + + public final transient DefaultInputPort<Tuple<InputT2>> input2 = new DefaultInputPort<Tuple<InputT2>>() + { + @Override + public void process(Tuple<InputT2> tuple) + { + processTuple2(tuple); + } + }; + + // TODO: This port should be removed when Apex Core has native support for custom control tuples + @InputPortFieldAnnotation(optional = true) + public final transient DefaultInputPort<ControlTuple> controlInput2 = new DefaultInputPort<ControlTuple>() + { + @Override + public void process(ControlTuple tuple) + { + if (tuple instanceof ControlTuple.Watermark) { + processWatermark2((ControlTuple.Watermark)tuple); + } + } + }; + + public void processTuple2(Tuple<InputT2> tuple) + { + long timestamp = extractTimestamp(tuple, timestampExtractor2); + if (isTooLate(timestamp)) { + dropTuple(tuple); + } else { + Tuple.WindowedTuple<InputT2> windowedTuple = getWindowedValueWithTimestamp(tuple, timestamp); + // do the accumulation + accumulateTuple2(windowedTuple); + processWindowState(windowedTuple); + } + } + + public void setTimestampExtractor2(Function<InputT2, Long> timestampExtractor2) + { + this.timestampExtractor2 = timestampExtractor2; + } + + + @Override + public void processWatermark(ControlTuple.Watermark watermark) + { + latestWatermark1 = watermark.getTimestamp(); + if (latestWatermark1 >= 0 && latestWatermark2 >= 0) { + // Select the smallest timestamp of the latest watermarks as the watermark of the operator. + long minWatermark = Math.min(latestWatermark1, latestWatermark2); + if (this.watermarkTimestamp < 0 || minWatermark != this.watermarkTimestamp) { + this.watermarkTimestamp = minWatermark; + } + } + } + + @Override + public void processWatermark2(ControlTuple.Watermark watermark) + { + latestWatermark2 = watermark.getTimestamp(); + if (latestWatermark1 >= 0 && latestWatermark2 >= 0) { + long minWatermark = Math.min(latestWatermark1, latestWatermark2); + if (this.watermarkTimestamp < 0 || minWatermark != this.watermarkTimestamp) { + this.watermarkTimestamp = minWatermark; + } + } + } + + @Override + protected void processWatermarkAtEndWindow() + { + if (fixedWatermarkMillis > 0 || this.watermarkTimestamp != this.currentWatermark) { + super.processWatermarkAtEndWindow(); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java index 0ece11e..f965a01 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java @@ -79,16 +79,16 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext private Function<InputT, Long> timestampExtractor; - private long currentWatermark = -1; - private long watermarkTimestamp = -1; + protected long currentWatermark = -1; + protected long watermarkTimestamp = -1; private boolean triggerAtWatermark; - private long earlyTriggerCount; + protected long earlyTriggerCount; private long earlyTriggerMillis; - private long lateTriggerCount; + protected long lateTriggerCount; private long lateTriggerMillis; private long currentDerivedTimestamp = -1; private long timeIncrement; - private long fixedWatermarkMillis = -1; + protected long fixedWatermarkMillis = -1; private Map<String, Component<Context.OperatorContext>> components = new HashMap<>(); @@ -96,7 +96,9 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext protected RetractionStorageT retractionStorage; protected AccumulationT accumulation; - private static final transient Collection<? extends Window> GLOBAL_WINDOW_SINGLETON_SET = Collections.singleton(Window.GlobalWindow.INSTANCE); + + protected static final transient Collection<? extends Window> GLOBAL_WINDOW_SINGLETON_SET = Collections.singleton(Window.GlobalWindow.INSTANCE); + private static final transient Logger LOG = LoggerFactory.getLogger(AbstractWindowedOperator.class); public final transient DefaultInputPort<Tuple<InputT>> input = new DefaultInputPort<Tuple<InputT>>() @@ -135,28 +137,32 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext */ public void processTuple(Tuple<InputT> tuple) { - long timestamp = extractTimestamp(tuple); + long timestamp = extractTimestamp(tuple, timestampExtractor); if (isTooLate(timestamp)) { dropTuple(tuple); } else { Tuple.WindowedTuple<InputT> windowedTuple = getWindowedValue(tuple); // do the accumulation accumulateTuple(windowedTuple); + processWindowState(windowedTuple); + } + } - for (Window window : windowedTuple.getWindows()) { - WindowState windowState = windowStateMap.get(window); - windowState.tupleCount++; - // process any count based triggers - if (windowState.watermarkArrivalTime == -1) { - // watermark has not arrived yet, check for early count based trigger - if (earlyTriggerCount > 0 && (windowState.tupleCount % earlyTriggerCount) == 0) { - fireTrigger(window, windowState); - } - } else { - // watermark has arrived, check for late count based trigger - if (lateTriggerCount > 0 && (windowState.tupleCount % lateTriggerCount) == 0) { - fireTrigger(window, windowState); - } + protected void processWindowState(Tuple.WindowedTuple<? extends Object> windowedTuple) + { + for (Window window : windowedTuple.getWindows()) { + WindowState windowState = windowStateMap.get(window); + windowState.tupleCount++; + // process any count based triggers + if (windowState.watermarkArrivalTime == -1) { + // watermark has not arrived yet, check for early count based trigger + if (earlyTriggerCount > 0 && (windowState.tupleCount % earlyTriggerCount) == 0) { + fireTrigger(window, windowState); + } + } else { + // watermark has arrived, check for late count based trigger + if (lateTriggerCount > 0 && (windowState.tupleCount % lateTriggerCount) == 0) { + fireTrigger(window, windowState); } } } @@ -292,15 +298,31 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext @Override public Tuple.WindowedTuple<InputT> getWindowedValue(Tuple<InputT> input) { + long timestamp = extractTimestamp(input, timestampExtractor); + return getWindowedValueWithTimestamp(input, timestamp); + } + + public <T> Tuple.WindowedTuple<T> getWindowedValueWithTimestamp(Tuple<T> input, long timestamp) + { if (windowOption == null && input instanceof Tuple.WindowedTuple) { // inherit the windows from upstream - return (Tuple.WindowedTuple<InputT>)input; + initializeWindowStates(((Tuple.WindowedTuple<T>)input).getWindows()); + return (Tuple.WindowedTuple<T>)input; } else { - return new Tuple.WindowedTuple<>(assignWindows(input), extractTimestamp(input), input.getValue()); + return new Tuple.WindowedTuple<>(assignWindows(input, timestamp), timestamp, input.getValue()); } } - private long extractTimestamp(Tuple<InputT> tuple) + protected void initializeWindowStates(Collection<? extends Window> windows) + { + for (Window window : windows) { + if (!windowStateMap.containsWindow(window)) { + windowStateMap.put(window, new WindowState()); + } + } + } + + protected <T> long extractTimestamp(Tuple<T> tuple, Function<T, Long> timestampExtractor) { if (timestampExtractor == null) { if (tuple instanceof Tuple.TimestampedTuple) { @@ -313,19 +335,14 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext } } - private Collection<? extends Window> assignWindows(Tuple<InputT> inputTuple) + protected <T> Collection<? extends Window> assignWindows(Tuple<T> inputTuple, long timestamp) { if (windowOption instanceof WindowOption.GlobalWindow) { return GLOBAL_WINDOW_SINGLETON_SET; } else { - long timestamp = extractTimestamp(inputTuple); if (windowOption instanceof WindowOption.TimeWindows) { Collection<? extends Window> windows = getTimeWindowsForTimestamp(timestamp); - for (Window window : windows) { - if (!windowStateMap.containsWindow(window)) { - windowStateMap.put(window, new WindowState()); - } - } + initializeWindowStates(windows); return windows; } else if (windowOption instanceof WindowOption.SessionWindows) { return assignSessionWindows(timestamp, inputTuple); @@ -335,7 +352,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext } } - protected Collection<Window.SessionWindow> assignSessionWindows(long timestamp, Tuple<InputT> inputTuple) + protected <T> Collection<Window.SessionWindow> assignSessionWindows(long timestamp, Tuple<T> inputTuple) { throw new UnsupportedOperationException("Session window require keyed tuples"); } @@ -348,7 +365,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext * @param timestamp the timestamp * @return the windows this timestamp belongs to */ - private Collection<Window.TimeWindow> getTimeWindowsForTimestamp(long timestamp) + protected Collection<Window.TimeWindow> getTimeWindowsForTimestamp(long timestamp) { List<Window.TimeWindow> windows = new ArrayList<>(); if (windowOption instanceof WindowOption.TimeWindows) { @@ -382,7 +399,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext } @Override - public void dropTuple(Tuple<InputT> input) + public void dropTuple(Tuple input) { // do nothing LOG.debug("Dropping late tuple {}", input); @@ -464,7 +481,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext } } - private void processWatermarkAtEndWindow() + protected void processWatermarkAtEndWindow() { if (fixedWatermarkMillis > 0) { watermarkTimestamp = currentDerivedTimestamp - fixedWatermarkMillis; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedMergeOperatorImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedMergeOperatorImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedMergeOperatorImpl.java new file mode 100644 index 0000000..a5f17c5 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedMergeOperatorImpl.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl; + +import java.util.Map; + +import org.apache.apex.malhar.lib.window.MergeAccumulation; +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.Window; +import org.apache.apex.malhar.lib.window.WindowedStorage; + +import com.datatorrent.lib.util.KeyValPair; + + +/** + * Keyed Windowed Merge Operator to merge two streams of keyed tuple with a key. Please use + * {@link WindowedMergeOperatorImpl} for non-keyed merging. + * + * @param <KeyT> Type of the key used to merge two streams. + * @param <InputT1> The type of the value of the keyed input tuple from first stream. + * @param <InputT2> The type of the value of the keyed input tuple from second stream. + * @param <AccumT> The type of the accumulated value in the operator state per key per window. + * @param <OutputT> The type of the value of the keyed output tuple. + */ +public class KeyedWindowedMergeOperatorImpl<KeyT, InputT1, InputT2, AccumT, OutputT> + extends AbstractWindowedMergeOperator<KeyValPair<KeyT, InputT1>, KeyValPair<KeyT, InputT2>, KeyValPair<KeyT, OutputT>, WindowedStorage.WindowedKeyedStorage<KeyT, AccumT>, WindowedStorage.WindowedKeyedStorage<KeyT, OutputT>, MergeAccumulation<InputT1, InputT2, AccumT, OutputT>> +{ + // TODO: Add session window support. + + private abstract class AccumFunction<T> + { + abstract AccumT accumulate(AccumT accum, T value); + } + + private <T> void accumulateTupleHelper(Tuple.WindowedTuple<KeyValPair<KeyT, T>> tuple, AccumFunction<T> accumFn) + { + final KeyValPair<KeyT, T> kvData = tuple.getValue(); + KeyT key = kvData.getKey(); + for (Window window : tuple.getWindows()) { + // process each window + AccumT accum = dataStorage.get(window, key); + if (accum == null) { + accum = accumulation.defaultAccumulatedValue(); + } + dataStorage.put(window, key, accumFn.accumulate(accum, kvData.getValue())); + } + } + + @Override + public void accumulateTuple(Tuple.WindowedTuple<KeyValPair<KeyT, InputT1>> tuple) + { + accumulateTupleHelper(tuple, new AccumFunction<InputT1>() + { + @Override + AccumT accumulate(AccumT accum, InputT1 value) + { + return accumulation.accumulate(accum, value); + } + }); + } + + @Override + public void accumulateTuple2(Tuple.WindowedTuple<KeyValPair<KeyT, InputT2>> tuple) + { + accumulateTupleHelper(tuple, new AccumFunction<InputT2>() + { + @Override + AccumT accumulate(AccumT accum, InputT2 value) + { + return accumulation.accumulate2(accum, value); + } + }); + } + + @Override + public void fireNormalTrigger(Window window, boolean fireOnlyUpdatedPanes) + { + for (Map.Entry<KeyT, AccumT> entry : dataStorage.entries(window)) { + OutputT outputVal = accumulation.getOutput(entry.getValue()); + if (fireOnlyUpdatedPanes) { + OutputT oldValue = retractionStorage.get(window, entry.getKey()); + if (oldValue != null && oldValue.equals(outputVal)) { + continue; + } + } + output.emit(new Tuple.WindowedTuple<>(window, new KeyValPair<>(entry.getKey(), outputVal))); + if (retractionStorage != null) { + retractionStorage.put(window, entry.getKey(), outputVal); + } + } + } + + @Override + public void fireRetractionTrigger(Window window, boolean firingOnlyUpdatedPanes) + { + if (triggerOption.getAccumulationMode() != TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) { + throw new UnsupportedOperationException(); + } + for (Map.Entry<KeyT, OutputT> entry : retractionStorage.entries(window)) { + output.emit(new Tuple.WindowedTuple<>(window, new KeyValPair<>(entry.getKey(), accumulation.getRetraction(entry.getValue())))); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java index 6fab7de..a33133b 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java @@ -52,81 +52,85 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT> { @Override - protected Collection<Window.SessionWindow> assignSessionWindows(long timestamp, Tuple<KeyValPair<KeyT, InputValT>> inputTuple) + protected <T> Collection<Window.SessionWindow> assignSessionWindows(long timestamp, Tuple<T> inputTuple) { - KeyT key = inputTuple.getValue().getKey(); - WindowOption.SessionWindows sessionWindowOption = (WindowOption.SessionWindows)windowOption; - SessionWindowedStorage<KeyT, AccumT> sessionStorage = (SessionWindowedStorage<KeyT, AccumT>)dataStorage; - Collection<Map.Entry<Window.SessionWindow<KeyT>, AccumT>> sessionEntries = sessionStorage.getSessionEntries(key, timestamp, sessionWindowOption.getMinGap().getMillis()); - Window.SessionWindow<KeyT> sessionWindowToAssign; - switch (sessionEntries.size()) { - case 0: { - // There are no existing windows within the minimum gap. Create a new session window - Window.SessionWindow<KeyT> sessionWindow = new Window.SessionWindow<>(key, timestamp, 1); - windowStateMap.put(sessionWindow, new WindowState()); - sessionWindowToAssign = sessionWindow; - break; - } - case 1: { - // There is already one existing window within the minimum gap. See whether we need to extend the time of that window - Map.Entry<Window.SessionWindow<KeyT>, AccumT> sessionWindowEntry = sessionEntries.iterator().next(); - Window.SessionWindow<KeyT> sessionWindow = sessionWindowEntry.getKey(); - if (sessionWindow.getBeginTimestamp() <= timestamp && timestamp < sessionWindow.getBeginTimestamp() + sessionWindow.getDurationMillis()) { - // The session window already covers the event + if (!(inputTuple.getValue() instanceof KeyValPair)) { + throw new UnsupportedOperationException("Session window require keyed tuples"); + } else { + KeyT key = ((KeyValPair<KeyT, ?>)inputTuple.getValue()).getKey(); + WindowOption.SessionWindows sessionWindowOption = (WindowOption.SessionWindows)windowOption; + SessionWindowedStorage<KeyT, AccumT> sessionStorage = (SessionWindowedStorage<KeyT, AccumT>)dataStorage; + Collection<Map.Entry<Window.SessionWindow<KeyT>, AccumT>> sessionEntries = sessionStorage.getSessionEntries(key, timestamp, sessionWindowOption.getMinGap().getMillis()); + Window.SessionWindow<KeyT> sessionWindowToAssign; + switch (sessionEntries.size()) { + case 0: { + // There are no existing windows within the minimum gap. Create a new session window + Window.SessionWindow<KeyT> sessionWindow = new Window.SessionWindow<>(key, timestamp, 1); + windowStateMap.put(sessionWindow, new WindowState()); sessionWindowToAssign = sessionWindow; - } else { - // The session window does not cover the event but is within the min gap + break; + } + case 1: { + // There is already one existing window within the minimum gap. See whether we need to extend the time of that window + Map.Entry<Window.SessionWindow<KeyT>, AccumT> sessionWindowEntry = sessionEntries.iterator().next(); + Window.SessionWindow<KeyT> sessionWindow = sessionWindowEntry.getKey(); + if (sessionWindow.getBeginTimestamp() <= timestamp && timestamp < sessionWindow.getBeginTimestamp() + sessionWindow.getDurationMillis()) { + // The session window already covers the event + sessionWindowToAssign = sessionWindow; + } else { + // The session window does not cover the event but is within the min gap + if (triggerOption != null && + triggerOption.getAccumulationMode() == TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) { + // fire a retraction trigger because the session window will be enlarged + fireRetractionTrigger(sessionWindow, false); + } + // create a new session window that covers the timestamp + long newBeginTimestamp = Math.min(sessionWindow.getBeginTimestamp(), timestamp); + long newEndTimestamp = Math.max(sessionWindow.getBeginTimestamp() + sessionWindow.getDurationMillis(), timestamp + 1); + Window.SessionWindow<KeyT> newSessionWindow = + new Window.SessionWindow<>(key, newBeginTimestamp, newEndTimestamp - newBeginTimestamp); + windowStateMap.remove(sessionWindow); + sessionStorage.migrateWindow(sessionWindow, newSessionWindow); + windowStateMap.put(newSessionWindow, new WindowState()); + sessionWindowToAssign = newSessionWindow; + } + break; + } + case 2: { + // There are two windows that fall within the minimum gap of the timestamp. We need to merge the two windows + Iterator<Map.Entry<Window.SessionWindow<KeyT>, AccumT>> iterator = sessionEntries.iterator(); + Map.Entry<Window.SessionWindow<KeyT>, AccumT> sessionWindowEntry1 = iterator.next(); + Map.Entry<Window.SessionWindow<KeyT>, AccumT> sessionWindowEntry2 = iterator.next(); + Window.SessionWindow<KeyT> sessionWindow1 = sessionWindowEntry1.getKey(); + Window.SessionWindow<KeyT> sessionWindow2 = sessionWindowEntry2.getKey(); + AccumT sessionData1 = sessionWindowEntry1.getValue(); + AccumT sessionData2 = sessionWindowEntry2.getValue(); if (triggerOption != null && triggerOption.getAccumulationMode() == TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) { - // fire a retraction trigger because the session window will be enlarged - fireRetractionTrigger(sessionWindow, false); + // fire a retraction trigger because the two session windows will be merged to a new window + fireRetractionTrigger(sessionWindow1, false); + fireRetractionTrigger(sessionWindow2, false); } - // create a new session window that covers the timestamp - long newBeginTimestamp = Math.min(sessionWindow.getBeginTimestamp(), timestamp); - long newEndTimestamp = Math.max(sessionWindow.getBeginTimestamp() + sessionWindow.getDurationMillis(), timestamp + 1); - Window.SessionWindow<KeyT> newSessionWindow = - new Window.SessionWindow<>(key, newBeginTimestamp, newEndTimestamp - newBeginTimestamp); - windowStateMap.remove(sessionWindow); - sessionStorage.migrateWindow(sessionWindow, newSessionWindow); + long newBeginTimestamp = Math.min(sessionWindow1.getBeginTimestamp(), sessionWindow2.getBeginTimestamp()); + long newEndTimestamp = Math.max(sessionWindow1.getBeginTimestamp() + sessionWindow1.getDurationMillis(), + sessionWindow2.getBeginTimestamp() + sessionWindow2.getDurationMillis()); + + Window.SessionWindow<KeyT> newSessionWindow = new Window.SessionWindow<>(key, newBeginTimestamp, newEndTimestamp - newBeginTimestamp); + AccumT newSessionData = accumulation.merge(sessionData1, sessionData2); + sessionStorage.remove(sessionWindow1); + sessionStorage.remove(sessionWindow2); + sessionStorage.put(newSessionWindow, key, newSessionData); + windowStateMap.remove(sessionWindow1); + windowStateMap.remove(sessionWindow2); windowStateMap.put(newSessionWindow, new WindowState()); sessionWindowToAssign = newSessionWindow; + break; } - break; - } - case 2: { - // There are two windows that fall within the minimum gap of the timestamp. We need to merge the two windows - Iterator<Map.Entry<Window.SessionWindow<KeyT>, AccumT>> iterator = sessionEntries.iterator(); - Map.Entry<Window.SessionWindow<KeyT>, AccumT> sessionWindowEntry1 = iterator.next(); - Map.Entry<Window.SessionWindow<KeyT>, AccumT> sessionWindowEntry2 = iterator.next(); - Window.SessionWindow<KeyT> sessionWindow1 = sessionWindowEntry1.getKey(); - Window.SessionWindow<KeyT> sessionWindow2 = sessionWindowEntry2.getKey(); - AccumT sessionData1 = sessionWindowEntry1.getValue(); - AccumT sessionData2 = sessionWindowEntry2.getValue(); - if (triggerOption != null && - triggerOption.getAccumulationMode() == TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) { - // fire a retraction trigger because the two session windows will be merged to a new window - fireRetractionTrigger(sessionWindow1, false); - fireRetractionTrigger(sessionWindow2, false); - } - long newBeginTimestamp = Math.min(sessionWindow1.getBeginTimestamp(), sessionWindow2.getBeginTimestamp()); - long newEndTimestamp = Math.max(sessionWindow1.getBeginTimestamp() + sessionWindow1.getDurationMillis(), - sessionWindow2.getBeginTimestamp() + sessionWindow2.getDurationMillis()); - - Window.SessionWindow<KeyT> newSessionWindow = new Window.SessionWindow<>(key, newBeginTimestamp, newEndTimestamp - newBeginTimestamp); - AccumT newSessionData = accumulation.merge(sessionData1, sessionData2); - sessionStorage.remove(sessionWindow1); - sessionStorage.remove(sessionWindow2); - sessionStorage.put(newSessionWindow, key, newSessionData); - windowStateMap.remove(sessionWindow1); - windowStateMap.remove(sessionWindow2); - windowStateMap.put(newSessionWindow, new WindowState()); - sessionWindowToAssign = newSessionWindow; - break; + default: + throw new IllegalStateException("There are more than two sessions matching one timestamp"); } - default: - throw new IllegalStateException("There are more than two sessions matching one timestamp"); + return Collections.<Window.SessionWindow>singleton(sessionWindowToAssign); } - return Collections.<Window.SessionWindow>singleton(sessionWindowToAssign); } @Override http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorImpl.java new file mode 100644 index 0000000..38eeff0 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorImpl.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl; + +import org.apache.apex.malhar.lib.window.MergeAccumulation; +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.Window; +import org.apache.apex.malhar.lib.window.WindowedStorage; + +/** + * Windowed Merge Operator to merge two streams together. It aggregates tuple from two + * input streams, perform merge operation base on its merge accumulation, and output one + * result stream to downstream. + * + * @param <InputT1> The type of input tuple from first stream. + * @param <InputT2> The type of input tuple from first stream. + * @param <AccumT> The type of the accumulated value in the operator state per key per window. + * @param <OutputT> The type of output tuple. + */ +public class WindowedMergeOperatorImpl<InputT1, InputT2, AccumT, OutputT> + extends AbstractWindowedMergeOperator<InputT1, InputT2, OutputT, WindowedStorage.WindowedPlainStorage<AccumT>, WindowedStorage.WindowedPlainStorage<OutputT>, MergeAccumulation<InputT1, InputT2, AccumT, OutputT>> +{ + private abstract class AccumFunction<T> + { + abstract AccumT accumulate(AccumT accum, T value); + } + + private <T> void accumulateTupleHelper(Tuple.WindowedTuple<T> tuple, AccumFunction<T> accumFn) + { + for (Window window : tuple.getWindows()) { + // process each window + AccumT accum = dataStorage.get(window); + if (accum == null) { + accum = accumulation.defaultAccumulatedValue(); + } + dataStorage.put(window, accumFn.accumulate(accum, tuple.getValue())); + } + } + + @Override + public void accumulateTuple2(Tuple.WindowedTuple<InputT2> tuple) + { + accumulateTupleHelper(tuple, new AccumFunction<InputT2>() + { + @Override + AccumT accumulate(AccumT accum, InputT2 value) + { + return accumulation.accumulate2(accum, value); + } + }); + } + + @Override + public void accumulateTuple(Tuple.WindowedTuple<InputT1> tuple) + { + accumulateTupleHelper(tuple, new AccumFunction<InputT1>() + { + @Override + AccumT accumulate(AccumT accum, InputT1 value) + { + return accumulation.accumulate(accum, value); + } + }); + } + + @Override + public void fireNormalTrigger(Window window, boolean fireOnlyUpdatedPanes) + { + AccumT accumulatedValue = dataStorage.get(window); + OutputT outputValue = accumulation.getOutput(accumulatedValue); + + if (fireOnlyUpdatedPanes) { + OutputT oldValue = retractionStorage.get(window); + if (oldValue != null && oldValue.equals(outputValue)) { + return; + } + } + output.emit(new Tuple.WindowedTuple<>(window, outputValue)); + if (retractionStorage != null) { + retractionStorage.put(window, outputValue); + } + } + + @Override + public void fireRetractionTrigger(Window window, boolean firingOnlyUpdatedPanes) + { + if (triggerOption.getAccumulationMode() != TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) { + throw new UnsupportedOperationException(); + } + OutputT oldValue = retractionStorage.get(window); + if (oldValue != null) { + output.emit(new Tuple.WindowedTuple<>(window, accumulation.getRetraction(oldValue))); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/CoGroupTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/CoGroupTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/CoGroupTest.java new file mode 100644 index 0000000..d988081 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/CoGroupTest.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import java.util.List; +import java.util.Set; + +import org.junit.Assert; +import org.junit.Test; + +/** + * Test for {@link InnerJoin}. + */ +public class CoGroupTest +{ + + @Test + public void CoGroupTest() + { + CoGroup<Long> cg = new CoGroup<Long>(); + List<Set<Long>> accu = cg.defaultAccumulatedValue(); + + Assert.assertEquals(2, accu.size()); + for (int i = 0; i < 2; i++) { + Assert.assertEquals(0, accu.get(i).size()); + } + + for (long i = 1; i <= 3; i++) { + accu = cg.accumulate(accu, i); + accu = cg.accumulate2(accu, i * 2); + } + + for (int i = 0; i < 2; i++) { + Assert.assertEquals(3, accu.get(i).size()); + } + + Assert.assertEquals(2, cg.getOutput(accu).size()); + for (int i = 0; i < 2; i++) { + Assert.assertEquals(3, cg.getOutput(accu).get(i).size()); + } + Assert.assertTrue(1 == cg.getOutput(accu).get(0).get(0)); + Assert.assertTrue(4 == cg.getOutput(accu).get(1).get(1)); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/InnerJoinTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/InnerJoinTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/InnerJoinTest.java new file mode 100644 index 0000000..1a379a4 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/InnerJoinTest.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import java.util.List; +import java.util.Set; + +import org.junit.Assert; +import org.junit.Test; + +/** + * Test for {@link InnerJoin}. + */ +public class InnerJoinTest +{ + @Test + public void CombineTest() + { + InnerJoin<Long> cb = new InnerJoin<Long>(); + List<Set<Long>> accu = cb.defaultAccumulatedValue(); + + Assert.assertEquals(2, accu.size()); + for (int i = 0; i < 2; i++) { + Assert.assertEquals(0, accu.get(i).size()); + } + + for (long i = 1; i <= 3; i++) { + accu = cb.accumulate(accu, i); + accu = cb.accumulate2(accu, i * 2); + } + + for (int i = 0; i < 2; i++) { + Assert.assertEquals(3, accu.get(i).size()); + } + + Assert.assertEquals(9, cb.getOutput(accu).size()); + for (int i = 0; i < 9; i++) { + Assert.assertEquals(2, cb.getOutput(accu).get(i).size()); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java new file mode 100644 index 0000000..63690d1 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Test; + +/** + * Test for {@link PojoInnerJoin}. + */ +public class PojoInnerJoinTest +{ + + public static class TestPojo1 + { + private int uId; + private String uName; + + public TestPojo1() + { + + } + + public TestPojo1(int id, String name) + { + this.uId = id; + this.uName = name; + } + + public int getuId() + { + return uId; + } + + public void setuId(int uId) + { + this.uId = uId; + } + + public String getuName() + { + return uName; + } + + public void setuName(String uName) + { + this.uName = uName; + } + } + + public static class TestPojo2 + { + private int uId; + private String dep; + + public TestPojo2() + { + + } + + public TestPojo2(int id, String dep) + { + this.uId = id; + this.dep = dep; + } + + public int getuId() + { + return uId; + } + + public void setuId(int uId) + { + this.uId = uId; + } + + public String getDep() + { + return dep; + } + + public void setDep(String dep) + { + this.dep = dep; + } + } + + + @Test + public void PojoInnerJoinTest() + { + PojoInnerJoin<TestPojo1, TestPojo2> pij = new PojoInnerJoin<>(2, "uId", "uId"); + + List<List<Map<String, Object>>> accu = pij.defaultAccumulatedValue(); + + Assert.assertEquals(2, accu.size()); + + accu = pij.accumulate(accu, new TestPojo1(1, "Josh")); + accu = pij.accumulate(accu, new TestPojo1(2, "Bob")); + + accu = pij.accumulate2(accu, new TestPojo2(1, "CS")); + accu = pij.accumulate2(accu, new TestPojo2(3, "ECE")); + + Map<String, Object> result = new HashMap<>(); + result.put("uId", 1); + result.put("uName", "Josh"); + result.put("dep", "CS"); + + Assert.assertEquals(1, pij.getOutput(accu).size()); + Assert.assertEquals(result, pij.getOutput(accu).get(0)); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/test/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedMergeOperatorTestApplication.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedMergeOperatorTestApplication.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedMergeOperatorTestApplication.java new file mode 100644 index 0000000..15e3d4e --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedMergeOperatorTestApplication.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.joda.time.Duration; + +import org.apache.apex.malhar.lib.window.ControlTuple; +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.Window; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.lib.window.WindowState; +import org.apache.apex.malhar.lib.window.WindowedStorage; +import org.apache.apex.malhar.lib.window.accumulation.InnerJoin; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.util.KeyValPair; + +/** + * Example application using {@link KeyedWindowedMergeOperatorImpl}. Generators send streams of key-value pairs of + * <{@link String}, {@link Integer}>, Merge operator combines the two streams base on the key of the tuple. + */ +public class KeyedWindowedMergeOperatorTestApplication implements StreamingApplication +{ + private static WindowedStorage.WindowedPlainStorage<WindowState> windowStateMap = new InMemoryWindowedStorage<>(); + private static final long windowDuration = 1000; + private static final String[] keys = new String[]{"A", "B", "C", "D", "E"}; + + + public static Window.TimeWindow assignTestWindow(long timestamp) + { + long beginTimestamp = timestamp - timestamp % windowDuration; + Window.TimeWindow window = new Window.TimeWindow(beginTimestamp, windowDuration); + if (!windowStateMap.containsWindow(window)) { + windowStateMap.put(window, new WindowState()); + } + return window; + } + + public static class NumGen1 extends BaseOperator implements InputOperator + { + private int i; + private long watermarkTime; + private long startingTime; + + public final transient DefaultOutputPort<Tuple.WindowedTuple<KeyValPair<String, Integer>>> output = new DefaultOutputPort<>(); + public final transient DefaultOutputPort<ControlTuple> watermarkDefaultOutputPort = new DefaultOutputPort<>(); + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + startingTime = System.currentTimeMillis(); + watermarkTime = System.currentTimeMillis() + 10000; + i = 1; + } + + @Override + public void emitTuples() + { + while (i <= 20) { + if (System.currentTimeMillis() - startingTime >= (i + 1) * 400) { + output.emit(new Tuple.WindowedTuple<KeyValPair<String, Integer>>(assignTestWindow(System.currentTimeMillis()), new KeyValPair<String, Integer>(keys[i % 5], i))); + i++; + } + } + } + + @Override + public void endWindow() + { + if (i <= 20) { + watermarkDefaultOutputPort.emit(new WatermarkImpl(watermarkTime)); + } + } + } + + public static class NumGen2 extends BaseOperator implements InputOperator + { + private int i; + private long watermarkTime; + private long startingTime; + + public final transient DefaultOutputPort<Tuple.WindowedTuple<KeyValPair<String, Integer>>> output = new DefaultOutputPort<>(); + public final transient DefaultOutputPort<ControlTuple> watermarkDefaultOutputPort = new DefaultOutputPort<>(); + + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + startingTime = System.currentTimeMillis(); + watermarkTime = System.currentTimeMillis() + 10000; + i = 1; + } + + @Override + public void emitTuples() + { + while (i <= 20) { + if (System.currentTimeMillis() - startingTime >= (i + 1) * 400) { + output.emit(new Tuple.WindowedTuple<KeyValPair<String, Integer>>(assignTestWindow(System.currentTimeMillis()), new KeyValPair<String, Integer>(keys[i % 5], 10 * i))); + i++; + } + } + } + + @Override + public void endWindow() + { + if (i <= 20) { + watermarkDefaultOutputPort.emit(new WatermarkImpl(watermarkTime)); + } + } + } + + public static class Collector extends BaseOperator + { + public static Map<String, List<List<Integer>>> result = new HashMap<>(); + + public final transient DefaultOutputPort<Tuple.WindowedTuple<KeyValPair<String, List<List<Integer>>>>> output = new DefaultOutputPort<>(); + + public final transient DefaultInputPort<Tuple.WindowedTuple<KeyValPair<String, List<List<Integer>>>>> input = new DefaultInputPort<Tuple.WindowedTuple<KeyValPair<String, List<List<Integer>>>>>() + { + @Override + public void process(Tuple.WindowedTuple<KeyValPair<String, List<List<Integer>>>> tuple) + { + result.put(tuple.getValue().getKey(), tuple.getValue().getValue()); + output.emit(tuple); + } + }; + } + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + KeyedWindowedMergeOperatorImpl<String, Integer, Integer, List<Set<Integer>>, List<List<Integer>>> op + = dag.addOperator("Merge", new KeyedWindowedMergeOperatorImpl<String, Integer, Integer, List<Set<Integer>>, List<List<Integer>>>()); + + //op.setAccumulation(new CoGroup<Integer>()); + op.setAccumulation(new InnerJoin<Integer>()); + + op.setDataStorage(new InMemoryWindowedKeyedStorage<String, List<Set<Integer>>>()); + op.setRetractionStorage(new InMemoryWindowedKeyedStorage<String, List<List<Integer>>>()); + op.setWindowStateStorage(windowStateMap); + + // Can select one of the following window options, or don't select any of them. + op.setWindowOption(new WindowOption.GlobalWindow()); + //op.setWindowOption(new WindowOption.TimeWindows(Duration.millis(4000))); + + op.setTriggerOption(new TriggerOption().withEarlyFiringsAtEvery(1).accumulatingFiredPanes()); + op.setAllowedLateness(Duration.millis(500)); + + NumGen1 numGen1 = dag.addOperator("numGen1", new NumGen1()); + NumGen2 numGen2 = dag.addOperator("numGen2", new NumGen2()); + + Collector collector = dag.addOperator("collector", new Collector()); + ConsoleOutputOperator con = dag.addOperator("console", new ConsoleOutputOperator()); + + dag.addStream("num1", numGen1.output, op.input); + dag.addStream("num2", numGen2.output, op.input2); + dag.addStream("wm1", numGen1.watermarkDefaultOutputPort, op.controlInput); + dag.addStream("wm2", numGen2.watermarkDefaultOutputPort, op.controlInput2); + + dag.addStream("MergedResult", op.output, collector.input); + dag.addStream("output", collector.output, con.input); + } + + public static void main(String[] args) throws Exception + { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + lma.prepareDAG(new KeyedWindowedMergeOperatorTestApplication(), conf); + LocalMode.Controller lc = lma.getController(); + lc.run(20000); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/test/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTest.java new file mode 100644 index 0000000..7dc09d0 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTest.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl; + +import java.util.List; +import java.util.Set; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.Window; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.lib.window.WindowState; +import org.apache.apex.malhar.lib.window.accumulation.CoGroup; + +import com.google.common.base.Function; + +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.KeyValPair; + +/** + * Unit tests for Windowed Join Operator + */ +public class WindowedMergeOperatorTest +{ + @Test + public void extractTimestampTest() + { + WindowedMergeOperatorImpl op = createDefaultWindowedMergeOperator(); + Function<Integer, Long> timestampExtractor = new Function<Integer, Long>() + { + @Override + public Long apply(Integer input) + { + return (input * 10L); + } + }; + + Assert.assertEquals(1000L, op.extractTimestamp(new Tuple.PlainTuple<Integer>(100), timestampExtractor)); + Assert.assertEquals(2000L, op.extractTimestamp(new Tuple.PlainTuple<Integer>(200), timestampExtractor)); + Assert.assertEquals(200L, op.extractTimestamp(new Tuple.TimestampedTuple<Integer>(200L, 10), null)); + } + + + @Test + public void windowedMergeOperatorMergeTest() + { + WindowedMergeOperatorImpl<Integer, Integer, List<Set<Integer>>, List<List<Integer>>> op = createDefaultWindowedMergeOperator(); + Window global = Window.GlobalWindow.INSTANCE; + op.setDataStorage(new InMemoryWindowedStorage<List<Set<Integer>>>()); + op.setWindowOption(new WindowOption.GlobalWindow()); + op.initializeWindowStates(AbstractWindowedOperator.GLOBAL_WINDOW_SINGLETON_SET); + + op.processTuple(new Tuple.WindowedTuple<Integer>(global, 100)); + Assert.assertEquals(1, op.dataStorage.get(global).get(0).size()); + op.processTuple2(new Tuple.WindowedTuple<Integer>(global, 200)); + Assert.assertEquals(1, op.dataStorage.get(global).get(1).size()); + op.processTuple(new Tuple.WindowedTuple<Integer>(global, 300)); + Assert.assertEquals(2, op.dataStorage.get(global).get(0).size()); + Assert.assertEquals(2, op.accumulation.getOutput(op.dataStorage.get(global)).size()); + } + + @Test + public void keyedWindowedMergeOperatorMergeTest() + { + KeyedWindowedMergeOperatorImpl<String, Integer, Integer, List<Set<Integer>>, List<List<Integer>>> op + = createDefaultKeyedWindowedMergeOperator(); + Window global = Window.GlobalWindow.INSTANCE; + op.setDataStorage(new InMemoryWindowedKeyedStorage<String, List<Set<Integer>>>()); + op.setWindowOption(new WindowOption.GlobalWindow()); + op.initializeWindowStates(AbstractWindowedOperator.GLOBAL_WINDOW_SINGLETON_SET); + + op.processTuple(new Tuple.WindowedTuple<KeyValPair<String, Integer>>(global, new KeyValPair<String, Integer>("A", 100))); + Assert.assertEquals(1, op.dataStorage.get(global, "A").get(0).size()); + Assert.assertTrue(op.dataStorage.get(global, "A").get(0).contains(100)); + + op.processTuple2(new Tuple.WindowedTuple<KeyValPair<String, Integer>>(global, new KeyValPair<String, Integer>("A", 200))); + Assert.assertEquals(1, op.dataStorage.get(global, "A").get(1).size()); + Assert.assertTrue(op.dataStorage.get(global, "A").get(1).contains(200)); + + op.processTuple2(new Tuple.WindowedTuple<KeyValPair<String, Integer>>(global, new KeyValPair<String, Integer>("B", 300))); + Assert.assertEquals(1, op.dataStorage.get(global, "A").get(1).size()); + Assert.assertEquals(1, op.dataStorage.get(global, "B").get(1).size()); + Assert.assertTrue(op.dataStorage.get(global, "B").get(1).contains(300)); + + Assert.assertEquals(2, op.accumulation.getOutput(op.dataStorage.get(global, "A")).size()); + } + + @Test + public void windowedMergeOperatorWatermarkTest() + { + WindowedMergeOperatorImpl op = createDefaultWindowedMergeOperator(); + CollectorTestSink<WatermarkImpl> sink = new CollectorTestSink<>(); + op.controlOutput.setSink(sink); + + // No watermark is generated if the Merge operator haven't seen all watermarks from all input streams. + op.controlInput.process(new WatermarkImpl(1000000)); + op.endWindow(); + Assert.assertEquals(-1, op.currentWatermark); + Assert.assertEquals(0, sink.collectedTuples.size()); + + // Once both input streams sent watermarks to Merge operator, it should generate a watermark and send to downstream. + op.controlInput2.process(new WatermarkImpl(200000)); + op.endWindow(); + Assert.assertEquals(200000, op.currentWatermark); + Assert.assertEquals(1, sink.collectedTuples.size()); + + // If the minimum of the latest input watermarks changes, Merge operator should also generate a new watermark. + op.controlInput2.process(new WatermarkImpl(2100000)); + op.endWindow(); + Assert.assertEquals(1000000, op.currentWatermark); + Assert.assertEquals(2, sink.collectedTuples.size()); + + // Current watermark of Merge operator could only change during endWindow() event. + op.controlInput.process(new WatermarkImpl(1100000)); + Assert.assertEquals(1100000, op.currentWatermark); + op.endWindow(); + Assert.assertEquals(3, sink.collectedTuples.size()); + + // If the upstreams sent a watermark but the minimum of the latest input watermarks doesn't change, the Merge + // operator should not generate a new watermark, thus nothing will be sent to downstream. + op.controlInput.process(new WatermarkImpl(1100000)); + op.endWindow(); + Assert.assertEquals(1100000, op.currentWatermark); + Assert.assertEquals(3, sink.collectedTuples.size()); + } + + private WindowedMergeOperatorImpl<Integer, Integer, List<Set<Integer>>, List<List<Integer>>> createDefaultWindowedMergeOperator() + { + WindowedMergeOperatorImpl<Integer, Integer, List<Set<Integer>>, List<List<Integer>>> windowedMergeOperator = new WindowedMergeOperatorImpl<>(); + windowedMergeOperator.setDataStorage(new InMemoryWindowedStorage<List<Set<Integer>>>()); + windowedMergeOperator.setRetractionStorage(new InMemoryWindowedStorage<List<List<Integer>>>()); + windowedMergeOperator.setWindowStateStorage(new InMemoryWindowedStorage<WindowState>()); + windowedMergeOperator.setAccumulation(new CoGroup<Integer>()); + return windowedMergeOperator; + } + + private KeyedWindowedMergeOperatorImpl<String, Integer, Integer, List<Set<Integer>>, List<List<Integer>>> createDefaultKeyedWindowedMergeOperator() + { + KeyedWindowedMergeOperatorImpl<String, Integer, Integer, List<Set<Integer>>, List<List<Integer>>> windowedMergeOperator = new KeyedWindowedMergeOperatorImpl<>(); + windowedMergeOperator.setDataStorage(new InMemoryWindowedKeyedStorage<String, List<Set<Integer>>>()); + windowedMergeOperator.setRetractionStorage(new InMemoryWindowedKeyedStorage<String, List<List<Integer>>>()); + windowedMergeOperator.setWindowStateStorage(new InMemoryWindowedStorage<WindowState>()); + windowedMergeOperator.setAccumulation(new CoGroup<Integer>()); + return windowedMergeOperator; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/test/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTestApplication.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTestApplication.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTestApplication.java new file mode 100644 index 0000000..5d80388 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTestApplication.java @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import org.joda.time.Duration; + +import org.apache.apex.malhar.lib.window.ControlTuple; +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.Window; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.lib.window.WindowState; +import org.apache.apex.malhar.lib.window.WindowedStorage; +import org.apache.apex.malhar.lib.window.accumulation.CoGroup; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.io.ConsoleOutputOperator; + +/** + * Example application to show usage of Windowed Merge Operator. Two generators send out two streams of integers, + * the Merge operator will do a co-group to Merge all income tuples and send output to collector and output to console. + * User can choose different window options and see how the application behaves. + */ +public class WindowedMergeOperatorTestApplication implements StreamingApplication +{ + private static WindowedStorage.WindowedPlainStorage<WindowState> windowStateMap = new InMemoryWindowedStorage<>(); + private static final long windowDuration = 1000; + + + public static Window.TimeWindow assignTestWindow(long timestamp) + { + long beginTimestamp = timestamp - timestamp % windowDuration; + Window.TimeWindow window = new Window.TimeWindow(beginTimestamp, windowDuration); + if (!windowStateMap.containsWindow(window)) { + windowStateMap.put(window, new WindowState()); + } + return window; + } + + public static class NumGen1 extends BaseOperator implements InputOperator + { + private int i; + private long watermarkTime; + private long startingTime; + + public final transient DefaultOutputPort<Tuple.WindowedTuple<Integer>> output = new DefaultOutputPort<>(); + public final transient DefaultOutputPort<ControlTuple> watermarkDefaultOutputPort = new DefaultOutputPort<>(); + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + startingTime = System.currentTimeMillis(); + watermarkTime = System.currentTimeMillis() + 10000; + i = 1; + } + + @Override + public void emitTuples() + { + while (i <= 20) { + if (System.currentTimeMillis() - startingTime >= (i + 1) * 400) { + output.emit(new Tuple.WindowedTuple<Integer>(assignTestWindow(System.currentTimeMillis()), i)); + i++; + } + } + } + + @Override + public void endWindow() + { + if (i <= 20) { + watermarkDefaultOutputPort.emit(new WatermarkImpl(watermarkTime)); + } + } + } + + public static class NumGen2 extends BaseOperator implements InputOperator + { + private int i; + private long watermarkTime; + private long startingTime; + + public final transient DefaultOutputPort<Tuple.WindowedTuple<Integer>> output = new DefaultOutputPort<>(); + public final transient DefaultOutputPort<ControlTuple> watermarkDefaultOutputPort = new DefaultOutputPort<>(); + + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + startingTime = System.currentTimeMillis(); + watermarkTime = System.currentTimeMillis() + 10000; + i = 1; + } + + @Override + public void emitTuples() + { + while (i <= 20) { + if (System.currentTimeMillis() - startingTime >= (i + 1) * 400) { + output.emit(new Tuple.WindowedTuple<Integer>(assignTestWindow(System.currentTimeMillis()), 10 * i)); + i++; + } + } + } + + @Override + public void endWindow() + { + if (i <= 20) { + watermarkDefaultOutputPort.emit(new WatermarkImpl(watermarkTime)); + } + } + } + + public static class Collector extends BaseOperator + { + public static List<List<List<Integer>>> result = new ArrayList<>(); + + public final transient DefaultOutputPort<Tuple<List<List<Integer>>>> output = new DefaultOutputPort<>(); + + public final transient DefaultInputPort<Tuple<List<List<Integer>>>> input = new DefaultInputPort<Tuple<List<List<Integer>>>>() + { + @Override + public void process(Tuple<List<List<Integer>>> tuple) + { + result.add(tuple.getValue()); + output.emit(tuple); + } + }; + } + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + WindowedMergeOperatorImpl<Integer, Integer, List<Set<Integer>>, List<List<Integer>>> op + = dag.addOperator("Merge", new WindowedMergeOperatorImpl<Integer, Integer, List<Set<Integer>>, List<List<Integer>>>()); + op.setAccumulation(new CoGroup<Integer>()); + op.setDataStorage(new InMemoryWindowedStorage<List<Set<Integer>>>()); + op.setRetractionStorage(new InMemoryWindowedStorage<List<List<Integer>>>()); + op.setWindowStateStorage(windowStateMap); + + // Can select one of the following window options, or don't select any of them. + //op.setWindowOption(new WindowOption.GlobalWindow()); + op.setWindowOption(new WindowOption.TimeWindows(Duration.millis(2000))); + + op.setTriggerOption(new TriggerOption().withEarlyFiringsAtEvery(1).accumulatingFiredPanes()); + op.setAllowedLateness(Duration.millis(500)); + + NumGen1 numGen1 = dag.addOperator("numGen1", new NumGen1()); + NumGen2 numGen2 = dag.addOperator("numGen2", new NumGen2()); + + Collector collector = dag.addOperator("collector", new Collector()); + ConsoleOutputOperator con = dag.addOperator("console", new ConsoleOutputOperator()); + + dag.addStream("num1", numGen1.output, op.input); + dag.addStream("num2", numGen2.output, op.input2); + dag.addStream("wm1", numGen1.watermarkDefaultOutputPort, op.controlInput); + dag.addStream("wm2", numGen2.watermarkDefaultOutputPort, op.controlInput2); + + dag.addStream("MergedResult", op.output, collector.input); + dag.addStream("output", collector.output, con.input); + } + + public static void main(String[] args) throws Exception + { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + lma.prepareDAG(new WindowedMergeOperatorTestApplication(), conf); + LocalMode.Controller lc = lma.getController(); + lc.run(20000); + } +}
