Repository: apex-malhar Updated Branches: refs/heads/master 88295aed8 -> f22b269a9
APEXMALHAR-2354 Support for heuristic watermark Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/1290c726 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/1290c726 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/1290c726 Branch: refs/heads/master Commit: 1290c72649cd925fa795fd1234b29499ecb87189 Parents: cf896b0 Author: bhupeshchawda <[email protected]> Authored: Mon Jan 16 18:23:00 2017 +0530 Committer: bhupeshchawda <[email protected]> Committed: Tue Jan 24 18:08:56 2017 +0530 ---------------------------------------------------------------------- .../lib/window/ImplicitWatermarkGenerator.java | 49 ++++++++++++ .../window/impl/AbstractWindowedOperator.java | 27 ++++++- .../impl/FixedDiffEventTimeWatermarkGen.java | 83 ++++++++++++++++++++ .../FixedDiffProcessingTimeWatermarkGen.java | 76 ++++++++++++++++++ .../malhar/lib/window/WindowedOperatorTest.java | 34 ++++++++ 5 files changed, 267 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1290c726/library/src/main/java/org/apache/apex/malhar/lib/window/ImplicitWatermarkGenerator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/ImplicitWatermarkGenerator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/ImplicitWatermarkGenerator.java new file mode 100644 index 0000000..c7e91a1 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/ImplicitWatermarkGenerator.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.api.Component; + +/** + * Interface for generators for implicit watermarks. + * An operator which does not want to rely on explicit watermarks (generated from upstream), + * can use implementations of this interface to get implicit watermarks. + */ [email protected] +public interface ImplicitWatermarkGenerator extends Component +{ + /** + * Called on arrival of every tuple. + * Implementations would update the state of the watermark generator in order to keep their state updated + * @param t the incoming windowed tuple + * @param currentProcessingTime the current notion of processing time + * (usually the system time generated based on the window id) + */ + void processTupleForWatermark(Tuple.WindowedTuple t, long currentProcessingTime); + + /** + * Returns the current watermark tuple as per the generator's state + * @param currentProcessingTime the current notion of processing time + * (usually the system time generated based on the window id) + * @return the latest watermark tuple created based on the implementation + */ + ControlTuple.Watermark getWatermarkTuple(long currentProcessingTime); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1290c726/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java index 22e8525..b755eb4 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java @@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory; import org.apache.apex.malhar.lib.state.spillable.WindowListener; import org.apache.apex.malhar.lib.window.Accumulation; import org.apache.apex.malhar.lib.window.ControlTuple; +import org.apache.apex.malhar.lib.window.ImplicitWatermarkGenerator; import org.apache.apex.malhar.lib.window.TriggerOption; import org.apache.apex.malhar.lib.window.Tuple; import org.apache.apex.malhar.lib.window.Window; @@ -92,6 +93,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext protected long fixedWatermarkMillis = -1; private transient long streamingWindowId; private transient TreeMap<Long, Long> streamingWindowToLatenessHorizon = new TreeMap<>(); + private ImplicitWatermarkGenerator implicitWatermarkGenerator; private Map<String, Component<Context.OperatorContext>> components = new HashMap<>(); @@ -148,6 +150,9 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext // do the accumulation accumulateTuple(windowedTuple); processWindowState(windowedTuple); + if (implicitWatermarkGenerator != null) { + implicitWatermarkGenerator.processTupleForWatermark(windowedTuple, currentDerivedTimestamp); + } } } @@ -264,6 +269,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext this.nextWatermark = timestamp; } + /** * Sets the fixed watermark with respect to the processing time derived from the Apex window ID. This is useful if we * don't have watermark tuples from upstream. However, using this means whether a tuple is considered late totally @@ -276,6 +282,11 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext this.fixedWatermarkMillis = millis; } + public void setImplicitWatermarkGenerator(ImplicitWatermarkGenerator implicitWatermarkGenerator) + { + this.implicitWatermarkGenerator = implicitWatermarkGenerator; + } + public void validate() throws ValidationException { if (accumulation == null) { @@ -429,6 +440,9 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext if (retractionStorage != null) { retractionStorage.setup(context); } + if (implicitWatermarkGenerator != null) { + implicitWatermarkGenerator.setup(context); + } for (Component component : components.values()) { component.setup(context); } @@ -445,6 +459,9 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext if (retractionStorage != null) { retractionStorage.teardown(); } + if (implicitWatermarkGenerator != null) { + implicitWatermarkGenerator.teardown(); + } for (Component component : components.values()) { component.teardown(); } @@ -490,9 +507,15 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext protected void processWatermarkAtEndWindow() { - if (fixedWatermarkMillis > 0) { - nextWatermark = currentDerivedTimestamp - fixedWatermarkMillis; + long implicitWatermark = -1; + if (implicitWatermarkGenerator != null) { + implicitWatermark = implicitWatermarkGenerator + .getWatermarkTuple(currentDerivedTimestamp).getTimestamp(); } + if (implicitWatermark > nextWatermark) { + nextWatermark = implicitWatermark; + } + if (nextWatermark > 0 && currentWatermark < nextWatermark) { long horizon = nextWatermark - allowedLatenessMillis; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1290c726/library/src/main/java/org/apache/apex/malhar/lib/window/impl/FixedDiffEventTimeWatermarkGen.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/FixedDiffEventTimeWatermarkGen.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/FixedDiffEventTimeWatermarkGen.java new file mode 100644 index 0000000..1e61a29 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/FixedDiffEventTimeWatermarkGen.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl; + +import javax.annotation.Nonnegative; + +import org.apache.apex.malhar.lib.window.ControlTuple; +import org.apache.apex.malhar.lib.window.ImplicitWatermarkGenerator; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.api.Context; + +/** + * An @{@link ImplicitWatermarkGenerator} implementation for generating watermarks + * based on event time. + * + * Generates a watermark tuple with a fixed difference from the latest event time. + */ [email protected] +public class FixedDiffEventTimeWatermarkGen implements ImplicitWatermarkGenerator +{ + @Nonnegative + private long fixedDifference; + private long maxEventTime = -1; + + public FixedDiffEventTimeWatermarkGen(long fixedDifference) + { + this.fixedDifference = fixedDifference; + } + + /** + * {@inheritDoc} + */ + @Override + public void processTupleForWatermark(Tuple.WindowedTuple t, long currentProcessingTime) + { + long eventTime = t.getTimestamp(); + if (maxEventTime < eventTime) { + maxEventTime = eventTime; + } + } + + /** + * {@inheritDoc} + */ + @Override + public ControlTuple.Watermark getWatermarkTuple(long currentProcessingTime) + { + return new WatermarkImpl(maxEventTime - fixedDifference); + } + + @Override + public void setup(Context context) + { + } + + @Override + public void teardown() + { + } + + public void setFixedDifference(long fixedDifference) + { + this.fixedDifference = fixedDifference; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1290c726/library/src/main/java/org/apache/apex/malhar/lib/window/impl/FixedDiffProcessingTimeWatermarkGen.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/FixedDiffProcessingTimeWatermarkGen.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/FixedDiffProcessingTimeWatermarkGen.java new file mode 100644 index 0000000..a4d9e1a --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/FixedDiffProcessingTimeWatermarkGen.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl; + +import org.apache.apex.malhar.lib.window.ControlTuple; +import org.apache.apex.malhar.lib.window.ImplicitWatermarkGenerator; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.api.Context; + +/** + * An @{@link ImplicitWatermarkGenerator} implementation for generating watermarks + * based on processing time. + * + * Generates a watermark tuple with a fixed difference from the current processing time. + */ [email protected] +public class FixedDiffProcessingTimeWatermarkGen implements ImplicitWatermarkGenerator +{ + long fixedWatermarkMillis = -1; + + public FixedDiffProcessingTimeWatermarkGen(long fixedWatermarkMillis) + { + this.fixedWatermarkMillis = fixedWatermarkMillis; + } + + /** + * {@inheritDoc} + */ + @Override + public void processTupleForWatermark(Tuple.WindowedTuple t, long currentProcessingTime) + { + // do nothing + } + + /** + * {@inheritDoc} + */ + @Override + public ControlTuple.Watermark getWatermarkTuple(long currentProcessingTime) + { + return new WatermarkImpl(currentProcessingTime - fixedWatermarkMillis); + } + + @Override + public void setup(Context context) + { + } + + @Override + public void teardown() + { + } + + public void setFixedWatermarkMillis(long fixedWatermarkMillis) + { + this.fixedWatermarkMillis = fixedWatermarkMillis; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1290c726/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java index 512626e..39f4b6e 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java @@ -36,6 +36,7 @@ import org.junit.runners.Parameterized; import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl; import org.apache.apex.malhar.lib.state.spillable.SpillableTestUtils; +import org.apache.apex.malhar.lib.window.impl.FixedDiffEventTimeWatermarkGen; import org.apache.apex.malhar.lib.window.impl.InMemorySessionWindowedStorage; import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedKeyedStorage; import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedStorage; @@ -218,6 +219,39 @@ public class WindowedOperatorTest windowedOperator.teardown(); } + @Test + public void testImplicitWatermarks() + { + WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator(); + CollectorTestSink controlSink = new CollectorTestSink(); + + windowedOperator.controlOutput.setSink(controlSink); + + windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000))); + windowedOperator.setAllowedLateness(Duration.millis(1000)); + windowedOperator.setImplicitWatermarkGenerator(new FixedDiffEventTimeWatermarkGen(100)); + + windowedOperator.setup(testMeta.operatorContext); + + windowedOperator.beginWindow(1); + windowedOperator.endWindow(); + Assert.assertEquals("We should get no watermark tuple", 0, controlSink.getCount(false)); + + windowedOperator.beginWindow(2); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 100L, 2L)); + windowedOperator.endWindow(); + Assert.assertEquals("We should get one watermark tuple", 1, controlSink.getCount(false)); + Assert.assertEquals("Check Watermark value", + ((ControlTuple.Watermark)controlSink.collectedTuples.get(0)).getTimestamp(), BASE); + + windowedOperator.beginWindow(3); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 900L, 4L)); + windowedOperator.endWindow(); + Assert.assertEquals("We should get two watermark tuples", 2, controlSink.getCount(false)); + Assert.assertEquals("Check Watermark value", + ((ControlTuple.Watermark)controlSink.collectedTuples.get(1)).getTimestamp(), BASE + 800); + } + private void testTrigger(TriggerOption.AccumulationMode accumulationMode) { WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator();
