Introduced WindowedMergeOperatorFeatures classes to solve the problem of code duplication
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/aeb10f33 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/aeb10f33 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/aeb10f33 Branch: refs/heads/master Commit: aeb10f33d54b6b661cb4f776a4cad0e41d5375c3 Parents: 92bd732 Author: David Yan <[email protected]> Authored: Mon Sep 26 16:47:11 2016 -0700 Committer: Shunxin <[email protected]> Committed: Mon Oct 10 19:07:25 2016 -0700 ---------------------------------------------------------------------- .../malhar/lib/window/MergeAccumulation.java | 2 +- .../lib/window/MergeWindowedOperator.java | 12 +- .../impl/AbstractWindowedMergeOperator.java | 123 ------------ .../window/impl/AbstractWindowedOperator.java | 34 +++- .../impl/KeyedWindowedMergeOperatorImpl.java | 105 +++++------ .../impl/WindowedMergeOperatorFeatures.java | 185 +++++++++++++++++++ .../window/impl/WindowedMergeOperatorImpl.java | 106 ++++++----- .../window/impl/WindowedMergeOperatorTest.java | 2 +- 8 files changed, 320 insertions(+), 249 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/aeb10f33/library/src/main/java/org/apache/apex/malhar/lib/window/MergeAccumulation.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/MergeAccumulation.java b/library/src/main/java/org/apache/apex/malhar/lib/window/MergeAccumulation.java index 71f4408..53cfd40 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/MergeAccumulation.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/MergeAccumulation.java @@ -21,7 +21,7 @@ package org.apache.apex.malhar.lib.window; import org.apache.hadoop.classification.InterfaceStability; /** - * This is the interface for accumulation when joining multiple streams. + * This is the interface for accumulation when joining two streams. * * @since 3.5.0 */ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/aeb10f33/library/src/main/java/org/apache/apex/malhar/lib/window/MergeWindowedOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/MergeWindowedOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/MergeWindowedOperator.java index 89a70a4..1561caa 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/MergeWindowedOperator.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/MergeWindowedOperator.java @@ -19,12 +19,22 @@ package org.apache.apex.malhar.lib.window; /** - * Interface for Join Windowed Operator. + * Interface for Merge Windowed Operator. */ public interface MergeWindowedOperator<InputT1, InputT2> extends WindowedOperator<InputT1> { + /** + * The method to accumulate the data tuple from the 2nd input stream + * + * @param tuple the data tuple + */ void accumulateTuple2(Tuple.WindowedTuple<InputT2> tuple); + /** + * The method to process the watermark tuple from the 2nd input stream + * + * @param watermark the watermark tuple + */ void processWatermark2(ControlTuple.Watermark watermark); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/aeb10f33/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedMergeOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedMergeOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedMergeOperator.java deleted file mode 100644 index 05a2495..0000000 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedMergeOperator.java +++ /dev/null @@ -1,123 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.window.impl; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.apex.malhar.lib.window.ControlTuple; -import org.apache.apex.malhar.lib.window.MergeAccumulation; -import org.apache.apex.malhar.lib.window.MergeWindowedOperator; -import org.apache.apex.malhar.lib.window.Tuple; -import org.apache.apex.malhar.lib.window.WindowedStorage; - -import com.google.common.base.Function; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.annotation.InputPortFieldAnnotation; - - -/** - * Abstract Windowed Merge Operator. - */ -public abstract class AbstractWindowedMergeOperator<InputT1, InputT2, OutputT, DataStorageT extends WindowedStorage, - RetractionStorageT extends WindowedStorage, AccumulationT extends - MergeAccumulation> - extends AbstractWindowedOperator<InputT1, OutputT, DataStorageT, RetractionStorageT, AccumulationT> - implements MergeWindowedOperator<InputT1, InputT2> -{ - - private static final transient Logger LOG = LoggerFactory.getLogger(AbstractWindowedMergeOperator.class); - private Function<InputT2, Long> timestampExtractor2; - - private long latestWatermark1 = -1; // latest watermark from stream 1 - private long latestWatermark2 = -1; // latest watermark from stream 2 - - public final transient DefaultInputPort<Tuple<InputT2>> input2 = new DefaultInputPort<Tuple<InputT2>>() - { - @Override - public void process(Tuple<InputT2> tuple) - { - processTuple2(tuple); - } - }; - - // TODO: This port should be removed when Apex Core has native support for custom control tuples - @InputPortFieldAnnotation(optional = true) - public final transient DefaultInputPort<ControlTuple> controlInput2 = new DefaultInputPort<ControlTuple>() - { - @Override - public void process(ControlTuple tuple) - { - if (tuple instanceof ControlTuple.Watermark) { - processWatermark2((ControlTuple.Watermark)tuple); - } - } - }; - - public void processTuple2(Tuple<InputT2> tuple) - { - long timestamp = extractTimestamp(tuple, timestampExtractor2); - if (isTooLate(timestamp)) { - dropTuple(tuple); - } else { - Tuple.WindowedTuple<InputT2> windowedTuple = getWindowedValueWithTimestamp(tuple, timestamp); - // do the accumulation - accumulateTuple2(windowedTuple); - processWindowState(windowedTuple); - } - } - - public void setTimestampExtractor2(Function<InputT2, Long> timestampExtractor2) - { - this.timestampExtractor2 = timestampExtractor2; - } - - - @Override - public void processWatermark(ControlTuple.Watermark watermark) - { - latestWatermark1 = watermark.getTimestamp(); - if (latestWatermark1 >= 0 && latestWatermark2 >= 0) { - // Select the smallest timestamp of the latest watermarks as the watermark of the operator. - long minWatermark = Math.min(latestWatermark1, latestWatermark2); - if (this.watermarkTimestamp < 0 || minWatermark != this.watermarkTimestamp) { - this.watermarkTimestamp = minWatermark; - } - } - } - - @Override - public void processWatermark2(ControlTuple.Watermark watermark) - { - latestWatermark2 = watermark.getTimestamp(); - if (latestWatermark1 >= 0 && latestWatermark2 >= 0) { - long minWatermark = Math.min(latestWatermark1, latestWatermark2); - if (this.watermarkTimestamp < 0 || minWatermark != this.watermarkTimestamp) { - this.watermarkTimestamp = minWatermark; - } - } - } - - @Override - protected void processWatermarkAtEndWindow() - { - if (fixedWatermarkMillis > 0 || this.watermarkTimestamp != this.currentWatermark) { - super.processWatermarkAtEndWindow(); - } - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/aeb10f33/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java index f965a01..e8ff622 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java @@ -79,8 +79,8 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext private Function<InputT, Long> timestampExtractor; + protected long nextWatermark = -1; protected long currentWatermark = -1; - protected long watermarkTimestamp = -1; private boolean triggerAtWatermark; protected long earlyTriggerCount; private long earlyTriggerMillis; @@ -141,7 +141,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext if (isTooLate(timestamp)) { dropTuple(tuple); } else { - Tuple.WindowedTuple<InputT> windowedTuple = getWindowedValue(tuple); + Tuple.WindowedTuple<InputT> windowedTuple = getWindowedValueWithTimestamp(tuple, timestamp); // do the accumulation accumulateTuple(windowedTuple); processWindowState(windowedTuple); @@ -256,6 +256,11 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext this.timestampExtractor = timestampExtractor; } + public void setNextWatermark(long timestamp) + { + this.nextWatermark = timestamp; + } + /** * Sets the fixed watermark with respect to the processing time derived from the Apex window ID. This is useful if we * don't have watermark tuples from upstream. However, using this means whether a tuple is considered late totally @@ -408,7 +413,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext @Override public void processWatermark(ControlTuple.Watermark watermark) { - this.watermarkTimestamp = watermark.getTimestamp(); + this.nextWatermark = watermark.getTimestamp(); } @Override @@ -460,7 +465,6 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext } else { currentDerivedTimestamp += timeIncrement; } - watermarkTimestamp = -1; } /** @@ -484,18 +488,17 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext protected void processWatermarkAtEndWindow() { if (fixedWatermarkMillis > 0) { - watermarkTimestamp = currentDerivedTimestamp - fixedWatermarkMillis; + nextWatermark = currentDerivedTimestamp - fixedWatermarkMillis; } - if (watermarkTimestamp > 0) { - this.currentWatermark = watermarkTimestamp; + if (nextWatermark > 0 && currentWatermark < nextWatermark) { - long horizon = watermarkTimestamp - allowedLatenessMillis; + long horizon = nextWatermark - allowedLatenessMillis; for (Iterator<Map.Entry<Window, WindowState>> it = windowStateMap.entries().iterator(); it.hasNext(); ) { Map.Entry<Window, WindowState> entry = it.next(); Window window = entry.getKey(); WindowState windowState = entry.getValue(); - if (window.getBeginTimestamp() + window.getDurationMillis() < watermarkTimestamp) { + if (window.getBeginTimestamp() + window.getDurationMillis() < nextWatermark) { // watermark has not arrived for this window before, marking this window late if (windowState.watermarkArrivalTime == -1) { windowState.watermarkArrivalTime = currentDerivedTimestamp; @@ -514,7 +517,8 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext } } } - controlOutput.emit(new WatermarkImpl(watermarkTimestamp)); + controlOutput.emit(new WatermarkImpl(nextWatermark)); + this.currentWatermark = nextWatermark; } } @@ -552,6 +556,16 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext } } + DataStorageT getDataStorage() + { + return dataStorage; + } + + AccumulationT getAccumulation() + { + return accumulation; + } + /** * This method fires the normal trigger for the given window. * http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/aeb10f33/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedMergeOperatorImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedMergeOperatorImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedMergeOperatorImpl.java index a5f17c5..3714d6d 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedMergeOperatorImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedMergeOperatorImpl.java @@ -18,14 +18,14 @@ */ package org.apache.apex.malhar.lib.window.impl; -import java.util.Map; - -import org.apache.apex.malhar.lib.window.MergeAccumulation; -import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.ControlTuple; +import org.apache.apex.malhar.lib.window.MergeWindowedOperator; import org.apache.apex.malhar.lib.window.Tuple; -import org.apache.apex.malhar.lib.window.Window; -import org.apache.apex.malhar.lib.window.WindowedStorage; +import com.google.common.base.Function; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; import com.datatorrent.lib.util.KeyValPair; @@ -40,81 +40,68 @@ import com.datatorrent.lib.util.KeyValPair; * @param <OutputT> The type of the value of the keyed output tuple. */ public class KeyedWindowedMergeOperatorImpl<KeyT, InputT1, InputT2, AccumT, OutputT> - extends AbstractWindowedMergeOperator<KeyValPair<KeyT, InputT1>, KeyValPair<KeyT, InputT2>, KeyValPair<KeyT, OutputT>, WindowedStorage.WindowedKeyedStorage<KeyT, AccumT>, WindowedStorage.WindowedKeyedStorage<KeyT, OutputT>, MergeAccumulation<InputT1, InputT2, AccumT, OutputT>> + extends KeyedWindowedOperatorImpl<KeyT, InputT1, AccumT, OutputT> + implements MergeWindowedOperator<KeyValPair<KeyT, InputT1>, KeyValPair<KeyT, InputT2>> { - // TODO: Add session window support. + private Function<KeyValPair<KeyT, InputT2>, Long> timestampExtractor2; + + private WindowedMergeOperatorFeatures.Keyed joinFeatures = new WindowedMergeOperatorFeatures.Keyed(this); - private abstract class AccumFunction<T> + public final transient DefaultInputPort<Tuple<KeyValPair<KeyT, InputT2>>> input2 = new DefaultInputPort<Tuple<KeyValPair<KeyT, InputT2>>>() { - abstract AccumT accumulate(AccumT accum, T value); - } + @Override + public void process(Tuple<KeyValPair<KeyT, InputT2>> tuple) + { + processTuple2(tuple); + } + }; - private <T> void accumulateTupleHelper(Tuple.WindowedTuple<KeyValPair<KeyT, T>> tuple, AccumFunction<T> accumFn) + // TODO: This port should be removed when Apex Core has native support for custom control tuples + @InputPortFieldAnnotation(optional = true) + public final transient DefaultInputPort<ControlTuple> controlInput2 = new DefaultInputPort<ControlTuple>() { - final KeyValPair<KeyT, T> kvData = tuple.getValue(); - KeyT key = kvData.getKey(); - for (Window window : tuple.getWindows()) { - // process each window - AccumT accum = dataStorage.get(window, key); - if (accum == null) { - accum = accumulation.defaultAccumulatedValue(); + @Override + public void process(ControlTuple tuple) + { + if (tuple instanceof ControlTuple.Watermark) { + processWatermark2((ControlTuple.Watermark)tuple); } - dataStorage.put(window, key, accumFn.accumulate(accum, kvData.getValue())); } + }; + + public void setTimestampExtractor2(Function<KeyValPair<KeyT, InputT2>, Long> timestampExtractor) + { + this.timestampExtractor2 = timestampExtractor; } - @Override - public void accumulateTuple(Tuple.WindowedTuple<KeyValPair<KeyT, InputT1>> tuple) + public void processTuple2(Tuple<KeyValPair<KeyT, InputT2>> tuple) { - accumulateTupleHelper(tuple, new AccumFunction<InputT1>() - { - @Override - AccumT accumulate(AccumT accum, InputT1 value) - { - return accumulation.accumulate(accum, value); - } - }); + long timestamp = extractTimestamp(tuple, this.timestampExtractor2); + if (isTooLate(timestamp)) { + dropTuple(tuple); + } else { + Tuple.WindowedTuple<KeyValPair<KeyT, InputT2>> windowedTuple = getWindowedValueWithTimestamp(tuple, timestamp); + // do the accumulation + accumulateTuple2(windowedTuple); + processWindowState(windowedTuple); + } } @Override public void accumulateTuple2(Tuple.WindowedTuple<KeyValPair<KeyT, InputT2>> tuple) { - accumulateTupleHelper(tuple, new AccumFunction<InputT2>() - { - @Override - AccumT accumulate(AccumT accum, InputT2 value) - { - return accumulation.accumulate2(accum, value); - } - }); + joinFeatures.accumulateTuple2(tuple); } @Override - public void fireNormalTrigger(Window window, boolean fireOnlyUpdatedPanes) + public void processWatermark(ControlTuple.Watermark watermark) { - for (Map.Entry<KeyT, AccumT> entry : dataStorage.entries(window)) { - OutputT outputVal = accumulation.getOutput(entry.getValue()); - if (fireOnlyUpdatedPanes) { - OutputT oldValue = retractionStorage.get(window, entry.getKey()); - if (oldValue != null && oldValue.equals(outputVal)) { - continue; - } - } - output.emit(new Tuple.WindowedTuple<>(window, new KeyValPair<>(entry.getKey(), outputVal))); - if (retractionStorage != null) { - retractionStorage.put(window, entry.getKey(), outputVal); - } - } + joinFeatures.processWatermark1(watermark); } @Override - public void fireRetractionTrigger(Window window, boolean firingOnlyUpdatedPanes) + public void processWatermark2(ControlTuple.Watermark watermark) { - if (triggerOption.getAccumulationMode() != TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) { - throw new UnsupportedOperationException(); - } - for (Map.Entry<KeyT, OutputT> entry : retractionStorage.entries(window)) { - output.emit(new Tuple.WindowedTuple<>(window, new KeyValPair<>(entry.getKey(), accumulation.getRetraction(entry.getValue())))); - } + joinFeatures.processWatermark2(watermark); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/aeb10f33/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorFeatures.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorFeatures.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorFeatures.java new file mode 100644 index 0000000..3fceb06 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorFeatures.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl; + +import org.apache.apex.malhar.lib.window.ControlTuple; +import org.apache.apex.malhar.lib.window.MergeAccumulation; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.Window; +import org.apache.apex.malhar.lib.window.WindowedStorage; + +import com.datatorrent.lib.util.KeyValPair; + +/** + * This class provides the features in a MergeWindowedOperator and is intended to be used only + * by the implementation of such operator + */ +abstract class WindowedMergeOperatorFeatures<InputT1, InputT2, AccumT, AccumulationT extends MergeAccumulation, DataStorageT extends WindowedStorage> +{ + protected AbstractWindowedOperator<InputT1, ?, DataStorageT, ?, AccumulationT> operator; + + protected long latestWatermark1 = -1; // latest watermark from stream 1 + protected long latestWatermark2 = -1; // latest watermark from stream 2 + + protected abstract class AccumFunction<T> + { + abstract AccumT accumulate(AccumT accum, T value); + } + + protected WindowedMergeOperatorFeatures() + { + // for kryo + } + + WindowedMergeOperatorFeatures(AbstractWindowedOperator<InputT1, ?, DataStorageT, ?, AccumulationT> operator) + { + this.operator = operator; + } + + abstract void accumulateTuple1(Tuple.WindowedTuple<InputT1> tuple); + + abstract void accumulateTuple2(Tuple.WindowedTuple<InputT2> tuple); + + void processWatermark1(ControlTuple.Watermark watermark) + { + latestWatermark1 = watermark.getTimestamp(); + // Select the smallest timestamp of the latest watermarks as the watermark of the operator. + long minWatermark = Math.min(latestWatermark1, latestWatermark2); + operator.setNextWatermark(minWatermark); + } + + void processWatermark2(ControlTuple.Watermark watermark) + { + latestWatermark2 = watermark.getTimestamp(); + long minWatermark = Math.min(latestWatermark1, latestWatermark2); + operator.setNextWatermark(minWatermark); + } + + /** + * The merge features for plain (non-keyed) operator + */ + static class Plain<InputT1, InputT2, AccumT, AccumulationT extends MergeAccumulation<InputT1, InputT2, AccumT, ?>, DataStorageT extends WindowedStorage.WindowedPlainStorage<AccumT>> + extends WindowedMergeOperatorFeatures<InputT1, InputT2, AccumT, AccumulationT, DataStorageT> + { + private Plain() + { + // for kryo + } + + Plain(AbstractWindowedOperator<InputT1, ?, DataStorageT, ?, AccumulationT> operator) + { + super(operator); + } + + private <T> void accumulateTupleHelper(Tuple.WindowedTuple<T> tuple, AccumFunction<T> accumFn) + { + for (Window window : tuple.getWindows()) { + // process each window + AccumT accum = operator.getDataStorage().get(window); + if (accum == null) { + accum = operator.getAccumulation().defaultAccumulatedValue(); + } + operator.getDataStorage().put(window, accumFn.accumulate(accum, tuple.getValue())); + } + } + + @Override + void accumulateTuple1(Tuple.WindowedTuple<InputT1> tuple) + { + accumulateTupleHelper(tuple, new AccumFunction<InputT1>() + { + @Override + AccumT accumulate(AccumT accum, InputT1 value) + { + return operator.getAccumulation().accumulate(accum, value); + } + }); + } + + @Override + void accumulateTuple2(Tuple.WindowedTuple<InputT2> tuple) + { + accumulateTupleHelper(tuple, new AccumFunction<InputT2>() + { + @Override + AccumT accumulate(AccumT accum, InputT2 value) + { + return operator.getAccumulation().accumulate2(accum, value); + } + }); + } + } + + /** + * The merge features for keyed operator + */ + static class Keyed<KeyT, InputT1, InputT2, AccumT, AccumulationT extends MergeAccumulation<InputT1, InputT2, AccumT, ?>, DataStorageT extends WindowedStorage.WindowedKeyedStorage<KeyT, AccumT>> + extends WindowedMergeOperatorFeatures<KeyValPair<KeyT, InputT1>, KeyValPair<KeyT, InputT2>, AccumT, AccumulationT, DataStorageT> + { + private Keyed() + { + // for kryo + } + + Keyed(AbstractWindowedOperator<KeyValPair<KeyT, InputT1>, ?, DataStorageT, ?, AccumulationT> operator) + { + super(operator); + } + + private <T> void accumulateTupleHelper(Tuple.WindowedTuple<KeyValPair<KeyT, T>> tuple, AccumFunction<T> accumFn) + { + final KeyValPair<KeyT, T> kvData = tuple.getValue(); + KeyT key = kvData.getKey(); + for (Window window : tuple.getWindows()) { + // process each window + AccumT accum = operator.getDataStorage().get(window, key); + if (accum == null) { + accum = operator.getAccumulation().defaultAccumulatedValue(); + } + operator.getDataStorage().put(window, key, accumFn.accumulate(accum, kvData.getValue())); + } + } + + @Override + void accumulateTuple1(Tuple.WindowedTuple<KeyValPair<KeyT, InputT1>> tuple) + { + accumulateTupleHelper(tuple, new AccumFunction<InputT1>() + { + @Override + AccumT accumulate(AccumT accum, InputT1 value) + { + return operator.getAccumulation().accumulate(accum, value); + } + }); + } + + @Override + void accumulateTuple2(Tuple.WindowedTuple<KeyValPair<KeyT, InputT2>> tuple) + { + accumulateTupleHelper(tuple, new AccumFunction<InputT2>() + { + @Override + AccumT accumulate(AccumT accum, InputT2 value) + { + return operator.getAccumulation().accumulate2(accum, value); + } + }); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/aeb10f33/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorImpl.java index 38eeff0..0f8a762 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorImpl.java @@ -18,11 +18,14 @@ */ package org.apache.apex.malhar.lib.window.impl; -import org.apache.apex.malhar.lib.window.MergeAccumulation; -import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.ControlTuple; +import org.apache.apex.malhar.lib.window.MergeWindowedOperator; import org.apache.apex.malhar.lib.window.Tuple; -import org.apache.apex.malhar.lib.window.Window; -import org.apache.apex.malhar.lib.window.WindowedStorage; + +import com.google.common.base.Function; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; /** * Windowed Merge Operator to merge two streams together. It aggregates tuple from two @@ -35,78 +38,73 @@ import org.apache.apex.malhar.lib.window.WindowedStorage; * @param <OutputT> The type of output tuple. */ public class WindowedMergeOperatorImpl<InputT1, InputT2, AccumT, OutputT> - extends AbstractWindowedMergeOperator<InputT1, InputT2, OutputT, WindowedStorage.WindowedPlainStorage<AccumT>, WindowedStorage.WindowedPlainStorage<OutputT>, MergeAccumulation<InputT1, InputT2, AccumT, OutputT>> + extends WindowedOperatorImpl<InputT1, AccumT, OutputT> implements MergeWindowedOperator<InputT1, InputT2> { - private abstract class AccumFunction<T> + private Function<InputT2, Long> timestampExtractor2; + + private WindowedMergeOperatorFeatures.Plain joinFeatures = new WindowedMergeOperatorFeatures.Plain(this); + + public final transient DefaultInputPort<Tuple<InputT2>> input2 = new DefaultInputPort<Tuple<InputT2>>() { - abstract AccumT accumulate(AccumT accum, T value); - } + @Override + public void process(Tuple<InputT2> tuple) + { + processTuple2(tuple); + } + }; - private <T> void accumulateTupleHelper(Tuple.WindowedTuple<T> tuple, AccumFunction<T> accumFn) + // TODO: This port should be removed when Apex Core has native support for custom control tuples + @InputPortFieldAnnotation(optional = true) + public final transient DefaultInputPort<ControlTuple> controlInput2 = new DefaultInputPort<ControlTuple>() { - for (Window window : tuple.getWindows()) { - // process each window - AccumT accum = dataStorage.get(window); - if (accum == null) { - accum = accumulation.defaultAccumulatedValue(); + @Override + public void process(ControlTuple tuple) + { + if (tuple instanceof ControlTuple.Watermark) { + processWatermark2((ControlTuple.Watermark)tuple); } - dataStorage.put(window, accumFn.accumulate(accum, tuple.getValue())); } + }; + + public void setTimestampExtractor2(Function<InputT2, Long> timestampExtractor) + { + this.timestampExtractor2 = timestampExtractor; } - @Override - public void accumulateTuple2(Tuple.WindowedTuple<InputT2> tuple) + public void processTuple2(Tuple<InputT2> tuple) { - accumulateTupleHelper(tuple, new AccumFunction<InputT2>() - { - @Override - AccumT accumulate(AccumT accum, InputT2 value) - { - return accumulation.accumulate2(accum, value); - } - }); + long timestamp = extractTimestamp(tuple, this.timestampExtractor2); + if (isTooLate(timestamp)) { + dropTuple(tuple); + } else { + Tuple.WindowedTuple<InputT2> windowedTuple = getWindowedValueWithTimestamp(tuple, timestamp); + // do the accumulation + accumulateTuple2(windowedTuple); + processWindowState(windowedTuple); + } } @Override public void accumulateTuple(Tuple.WindowedTuple<InputT1> tuple) { - accumulateTupleHelper(tuple, new AccumFunction<InputT1>() - { - @Override - AccumT accumulate(AccumT accum, InputT1 value) - { - return accumulation.accumulate(accum, value); - } - }); + joinFeatures.accumulateTuple1(tuple); } @Override - public void fireNormalTrigger(Window window, boolean fireOnlyUpdatedPanes) + public void accumulateTuple2(Tuple.WindowedTuple<InputT2> tuple) { - AccumT accumulatedValue = dataStorage.get(window); - OutputT outputValue = accumulation.getOutput(accumulatedValue); + joinFeatures.accumulateTuple2(tuple); + } - if (fireOnlyUpdatedPanes) { - OutputT oldValue = retractionStorage.get(window); - if (oldValue != null && oldValue.equals(outputValue)) { - return; - } - } - output.emit(new Tuple.WindowedTuple<>(window, outputValue)); - if (retractionStorage != null) { - retractionStorage.put(window, outputValue); - } + @Override + public void processWatermark(ControlTuple.Watermark watermark) + { + joinFeatures.processWatermark1(watermark); } @Override - public void fireRetractionTrigger(Window window, boolean firingOnlyUpdatedPanes) + public void processWatermark2(ControlTuple.Watermark watermark) { - if (triggerOption.getAccumulationMode() != TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) { - throw new UnsupportedOperationException(); - } - OutputT oldValue = retractionStorage.get(window); - if (oldValue != null) { - output.emit(new Tuple.WindowedTuple<>(window, accumulation.getRetraction(oldValue))); - } + joinFeatures.processWatermark2(watermark); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/aeb10f33/library/src/test/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTest.java index 7dc09d0..8c37d57 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTest.java @@ -130,8 +130,8 @@ public class WindowedMergeOperatorTest // Current watermark of Merge operator could only change during endWindow() event. op.controlInput.process(new WatermarkImpl(1100000)); - Assert.assertEquals(1100000, op.currentWatermark); op.endWindow(); + Assert.assertEquals(1100000, op.currentWatermark); Assert.assertEquals(3, sink.collectedTuples.size()); // If the upstreams sent a watermark but the minimum of the latest input watermarks doesn't change, the Merge
