http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7a77274a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java new file mode 100644 index 0000000..1bee15d --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl; + +import org.apache.apex.malhar.lib.window.Accumulation; +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.Window; +import org.apache.apex.malhar.lib.window.WindowedStorage; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * This is an implementation of the WindowedOperator. If your operation is key based, please use {@link KeyedWindowedOperatorImpl}. + * + * @param <InputT> The type of the value of the input tuple + * @param <AccumT> The type of the accumulated value in the operator state per window + * @param <OutputT> The type of the value of the output tuple + */ [email protected] +public class WindowedOperatorImpl<InputT, AccumT, OutputT> + extends AbstractWindowedOperator<InputT, OutputT, WindowedStorage<AccumT>, WindowedStorage<OutputT>, Accumulation<InputT, AccumT, OutputT>> +{ + @Override + public void accumulateTuple(Tuple.WindowedTuple<InputT> tuple) + { + for (Window window : tuple.getWindows()) { + // process each window + AccumT accum = dataStorage.get(window); + if (accum == null) { + accum = accumulation.defaultAccumulatedValue(); + } + dataStorage.put(window, accumulation.accumulate(accum, tuple.getValue())); + } + } + + @Override + public void fireNormalTrigger(Window window, boolean fireOnlyUpdatedPanes) + { + AccumT accumulatedValue = dataStorage.get(window); + OutputT outputValue = accumulation.getOutput(accumulatedValue); + if (fireOnlyUpdatedPanes) { + OutputT oldValue = retractionStorage.get(window); + if (oldValue != null && oldValue.equals(outputValue)) { + return; + } + } + output.emit(new Tuple.WindowedTuple<>(window, outputValue)); + if (retractionStorage != null) { + retractionStorage.put(window, outputValue); + } + } + + @Override + public void fireRetractionTrigger(Window window) + { + if (triggerOption.getAccumulationMode() != TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) { + throw new UnsupportedOperationException(); + } + OutputT oldValue = retractionStorage.get(window); + if (oldValue != null) { + output.emit(new Tuple.WindowedTuple<>(window, accumulation.getRetraction(oldValue))); + } + } +}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7a77274a/library/src/test/java/org/apache/apex/malhar/lib/window/SumAccumulation.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/SumAccumulation.java b/library/src/test/java/org/apache/apex/malhar/lib/window/SumAccumulation.java new file mode 100644 index 0000000..6a4981b --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/SumAccumulation.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window; + +import org.apache.commons.lang3.mutable.MutableLong; + +/** + * Accumulation that does a simple sum of longs + */ +public class SumAccumulation implements Accumulation<Long, MutableLong, Long> +{ + @Override + public MutableLong defaultAccumulatedValue() + { + return new MutableLong(0); + } + + @Override + public MutableLong accumulate(MutableLong accumulatedValue, Long input) + { + accumulatedValue.add(input); + return accumulatedValue; + } + + @Override + public MutableLong merge(MutableLong accumulatedValue1, MutableLong accumulatedValue2) + { + accumulatedValue1.add(accumulatedValue2); + return accumulatedValue1; + } + + @Override + public Long getOutput(MutableLong accumulatedValue) + { + return accumulatedValue.getValue(); + } + + @Override + public Long getRetraction(Long value) + { + return -value; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7a77274a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java new file mode 100644 index 0000000..f8f9d8a --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java @@ -0,0 +1,550 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.validation.ValidationException; + +import org.joda.time.Duration; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedKeyedStorage; +import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedStorage; +import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl; +import org.apache.apex.malhar.lib.window.impl.WatermarkImpl; +import org.apache.apex.malhar.lib.window.impl.WindowedOperatorImpl; +import org.apache.commons.lang3.mutable.MutableLong; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Sink; +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.KeyValPair; + +/** + * Unit tests for WindowedOperator + */ +public class WindowedOperatorTest +{ + private void verifyValidationFailure(WindowedOperatorImpl windowedOperator, String message) + { + try { + windowedOperator.validate(); + Assert.fail("Should fail validation because " + message); + } catch (ValidationException ex) { + return; + } + } + + private WindowedOperatorImpl<Long, MutableLong, Long> createDefaultWindowedOperator() + { + WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = new WindowedOperatorImpl<>(); + windowedOperator.setDataStorage(new InMemoryWindowedStorage<MutableLong>()); + windowedOperator.setRetractionStorage(new InMemoryWindowedStorage<Long>()); + windowedOperator.setWindowStateStorage(new InMemoryWindowedStorage<WindowState>()); + windowedOperator.setAccumulation(new SumAccumulation()); + return windowedOperator; + } + + private KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> createDefaultKeyedWindowedOperator() + { + KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = new KeyedWindowedOperatorImpl<>(); + windowedOperator.setDataStorage(new InMemoryWindowedKeyedStorage<String, MutableLong>()); + windowedOperator.setRetractionStorage(new InMemoryWindowedKeyedStorage<String, Long>()); + windowedOperator.setWindowStateStorage(new InMemoryWindowedStorage<WindowState>()); + windowedOperator.setAccumulation(new SumAccumulation()); + return windowedOperator; + } + + @Test + public void testValidation() throws Exception + { + WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = new WindowedOperatorImpl<>(); + verifyValidationFailure(windowedOperator, "nothing is configured"); + windowedOperator.setWindowStateStorage(new InMemoryWindowedStorage<WindowState>()); + verifyValidationFailure(windowedOperator, "data storage is not set"); + windowedOperator.setDataStorage(new InMemoryWindowedStorage<MutableLong>()); + verifyValidationFailure(windowedOperator, "accumulation is not set"); + windowedOperator.setAccumulation(new SumAccumulation()); + windowedOperator.validate(); + windowedOperator.setTriggerOption(new TriggerOption().accumulatingAndRetractingFiredPanes()); + verifyValidationFailure(windowedOperator, "retracting storage is not set for ACCUMULATING_AND_RETRACTING"); + windowedOperator.setRetractionStorage(new InMemoryWindowedStorage<Long>()); + windowedOperator.validate(); + windowedOperator.setTriggerOption(new TriggerOption().discardingFiredPanes().firingOnlyUpdatedPanes()); + verifyValidationFailure(windowedOperator, "DISCARDING is not valid for option firingOnlyUpdatedPanes"); + windowedOperator.setTriggerOption(new TriggerOption().accumulatingFiredPanes().firingOnlyUpdatedPanes()); + windowedOperator.setRetractionStorage(null); + verifyValidationFailure(windowedOperator, "retracting storage is not set for option firingOnlyUpdatedPanes"); + } + + @Test + public void testWatermarkAndAllowedLateness() + { + OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1, + new Attribute.AttributeMap.DefaultAttributeMap()); + WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator(); + CollectorTestSink controlSink = new CollectorTestSink(); + + windowedOperator.controlOutput.setSink(controlSink); + + windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000))); + windowedOperator.setAllowedLateness(Duration.millis(1000)); + + WindowedStorage<MutableLong> dataStorage = new InMemoryWindowedStorage<>(); + WindowedStorage<WindowState> windowStateStorage = new InMemoryWindowedStorage<>(); + + windowedOperator.setDataStorage(dataStorage); + windowedOperator.setWindowStateStorage(windowStateStorage); + + windowedOperator.setup(context); + windowedOperator.beginWindow(1); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, 2L)); + Assert.assertEquals("There should be exactly one window in the storage", 1, dataStorage.size()); + Assert.assertEquals("There should be exactly one window in the storage", 1, windowStateStorage.size()); + + Map.Entry<Window, WindowState> entry = windowStateStorage.entrySet().iterator().next(); + Window window = entry.getKey(); + WindowState windowState = entry.getValue(); + Assert.assertEquals(-1, windowState.watermarkArrivalTime); + Assert.assertEquals(2L, dataStorage.get(window).longValue()); + + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, 3L)); + Assert.assertEquals(5L, dataStorage.get(window).longValue()); + + windowedOperator.processWatermark(new WatermarkImpl(1200)); + windowedOperator.endWindow(); + Assert.assertTrue(windowState.watermarkArrivalTime > 0); + Assert.assertEquals("We should get one watermark tuple", 1, controlSink.getCount(false)); + + windowedOperator.beginWindow(2); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(900L, 4L)); + Assert.assertEquals("Late but not too late", 9L, dataStorage.get(window).longValue()); + windowedOperator.processWatermark(new WatermarkImpl(3000)); + windowedOperator.endWindow(); + Assert.assertEquals("We should get two watermark tuples", 2, controlSink.getCount(false)); + windowedOperator.beginWindow(3); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(120L, 5L)); // this tuple should be dropped + Assert.assertEquals("The window should be dropped because it's too late", 0, dataStorage.size()); + Assert.assertEquals("The window should be dropped because it's too late", 0, windowStateStorage.size()); + windowedOperator.endWindow(); + } + + private void testTrigger(TriggerOption.AccumulationMode accumulationMode) + { + WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator(); + TriggerOption triggerOption = new TriggerOption().withEarlyFiringsAtEvery(Duration.millis(1000)); + switch (accumulationMode) { + case ACCUMULATING: + triggerOption.accumulatingFiredPanes(); + break; + case ACCUMULATING_AND_RETRACTING: + triggerOption.accumulatingAndRetractingFiredPanes(); + break; + case DISCARDING: + triggerOption.discardingFiredPanes(); + break; + default: + throw new RuntimeException("Unknown accumulation mode: " + accumulationMode); + } + windowedOperator.setTriggerOption(triggerOption); + windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000))); + CollectorTestSink sink = new CollectorTestSink(); + windowedOperator.output.setSink(sink); + OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1, + new Attribute.AttributeMap.DefaultAttributeMap()); + windowedOperator.setup(context); + windowedOperator.beginWindow(1); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, 2L)); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, 3L)); + windowedOperator.endWindow(); + Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty()); + windowedOperator.beginWindow(2); + windowedOperator.endWindow(); + Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty()); + windowedOperator.beginWindow(3); + windowedOperator.endWindow(); + Assert.assertEquals("There should be exactly one tuple for the time trigger", 1, sink.collectedTuples.size()); + Assert.assertEquals(5L, ((Tuple<Long>)sink.collectedTuples.get(0)).getValue().longValue()); + sink.collectedTuples.clear(); + windowedOperator.beginWindow(4); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(400L, 4L)); + windowedOperator.endWindow(); + Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty()); + windowedOperator.beginWindow(5); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(300L, 5L)); + windowedOperator.endWindow(); + switch (accumulationMode) { + case ACCUMULATING: + Assert.assertEquals("There should be exactly one tuple for the time trigger", 1, sink.collectedTuples.size()); + Assert.assertEquals(14L, ((Tuple<Long>)sink.collectedTuples.get(0)).getValue().longValue()); + break; + case DISCARDING: + Assert.assertEquals("There should be exactly one tuple for the time trigger", 1, sink.collectedTuples.size()); + Assert.assertEquals(9L, ((Tuple<Long>)sink.collectedTuples.get(0)).getValue().longValue()); + break; + case ACCUMULATING_AND_RETRACTING: + Assert.assertEquals("There should be exactly two tuples for the time trigger", 2, sink.collectedTuples.size()); + Assert.assertEquals(-5L, ((Tuple<Long>)sink.collectedTuples.get(0)).getValue().longValue()); + Assert.assertEquals(14L, ((Tuple<Long>)sink.collectedTuples.get(1)).getValue().longValue()); + break; + default: + throw new RuntimeException("Unknown accumulation mode: " + accumulationMode); + } + } + + @Test + public void testTriggerWithDiscardingMode() + { + testTrigger(TriggerOption.AccumulationMode.DISCARDING); + } + + @Test + public void testTriggerWithAccumulatingMode() + { + testTrigger(TriggerOption.AccumulationMode.ACCUMULATING); + } + + @Test + public void testTriggerWithAccumulatingAndRetractingMode() + { + testTrigger(TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING); + } + + @Test + public void testTriggerWithAccumulatingModeFiringOnlyUpdatedPanes() + { + for (boolean firingOnlyUpdatedPanes : new boolean[]{true, false}) { + WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator(); + TriggerOption triggerOption = new TriggerOption().withEarlyFiringsAtEvery(Duration.millis(1000)) + .accumulatingFiredPanes(); + if (firingOnlyUpdatedPanes) { + triggerOption.firingOnlyUpdatedPanes(); + } + windowedOperator.setTriggerOption(triggerOption); + windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000))); + CollectorTestSink sink = new CollectorTestSink(); + windowedOperator.output.setSink(sink); + OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1, + new Attribute.AttributeMap.DefaultAttributeMap()); + windowedOperator.setup(context); + windowedOperator.beginWindow(1); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, 2L)); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, 3L)); + windowedOperator.endWindow(); + Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty()); + windowedOperator.beginWindow(2); + windowedOperator.endWindow(); + Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty()); + windowedOperator.beginWindow(3); + windowedOperator.endWindow(); + Assert.assertEquals("There should be exactly one tuple for the time trigger", 1, sink.collectedTuples.size()); + Assert.assertEquals(5L, ((Tuple<Long>)sink.collectedTuples.get(0)).getValue().longValue()); + sink.collectedTuples.clear(); + windowedOperator.beginWindow(4); + windowedOperator.endWindow(); + Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty()); + windowedOperator.beginWindow(5); + windowedOperator.endWindow(); + if (firingOnlyUpdatedPanes) { + Assert.assertTrue("There should not be any trigger since no panes have been updated", sink.collectedTuples.isEmpty()); + } else { + Assert.assertEquals("There should be exactly one tuple for the time trigger", 1, sink.collectedTuples.size()); + Assert.assertEquals(5L, ((Tuple<Long>)sink.collectedTuples.get(0)).getValue().longValue()); + } + } + } + + @Test + public void testGlobalWindowAssignment() + { + OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1, + new Attribute.AttributeMap.DefaultAttributeMap()); + WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator(); + windowedOperator.setWindowOption(new WindowOption.GlobalWindow()); + windowedOperator.setup(context); + Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(1100L, 2L)); + List<Window> windows = windowedValue.getWindows(); + Assert.assertEquals(1, windows.size()); + Assert.assertEquals(Window.GLOBAL_WINDOW, windows.get(0)); + } + + @Test + public void testTimeWindowAssignment() + { + OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1, + new Attribute.AttributeMap.DefaultAttributeMap()); + WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator(); + windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000))); + windowedOperator.setup(context); + Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(1100L, 2L)); + List<Window> windows = windowedValue.getWindows(); + Assert.assertEquals(1, windows.size()); + Assert.assertEquals(1000, windows.get(0).getBeginTimestamp()); + Assert.assertEquals(1000, windows.get(0).getDurationMillis()); + } + + @Test + public void testSlidingWindowAssignment() + { + OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1, + new Attribute.AttributeMap.DefaultAttributeMap()); + WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator(); + windowedOperator.setWindowOption(new WindowOption.SlidingTimeWindows(Duration.millis(1000), Duration.millis(200))); + windowedOperator.setup(context); + Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(1600L, 2L)); + List<Window> windows = windowedValue.getWindows(); + Window[] winArray = windows.toArray(new Window[]{}); + Arrays.sort(winArray, Window.DEFAULT_COMPARATOR); + Assert.assertEquals(5, winArray.length); + Assert.assertEquals(800, winArray[0].getBeginTimestamp()); + Assert.assertEquals(1000, winArray[0].getDurationMillis()); + Assert.assertEquals(1000, winArray[1].getBeginTimestamp()); + Assert.assertEquals(1000, winArray[1].getDurationMillis()); + Assert.assertEquals(1200, winArray[2].getBeginTimestamp()); + Assert.assertEquals(1000, winArray[2].getDurationMillis()); + Assert.assertEquals(1400, winArray[3].getBeginTimestamp()); + Assert.assertEquals(1000, winArray[3].getDurationMillis()); + Assert.assertEquals(1600, winArray[4].getBeginTimestamp()); + Assert.assertEquals(1000, winArray[4].getDurationMillis()); + } + + @Test + public void testSessionWindows() + { + OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1, + new Attribute.AttributeMap.DefaultAttributeMap()); + KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = createDefaultKeyedWindowedOperator(); + windowedOperator.setWindowOption(new WindowOption.SessionWindows(Duration.millis(2000))); + windowedOperator.setTriggerOption(new TriggerOption().withEarlyFiringsAtEvery(1).accumulatingAndRetractingFiredPanes().firingOnlyUpdatedPanes()); + CollectorTestSink<Tuple<KeyValPair<String, Long>>> sink = new CollectorTestSink(); + windowedOperator.output.setSink((Sink<Object>)(Sink)sink); + windowedOperator.setup(context); + windowedOperator.beginWindow(1); + Tuple<KeyValPair<String, Long>> tuple = new Tuple.TimestampedTuple<>(1100L, new KeyValPair<>("a", 2L)); + windowedOperator.processTuple(tuple); + + Assert.assertEquals(1, sink.getCount(false)); + Tuple.WindowedTuple<KeyValPair<String, Long>> out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(0); + Assert.assertEquals(1, out.getWindows().size()); + Window.SessionWindow<String> window1 = (Window.SessionWindow<String>)out.getWindows().get(0); + Assert.assertEquals(1100L, window1.getBeginTimestamp()); + Assert.assertEquals(1, window1.getDurationMillis()); + Assert.assertEquals("a", window1.getKey()); + Assert.assertEquals("a", out.getValue().getKey()); + Assert.assertEquals(2L, out.getValue().getValue().longValue()); + sink.clear(); + + // extending an existing session window + tuple = new Tuple.TimestampedTuple<>(2000L, new KeyValPair<>("a", 3L)); + windowedOperator.processTuple(tuple); + Assert.assertEquals(2, sink.getCount(false)); + + // retraction trigger + out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(0); + Assert.assertEquals(1, out.getWindows().size()); + Assert.assertEquals(window1, out.getWindows().get(0)); + Assert.assertEquals("a", out.getValue().getKey()); + Assert.assertEquals(-2L, out.getValue().getValue().longValue()); + + // normal trigger + out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(1); + Window.SessionWindow<String> window2 = (Window.SessionWindow<String>)out.getWindows().get(0); + + Assert.assertEquals(1100L, window2.getBeginTimestamp()); + Assert.assertEquals(901, window2.getDurationMillis()); + Assert.assertEquals("a", out.getValue().getKey()); + Assert.assertEquals(5L, out.getValue().getValue().longValue()); + sink.clear(); + + // a separate session window + tuple = new Tuple.TimestampedTuple<>(5000L, new KeyValPair<>("a", 4L)); + windowedOperator.processTuple(tuple); + Assert.assertEquals(1, sink.getCount(false)); + out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(0); + Assert.assertEquals(1, out.getWindows().size()); + Window.SessionWindow<String> window3 = (Window.SessionWindow<String>)out.getWindows().get(0); + Assert.assertEquals(5000L, window3.getBeginTimestamp()); + Assert.assertEquals(1, window3.getDurationMillis()); + Assert.assertEquals("a", out.getValue().getKey()); + Assert.assertEquals(4L, out.getValue().getValue().longValue()); + sink.clear(); + + // session window merging + tuple = new Tuple.TimestampedTuple<>(3500L, new KeyValPair<>("a", 3L)); + windowedOperator.processTuple(tuple); + + Assert.assertEquals(3, sink.getCount(false)); + + // retraction of the two old windows + out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(0); + Assert.assertEquals(1, out.getWindows().size()); + Assert.assertEquals(window2, out.getWindows().get(0)); + Assert.assertEquals("a", out.getValue().getKey()); + Assert.assertEquals(-5L, out.getValue().getValue().longValue()); + out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(1); + Assert.assertEquals(1, out.getWindows().size()); + Assert.assertEquals(window3, out.getWindows().get(0)); + Assert.assertEquals("a", out.getValue().getKey()); + Assert.assertEquals(-4L, out.getValue().getValue().longValue()); + + // normal trigger + out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(2); + Assert.assertEquals(1, out.getWindows().size()); + Window.SessionWindow<String> window4 = (Window.SessionWindow<String>)out.getWindows().get(0); + Assert.assertEquals(1100L, window4.getBeginTimestamp()); + Assert.assertEquals(3901, window4.getDurationMillis()); + Assert.assertEquals("a", out.getValue().getKey()); + Assert.assertEquals(12L, out.getValue().getValue().longValue()); + + windowedOperator.endWindow(); + } + + @Test + public void testKeyedAccumulation() + { + OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1, + new Attribute.AttributeMap.DefaultAttributeMap()); + KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = createDefaultKeyedWindowedOperator(); + windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000))); + WindowedKeyedStorage<String, MutableLong> dataStorage = new InMemoryWindowedKeyedStorage<>(); + windowedOperator.setDataStorage(dataStorage); + windowedOperator.setup(context); + windowedOperator.beginWindow(1); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, new KeyValPair<>("a", 2L))); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, new KeyValPair<>("a", 3L))); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(300L, new KeyValPair<>("b", 4L))); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(150L, new KeyValPair<>("b", 5L))); + windowedOperator.endWindow(); + Assert.assertEquals(1, dataStorage.size()); + Assert.assertEquals(5L, dataStorage.get(new Window.TimeWindow(0, 1000), "a").longValue()); + Assert.assertEquals(9L, dataStorage.get(new Window.TimeWindow(0, 1000), "b").longValue()); + } + + private void testKeyedTrigger(TriggerOption.AccumulationMode accumulationMode) + { + KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = createDefaultKeyedWindowedOperator(); + TriggerOption triggerOption = new TriggerOption().withEarlyFiringsAtEvery(Duration.millis(1000)); + switch (accumulationMode) { + case ACCUMULATING: + triggerOption.accumulatingFiredPanes(); + break; + case ACCUMULATING_AND_RETRACTING: + triggerOption.accumulatingAndRetractingFiredPanes(); + break; + case DISCARDING: + triggerOption.discardingFiredPanes(); + break; + default: + throw new RuntimeException("Unknown accumulation mode: " + accumulationMode); + } + windowedOperator.setTriggerOption(triggerOption); + windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000))); + CollectorTestSink<Tuple<KeyValPair<String, Long>>> sink = new CollectorTestSink(); + windowedOperator.output.setSink((Sink<Object>)(Sink)sink); + OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1, + new Attribute.AttributeMap.DefaultAttributeMap()); + windowedOperator.setup(context); + windowedOperator.beginWindow(1); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, new KeyValPair<>("a", 2L))); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, new KeyValPair<>("b", 3L))); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(400L, new KeyValPair<>("b", 5L))); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(300L, new KeyValPair<>("a", 4L))); + windowedOperator.endWindow(); + Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty()); + windowedOperator.beginWindow(2); + windowedOperator.endWindow(); + Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty()); + windowedOperator.beginWindow(3); + windowedOperator.endWindow(); + Assert.assertEquals("There should be exactly two tuple for the time trigger", 2, sink.collectedTuples.size()); + { + Map<String, Long> map = new HashMap<>(); + for (Tuple<KeyValPair<String, Long>> tuple : sink.collectedTuples) { + map.put(tuple.getValue().getKey(), tuple.getValue().getValue()); + } + Assert.assertEquals(6L, map.get("a").longValue()); + Assert.assertEquals(8L, map.get("b").longValue()); + } + sink.collectedTuples.clear(); + windowedOperator.beginWindow(4); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(400L, new KeyValPair<>("a", 8L))); + windowedOperator.endWindow(); + Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty()); + windowedOperator.beginWindow(5); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(300L, new KeyValPair<>("b", 9L))); + windowedOperator.endWindow(); + Map<String, Long> map = new HashMap<>(); + switch (accumulationMode) { + case ACCUMULATING: + Assert.assertEquals("There should be exactly two tuples for the time trigger", 2, sink.collectedTuples.size()); + for (Tuple<KeyValPair<String, Long>> tuple : sink.collectedTuples) { + map.put(tuple.getValue().getKey(), tuple.getValue().getValue()); + } + Assert.assertEquals(14L, map.get("a").longValue()); + Assert.assertEquals(17L, map.get("b").longValue()); + break; + case DISCARDING: + Assert.assertEquals("There should be exactly two tuples for the time trigger", 2, sink.collectedTuples.size()); + for (Tuple<KeyValPair<String, Long>> tuple : sink.collectedTuples) { + map.put(tuple.getValue().getKey(), tuple.getValue().getValue()); + } + Assert.assertEquals(8L, map.get("a").longValue()); + Assert.assertEquals(9L, map.get("b").longValue()); + break; + case ACCUMULATING_AND_RETRACTING: + Assert.assertEquals("There should be exactly four tuples for the time trigger", 4, sink.collectedTuples.size()); + for (Tuple<KeyValPair<String, Long>> tuple : sink.collectedTuples) { + String key = tuple.getValue().getKey(); + long value = tuple.getValue().getValue(); + map.put(value < 0 ? "R" + key : key, value); + } + Assert.assertEquals(-6L, map.get("Ra").longValue()); + Assert.assertEquals(-8L, map.get("Rb").longValue()); + Assert.assertEquals(14L, map.get("a").longValue()); + Assert.assertEquals(17L, map.get("b").longValue()); + break; + default: + throw new RuntimeException("Unknown accumulation mode: " + accumulationMode); + } + } + + @Test + public void testKeyedTriggerWithDiscardingMode() + { + testKeyedTrigger(TriggerOption.AccumulationMode.DISCARDING); + } + + @Test + public void testKeyedTriggerWithAccumulatingMode() + { + testKeyedTrigger(TriggerOption.AccumulationMode.ACCUMULATING); + } + + @Test + public void testKeyedTriggerWithAccumulatingAndRetractingMode() + { + testKeyedTrigger(TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7a77274a/library/src/test/java/org/apache/apex/malhar/lib/window/sample/pi/Application.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/sample/pi/Application.java b/library/src/test/java/org/apache/apex/malhar/lib/window/sample/pi/Application.java new file mode 100644 index 0000000..f70ac64 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/sample/pi/Application.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.sample.pi; + +import org.joda.time.Duration; + +import org.apache.apex.malhar.lib.window.Accumulation; +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.lib.window.WindowState; +import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedStorage; +import org.apache.apex.malhar.lib.window.impl.WindowedOperatorImpl; +import org.apache.commons.lang3.mutable.MutableLong; +import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.io.ConsoleOutputOperator; + +/** + * This is an example of using the WindowedOperator concepts to calculate the value of pi. + */ +public class Application implements StreamingApplication +{ + public static class RandomNumberPairGenerator extends BaseOperator implements InputOperator + { + public final transient DefaultOutputPort<Tuple<MutablePair<Double, Double>>> output = new DefaultOutputPort<>(); + + @Override + public void emitTuples() + { + Tuple.PlainTuple<MutablePair<Double, Double>> tuple = new Tuple.PlainTuple<>(new MutablePair<>(Math.random(), Math.random())); + this.output.emit(tuple); + } + } + + public static class PiAccumulation implements Accumulation<MutablePair<Double, Double>, MutablePair<MutableLong, MutableLong>, Double> + { + @Override + public MutablePair<MutableLong, MutableLong> defaultAccumulatedValue() + { + return new MutablePair<>(new MutableLong(0), new MutableLong(0)); + } + + @Override + public MutablePair<MutableLong, MutableLong> accumulate(MutablePair<MutableLong, MutableLong> accumulatedValue, MutablePair<Double, Double> input) + { + if (input.getLeft() * input.getLeft() + input.getRight() * input.getRight() < 1) { + accumulatedValue.getLeft().increment(); + } + accumulatedValue.getRight().increment(); + return accumulatedValue; + } + + @Override + public MutablePair<MutableLong, MutableLong> merge(MutablePair<MutableLong, MutableLong> accumulatedValue1, MutablePair<MutableLong, MutableLong> accumulatedValue2) + { + accumulatedValue1.getLeft().add(accumulatedValue2.getLeft()); + accumulatedValue1.getRight().add(accumulatedValue2.getRight()); + return accumulatedValue1; + } + + @Override + public Double getOutput(MutablePair<MutableLong, MutableLong> accumulatedValue) + { + return accumulatedValue.getRight().longValue() == 0 ? 0.0 : (((double)accumulatedValue.getLeft().longValue()) * 4 / accumulatedValue.getRight().longValue()); + } + + @Override + public Double getRetraction(Double value) + { + return -value; + } + } + + @Override + public void populateDAG(DAG dag, Configuration configuration) + { + RandomNumberPairGenerator inputOperator = new RandomNumberPairGenerator(); + WindowedOperatorImpl<MutablePair<Double, Double>, MutablePair<MutableLong, MutableLong>, Double> windowedOperator = new WindowedOperatorImpl<>(); + Accumulation<MutablePair<Double, Double>, MutablePair<MutableLong, MutableLong>, Double> piAccumulation = new PiAccumulation(); + + windowedOperator.setAccumulation(piAccumulation); + windowedOperator.setDataStorage(new InMemoryWindowedStorage<MutablePair<MutableLong, MutableLong>>()); + windowedOperator.setWindowStateStorage(new InMemoryWindowedStorage<WindowState>()); + windowedOperator.setWindowOption(new WindowOption.GlobalWindow()); + windowedOperator.setTriggerOption(TriggerOption.AtWatermark().withEarlyFiringsAtEvery(Duration.millis(1000)).accumulatingFiredPanes()); + + ConsoleOutputOperator outputOperator = new ConsoleOutputOperator(); + dag.addOperator("inputOperator", inputOperator); + dag.addOperator("windowedOperator", windowedOperator); + dag.addOperator("outputOperator", outputOperator); + dag.addStream("input_windowed", inputOperator.output, windowedOperator.input); + dag.addStream("windowed_output", windowedOperator.output, outputOperator.input); + } + + public static void main(String[] args) throws Exception + { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + lma.prepareDAG(new Application(), conf); + LocalMode.Controller lc = lma.getController(); + lc.run(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7a77274a/library/src/test/java/org/apache/apex/malhar/lib/window/sample/wordcount/Application.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/sample/wordcount/Application.java b/library/src/test/java/org/apache/apex/malhar/lib/window/sample/wordcount/Application.java new file mode 100644 index 0000000..83dc277 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/sample/wordcount/Application.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.sample.wordcount; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.HashMap; +import java.util.Map; + +import org.joda.time.Duration; + +import org.apache.apex.malhar.lib.window.Accumulation; +import org.apache.apex.malhar.lib.window.ControlTuple; +import org.apache.apex.malhar.lib.window.SumAccumulation; +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.lib.window.WindowState; +import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedKeyedStorage; +import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedStorage; +import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl; +import org.apache.apex.malhar.lib.window.impl.WatermarkImpl; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.mutable.MutableLong; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.base.Throwables; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.util.KeyValPair; + +/** + * This is an example of using the WindowedOperator concepts to do streaming word count. + */ +public class Application implements StreamingApplication +{ + public static class WordGenerator extends BaseOperator implements InputOperator + { + public final transient DefaultOutputPort<Tuple<KeyValPair<String, Long>>> output = new DefaultOutputPort<>(); + public final transient DefaultOutputPort<ControlTuple> controlOutput = new DefaultOutputPort<>(); + + private transient BufferedReader reader; + + @Override + public void setup(Context.OperatorContext context) + { + initReader(); + } + + private void initReader() + { + try { + InputStream resourceStream = this.getClass().getResourceAsStream("/wordcount.txt"); + reader = new BufferedReader(new InputStreamReader(resourceStream)); + } catch (Exception ex) { + throw Throwables.propagate(ex); + } + } + + @Override + public void teardown() + { + IOUtils.closeQuietly(reader); + } + + @Override + public void emitTuples() + { + try { + String line = reader.readLine(); + if (line == null) { + reader.close(); + initReader(); + } else { + // simulate late data + long timestamp = System.currentTimeMillis() - (long)(Math.random() * 30000); + Map<String, Long> countMap = new HashMap<>(); + for (String str : line.split("[\\p{Punct}\\s]+")) { + countMap.put(StringUtils.lowerCase(str), (countMap.containsKey(str)) ? countMap.get(str) + 1 : 1); + } + for (Map.Entry<String, Long> entry : countMap.entrySet()) { + String word = entry.getKey(); + long count = entry.getValue(); + Tuple.TimestampedTuple<KeyValPair<String, Long>> tuple = new Tuple.TimestampedTuple<>(timestamp, new KeyValPair<>(word, count)); + this.output.emit(tuple); + } + } + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + + @Override + public void endWindow() + { + this.controlOutput.emit(new WatermarkImpl(System.currentTimeMillis() - 15000)); + } + } + + @Override + public void populateDAG(DAG dag, Configuration configuration) + { + WordGenerator inputOperator = new WordGenerator(); + KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = new KeyedWindowedOperatorImpl<>(); + Accumulation<Long, MutableLong, Long> sum = new SumAccumulation(); + + windowedOperator.setAccumulation(sum); + windowedOperator.setDataStorage(new InMemoryWindowedKeyedStorage<String, MutableLong>()); + windowedOperator.setRetractionStorage(new InMemoryWindowedKeyedStorage<String, Long>()); + windowedOperator.setWindowStateStorage(new InMemoryWindowedStorage<WindowState>()); + windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.standardMinutes(1))); + windowedOperator.setTriggerOption(TriggerOption.AtWatermark().withEarlyFiringsAtEvery(Duration.millis(1000)).accumulatingAndRetractingFiredPanes()); + //windowedOperator.setAllowedLateness(Duration.millis(14000)); + + ConsoleOutputOperator outputOperator = new ConsoleOutputOperator(); + dag.addOperator("inputOperator", inputOperator); + dag.addOperator("windowedOperator", windowedOperator); + dag.addOperator("outputOperator", outputOperator); + dag.addStream("input_windowed", inputOperator.output, windowedOperator.input); + dag.addStream("windowed_output", windowedOperator.output, outputOperator.input); + } + + public static void main(String[] args) throws Exception + { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + lma.prepareDAG(new Application(), conf); + LocalMode.Controller lc = lma.getController(); + lc.run(); + } +}
