http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7a77274a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java
new file mode 100644
index 0000000..1bee15d
--- /dev/null
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * This is an implementation of the WindowedOperator. If your operation is key 
based, please use {@link KeyedWindowedOperatorImpl}.
+ *
+ * @param <InputT> The type of the value of the input tuple
+ * @param <AccumT> The type of the accumulated value in the operator state per 
window
+ * @param <OutputT> The type of the value of the output tuple
+ */
[email protected]
+public class WindowedOperatorImpl<InputT, AccumT, OutputT>
+    extends AbstractWindowedOperator<InputT, OutputT, WindowedStorage<AccumT>, 
WindowedStorage<OutputT>, Accumulation<InputT, AccumT, OutputT>>
+{
+  @Override
+  public void accumulateTuple(Tuple.WindowedTuple<InputT> tuple)
+  {
+    for (Window window : tuple.getWindows()) {
+      // process each window
+      AccumT accum = dataStorage.get(window);
+      if (accum == null) {
+        accum = accumulation.defaultAccumulatedValue();
+      }
+      dataStorage.put(window, accumulation.accumulate(accum, 
tuple.getValue()));
+    }
+  }
+
+  @Override
+  public void fireNormalTrigger(Window window, boolean fireOnlyUpdatedPanes)
+  {
+    AccumT accumulatedValue = dataStorage.get(window);
+    OutputT outputValue = accumulation.getOutput(accumulatedValue);
+    if (fireOnlyUpdatedPanes) {
+      OutputT oldValue = retractionStorage.get(window);
+      if (oldValue != null && oldValue.equals(outputValue)) {
+        return;
+      }
+    }
+    output.emit(new Tuple.WindowedTuple<>(window, outputValue));
+    if (retractionStorage != null) {
+      retractionStorage.put(window, outputValue);
+    }
+  }
+
+  @Override
+  public void fireRetractionTrigger(Window window)
+  {
+    if (triggerOption.getAccumulationMode() != 
TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) {
+      throw new UnsupportedOperationException();
+    }
+    OutputT oldValue = retractionStorage.get(window);
+    if (oldValue != null) {
+      output.emit(new Tuple.WindowedTuple<>(window, 
accumulation.getRetraction(oldValue)));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7a77274a/library/src/test/java/org/apache/apex/malhar/lib/window/SumAccumulation.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/window/SumAccumulation.java 
b/library/src/test/java/org/apache/apex/malhar/lib/window/SumAccumulation.java
new file mode 100644
index 0000000..6a4981b
--- /dev/null
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/window/SumAccumulation.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window;
+
+import org.apache.commons.lang3.mutable.MutableLong;
+
+/**
+ * Accumulation that does a simple sum of longs
+ */
+public class SumAccumulation implements Accumulation<Long, MutableLong, Long>
+{
+  @Override
+  public MutableLong defaultAccumulatedValue()
+  {
+    return new MutableLong(0);
+  }
+
+  @Override
+  public MutableLong accumulate(MutableLong accumulatedValue, Long input)
+  {
+    accumulatedValue.add(input);
+    return accumulatedValue;
+  }
+
+  @Override
+  public MutableLong merge(MutableLong accumulatedValue1, MutableLong 
accumulatedValue2)
+  {
+    accumulatedValue1.add(accumulatedValue2);
+    return accumulatedValue1;
+  }
+
+  @Override
+  public Long getOutput(MutableLong accumulatedValue)
+  {
+    return accumulatedValue.getValue();
+  }
+
+  @Override
+  public Long getRetraction(Long value)
+  {
+    return -value;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7a77274a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
new file mode 100644
index 0000000..f8f9d8a
--- /dev/null
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
@@ -0,0 +1,550 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.validation.ValidationException;
+
+import org.joda.time.Duration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedKeyedStorage;
+import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedStorage;
+import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl;
+import org.apache.apex.malhar.lib.window.impl.WatermarkImpl;
+import org.apache.apex.malhar.lib.window.impl.WindowedOperatorImpl;
+import org.apache.commons.lang3.mutable.MutableLong;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Sink;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * Unit tests for WindowedOperator
+ */
+public class WindowedOperatorTest
+{
+  private void verifyValidationFailure(WindowedOperatorImpl windowedOperator, 
String message)
+  {
+    try {
+      windowedOperator.validate();
+      Assert.fail("Should fail validation because " + message);
+    } catch (ValidationException ex) {
+      return;
+    }
+  }
+
+  private WindowedOperatorImpl<Long, MutableLong, Long> 
createDefaultWindowedOperator()
+  {
+    WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = new 
WindowedOperatorImpl<>();
+    windowedOperator.setDataStorage(new 
InMemoryWindowedStorage<MutableLong>());
+    windowedOperator.setRetractionStorage(new InMemoryWindowedStorage<Long>());
+    windowedOperator.setWindowStateStorage(new 
InMemoryWindowedStorage<WindowState>());
+    windowedOperator.setAccumulation(new SumAccumulation());
+    return windowedOperator;
+  }
+
+  private KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> 
createDefaultKeyedWindowedOperator()
+  {
+    KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> 
windowedOperator = new KeyedWindowedOperatorImpl<>();
+    windowedOperator.setDataStorage(new InMemoryWindowedKeyedStorage<String, 
MutableLong>());
+    windowedOperator.setRetractionStorage(new 
InMemoryWindowedKeyedStorage<String, Long>());
+    windowedOperator.setWindowStateStorage(new 
InMemoryWindowedStorage<WindowState>());
+    windowedOperator.setAccumulation(new SumAccumulation());
+    return windowedOperator;
+  }
+
+  @Test
+  public void testValidation() throws Exception
+  {
+    WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = new 
WindowedOperatorImpl<>();
+    verifyValidationFailure(windowedOperator, "nothing is configured");
+    windowedOperator.setWindowStateStorage(new 
InMemoryWindowedStorage<WindowState>());
+    verifyValidationFailure(windowedOperator, "data storage is not set");
+    windowedOperator.setDataStorage(new 
InMemoryWindowedStorage<MutableLong>());
+    verifyValidationFailure(windowedOperator, "accumulation is not set");
+    windowedOperator.setAccumulation(new SumAccumulation());
+    windowedOperator.validate();
+    windowedOperator.setTriggerOption(new 
TriggerOption().accumulatingAndRetractingFiredPanes());
+    verifyValidationFailure(windowedOperator, "retracting storage is not set 
for ACCUMULATING_AND_RETRACTING");
+    windowedOperator.setRetractionStorage(new InMemoryWindowedStorage<Long>());
+    windowedOperator.validate();
+    windowedOperator.setTriggerOption(new 
TriggerOption().discardingFiredPanes().firingOnlyUpdatedPanes());
+    verifyValidationFailure(windowedOperator, "DISCARDING is not valid for 
option firingOnlyUpdatedPanes");
+    windowedOperator.setTriggerOption(new 
TriggerOption().accumulatingFiredPanes().firingOnlyUpdatedPanes());
+    windowedOperator.setRetractionStorage(null);
+    verifyValidationFailure(windowedOperator, "retracting storage is not set 
for option firingOnlyUpdatedPanes");
+  }
+
+  @Test
+  public void testWatermarkAndAllowedLateness()
+  {
+    OperatorContextTestHelper.TestIdOperatorContext context = new 
OperatorContextTestHelper.TestIdOperatorContext(1,
+        new Attribute.AttributeMap.DefaultAttributeMap());
+    WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = 
createDefaultWindowedOperator();
+    CollectorTestSink controlSink = new CollectorTestSink();
+
+    windowedOperator.controlOutput.setSink(controlSink);
+
+    windowedOperator.setWindowOption(new 
WindowOption.TimeWindows(Duration.millis(1000)));
+    windowedOperator.setAllowedLateness(Duration.millis(1000));
+
+    WindowedStorage<MutableLong> dataStorage = new InMemoryWindowedStorage<>();
+    WindowedStorage<WindowState> windowStateStorage = new 
InMemoryWindowedStorage<>();
+
+    windowedOperator.setDataStorage(dataStorage);
+    windowedOperator.setWindowStateStorage(windowStateStorage);
+
+    windowedOperator.setup(context);
+    windowedOperator.beginWindow(1);
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, 2L));
+    Assert.assertEquals("There should be exactly one window in the storage", 
1, dataStorage.size());
+    Assert.assertEquals("There should be exactly one window in the storage", 
1, windowStateStorage.size());
+
+    Map.Entry<Window, WindowState> entry = 
windowStateStorage.entrySet().iterator().next();
+    Window window = entry.getKey();
+    WindowState windowState = entry.getValue();
+    Assert.assertEquals(-1, windowState.watermarkArrivalTime);
+    Assert.assertEquals(2L, dataStorage.get(window).longValue());
+
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, 3L));
+    Assert.assertEquals(5L, dataStorage.get(window).longValue());
+
+    windowedOperator.processWatermark(new WatermarkImpl(1200));
+    windowedOperator.endWindow();
+    Assert.assertTrue(windowState.watermarkArrivalTime > 0);
+    Assert.assertEquals("We should get one watermark tuple", 1, 
controlSink.getCount(false));
+
+    windowedOperator.beginWindow(2);
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(900L, 4L));
+    Assert.assertEquals("Late but not too late", 9L, 
dataStorage.get(window).longValue());
+    windowedOperator.processWatermark(new WatermarkImpl(3000));
+    windowedOperator.endWindow();
+    Assert.assertEquals("We should get two watermark tuples", 2, 
controlSink.getCount(false));
+    windowedOperator.beginWindow(3);
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(120L, 5L)); // 
this tuple should be dropped
+    Assert.assertEquals("The window should be dropped because it's too late", 
0, dataStorage.size());
+    Assert.assertEquals("The window should be dropped because it's too late", 
0, windowStateStorage.size());
+    windowedOperator.endWindow();
+  }
+
+  private void testTrigger(TriggerOption.AccumulationMode accumulationMode)
+  {
+    WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = 
createDefaultWindowedOperator();
+    TriggerOption triggerOption = new 
TriggerOption().withEarlyFiringsAtEvery(Duration.millis(1000));
+    switch (accumulationMode) {
+      case ACCUMULATING:
+        triggerOption.accumulatingFiredPanes();
+        break;
+      case ACCUMULATING_AND_RETRACTING:
+        triggerOption.accumulatingAndRetractingFiredPanes();
+        break;
+      case DISCARDING:
+        triggerOption.discardingFiredPanes();
+        break;
+      default:
+        throw new RuntimeException("Unknown accumulation mode: " + 
accumulationMode);
+    }
+    windowedOperator.setTriggerOption(triggerOption);
+    windowedOperator.setWindowOption(new 
WindowOption.TimeWindows(Duration.millis(1000)));
+    CollectorTestSink sink = new CollectorTestSink();
+    windowedOperator.output.setSink(sink);
+    OperatorContextTestHelper.TestIdOperatorContext context = new 
OperatorContextTestHelper.TestIdOperatorContext(1,
+        new Attribute.AttributeMap.DefaultAttributeMap());
+    windowedOperator.setup(context);
+    windowedOperator.beginWindow(1);
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, 2L));
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, 3L));
+    windowedOperator.endWindow();
+    Assert.assertTrue("No trigger should be fired yet", 
sink.collectedTuples.isEmpty());
+    windowedOperator.beginWindow(2);
+    windowedOperator.endWindow();
+    Assert.assertTrue("No trigger should be fired yet", 
sink.collectedTuples.isEmpty());
+    windowedOperator.beginWindow(3);
+    windowedOperator.endWindow();
+    Assert.assertEquals("There should be exactly one tuple for the time 
trigger", 1, sink.collectedTuples.size());
+    Assert.assertEquals(5L, 
((Tuple<Long>)sink.collectedTuples.get(0)).getValue().longValue());
+    sink.collectedTuples.clear();
+    windowedOperator.beginWindow(4);
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(400L, 4L));
+    windowedOperator.endWindow();
+    Assert.assertTrue("No trigger should be fired yet", 
sink.collectedTuples.isEmpty());
+    windowedOperator.beginWindow(5);
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(300L, 5L));
+    windowedOperator.endWindow();
+    switch (accumulationMode) {
+      case ACCUMULATING:
+        Assert.assertEquals("There should be exactly one tuple for the time 
trigger", 1, sink.collectedTuples.size());
+        Assert.assertEquals(14L, 
((Tuple<Long>)sink.collectedTuples.get(0)).getValue().longValue());
+        break;
+      case DISCARDING:
+        Assert.assertEquals("There should be exactly one tuple for the time 
trigger", 1, sink.collectedTuples.size());
+        Assert.assertEquals(9L, 
((Tuple<Long>)sink.collectedTuples.get(0)).getValue().longValue());
+        break;
+      case ACCUMULATING_AND_RETRACTING:
+        Assert.assertEquals("There should be exactly two tuples for the time 
trigger", 2, sink.collectedTuples.size());
+        Assert.assertEquals(-5L, 
((Tuple<Long>)sink.collectedTuples.get(0)).getValue().longValue());
+        Assert.assertEquals(14L, 
((Tuple<Long>)sink.collectedTuples.get(1)).getValue().longValue());
+        break;
+      default:
+        throw new RuntimeException("Unknown accumulation mode: " + 
accumulationMode);
+    }
+  }
+
+  @Test
+  public void testTriggerWithDiscardingMode()
+  {
+    testTrigger(TriggerOption.AccumulationMode.DISCARDING);
+  }
+
+  @Test
+  public void testTriggerWithAccumulatingMode()
+  {
+    testTrigger(TriggerOption.AccumulationMode.ACCUMULATING);
+  }
+
+  @Test
+  public void testTriggerWithAccumulatingAndRetractingMode()
+  {
+    testTrigger(TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING);
+  }
+
+  @Test
+  public void testTriggerWithAccumulatingModeFiringOnlyUpdatedPanes()
+  {
+    for (boolean firingOnlyUpdatedPanes : new boolean[]{true, false}) {
+      WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = 
createDefaultWindowedOperator();
+      TriggerOption triggerOption = new 
TriggerOption().withEarlyFiringsAtEvery(Duration.millis(1000))
+          .accumulatingFiredPanes();
+      if (firingOnlyUpdatedPanes) {
+        triggerOption.firingOnlyUpdatedPanes();
+      }
+      windowedOperator.setTriggerOption(triggerOption);
+      windowedOperator.setWindowOption(new 
WindowOption.TimeWindows(Duration.millis(1000)));
+      CollectorTestSink sink = new CollectorTestSink();
+      windowedOperator.output.setSink(sink);
+      OperatorContextTestHelper.TestIdOperatorContext context = new 
OperatorContextTestHelper.TestIdOperatorContext(1,
+          new Attribute.AttributeMap.DefaultAttributeMap());
+      windowedOperator.setup(context);
+      windowedOperator.beginWindow(1);
+      windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, 2L));
+      windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, 3L));
+      windowedOperator.endWindow();
+      Assert.assertTrue("No trigger should be fired yet", 
sink.collectedTuples.isEmpty());
+      windowedOperator.beginWindow(2);
+      windowedOperator.endWindow();
+      Assert.assertTrue("No trigger should be fired yet", 
sink.collectedTuples.isEmpty());
+      windowedOperator.beginWindow(3);
+      windowedOperator.endWindow();
+      Assert.assertEquals("There should be exactly one tuple for the time 
trigger", 1, sink.collectedTuples.size());
+      Assert.assertEquals(5L, 
((Tuple<Long>)sink.collectedTuples.get(0)).getValue().longValue());
+      sink.collectedTuples.clear();
+      windowedOperator.beginWindow(4);
+      windowedOperator.endWindow();
+      Assert.assertTrue("No trigger should be fired yet", 
sink.collectedTuples.isEmpty());
+      windowedOperator.beginWindow(5);
+      windowedOperator.endWindow();
+      if (firingOnlyUpdatedPanes) {
+        Assert.assertTrue("There should not be any trigger since no panes have 
been updated", sink.collectedTuples.isEmpty());
+      } else {
+        Assert.assertEquals("There should be exactly one tuple for the time 
trigger", 1, sink.collectedTuples.size());
+        Assert.assertEquals(5L, 
((Tuple<Long>)sink.collectedTuples.get(0)).getValue().longValue());
+      }
+    }
+  }
+
+  @Test
+  public void testGlobalWindowAssignment()
+  {
+    OperatorContextTestHelper.TestIdOperatorContext context = new 
OperatorContextTestHelper.TestIdOperatorContext(1,
+        new Attribute.AttributeMap.DefaultAttributeMap());
+    WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = 
createDefaultWindowedOperator();
+    windowedOperator.setWindowOption(new WindowOption.GlobalWindow());
+    windowedOperator.setup(context);
+    Tuple.WindowedTuple<Long> windowedValue = 
windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(1100L, 2L));
+    List<Window> windows = windowedValue.getWindows();
+    Assert.assertEquals(1, windows.size());
+    Assert.assertEquals(Window.GLOBAL_WINDOW, windows.get(0));
+  }
+
+  @Test
+  public void testTimeWindowAssignment()
+  {
+    OperatorContextTestHelper.TestIdOperatorContext context = new 
OperatorContextTestHelper.TestIdOperatorContext(1,
+        new Attribute.AttributeMap.DefaultAttributeMap());
+    WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = 
createDefaultWindowedOperator();
+    windowedOperator.setWindowOption(new 
WindowOption.TimeWindows(Duration.millis(1000)));
+    windowedOperator.setup(context);
+    Tuple.WindowedTuple<Long> windowedValue = 
windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(1100L, 2L));
+    List<Window> windows = windowedValue.getWindows();
+    Assert.assertEquals(1, windows.size());
+    Assert.assertEquals(1000, windows.get(0).getBeginTimestamp());
+    Assert.assertEquals(1000, windows.get(0).getDurationMillis());
+  }
+
+  @Test
+  public void testSlidingWindowAssignment()
+  {
+    OperatorContextTestHelper.TestIdOperatorContext context = new 
OperatorContextTestHelper.TestIdOperatorContext(1,
+        new Attribute.AttributeMap.DefaultAttributeMap());
+    WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = 
createDefaultWindowedOperator();
+    windowedOperator.setWindowOption(new 
WindowOption.SlidingTimeWindows(Duration.millis(1000), Duration.millis(200)));
+    windowedOperator.setup(context);
+    Tuple.WindowedTuple<Long> windowedValue = 
windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(1600L, 2L));
+    List<Window> windows = windowedValue.getWindows();
+    Window[] winArray = windows.toArray(new Window[]{});
+    Arrays.sort(winArray, Window.DEFAULT_COMPARATOR);
+    Assert.assertEquals(5, winArray.length);
+    Assert.assertEquals(800, winArray[0].getBeginTimestamp());
+    Assert.assertEquals(1000, winArray[0].getDurationMillis());
+    Assert.assertEquals(1000, winArray[1].getBeginTimestamp());
+    Assert.assertEquals(1000, winArray[1].getDurationMillis());
+    Assert.assertEquals(1200, winArray[2].getBeginTimestamp());
+    Assert.assertEquals(1000, winArray[2].getDurationMillis());
+    Assert.assertEquals(1400, winArray[3].getBeginTimestamp());
+    Assert.assertEquals(1000, winArray[3].getDurationMillis());
+    Assert.assertEquals(1600, winArray[4].getBeginTimestamp());
+    Assert.assertEquals(1000, winArray[4].getDurationMillis());
+  }
+
+  @Test
+  public void testSessionWindows()
+  {
+    OperatorContextTestHelper.TestIdOperatorContext context = new 
OperatorContextTestHelper.TestIdOperatorContext(1,
+        new Attribute.AttributeMap.DefaultAttributeMap());
+    KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> 
windowedOperator = createDefaultKeyedWindowedOperator();
+    windowedOperator.setWindowOption(new 
WindowOption.SessionWindows(Duration.millis(2000)));
+    windowedOperator.setTriggerOption(new 
TriggerOption().withEarlyFiringsAtEvery(1).accumulatingAndRetractingFiredPanes().firingOnlyUpdatedPanes());
+    CollectorTestSink<Tuple<KeyValPair<String, Long>>> sink = new 
CollectorTestSink();
+    windowedOperator.output.setSink((Sink<Object>)(Sink)sink);
+    windowedOperator.setup(context);
+    windowedOperator.beginWindow(1);
+    Tuple<KeyValPair<String, Long>> tuple = new 
Tuple.TimestampedTuple<>(1100L, new KeyValPair<>("a", 2L));
+    windowedOperator.processTuple(tuple);
+
+    Assert.assertEquals(1, sink.getCount(false));
+    Tuple.WindowedTuple<KeyValPair<String, Long>> out = 
(Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(0);
+    Assert.assertEquals(1, out.getWindows().size());
+    Window.SessionWindow<String> window1 = 
(Window.SessionWindow<String>)out.getWindows().get(0);
+    Assert.assertEquals(1100L, window1.getBeginTimestamp());
+    Assert.assertEquals(1, window1.getDurationMillis());
+    Assert.assertEquals("a", window1.getKey());
+    Assert.assertEquals("a", out.getValue().getKey());
+    Assert.assertEquals(2L, out.getValue().getValue().longValue());
+    sink.clear();
+
+    // extending an existing session window
+    tuple = new Tuple.TimestampedTuple<>(2000L, new KeyValPair<>("a", 3L));
+    windowedOperator.processTuple(tuple);
+    Assert.assertEquals(2, sink.getCount(false));
+
+    // retraction trigger
+    out = (Tuple.WindowedTuple<KeyValPair<String, 
Long>>)sink.collectedTuples.get(0);
+    Assert.assertEquals(1, out.getWindows().size());
+    Assert.assertEquals(window1, out.getWindows().get(0));
+    Assert.assertEquals("a", out.getValue().getKey());
+    Assert.assertEquals(-2L, out.getValue().getValue().longValue());
+
+    // normal trigger
+    out = (Tuple.WindowedTuple<KeyValPair<String, 
Long>>)sink.collectedTuples.get(1);
+    Window.SessionWindow<String> window2 = 
(Window.SessionWindow<String>)out.getWindows().get(0);
+
+    Assert.assertEquals(1100L, window2.getBeginTimestamp());
+    Assert.assertEquals(901, window2.getDurationMillis());
+    Assert.assertEquals("a", out.getValue().getKey());
+    Assert.assertEquals(5L, out.getValue().getValue().longValue());
+    sink.clear();
+
+    // a separate session window
+    tuple = new Tuple.TimestampedTuple<>(5000L, new KeyValPair<>("a", 4L));
+    windowedOperator.processTuple(tuple);
+    Assert.assertEquals(1, sink.getCount(false));
+    out = (Tuple.WindowedTuple<KeyValPair<String, 
Long>>)sink.collectedTuples.get(0);
+    Assert.assertEquals(1, out.getWindows().size());
+    Window.SessionWindow<String> window3 = 
(Window.SessionWindow<String>)out.getWindows().get(0);
+    Assert.assertEquals(5000L, window3.getBeginTimestamp());
+    Assert.assertEquals(1, window3.getDurationMillis());
+    Assert.assertEquals("a", out.getValue().getKey());
+    Assert.assertEquals(4L, out.getValue().getValue().longValue());
+    sink.clear();
+
+    // session window merging
+    tuple = new Tuple.TimestampedTuple<>(3500L, new KeyValPair<>("a", 3L));
+    windowedOperator.processTuple(tuple);
+
+    Assert.assertEquals(3, sink.getCount(false));
+
+    // retraction of the two old windows
+    out = (Tuple.WindowedTuple<KeyValPair<String, 
Long>>)sink.collectedTuples.get(0);
+    Assert.assertEquals(1, out.getWindows().size());
+    Assert.assertEquals(window2, out.getWindows().get(0));
+    Assert.assertEquals("a", out.getValue().getKey());
+    Assert.assertEquals(-5L, out.getValue().getValue().longValue());
+    out = (Tuple.WindowedTuple<KeyValPair<String, 
Long>>)sink.collectedTuples.get(1);
+    Assert.assertEquals(1, out.getWindows().size());
+    Assert.assertEquals(window3, out.getWindows().get(0));
+    Assert.assertEquals("a", out.getValue().getKey());
+    Assert.assertEquals(-4L, out.getValue().getValue().longValue());
+
+    // normal trigger
+    out = (Tuple.WindowedTuple<KeyValPair<String, 
Long>>)sink.collectedTuples.get(2);
+    Assert.assertEquals(1, out.getWindows().size());
+    Window.SessionWindow<String> window4 = 
(Window.SessionWindow<String>)out.getWindows().get(0);
+    Assert.assertEquals(1100L, window4.getBeginTimestamp());
+    Assert.assertEquals(3901, window4.getDurationMillis());
+    Assert.assertEquals("a", out.getValue().getKey());
+    Assert.assertEquals(12L, out.getValue().getValue().longValue());
+
+    windowedOperator.endWindow();
+  }
+
+  @Test
+  public void testKeyedAccumulation()
+  {
+    OperatorContextTestHelper.TestIdOperatorContext context = new 
OperatorContextTestHelper.TestIdOperatorContext(1,
+        new Attribute.AttributeMap.DefaultAttributeMap());
+    KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> 
windowedOperator = createDefaultKeyedWindowedOperator();
+    windowedOperator.setWindowOption(new 
WindowOption.TimeWindows(Duration.millis(1000)));
+    WindowedKeyedStorage<String, MutableLong> dataStorage = new 
InMemoryWindowedKeyedStorage<>();
+    windowedOperator.setDataStorage(dataStorage);
+    windowedOperator.setup(context);
+    windowedOperator.beginWindow(1);
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, new 
KeyValPair<>("a", 2L)));
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, new 
KeyValPair<>("a", 3L)));
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(300L, new 
KeyValPair<>("b", 4L)));
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(150L, new 
KeyValPair<>("b", 5L)));
+    windowedOperator.endWindow();
+    Assert.assertEquals(1, dataStorage.size());
+    Assert.assertEquals(5L, dataStorage.get(new Window.TimeWindow(0, 1000), 
"a").longValue());
+    Assert.assertEquals(9L, dataStorage.get(new Window.TimeWindow(0, 1000), 
"b").longValue());
+  }
+
+  private void testKeyedTrigger(TriggerOption.AccumulationMode 
accumulationMode)
+  {
+    KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> 
windowedOperator = createDefaultKeyedWindowedOperator();
+    TriggerOption triggerOption = new 
TriggerOption().withEarlyFiringsAtEvery(Duration.millis(1000));
+    switch (accumulationMode) {
+      case ACCUMULATING:
+        triggerOption.accumulatingFiredPanes();
+        break;
+      case ACCUMULATING_AND_RETRACTING:
+        triggerOption.accumulatingAndRetractingFiredPanes();
+        break;
+      case DISCARDING:
+        triggerOption.discardingFiredPanes();
+        break;
+      default:
+        throw new RuntimeException("Unknown accumulation mode: " + 
accumulationMode);
+    }
+    windowedOperator.setTriggerOption(triggerOption);
+    windowedOperator.setWindowOption(new 
WindowOption.TimeWindows(Duration.millis(1000)));
+    CollectorTestSink<Tuple<KeyValPair<String, Long>>> sink = new 
CollectorTestSink();
+    windowedOperator.output.setSink((Sink<Object>)(Sink)sink);
+    OperatorContextTestHelper.TestIdOperatorContext context = new 
OperatorContextTestHelper.TestIdOperatorContext(1,
+        new Attribute.AttributeMap.DefaultAttributeMap());
+    windowedOperator.setup(context);
+    windowedOperator.beginWindow(1);
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, new 
KeyValPair<>("a", 2L)));
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, new 
KeyValPair<>("b", 3L)));
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(400L, new 
KeyValPair<>("b", 5L)));
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(300L, new 
KeyValPair<>("a", 4L)));
+    windowedOperator.endWindow();
+    Assert.assertTrue("No trigger should be fired yet", 
sink.collectedTuples.isEmpty());
+    windowedOperator.beginWindow(2);
+    windowedOperator.endWindow();
+    Assert.assertTrue("No trigger should be fired yet", 
sink.collectedTuples.isEmpty());
+    windowedOperator.beginWindow(3);
+    windowedOperator.endWindow();
+    Assert.assertEquals("There should be exactly two tuple for the time 
trigger", 2, sink.collectedTuples.size());
+    {
+      Map<String, Long> map = new HashMap<>();
+      for (Tuple<KeyValPair<String, Long>> tuple : sink.collectedTuples) {
+        map.put(tuple.getValue().getKey(), tuple.getValue().getValue());
+      }
+      Assert.assertEquals(6L, map.get("a").longValue());
+      Assert.assertEquals(8L, map.get("b").longValue());
+    }
+    sink.collectedTuples.clear();
+    windowedOperator.beginWindow(4);
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(400L, new 
KeyValPair<>("a", 8L)));
+    windowedOperator.endWindow();
+    Assert.assertTrue("No trigger should be fired yet", 
sink.collectedTuples.isEmpty());
+    windowedOperator.beginWindow(5);
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(300L, new 
KeyValPair<>("b", 9L)));
+    windowedOperator.endWindow();
+    Map<String, Long> map = new HashMap<>();
+    switch (accumulationMode) {
+      case ACCUMULATING:
+        Assert.assertEquals("There should be exactly two tuples for the time 
trigger", 2, sink.collectedTuples.size());
+        for (Tuple<KeyValPair<String, Long>> tuple : sink.collectedTuples) {
+          map.put(tuple.getValue().getKey(), tuple.getValue().getValue());
+        }
+        Assert.assertEquals(14L, map.get("a").longValue());
+        Assert.assertEquals(17L, map.get("b").longValue());
+        break;
+      case DISCARDING:
+        Assert.assertEquals("There should be exactly two tuples for the time 
trigger", 2, sink.collectedTuples.size());
+        for (Tuple<KeyValPair<String, Long>> tuple : sink.collectedTuples) {
+          map.put(tuple.getValue().getKey(), tuple.getValue().getValue());
+        }
+        Assert.assertEquals(8L, map.get("a").longValue());
+        Assert.assertEquals(9L, map.get("b").longValue());
+        break;
+      case ACCUMULATING_AND_RETRACTING:
+        Assert.assertEquals("There should be exactly four tuples for the time 
trigger", 4, sink.collectedTuples.size());
+        for (Tuple<KeyValPair<String, Long>> tuple : sink.collectedTuples) {
+          String key = tuple.getValue().getKey();
+          long value = tuple.getValue().getValue();
+          map.put(value < 0 ? "R" + key : key, value);
+        }
+        Assert.assertEquals(-6L, map.get("Ra").longValue());
+        Assert.assertEquals(-8L, map.get("Rb").longValue());
+        Assert.assertEquals(14L, map.get("a").longValue());
+        Assert.assertEquals(17L, map.get("b").longValue());
+        break;
+      default:
+        throw new RuntimeException("Unknown accumulation mode: " + 
accumulationMode);
+    }
+  }
+
+  @Test
+  public void testKeyedTriggerWithDiscardingMode()
+  {
+    testKeyedTrigger(TriggerOption.AccumulationMode.DISCARDING);
+  }
+
+  @Test
+  public void testKeyedTriggerWithAccumulatingMode()
+  {
+    testKeyedTrigger(TriggerOption.AccumulationMode.ACCUMULATING);
+  }
+
+  @Test
+  public void testKeyedTriggerWithAccumulatingAndRetractingMode()
+  {
+    
testKeyedTrigger(TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7a77274a/library/src/test/java/org/apache/apex/malhar/lib/window/sample/pi/Application.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/window/sample/pi/Application.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/window/sample/pi/Application.java
new file mode 100644
index 0000000..f70ac64
--- /dev/null
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/window/sample/pi/Application.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.sample.pi;
+
+import org.joda.time.Duration;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.WindowState;
+import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedStorage;
+import org.apache.apex.malhar.lib.window.impl.WindowedOperatorImpl;
+import org.apache.commons.lang3.mutable.MutableLong;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+
+/**
+ * This is an example of using the WindowedOperator concepts to calculate the 
value of pi.
+ */
+public class Application implements StreamingApplication
+{
+  public static class RandomNumberPairGenerator extends BaseOperator 
implements InputOperator
+  {
+    public final transient DefaultOutputPort<Tuple<MutablePair<Double, 
Double>>> output = new DefaultOutputPort<>();
+
+    @Override
+    public void emitTuples()
+    {
+      Tuple.PlainTuple<MutablePair<Double, Double>> tuple = new 
Tuple.PlainTuple<>(new MutablePair<>(Math.random(), Math.random()));
+      this.output.emit(tuple);
+    }
+  }
+
+  public static class PiAccumulation implements 
Accumulation<MutablePair<Double, Double>, MutablePair<MutableLong, 
MutableLong>, Double>
+  {
+    @Override
+    public MutablePair<MutableLong, MutableLong> defaultAccumulatedValue()
+    {
+      return new MutablePair<>(new MutableLong(0), new MutableLong(0));
+    }
+
+    @Override
+    public MutablePair<MutableLong, MutableLong> 
accumulate(MutablePair<MutableLong, MutableLong> accumulatedValue, 
MutablePair<Double, Double> input)
+    {
+      if (input.getLeft() * input.getLeft() + input.getRight() * 
input.getRight() < 1) {
+        accumulatedValue.getLeft().increment();
+      }
+      accumulatedValue.getRight().increment();
+      return accumulatedValue;
+    }
+
+    @Override
+    public MutablePair<MutableLong, MutableLong> 
merge(MutablePair<MutableLong, MutableLong> accumulatedValue1, 
MutablePair<MutableLong, MutableLong> accumulatedValue2)
+    {
+      accumulatedValue1.getLeft().add(accumulatedValue2.getLeft());
+      accumulatedValue1.getRight().add(accumulatedValue2.getRight());
+      return accumulatedValue1;
+    }
+
+    @Override
+    public Double getOutput(MutablePair<MutableLong, MutableLong> 
accumulatedValue)
+    {
+      return accumulatedValue.getRight().longValue() == 0 ? 0.0 : 
(((double)accumulatedValue.getLeft().longValue()) * 4 / 
accumulatedValue.getRight().longValue());
+    }
+
+    @Override
+    public Double getRetraction(Double value)
+    {
+      return -value;
+    }
+  }
+
+  @Override
+  public void populateDAG(DAG dag, Configuration configuration)
+  {
+    RandomNumberPairGenerator inputOperator = new RandomNumberPairGenerator();
+    WindowedOperatorImpl<MutablePair<Double, Double>, MutablePair<MutableLong, 
MutableLong>, Double> windowedOperator = new WindowedOperatorImpl<>();
+    Accumulation<MutablePair<Double, Double>, MutablePair<MutableLong, 
MutableLong>, Double> piAccumulation = new PiAccumulation();
+
+    windowedOperator.setAccumulation(piAccumulation);
+    windowedOperator.setDataStorage(new 
InMemoryWindowedStorage<MutablePair<MutableLong, MutableLong>>());
+    windowedOperator.setWindowStateStorage(new 
InMemoryWindowedStorage<WindowState>());
+    windowedOperator.setWindowOption(new WindowOption.GlobalWindow());
+    
windowedOperator.setTriggerOption(TriggerOption.AtWatermark().withEarlyFiringsAtEvery(Duration.millis(1000)).accumulatingFiredPanes());
+
+    ConsoleOutputOperator outputOperator = new ConsoleOutputOperator();
+    dag.addOperator("inputOperator", inputOperator);
+    dag.addOperator("windowedOperator", windowedOperator);
+    dag.addOperator("outputOperator", outputOperator);
+    dag.addStream("input_windowed", inputOperator.output, 
windowedOperator.input);
+    dag.addStream("windowed_output", windowedOperator.output, 
outputOperator.input);
+  }
+
+  public static void main(String[] args) throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    lma.prepareDAG(new Application(), conf);
+    LocalMode.Controller lc = lma.getController();
+    lc.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7a77274a/library/src/test/java/org/apache/apex/malhar/lib/window/sample/wordcount/Application.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/window/sample/wordcount/Application.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/window/sample/wordcount/Application.java
new file mode 100644
index 0000000..83dc277
--- /dev/null
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/window/sample/wordcount/Application.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.sample.wordcount;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.joda.time.Duration;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.apex.malhar.lib.window.ControlTuple;
+import org.apache.apex.malhar.lib.window.SumAccumulation;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.WindowState;
+import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedKeyedStorage;
+import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedStorage;
+import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl;
+import org.apache.apex.malhar.lib.window.impl.WatermarkImpl;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.mutable.MutableLong;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Throwables;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * This is an example of using the WindowedOperator concepts to do streaming 
word count.
+ */
+public class Application implements StreamingApplication
+{
+  public static class WordGenerator extends BaseOperator implements 
InputOperator
+  {
+    public final transient DefaultOutputPort<Tuple<KeyValPair<String, Long>>> 
output = new DefaultOutputPort<>();
+    public final transient DefaultOutputPort<ControlTuple> controlOutput = new 
DefaultOutputPort<>();
+
+    private transient BufferedReader reader;
+
+    @Override
+    public void setup(Context.OperatorContext context)
+    {
+      initReader();
+    }
+
+    private void initReader()
+    {
+      try {
+        InputStream resourceStream = 
this.getClass().getResourceAsStream("/wordcount.txt");
+        reader = new BufferedReader(new InputStreamReader(resourceStream));
+      } catch (Exception ex) {
+        throw Throwables.propagate(ex);
+      }
+    }
+
+    @Override
+    public void teardown()
+    {
+      IOUtils.closeQuietly(reader);
+    }
+
+    @Override
+    public void emitTuples()
+    {
+      try {
+        String line = reader.readLine();
+        if (line == null) {
+          reader.close();
+          initReader();
+        } else {
+          // simulate late data
+          long timestamp = System.currentTimeMillis() - (long)(Math.random() * 
30000);
+          Map<String, Long> countMap = new HashMap<>();
+          for (String str : line.split("[\\p{Punct}\\s]+")) {
+            countMap.put(StringUtils.lowerCase(str), 
(countMap.containsKey(str)) ? countMap.get(str) + 1 : 1);
+          }
+          for (Map.Entry<String, Long> entry : countMap.entrySet()) {
+            String word = entry.getKey();
+            long count = entry.getValue();
+            Tuple.TimestampedTuple<KeyValPair<String, Long>> tuple = new 
Tuple.TimestampedTuple<>(timestamp, new KeyValPair<>(word, count));
+            this.output.emit(tuple);
+          }
+        }
+      } catch (IOException ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+
+    @Override
+    public void endWindow()
+    {
+      this.controlOutput.emit(new WatermarkImpl(System.currentTimeMillis() - 
15000));
+    }
+  }
+
+  @Override
+  public void populateDAG(DAG dag, Configuration configuration)
+  {
+    WordGenerator inputOperator = new WordGenerator();
+    KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> 
windowedOperator = new KeyedWindowedOperatorImpl<>();
+    Accumulation<Long, MutableLong, Long> sum = new SumAccumulation();
+
+    windowedOperator.setAccumulation(sum);
+    windowedOperator.setDataStorage(new InMemoryWindowedKeyedStorage<String, 
MutableLong>());
+    windowedOperator.setRetractionStorage(new 
InMemoryWindowedKeyedStorage<String, Long>());
+    windowedOperator.setWindowStateStorage(new 
InMemoryWindowedStorage<WindowState>());
+    windowedOperator.setWindowOption(new 
WindowOption.TimeWindows(Duration.standardMinutes(1)));
+    
windowedOperator.setTriggerOption(TriggerOption.AtWatermark().withEarlyFiringsAtEvery(Duration.millis(1000)).accumulatingAndRetractingFiredPanes());
+    //windowedOperator.setAllowedLateness(Duration.millis(14000));
+
+    ConsoleOutputOperator outputOperator = new ConsoleOutputOperator();
+    dag.addOperator("inputOperator", inputOperator);
+    dag.addOperator("windowedOperator", windowedOperator);
+    dag.addOperator("outputOperator", outputOperator);
+    dag.addStream("input_windowed", inputOperator.output, 
windowedOperator.input);
+    dag.addStream("windowed_output", windowedOperator.output, 
outputOperator.input);
+  }
+
+  public static void main(String[] args) throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    lma.prepareDAG(new Application(), conf);
+    LocalMode.Controller lc = lma.getController();
+    lc.run();
+  }
+}

Reply via email to