[FLINK-4329] [streaming api] Fix Streaming File Source Timestamps/Watermarks 
Handling

This closes #2546


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8ff451be
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8ff451be
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8ff451be

Branch: refs/heads/master
Commit: 8ff451bec58e9f5800eb77c74c1d7457b776cc94
Parents: c62776f
Author: kl0u <[email protected]>
Authored: Thu Aug 25 17:38:49 2016 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Wed Oct 5 19:36:13 2016 +0200

----------------------------------------------------------------------
 .../state/RocksDBAsyncSnapshotTest.java         |  11 +-
 .../ContinuousFileMonitoringFunctionITCase.java |  17 +-
 .../hdfstests/ContinuousFileMonitoringTest.java | 209 ++++++++++++--
 .../fs/bucketing/BucketingSinkTest.java         |   4 +-
 .../source/ContinuousFileReaderOperator.java    |  96 ++++---
 .../streaming/api/operators/StreamSource.java   | 275 +-----------------
 .../api/operators/StreamSourceContexts.java     | 284 +++++++++++++++++++
 .../runtime/tasks/AsyncExceptionHandler.java    |   8 +-
 .../tasks/DefaultTimeServiceProvider.java       |  11 +-
 .../runtime/tasks/OneInputStreamTask.java       |   2 +-
 .../streaming/runtime/tasks/StreamTask.java     |  54 +---
 .../runtime/tasks/TwoInputStreamTask.java       |   2 +-
 .../operators/StreamSourceOperatorTest.java     |  17 +-
 .../runtime/operators/TimeProviderTest.java     |  79 ++++--
 ...AlignedProcessingTimeWindowOperatorTest.java |  34 ++-
 ...AlignedProcessingTimeWindowOperatorTest.java |  36 ++-
 .../runtime/tasks/StreamMockEnvironment.java    |   8 +-
 .../KeyedOneInputStreamOperatorTestHarness.java |   4 +-
 .../util/OneInputStreamOperatorTestHarness.java |  23 +-
 19 files changed, 694 insertions(+), 480 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index bccbabc..2ebd84a 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -86,7 +86,7 @@ public class RocksDBAsyncSnapshotTest {
        }
 
        /**
-        * This ensures that asynchronous state handles are actually 
materialized asynchonously.
+        * This ensures that asynchronous state handles are actually 
materialized asynchronously.
         *
         * <p>We use latches to block at various stages and see if the code 
still continues through
         * the parts that are not asynchronous. If the checkpoint is not done 
asynchronously the
@@ -168,7 +168,6 @@ public class RocksDBAsyncSnapshotTest {
                                while (!field.getBoolean(task)) {
                                        Thread.sleep(10);
                                }
-
                        }
                }
 
@@ -189,7 +188,9 @@ public class RocksDBAsyncSnapshotTest {
                Assert.assertTrue(threadPool.awaitTermination(60_000, 
TimeUnit.MILLISECONDS));
 
                testHarness.waitForTaskCompletion();
-               task.checkTimerException();
+               if (mockEnv.wasFailedExternally()) {
+                       Assert.fail("Unexpected exception during execution.");
+               }
        }
 
        /**
@@ -261,8 +262,10 @@ public class RocksDBAsyncSnapshotTest {
                        threadPool.shutdown();
                        Assert.assertTrue(threadPool.awaitTermination(60_000, 
TimeUnit.MILLISECONDS));
                        testHarness.waitForTaskCompletion();
-                       task.checkTimerException();
 
+                       if (mockEnv.wasFailedExternally()) {
+                               throw new AsynchronousException(new 
InterruptedException("Exception was thrown as expected."));
+                       }
                        Assert.fail("Operation completed. Cancel failed.");
                } catch (Exception expected) {
                        AsynchronousException asynchronousException = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
index 663345c..079bf04 100644
--- 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
+++ 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
@@ -120,7 +120,7 @@ public class ContinuousFileMonitoringFunctionITCase extends 
StreamingProgramTest
 
                try {
                        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-                       env.setParallelism(1);
+                       env.setParallelism(4);
 
                        
format.setFilesFilter(FilePathFilter.createDefaultFilter());
                        ContinuousFileMonitoringFunction<String> 
monitoringFunction =
@@ -130,7 +130,7 @@ public class ContinuousFileMonitoringFunctionITCase extends 
StreamingProgramTest
 
                        TypeInformation<String> typeInfo = 
TypeExtractor.getInputFormatTypes(format);
                        ContinuousFileReaderOperator<String, ?> reader = new 
ContinuousFileReaderOperator<>(format);
-                       TestingSinkFunction sink = new 
TestingSinkFunction(monitoringFunction);
+                       TestingSinkFunction sink = new TestingSinkFunction();
 
                        DataStream<FileInputSplit> splits = 
env.addSource(monitoringFunction);
                        splits.transform("FileSplitReader", typeInfo, 
reader).addSink(sink).setParallelism(1);
@@ -161,16 +161,10 @@ public class ContinuousFileMonitoringFunctionITCase 
extends StreamingProgramTest
 
        private static class TestingSinkFunction extends 
RichSinkFunction<String> {
 
-               private final ContinuousFileMonitoringFunction src;
-
                private int elementCounter = 0;
                private Map<Integer, Integer> elementCounters = new HashMap<>();
                private Map<Integer, List<String>> collectedContent = new 
HashMap<>();
 
-               TestingSinkFunction(ContinuousFileMonitoringFunction 
monitoringFunction) {
-                       this.src = monitoringFunction;
-               }
-
                @Override
                public void open(Configuration parameters) throws Exception {
                        // this sink can only work with DOP 1
@@ -200,13 +194,6 @@ public class ContinuousFileMonitoringFunctionITCase 
extends StreamingProgramTest
                                Assert.assertEquals(cntntStr.toString(), 
expectedContents.get(fileIdx));
                        }
                        expectedContents.clear();
-
-                       src.cancel();
-                       try {
-                               src.close();
-                       } catch (Exception e) {
-                               e.printStackTrace();
-                       }
                }
 
                private int getLineNo(String line) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
----------------------------------------------------------------------
diff --git 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
index 8a700f5..36b5c5e 100644
--- 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
+++ 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
@@ -27,12 +27,14 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.api.common.io.FilePathFilter;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
 import 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
 import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileUtil;
@@ -51,7 +53,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
 
 public class ContinuousFileMonitoringTest {
@@ -106,6 +107,155 @@ public class ContinuousFileMonitoringTest {
        //                                              TESTS
 
        @Test
+       public void testFileReadingOperatorWithIngestionTime() throws Exception 
{
+               Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
+               Map<Integer, String> expectedFileContents = new HashMap<>();
+
+               for(int i = 0; i < NO_OF_FILES; i++) {
+                       Tuple2<org.apache.hadoop.fs.Path, String> file = 
fillWithData(hdfsURI, "file", i, "This is test line.");
+                       filesCreated.add(file.f0);
+                       expectedFileContents.put(i, file.f1);
+               }
+
+               TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+               TypeInformation<String> typeInfo = 
TypeExtractor.getInputFormatTypes(format);
+
+               final long watermarkInterval = 10;
+               ExecutionConfig executionConfig = new ExecutionConfig();
+               executionConfig.setAutoWatermarkInterval(watermarkInterval);
+
+               ContinuousFileReaderOperator<String, ?> reader = new 
ContinuousFileReaderOperator<>(format);
+               reader.setOutputType(typeInfo, executionConfig);
+
+               final TestTimeServiceProvider timeServiceProvider = new 
TestTimeServiceProvider();
+               final OneInputStreamOperatorTestHarness<FileInputSplit, String> 
tester =
+                       new OneInputStreamOperatorTestHarness<>(reader, 
executionConfig, timeServiceProvider);
+               tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
+               tester.open();
+
+               Assert.assertEquals(TimeCharacteristic.IngestionTime, 
tester.getTimeCharacteristic());
+
+               // test that watermarks are correctly emitted
+
+               timeServiceProvider.setCurrentTime(201);
+               timeServiceProvider.setCurrentTime(301);
+               timeServiceProvider.setCurrentTime(401);
+               timeServiceProvider.setCurrentTime(501);
+
+               int i = 0;
+               for(Object line: tester.getOutput()) {
+                       if (!(line instanceof Watermark)) {
+                               Assert.fail("Only watermarks are expected here 
");
+                       }
+                       Watermark w = (Watermark) line;
+                       Assert.assertEquals(200 + (i * 100), w.getTimestamp());
+                       i++;
+               }
+
+               // clear the output to get the elements only and the final 
watermark
+               tester.getOutput().clear();
+               Assert.assertEquals(0, tester.getOutput().size());
+
+               // create the necessary splits for the test
+               FileInputSplit[] splits = format.createInputSplits(
+                       
reader.getRuntimeContext().getNumberOfParallelSubtasks());
+
+               // and feed them to the operator
+               Map<Integer, List<String>> actualFileContents = new HashMap<>();
+
+               long lastSeenWatermark = Long.MIN_VALUE;
+               int lineCounter = 0;    // counter for the lines read from the 
splits
+               int watermarkCounter = 0;
+
+               for(FileInputSplit split: splits) {
+
+                       // set the next "current processing time".
+                       long nextTimestamp = 
timeServiceProvider.getCurrentProcessingTime() + watermarkInterval;
+                       timeServiceProvider.setCurrentTime(nextTimestamp);
+
+                       // send the next split to be read and wait until it is 
fully read.
+                       tester.processElement(new StreamRecord<>(split));
+                       synchronized (tester.getCheckpointLock()) {
+                               while (tester.getOutput().isEmpty() || 
tester.getOutput().size() != (LINES_PER_FILE + 1)) {
+                                       tester.getCheckpointLock().wait(10);
+                               }
+                       }
+
+                       // verify that the results are the expected
+                       for(Object line: tester.getOutput()) {
+                               if (line instanceof StreamRecord) {
+                                       StreamRecord<String> element = 
(StreamRecord<String>) line;
+                                       lineCounter++;
+
+                                       Assert.assertEquals(nextTimestamp, 
element.getTimestamp());
+
+                                       int fileIdx = 
Character.getNumericValue(element.getValue().charAt(0));
+                                       List<String> content = 
actualFileContents.get(fileIdx);
+                                       if (content == null) {
+                                               content = new ArrayList<>();
+                                               actualFileContents.put(fileIdx, 
content);
+                                       }
+                                       content.add(element.getValue() + "\n");
+                               } else if (line instanceof Watermark) {
+                                       long watermark = ((Watermark) 
line).getTimestamp();
+
+                                       Assert.assertEquals(nextTimestamp - 
(nextTimestamp % watermarkInterval), watermark);
+                                       Assert.assertTrue(watermark > 
lastSeenWatermark);
+                                       watermarkCounter++;
+
+                                       lastSeenWatermark = watermark;
+                               } else {
+                                       Assert.fail("Unknown element in the 
list.");
+                               }
+                       }
+
+                       // clean the output to be ready for the next split
+                       tester.getOutput().clear();
+               }
+
+               // now we are processing one split after the other,
+               // so all the elements must be here by now.
+               Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE, lineCounter);
+
+               // because we expect one watermark per split.
+               Assert.assertEquals(NO_OF_FILES, watermarkCounter);
+
+               // then close the reader gracefully so that the Long.MAX 
watermark is emitted
+               synchronized (tester.getCheckpointLock()) {
+                       tester.close();
+               }
+
+               for(org.apache.hadoop.fs.Path file: filesCreated) {
+                       hdfs.delete(file, false);
+               }
+
+               // check if the last element is the LongMax watermark (by now 
this must be the only element)
+               Assert.assertEquals(1, tester.getOutput().size());
+               Assert.assertTrue(tester.getOutput().peek() instanceof 
Watermark);
+               Assert.assertEquals(Long.MAX_VALUE, ((Watermark) 
tester.getOutput().poll()).getTimestamp());
+
+               // check if the elements are the expected ones.
+               Assert.assertEquals(expectedFileContents.size(), 
actualFileContents.size());
+               for (Integer fileIdx: expectedFileContents.keySet()) {
+                       Assert.assertTrue("file" + fileIdx + " not found", 
actualFileContents.keySet().contains(fileIdx));
+
+                       List<String> cntnt = actualFileContents.get(fileIdx);
+                       Collections.sort(cntnt, new Comparator<String>() {
+                               @Override
+                               public int compare(String o1, String o2) {
+                                       return getLineNo(o1) - getLineNo(o2);
+                               }
+                       });
+
+                       StringBuilder cntntStr = new StringBuilder();
+                       for (String line: cntnt) {
+                               cntntStr.append(line);
+                       }
+                       Assert.assertEquals(expectedFileContents.get(fileIdx), 
cntntStr.toString());
+               }
+       }
+
+       @Test
        public void testFileReadingOperator() throws Exception {
                Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
                Map<Integer, String> expectedFileContents = new HashMap<>();
@@ -119,10 +269,11 @@ public class ContinuousFileMonitoringTest {
                TypeInformation<String> typeInfo = 
TypeExtractor.getInputFormatTypes(format);
 
                ContinuousFileReaderOperator<String, ?> reader = new 
ContinuousFileReaderOperator<>(format);
+               reader.setOutputType(typeInfo, new ExecutionConfig());
+
                OneInputStreamOperatorTestHarness<FileInputSplit, String> 
tester =
                        new OneInputStreamOperatorTestHarness<>(reader);
-
-               reader.setOutputType(typeInfo, new ExecutionConfig());
+               tester.setTimeCharacteristic(TimeCharacteristic.EventTime);
                tester.open();
 
                // create the necessary splits for the test
@@ -134,38 +285,38 @@ public class ContinuousFileMonitoringTest {
                        tester.processElement(new StreamRecord<>(split));
                }
 
-               // then close the reader gracefully
+               // then close the reader gracefully (and wait to finish reading)
                synchronized (tester.getCheckpointLock()) {
                        tester.close();
                }
 
-               /*
-               * Given that the reader is multithreaded, the test finishes 
before the reader thread finishes
-               * reading. This results in files being deleted by the test 
before being read, thus throwing an exception.
-               * In addition, even if file deletion happens at the end, the 
results are not ready for testing.
-               * To face this, we wait until all the output is collected or 
until the waiting time exceeds 1000 ms, or 1s.
-               */
+               // the lines received must be the elements in the files +1 for 
for the longMax watermark
+               // we are in event time, which emits no watermarks, so the last 
watermark will mark the
+               // of the input stream.
 
-               long start = System.currentTimeMillis();
-               Queue<Object> output;
-               do {
-                       output = tester.getOutput();
-                       Thread.sleep(50);
-               } while ((output == null || output.size() != NO_OF_FILES * 
LINES_PER_FILE) && (System.currentTimeMillis() - start) < 1000);
+               Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE + 1, 
tester.getOutput().size());
 
                Map<Integer, List<String>> actualFileContents = new HashMap<>();
+               Object lastElement = null;
                for(Object line: tester.getOutput()) {
-                       StreamRecord<String> element = (StreamRecord<String>) 
line;
-
-                       int fileIdx = 
Character.getNumericValue(element.getValue().charAt(0));
-                       List<String> content = actualFileContents.get(fileIdx);
-                       if(content == null) {
-                               content = new ArrayList<>();
-                               actualFileContents.put(fileIdx, content);
+                       lastElement = line;
+                       if (line instanceof StreamRecord) {
+                               StreamRecord<String> element = 
(StreamRecord<String>) line;
+
+                               int fileIdx = 
Character.getNumericValue(element.getValue().charAt(0));
+                               List<String> content = 
actualFileContents.get(fileIdx);
+                               if (content == null) {
+                                       content = new ArrayList<>();
+                                       actualFileContents.put(fileIdx, 
content);
+                               }
+                               content.add(element.getValue() + "\n");
                        }
-                       content.add(element.getValue() +"\n");
                }
 
+               // check if the last element is the LongMax watermark
+               Assert.assertTrue(lastElement instanceof Watermark);
+               Assert.assertEquals(Long.MAX_VALUE, ((Watermark) 
lastElement).getTimestamp());
+
                Assert.assertEquals(expectedFileContents.size(), 
actualFileContents.size());
                for (Integer fileIdx: expectedFileContents.keySet()) {
                        Assert.assertTrue("file" + fileIdx + " not found", 
actualFileContents.keySet().contains(fileIdx));
@@ -224,7 +375,7 @@ public class ContinuousFileMonitoringTest {
                monitoringFunction.open(new Configuration());
                monitoringFunction.run(new 
TestingSourceContext(monitoringFunction, uniqFilesFound));
 
-               Assert.assertTrue(uniqFilesFound.size() == NO_OF_FILES);
+               Assert.assertEquals(NO_OF_FILES, uniqFilesFound.size());
                for(int i = 0; i < NO_OF_FILES; i++) {
                        org.apache.hadoop.fs.Path file = new 
org.apache.hadoop.fs.Path(hdfsURI + "/file" + i);
                        
Assert.assertTrue(uniqFilesFound.contains(file.toString()));
@@ -268,8 +419,8 @@ public class ContinuousFileMonitoringTest {
                t.interrupt();
                fc.join();
 
-               Assert.assertTrue(fc.getFilesCreated().size() == NO_OF_FILES);
-               Assert.assertTrue(uniqFilesFound.size() == NO_OF_FILES);
+               Assert.assertEquals(NO_OF_FILES, fc.getFilesCreated().size());
+               Assert.assertEquals(NO_OF_FILES, uniqFilesFound.size());
 
                Set<org.apache.hadoop.fs.Path> filesCreated = 
fc.getFilesCreated();
                Set<String> fileNamesCreated = new HashSet<>();
@@ -316,7 +467,7 @@ public class ContinuousFileMonitoringTest {
                // wait until all the files are created
                fc.join();
 
-               Assert.assertTrue(filesCreated.size() == NO_OF_FILES);
+               Assert.assertEquals(NO_OF_FILES, filesCreated.size());
 
                Set<String> fileNamesCreated = new HashSet<>();
                for (org.apache.hadoop.fs.Path path: fc.getFilesCreated()) {
@@ -337,7 +488,7 @@ public class ContinuousFileMonitoringTest {
 
        private int getLineNo(String line) {
                String[] tkns = line.split("\\s");
-               Assert.assertTrue(tkns.length == 6);
+               Assert.assertEquals(6, tkns.length);
                return Integer.parseInt(tkns[tkns.length - 1]);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
 
b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
index e274fdd..ac1e3f0 100644
--- 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
+++ 
b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
@@ -70,7 +70,7 @@ public class BucketingSinkTest {
        private static org.apache.hadoop.fs.FileSystem dfs;
        private static String hdfsURI;
 
-       private OneInputStreamOperatorTestHarness<String, Object> 
createTestSink(File dataDir, TimeServiceProvider clock) {
+       private OneInputStreamOperatorTestHarness<String, Object> 
createTestSink(File dataDir, TestTimeServiceProvider clock) {
                BucketingSink<String> sink = new 
BucketingSink<String>(dataDir.getAbsolutePath())
                        .setBucketer(new Bucketer<String>() {
                                private static final long serialVersionUID = 1L;
@@ -91,7 +91,7 @@ public class BucketingSinkTest {
        }
 
        private <T> OneInputStreamOperatorTestHarness<T, Object> 
createTestSink(BucketingSink<T> sink,
-                                                                               
                                                                        
TimeServiceProvider clock) {
+                                                                               
                                                                        
TestTimeServiceProvider clock) {
                return new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(sink), new ExecutionConfig(), clock);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 838bee6..35e72a7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -29,14 +29,13 @@ import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.StreamSourceContexts;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,20 +43,24 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
 
+import static org.apache.flink.util.Preconditions.checkState;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * This is the operator that reads the {@link FileInputSplit FileInputSplits} 
received from
- * the preceding {@link ContinuousFileMonitoringFunction}. This operator will 
receive just the split descriptors
- * and then read and emit records. This may lead to increased backpressure. To 
avoid this, we have another
- * thread ({@link SplitReader}) actually reading the splits and emitting the 
elements, which is separate from
- * the thread forwarding the checkpoint barriers. The two threads sync on the 
{@link StreamTask#getCheckpointLock()}
- * so that the checkpoints reflect the current state.
+ * the preceding {@link ContinuousFileMonitoringFunction}. This operator can 
have parallelism
+ * greater than 1, contrary to the {@link ContinuousFileMonitoringFunction} 
which has
+ * a parallelism of 1.
+ * <p/>
+ * This operator will receive the split descriptors, put them in a queue, and 
have another
+ * thread read the actual data from the split. This architecture allows the 
separation of the
+ * reading thread, from the one emitting the checkpoint barriers, thus 
removing any potential
+ * back-pressure.
  */
 @Internal
 public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends 
AbstractStreamOperator<OUT>
@@ -67,16 +70,16 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
 
        private static final Logger LOG = 
LoggerFactory.getLogger(ContinuousFileReaderOperator.class);
 
+       /** A value that serves as a kill-pill to stop the reading thread when 
no more splits remain. */
        private static final FileInputSplit EOS = new FileInputSplit(-1, null, 
-1, -1, null);
 
-       private transient SplitReader<S, OUT> reader;
-       private transient TimestampedCollector<OUT> collector;
-
        private FileInputFormat<OUT> format;
        private TypeSerializer<OUT> serializer;
 
        private transient Object checkpointLock;
 
+       private transient SplitReader<S, OUT> reader;
+       private transient SourceFunction.SourceContext<OUT> readerContext;
        private Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState;
 
        public ContinuousFileReaderOperator(FileInputFormat<OUT> format) {
@@ -92,25 +95,22 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
        public void open() throws Exception {
                super.open();
 
-               if (this.serializer == null) {
-                       throw new IllegalStateException("The serializer has not 
been set. " +
-                               "Probably the setOutputType() was not called 
and this should not have happened. " +
-                               "Please report it.");
-               }
+               checkState(this.reader == null, "The reader is already 
initialized.");
+               checkState(this.serializer != null, "The serializer has not 
been set. " +
+                       "Probably the setOutputType() was not called. Please 
report it.");
 
                this.format.setRuntimeContext(getRuntimeContext());
                this.format.configure(new Configuration());
-
-               this.collector = new TimestampedCollector<>(output);
                this.checkpointLock = getContainingTask().getCheckpointLock();
 
-               Preconditions.checkState(reader == null, "The reader is already 
initialized.");
+               // set the reader context based on the time characteristic
+               final TimeCharacteristic timeCharacteristic = 
getOperatorConfig().getTimeCharacteristic();
+               final long watermarkInterval = 
getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
+               this.readerContext = StreamSourceContexts.getSourceContext(
+                       timeCharacteristic, getTimerService(), checkpointLock, 
output, watermarkInterval);
 
-               this.reader = new SplitReader<>(format, serializer, collector, 
checkpointLock, readerState);
-
-               // the readerState is needed for the initialization of the 
reader
-               // when recovering from a failure. So after the initialization,
-               // we can set it to null.
+               // and initialize the split reading thread
+               this.reader = new SplitReader<>(format, serializer, 
readerContext, checkpointLock, readerState);
                this.readerState = null;
                this.reader.start();
        }
@@ -122,7 +122,7 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
 
        @Override
        public void processWatermark(Watermark mark) throws Exception {
-               output.emitWatermark(mark);
+               // we do nothing because we emit our own watermarks if needed.
        }
 
        @Override
@@ -156,7 +156,8 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
                        }
                }
                reader = null;
-               collector = null;
+               readerContext = null;
+               readerState = null;
                format = null;
                serializer = null;
        }
@@ -177,7 +178,16 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
                        // called by the StreamTask while having it.
                        checkpointLock.wait();
                }
-               collector.close();
+
+               // finally if we are closed normally and we are operating on
+               // event or ingestion time, emit the max watermark indicating
+               // the end of the stream, like a normal source would do.
+
+               if (readerContext != null) {
+                       readerContext.emitWatermark(Watermark.MAX_WATERMARK);
+                       readerContext.close();
+               }
+               output.close();
        }
 
        private class SplitReader<S extends Serializable, OT> extends Thread {
@@ -188,7 +198,7 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
                private final TypeSerializer<OT> serializer;
 
                private final Object checkpointLock;
-               private final TimestampedCollector<OT> collector;
+               private final SourceFunction.SourceContext<OT> readerContext;
 
                private final Queue<FileInputSplit> pendingSplits;
 
@@ -200,16 +210,16 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
 
                private SplitReader(FileInputFormat<OT> format,
                                        TypeSerializer<OT> serializer,
-                                       TimestampedCollector<OT> collector,
+                                       SourceFunction.SourceContext<OT> 
readerContext,
                                        Object checkpointLock,
                                        Tuple3<List<FileInputSplit>, 
FileInputSplit, S> restoredState) {
 
                        this.format = checkNotNull(format, "Unspecified 
FileInputFormat.");
                        this.serializer = checkNotNull(serializer, "Unspecified 
Serializer.");
+                       this.readerContext = checkNotNull(readerContext, 
"Unspecified Reader Context.");
+                       this.checkpointLock = checkNotNull(checkpointLock, 
"Unspecified checkpoint lock.");
 
-                       this.pendingSplits = new LinkedList<>();
-                       this.collector = collector;
-                       this.checkpointLock = checkpointLock;
+                       this.pendingSplits = new ArrayDeque<>();
                        this.isRunning = true;
 
                        // this is the case where a task recovers from a 
previous failed attempt
@@ -219,7 +229,6 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
                                S formatState = restoredState.f2;
 
                                for (FileInputSplit split : pending) {
-                                       
Preconditions.checkArgument(!pendingSplits.contains(split), "Duplicate split 
entry to read: " + split + ".");
                                        pendingSplits.add(split);
                                }
 
@@ -229,9 +238,8 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
                }
 
                private void addSplit(FileInputSplit split) {
-                       Preconditions.checkNotNull(split);
+                       checkNotNull(split, "Cannot insert a null value in the 
pending splits queue.");
                        synchronized (checkpointLock) {
-                               
Preconditions.checkArgument(!pendingSplits.contains(split), "Duplicate split 
entry to read: " + split + ".");
                                this.pendingSplits.add(split);
                        }
                }
@@ -267,7 +275,7 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
                                                                
checkpointableFormat.reopen(currentSplit, restoredFormatState);
                                                        } else {
                                                                // this is the 
case of a non-checkpointable input format that will reprocess the last split.
-                                                               
LOG.info("Format " + this.format.getClass().getName() + " used is not 
checkpointable.");
+                                                               
LOG.info("Format " + this.format.getClass().getName() + " does not support 
checkpointing.");
                                                                
format.open(currentSplit);
                                                        }
                                                        // reset the restored 
state to null for the next iteration
@@ -299,7 +307,7 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
                                                        synchronized 
(checkpointLock) {
                                                                nextElement = 
format.nextRecord(nextElement);
                                                                if (nextElement 
!= null) {
-                                                                       
collector.collect(nextElement);
+                                                                       
readerContext.collect(nextElement);
                                                                } else {
                                                                        break;
                                                                }
@@ -318,10 +326,7 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
                                }
 
                        } catch (Throwable e) {
-                               if (isRunning) {
-                                       LOG.error("Caught exception processing 
split: ", currentSplit);
-                               }
-                               getContainingTask().failExternally(e);
+                               
getContainingTask().handleAsyncException("Caught exception when processing 
split: " + currentSplit, e);
                        } finally {
                                synchronized (checkpointLock) {
                                        LOG.info("Reader terminated, and 
exiting...");
@@ -358,7 +363,7 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
                                                        restoredFormatState;
                                        return new Tuple3<>(snapshot, 
currentSplit, formatState);
                                } else {
-                                       LOG.info("The format used is not 
checkpointable. The current input split will be restarted upon recovery.");
+                                       LOG.info("The format does not support 
checkpointing. The current input split will be re-read from start upon 
recovery.");
                                        return new Tuple3<>(snapshot, 
currentSplit, null);
                                }
                        } else {
@@ -404,7 +409,7 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
                FileInputSplit currSplit = (FileInputSplit) ois.readObject();
 
                // read the pending splits list
-               List<FileInputSplit> pendingSplits = new LinkedList<>();
+               List<FileInputSplit> pendingSplits = new ArrayList<>();
                int noOfSplits = ois.readInt();
                for (int i = 0; i < noOfSplits; i++) {
                        FileInputSplit split = (FileInputSplit) 
ois.readObject();
@@ -416,8 +421,7 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
                S formatState = (S) ois.readObject();
 
                // set the whole reader state for the open() to find.
-               Preconditions.checkState(this.readerState == null,
-                       "The reader state has already been initialized.");
+               checkState(this.readerState == null, "The reader state has 
already been initialized.");
 
                this.readerState = new Tuple3<>(pendingSplits, currSplit, 
formatState);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 22987ab..1409ae4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -21,11 +21,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
-
-import java.util.concurrent.ScheduledFuture;
 
 /**
  * {@link StreamOperator} for streaming sources.
@@ -57,26 +53,11 @@ public class StreamSource<OUT, SRC extends 
SourceFunction<OUT>>
        
        public void run(final Object lockingObject, final 
Output<StreamRecord<OUT>> collector) throws Exception {
                final TimeCharacteristic timeCharacteristic = 
getOperatorConfig().getTimeCharacteristic();
-               final SourceFunction.SourceContext<OUT> ctx;
-               
-               switch (timeCharacteristic) {
-                       case EventTime:
-                               ctx = new ManualWatermarkContext<>(this, 
lockingObject, collector);
-                               break;
-                       case IngestionTime:
-                               ctx = new AutomaticWatermarkContext<>(this, 
lockingObject, collector,
-                                               
getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval());
-                               break;
-                       case ProcessingTime:
-                               ctx = new NonTimestampContext<>(this, 
lockingObject, collector);
-                               break;
-                       default:
-                               throw new 
Exception(String.valueOf(timeCharacteristic));
-               }
+               final long watermarkInterval = 
getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
+
+               this.ctx = StreamSourceContexts.getSourceContext(
+                       timeCharacteristic, getTimerService(), lockingObject, 
collector, watermarkInterval);
 
-               // copy to a field to give the 'cancel()' method access
-               this.ctx = ctx;
-               
                try {
                        userFunction.run(ctx);
 
@@ -122,252 +103,4 @@ public class StreamSource<OUT, SRC extends 
SourceFunction<OUT>>
        protected boolean isCanceledOrStopped() {
                return canceledOrStopped;
        }
-
-       /**
-        * Checks whether any asynchronous thread (checkpoint trigger, timer, 
watermark generator, ...)
-        * has caused an exception. If one of these threads caused an 
exception, this method will
-        * throw that exception.
-        */
-       void checkAsyncException() {
-               getContainingTask().checkTimerException();
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Source contexts for various stream time characteristics
-       // 
------------------------------------------------------------------------
-       
-       /**
-        * A source context that attached {@code -1} as a timestamp to all 
records, and that
-        * does not forward watermarks.
-        */
-       public static class NonTimestampContext<T> implements 
SourceFunction.SourceContext<T> {
-
-               private final StreamSource<?, ?> owner;
-               private final Object lockingObject;
-               private final Output<StreamRecord<T>> output;
-               private final StreamRecord<T> reuse;
-
-               public NonTimestampContext(StreamSource<?, ?> owner, Object 
lockingObject, Output<StreamRecord<T>> output) {
-                       this.owner = owner;
-                       this.lockingObject = lockingObject;
-                       this.output = output;
-                       this.reuse = new StreamRecord<T>(null);
-               }
-
-               @Override
-               public void collect(T element) {
-                       owner.checkAsyncException();
-                       synchronized (lockingObject) {
-                               output.collect(reuse.replace(element));
-                       }
-               }
-
-               @Override
-               public void collectWithTimestamp(T element, long timestamp) {
-                       // ignore the timestamp
-                       collect(element);
-               }
-
-               @Override
-               public void emitWatermark(Watermark mark) {
-                       owner.checkAsyncException();
-                       // do nothing else
-               }
-
-               @Override
-               public Object getCheckpointLock() {
-                       return lockingObject;
-               }
-
-               @Override
-               public void close() {}
-       }
-       
-       /**
-        * {@link SourceFunction.SourceContext} to be used for sources with 
automatic timestamps
-        * and watermark emission.
-        */
-       public static class AutomaticWatermarkContext<T> implements 
SourceFunction.SourceContext<T> {
-
-               private final StreamSource<?, ?> owner;
-               private final TimeServiceProvider timeService;
-               private final Object lockingObject;
-               private final Output<StreamRecord<T>> output;
-               private final StreamRecord<T> reuse;
-               
-               private final ScheduledFuture<?> watermarkTimer;
-               private final long watermarkInterval;
-
-               private volatile long nextWatermarkTime;
-
-               public AutomaticWatermarkContext(
-                               final StreamSource<?, ?> owner,
-                               final Object lockingObjectParam,
-                               final Output<StreamRecord<T>> outputParam,
-                               final long watermarkInterval) {
-                       
-                       if (watermarkInterval < 1L) {
-                               throw new IllegalArgumentException("The 
watermark interval cannot be smaller than one.");
-                       }
-
-                       this.owner = owner;
-                       this.timeService = owner.getTimerService();
-                       this.lockingObject = lockingObjectParam;
-                       this.output = outputParam;
-                       this.watermarkInterval = watermarkInterval;
-                       this.reuse = new StreamRecord<T>(null);
-
-                       long now = this.timeService.getCurrentProcessingTime();
-                       this.watermarkTimer = 
this.timeService.registerTimer(now + watermarkInterval,
-                               new WatermarkEmittingTask(this.timeService, 
lockingObjectParam, outputParam));
-               }
-
-               @Override
-               public void collect(T element) {
-                       owner.checkAsyncException();
-                       
-                       synchronized (lockingObject) {
-                               final long currentTime = 
this.timeService.getCurrentProcessingTime();
-                               output.collect(reuse.replace(element, 
currentTime));
-
-                               // this is to avoid lock contention in the 
lockingObject by
-                               // sending the watermark before the firing of 
the watermark
-                               // emission task.
-
-                               if (currentTime > nextWatermarkTime) {
-                                       // in case we jumped some watermarks, 
recompute the next watermark time
-                                       final long watermarkTime = currentTime 
- (currentTime % watermarkInterval);
-                                       nextWatermarkTime = watermarkTime + 
watermarkInterval;
-                                       output.emitWatermark(new 
Watermark(watermarkTime));
-
-                                       // we do not need to register another 
timer here
-                                       // because the emitting task will do so.
-                               }
-                       }
-               }
-
-               @Override
-               public void collectWithTimestamp(T element, long timestamp) {
-                       collect(element);
-               }
-
-               @Override
-               public void emitWatermark(Watermark mark) {
-                       owner.checkAsyncException();
-                       
-                       if (mark.getTimestamp() == Long.MAX_VALUE) {
-                               // allow it since this is the special 
end-watermark that for example the Kafka source emits
-                               synchronized (lockingObject) {
-                                       nextWatermarkTime = Long.MAX_VALUE;
-                                       output.emitWatermark(mark);
-                               }
-
-                               // we can shutdown the timer now, no watermarks 
will be needed any more
-                               watermarkTimer.cancel(true);
-                       }
-               }
-
-               @Override
-               public Object getCheckpointLock() {
-                       return lockingObject;
-               }
-
-               @Override
-               public void close() {
-                       watermarkTimer.cancel(true);
-               }
-
-               private class WatermarkEmittingTask implements Triggerable {
-
-                       private final TimeServiceProvider timeService;
-                       private final Object lockingObject;
-                       private final Output<StreamRecord<T>> output;
-
-                       private WatermarkEmittingTask(TimeServiceProvider 
timeService, Object lock, Output<StreamRecord<T>> output) {
-                               this.timeService = timeService;
-                               this.lockingObject = lock;
-                               this.output = output;
-                       }
-
-                       @Override
-                       public void trigger(long timestamp) {
-                               final long currentTime = 
this.timeService.getCurrentProcessingTime();
-
-                               if (currentTime > nextWatermarkTime) {
-                                       // align the watermarks across all 
machines. this will ensure that we
-                                       // don't have watermarks that creep 
along at different intervals because
-                                       // the machine clocks are out of sync
-                                       final long watermarkTime = currentTime 
- (currentTime % watermarkInterval);
-
-                                       synchronized (lockingObject) {
-                                               if (currentTime > 
nextWatermarkTime) {
-                                                       
output.emitWatermark(new Watermark(watermarkTime));
-                                                       nextWatermarkTime += 
watermarkInterval;
-                                               }
-                                       }
-                               }
-
-                               
this.timeService.registerTimer(this.timeService.getCurrentProcessingTime() + 
watermarkInterval,
-                                       new 
WatermarkEmittingTask(this.timeService, lockingObject, output));
-                       }
-               }
-       }
-
-       /**
-        * A SourceContext for event time. Sources may directly attach 
timestamps and generate
-        * watermarks, but if records are emitted without timestamps, no 
timestamps are automatically
-        * generated and attached. The records will simply have no timestamp in 
that case.
-        * 
-        * Streaming topologies can use timestamp assigner functions to 
override the timestamps
-        * assigned here.
-        */
-       public static class ManualWatermarkContext<T> implements 
SourceFunction.SourceContext<T> {
-
-               private final StreamSource<?, ?> owner;
-               private final Object lockingObject;
-               private final Output<StreamRecord<T>> output;
-               private final StreamRecord<T> reuse;
-
-               public ManualWatermarkContext(StreamSource<?, ?> owner, Object 
lockingObject, Output<StreamRecord<T>> output) {
-                       this.owner = owner;
-                       this.lockingObject = lockingObject;
-                       this.output = output;
-                       this.reuse = new StreamRecord<T>(null);
-               }
-
-               @Override
-               public void collect(T element) {
-                       owner.checkAsyncException();
-                       
-                       synchronized (lockingObject) {
-                               output.collect(reuse.replace(element));
-                       }
-               }
-
-               @Override
-               public void collectWithTimestamp(T element, long timestamp) {
-                       owner.checkAsyncException();
-                       
-                       synchronized (lockingObject) {
-                               output.collect(reuse.replace(element, 
timestamp));
-                       }
-               }
-
-               @Override
-               public void emitWatermark(Watermark mark) {
-                       owner.checkAsyncException();
-                       
-                       synchronized (lockingObject) {
-                               output.emitWatermark(mark);
-                       }
-               }
-
-               @Override
-               public Object getCheckpointLock() {
-                       return lockingObject;
-               }
-
-               @Override
-               public void close() {}
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
new file mode 100644
index 0000000..abaf4e7
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.ScheduledFuture;
+
+/**
+ * Source contexts for various stream time characteristics.
+ */
+public class StreamSourceContexts {
+
+       /**
+        * Depending on the {@link TimeCharacteristic}, this method will return 
the adequate
+        * {@link 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext}. 
That is:
+        * <ul>
+        * <li> {@link TimeCharacteristic#IngestionTime} = {@link 
AutomaticWatermarkContext}
+        * <li> {@link TimeCharacteristic#ProcessingTime} = {@link 
NonTimestampContext}
+        * <li> {@link TimeCharacteristic#EventTime} = {@link 
ManualWatermarkContext}
+        * </ul>
+        * */
+       public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext(
+                       TimeCharacteristic timeCharacteristic, 
TimeServiceProvider timeService,
+                       Object checkpointLock, Output<StreamRecord<OUT>> 
output, long watermarkInterval) {
+
+               final SourceFunction.SourceContext<OUT> ctx;
+               switch (timeCharacteristic) {
+                       case EventTime:
+                               ctx = new 
ManualWatermarkContext<>(checkpointLock, output);
+                               break;
+                       case IngestionTime:
+                               ctx = new 
AutomaticWatermarkContext<>(timeService, checkpointLock, output, 
watermarkInterval);
+                               break;
+                       case ProcessingTime:
+                               ctx = new NonTimestampContext<>(checkpointLock, 
output);
+                               break;
+                       default:
+                               throw new 
IllegalArgumentException(String.valueOf(timeCharacteristic));
+               }
+               return ctx;
+       }
+
+       /**
+        * A source context that attached {@code -1} as a timestamp to all 
records, and that
+        * does not forward watermarks.
+        */
+       private static class NonTimestampContext<T> implements 
SourceFunction.SourceContext<T> {
+
+               private final Object lock;
+               private final Output<StreamRecord<T>> output;
+               private final StreamRecord<T> reuse;
+
+               private NonTimestampContext(Object checkpointLock, 
Output<StreamRecord<T>> output) {
+                       this.lock = Preconditions.checkNotNull(checkpointLock, 
"The checkpoint lock cannot be null.");
+                       this.output = Preconditions.checkNotNull(output, "The 
output cannot be null.");
+                       this.reuse = new StreamRecord<>(null);
+               }
+
+               @Override
+               public void collect(T element) {
+                       synchronized (lock) {
+                               output.collect(reuse.replace(element));
+                       }
+               }
+
+               @Override
+               public void collectWithTimestamp(T element, long timestamp) {
+                       // ignore the timestamp
+                       collect(element);
+               }
+
+               @Override
+               public void emitWatermark(Watermark mark) {
+                       // do nothing
+               }
+
+               @Override
+               public Object getCheckpointLock() {
+                       return lock;
+               }
+
+               @Override
+               public void close() {}
+       }
+
+       /**
+        * {@link SourceFunction.SourceContext} to be used for sources with 
automatic timestamps
+        * and watermark emission.
+        */
+       private static class AutomaticWatermarkContext<T> implements 
SourceFunction.SourceContext<T> {
+
+               private final TimeServiceProvider timeService;
+               private final Object lock;
+               private final Output<StreamRecord<T>> output;
+               private final StreamRecord<T> reuse;
+
+               private final ScheduledFuture<?> watermarkTimer;
+               private final long watermarkInterval;
+
+               private volatile long nextWatermarkTime;
+
+               private AutomaticWatermarkContext(
+                       final TimeServiceProvider timeService,
+                       final Object checkpointLock,
+                       final Output<StreamRecord<T>> output,
+                       final long watermarkInterval) {
+
+                       this.timeService = 
Preconditions.checkNotNull(timeService, "Time Service cannot be null.");
+                       this.lock = Preconditions.checkNotNull(checkpointLock, 
"The checkpoint lock cannot be null.");
+                       this.output = Preconditions.checkNotNull(output, "The 
output cannot be null.");
+
+                       Preconditions.checkArgument(watermarkInterval > 1L, 
"The watermark interval cannot be smaller than 1 ms.");
+                       this.watermarkInterval = watermarkInterval;
+
+                       this.reuse = new StreamRecord<>(null);
+
+                       long now = this.timeService.getCurrentProcessingTime();
+                       this.watermarkTimer = 
this.timeService.registerTimer(now + watermarkInterval,
+                               new WatermarkEmittingTask(this.timeService, 
lock, output));
+               }
+
+               @Override
+               public void collect(T element) {
+                       synchronized (lock) {
+                               final long currentTime = 
this.timeService.getCurrentProcessingTime();
+                               output.collect(reuse.replace(element, 
currentTime));
+
+                               // this is to avoid lock contention in the 
lockingObject by
+                               // sending the watermark before the firing of 
the watermark
+                               // emission task.
+
+                               if (currentTime > nextWatermarkTime) {
+                                       // in case we jumped some watermarks, 
recompute the next watermark time
+                                       final long watermarkTime = currentTime 
- (currentTime % watermarkInterval);
+                                       nextWatermarkTime = watermarkTime + 
watermarkInterval;
+                                       output.emitWatermark(new 
Watermark(watermarkTime));
+
+                                       // we do not need to register another 
timer here
+                                       // because the emitting task will do so.
+                               }
+                       }
+               }
+
+               @Override
+               public void collectWithTimestamp(T element, long timestamp) {
+                       collect(element);
+               }
+
+               @Override
+               public void emitWatermark(Watermark mark) {
+
+                       if (mark.getTimestamp() == Long.MAX_VALUE) {
+                               // allow it since this is the special 
end-watermark that for example the Kafka source emits
+                               synchronized (lock) {
+                                       nextWatermarkTime = Long.MAX_VALUE;
+                                       output.emitWatermark(mark);
+                               }
+
+                               // we can shutdown the timer now, no watermarks 
will be needed any more
+                               if (watermarkTimer != null) {
+                                       watermarkTimer.cancel(true);
+                               }
+                       }
+               }
+
+               @Override
+               public Object getCheckpointLock() {
+                       return lock;
+               }
+
+               @Override
+               public void close() {
+                       if (watermarkTimer != null) {
+                               watermarkTimer.cancel(true);
+                       }
+               }
+
+               private class WatermarkEmittingTask implements Triggerable {
+
+                       private final TimeServiceProvider timeService;
+                       private final Object lock;
+                       private final Output<StreamRecord<T>> output;
+
+                       private WatermarkEmittingTask(TimeServiceProvider 
timeService, Object checkpointLock, Output<StreamRecord<T>> output) {
+                               this.timeService = 
Preconditions.checkNotNull(timeService, "Time Service cannot be null.");
+                               this.lock = 
Preconditions.checkNotNull(checkpointLock, "The checkpoint lock cannot be 
null.");
+                               this.output = 
Preconditions.checkNotNull(output, "The output cannot be null.");
+                       }
+
+                       @Override
+                       public void trigger(long timestamp) {
+                               final long currentTime = 
timeService.getCurrentProcessingTime();
+
+                               if (currentTime > nextWatermarkTime) {
+                                       // align the watermarks across all 
machines. this will ensure that we
+                                       // don't have watermarks that creep 
along at different intervals because
+                                       // the machine clocks are out of sync
+                                       final long watermarkTime = currentTime 
- (currentTime % watermarkInterval);
+
+                                       synchronized (lock) {
+                                               if (currentTime > 
nextWatermarkTime) {
+                                                       
output.emitWatermark(new Watermark(watermarkTime));
+                                                       nextWatermarkTime = 
watermarkTime + watermarkInterval;
+                                               }
+                                       }
+                               }
+
+                               long nextWatermark = currentTime + 
watermarkInterval;
+                               this.timeService.registerTimer(nextWatermark, 
new WatermarkEmittingTask(this.timeService, lock, output));
+                       }
+               }
+       }
+
+       /**
+        * A SourceContext for event time. Sources may directly attach 
timestamps and generate
+        * watermarks, but if records are emitted without timestamps, no 
timestamps are automatically
+        * generated and attached. The records will simply have no timestamp in 
that case.
+        *
+        * Streaming topologies can use timestamp assigner functions to 
override the timestamps
+        * assigned here.
+        */
+       private static class ManualWatermarkContext<T> implements 
SourceFunction.SourceContext<T> {
+
+               private final Object lock;
+               private final Output<StreamRecord<T>> output;
+               private final StreamRecord<T> reuse;
+
+               private ManualWatermarkContext(Object checkpointLock, 
Output<StreamRecord<T>> output) {
+                       this.lock = Preconditions.checkNotNull(checkpointLock, 
"The checkpoint lock cannot be null.");
+                       this.output = Preconditions.checkNotNull(output, "The 
output cannot be null.");
+                       this.reuse = new StreamRecord<>(null);
+               }
+
+               @Override
+               public void collect(T element) {
+                       synchronized (lock) {
+                               output.collect(reuse.replace(element));
+                       }
+               }
+
+               @Override
+               public void collectWithTimestamp(T element, long timestamp) {
+                       synchronized (lock) {
+                               output.collect(reuse.replace(element, 
timestamp));
+                       }
+               }
+
+               @Override
+               public void emitWatermark(Watermark mark) {
+                       synchronized (lock) {
+                               output.emitWatermark(mark);
+                       }
+               }
+
+               @Override
+               public Object getCheckpointLock() {
+                       return lock;
+               }
+
+               @Override
+               public void close() {}
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
index c7ec2ed..4c55055 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
@@ -18,12 +18,14 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 /**
- * Interface for reporting exceptions that are thrown in (possibly) a 
different thread.
+ * An interface marking a task as capable of handling exceptions thrown
+ * by different threads, other than the one executing the task itself.
  */
 public interface AsyncExceptionHandler {
 
        /**
-        * Registers the given exception.
+        * Handles an exception thrown by another thread (e.g. a TriggerTask),
+        * other than the one executing the main task.
         */
-       void registerAsyncException(AsynchronousException exception);
+       void handleAsyncException(String message, Throwable exception);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
index ea2b07f..9534b3c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.util.Preconditions;
 
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -49,9 +50,9 @@ public class DefaultTimeServiceProvider extends 
TimeServiceProvider {
        private DefaultTimeServiceProvider(AsyncExceptionHandler task,
                                                                        
ScheduledExecutorService threadPoolExecutor,
                                                                        Object 
checkpointLock) {
-               this.task = task;
-               this.timerService = threadPoolExecutor;
-               this.checkpointLock = checkpointLock;
+               this.task = Preconditions.checkNotNull(task);
+               this.timerService = 
Preconditions.checkNotNull(threadPoolExecutor);
+               this.checkpointLock = 
Preconditions.checkNotNull(checkpointLock);
        }
 
        @Override
@@ -99,7 +100,7 @@ public class DefaultTimeServiceProvider extends 
TimeServiceProvider {
                                        target.trigger(timestamp);
                                } catch (Throwable t) {
                                        TimerException asyncException = new 
TimerException(t);
-                                       
exceptionHandler.registerAsyncException(asyncException);
+                                       
exceptionHandler.handleAsyncException("Caught exception while processing 
timer.", asyncException);
                                }
                        }
                }
@@ -109,7 +110,7 @@ public class DefaultTimeServiceProvider extends 
TimeServiceProvider {
        public static DefaultTimeServiceProvider 
createForTesting(ScheduledExecutorService executor, Object checkpointLock) {
                return new DefaultTimeServiceProvider(new 
AsyncExceptionHandler() {
                        @Override
-                       public void 
registerAsyncException(AsynchronousException exception) {
+                       public void handleAsyncException(String message, 
Throwable exception) {
                                exception.printStackTrace();
                        }
                }, executor, checkpointLock);

http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index d6d2fb5..cf8853e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -64,7 +64,7 @@ public class OneInputStreamTask<IN, OUT> extends 
StreamTask<OUT, OneInputStreamO
                final Object lock = getCheckpointLock();
                
                while (running && inputProcessor.processInput(operator, lock)) {
-                       checkTimerException();
+
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 9802a16..33317fa 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -53,6 +53,7 @@ import 
org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -158,11 +159,6 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
        private List<Collection<OperatorStateHandle>> lazyRestoreOperatorState;
 
-       /**
-        * This field is used to forward an exception that is caught in the 
timer thread or other
-        * asynchronous Threads. Subclasses must ensure that exceptions stored 
here get thrown on the
-        * actual execution Thread. */
-       private volatile AsynchronousException asyncException;
 
        /** The currently active background materialization threads */
        private final ClosableRegistry cancelables = new ClosableRegistry();
@@ -301,9 +297,6 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                        // still let the computation fail
                        tryDisposeAllOperators();
                        disposed = true;
-
-                       // Don't forget to check and throw exceptions that 
happened in async thread one last time
-                       checkTimerException();
                }
                finally {
                        // clean up everything we initialized
@@ -354,19 +347,6 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                }
        }
 
-       /**
-        * Marks task execution failed for an external reason (a reason other 
than the task code itself
-        * throwing an exception). If the task is already in a terminal state
-        * (such as FINISHED, CANCELED, FAILED), or if the task is already 
canceling this does nothing.
-        * Otherwise it sets the state to FAILED, and, if the invokable code is 
running,
-        * starts an asynchronous thread that aborts that code.
-        *
-        * <p>This method never blocks.</p>
-        */
-       public void failExternally(Throwable cause) {
-               getEnvironment().failExternally(cause);
-       }
-
        @Override
        public final void cancel() throws Exception {
                isRunning = false;
@@ -898,27 +878,21 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
        }
 
        /**
-        * Check whether an exception was thrown in a Thread other than the 
main Thread. (For example
-        * in the processing-time trigger Thread). This will rethrow that 
exception in case on
-        * occurred.
+        * Handles an exception thrown by another thread (e.g. a TriggerTask),
+        * other than the one executing the main task by failing the task 
entirely.
+        *
+        * In more detail, it marks task execution failed for an external reason
+        * (a reason other than the task code itself throwing an exception). If 
the task
+        * is already in a terminal state (such as FINISHED, CANCELED, FAILED), 
or if the
+        * task is already canceling this does nothing. Otherwise it sets the 
state to
+        * FAILED, and, if the invokable code is running, starts an 
asynchronous thread
+        * that aborts that code.
         *
-        * <p>This must be called in the main loop of {@code StreamTask} 
subclasses to ensure
-        * that we propagate failures.
+        * <p>This method never blocks.</p>
         */
-       public void checkTimerException() throws AsynchronousException {
-               if (asyncException != null) {
-                       throw asyncException;
-               }
-       }
-
        @Override
-       public void registerAsyncException(AsynchronousException exception) {
-               if (isRunning) {
-                       LOG.error("Asynchronous exception registered.", 
exception);
-               }
-               if (this.asyncException == null) {
-                       this.asyncException = exception;
-               }
+       public void handleAsyncException(String message, Throwable exception) {
+               getEnvironment().failExternally(exception);
        }
 
        // 
------------------------------------------------------------------------
@@ -1030,7 +1004,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                        catch (Exception e) {
                                // registers the exception and tries to fail 
the whole task
                                AsynchronousException asyncException = new 
AsynchronousException(e);
-                               owner.registerAsyncException(asyncException);
+                               owner.handleAsyncException("Failure in 
asynchronous checkpoint materialization", asyncException);
                        }
                        finally {
                                cancelables.unregisterClosable(this);

http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 9252063..0197c53 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -88,7 +88,7 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends 
StreamTask<OUT, TwoInputS
                final Object lock = getCheckpointLock();
                
                while (running && inputProcessor.processInput(operator, lock)) {
-                       checkTimerException();
+
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
index e8663f5..10b30d0 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StoppableStreamSource;
 import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.operators.StreamSourceContexts;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -187,12 +188,11 @@ public class StreamSourceOperatorTest {
 
                final List<StreamElement> output = new ArrayList<>();
 
-               StreamSource.AutomaticWatermarkContext<String> ctx =
-                       new StreamSource.AutomaticWatermarkContext<>(
-                               operator,
-                               
operator.getContainingTask().getCheckpointLock(),
-                               new CollectorOutput<String>(output),
-                               
operator.getExecutionConfig().getAutoWatermarkInterval());
+               
StreamSourceContexts.getSourceContext(TimeCharacteristic.IngestionTime,
+                       operator.getContainingTask().getTimerService(),
+                       operator.getContainingTask().getCheckpointLock(),
+                       new CollectorOutput<String>(output),
+                       
operator.getExecutionConfig().getAutoWatermarkInterval());
 
                // periodically emit the watermarks
                // even though we start from 1 the watermark are still
@@ -218,7 +218,7 @@ public class StreamSourceOperatorTest {
        private static <T> void setupSourceOperator(StreamSource<T, ?> operator,
                                                                                
                TimeCharacteristic timeChar,
                                                                                
                long watermarkInterval,
-                                                                               
                final TimeServiceProvider timeProvider) {
+                                                                               
                final TestTimeServiceProvider timeProvider) {
 
                ExecutionConfig executionConfig = new ExecutionConfig();
                executionConfig.setAutoWatermarkInterval(watermarkInterval);
@@ -241,9 +241,6 @@ public class StreamSourceOperatorTest {
                doAnswer(new Answer<TimeServiceProvider>() {
                        @Override
                        public TimeServiceProvider answer(InvocationOnMock 
invocation) throws Throwable {
-                               if (timeProvider == null) {
-                                       throw new RuntimeException("The time 
provider is null.");
-                               }
                                return timeProvider;
                        }
                }).when(mockTask).getTimerService();

http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
index 60850d8..0351978 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
@@ -23,7 +23,6 @@ import 
org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.runtime.tasks.AsyncExceptionHandler;
-import org.apache.flink.streaming.runtime.tasks.AsynchronousException;
 import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
@@ -39,6 +38,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.junit.Assert.assertEquals;
 
@@ -52,28 +52,24 @@ public class TimeProviderTest {
                final OneShotLatch latch = new OneShotLatch();
 
                final Object lock = new Object();
-               TimeServiceProvider timeServiceProvider = 
DefaultTimeServiceProvider.create(
-                               new AsyncExceptionHandler() {
-                                       @Override
-                                       public void 
registerAsyncException(AsynchronousException exception) {
-                                               exception.printStackTrace();
-                                       }
-                               },
-                               Executors.newSingleThreadScheduledExecutor(),
-                               lock);
+               TimeServiceProvider timeServiceProvider = 
DefaultTimeServiceProvider
+                       
.createForTesting(Executors.newSingleThreadScheduledExecutor(), lock);
 
                final List<Long> timestamps = new ArrayList<>();
 
-               long start = System.currentTimeMillis();
                long interval = 50L;
-
                final long noOfTimers = 20;
 
                // we add 2 timers per iteration minus the first that would 
have a negative timestamp
-               final long expectedNoOfTimers = 2 * noOfTimers - 1;
+               final long expectedNoOfTimers = 2 * noOfTimers;
 
                for (int i = 0; i < noOfTimers; i++) {
-                       double nextTimer = start + i * interval;
+
+                       // we add a delay (100ms) so that both timers are 
inserted before the first is processed.
+                       // If not, and given that we add timers out of order, 
we may have a timer firing
+                       // before the next one (with smaller timestamp) is 
added.
+
+                       double nextTimer = 
timeServiceProvider.getCurrentProcessingTime() + 100 + i * interval;
 
                        timeServiceProvider.registerTimer((long) nextTimer, new 
Triggerable() {
                                @Override
@@ -88,17 +84,15 @@ public class TimeProviderTest {
                        // add also out-of-order tasks to verify that eventually
                        // they will be executed in the correct order.
 
-                       if (i > 0) {
-                               timeServiceProvider.registerTimer((long) 
(nextTimer - 10), new Triggerable() {
-                                       @Override
-                                       public void trigger(long timestamp) 
throws Exception {
-                                               timestamps.add(timestamp);
-                                               if (timestamps.size() == 
expectedNoOfTimers) {
-                                                       latch.trigger();
-                                               }
+                       timeServiceProvider.registerTimer((long) (nextTimer - 
10L), new Triggerable() {
+                               @Override
+                               public void trigger(long timestamp) throws 
Exception {
+                                       timestamps.add(timestamp);
+                                       if (timestamps.size() == 
expectedNoOfTimers) {
+                                               latch.trigger();
                                        }
-                               });
-                       }
+                               }
+                       });
                }
 
                if (!latch.isTriggered()) {
@@ -114,15 +108,46 @@ public class TimeProviderTest {
                long lastTs = Long.MIN_VALUE;
                for (long timestamp: timestamps) {
                        Assert.assertTrue(timestamp >= lastTs);
+                       if (lastTs != Long.MIN_VALUE && counter % 2 == 1) {
+                               Assert.assertEquals((timestamp - lastTs), 10);
+                       }
                        lastTs = timestamp;
-
-                       long expectedTs = start + (counter/2) * interval;
-                       Assert.assertEquals(timestamp, (expectedTs + ((counter 
% 2 == 0) ? 0 : 40)));
                        counter++;
                }
        }
 
        @Test
+       public void testDefaultTimeProviderExceptionHandling() throws 
InterruptedException {
+               final OneShotLatch latch = new OneShotLatch();
+
+               final AtomicBoolean exceptionWasThrown = new 
AtomicBoolean(false);
+
+               final Object lock = new Object();
+
+               TimeServiceProvider timeServiceProvider = 
DefaultTimeServiceProvider
+                       .create(new AsyncExceptionHandler() {
+                               @Override
+                               public void handleAsyncException(String 
message, Throwable exception) {
+                                       exceptionWasThrown.compareAndSet(false, 
true);
+                                       latch.trigger();
+                               }
+                       }, Executors.newSingleThreadScheduledExecutor(), lock);
+
+               long now = System.currentTimeMillis();
+               timeServiceProvider.registerTimer(now, new Triggerable() {
+                       @Override
+                       public void trigger(long timestamp) throws Exception {
+                               throw new Exception("Exception in Timer");
+                       }
+               });
+
+               if (!latch.isTriggered()) {
+                       latch.await();
+               }
+               Assert.assertTrue(exceptionWasThrown.get());
+       }
+
+       @Test
        public void testTimerSorting() throws Exception {
 
                final List<Long> result = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index f33da89..30f38e3 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -183,13 +183,13 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
        @Test
        public void testWindowTriggerTimeAlignment() throws Exception {
                final Object lock = new Object();
-               final TimeServiceProvider timerService = 
DefaultTimeServiceProvider.createForTesting(
+               TimeServiceProvider timerService = 
DefaultTimeServiceProvider.createForTesting(
                        Executors.newSingleThreadScheduledExecutor(), lock);
 
                try {
                        @SuppressWarnings("unchecked")
                        final Output<StreamRecord<String>> mockOut = 
mock(Output.class);
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
+                       StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService, lock);
 
                        AccumulatingProcessingTimeWindowOperator<String, 
String, String> op;
 
@@ -201,6 +201,11 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                        assertTrue(op.getNextEvaluationTime() % 1000 == 0);
                        op.dispose();
 
+                       timerService.shutdownService();
+                       timerService = 
DefaultTimeServiceProvider.createForTesting(
+                               Executors.newSingleThreadScheduledExecutor(), 
lock);
+                       mockTask = createMockTaskWithTimer(timerService, lock);
+
                        op = new 
AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
                                        StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, 1000, 1000);
                        op.setup(mockTask, new StreamConfig(new 
Configuration()), mockOut);
@@ -209,6 +214,11 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                        assertTrue(op.getNextEvaluationTime() % 1000 == 0);
                        op.dispose();
 
+                       timerService.shutdownService();
+                       timerService = 
DefaultTimeServiceProvider.createForTesting(
+                               Executors.newSingleThreadScheduledExecutor(), 
lock);
+                       mockTask = createMockTaskWithTimer(timerService, lock);
+
                        op = new 
AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
                                        StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, 1500, 1000);
                        op.setup(mockTask, new StreamConfig(new 
Configuration()), mockOut);
@@ -217,6 +227,11 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                        assertTrue(op.getNextEvaluationTime() % 1000 == 0);
                        op.dispose();
 
+                       timerService.shutdownService();
+                       timerService = 
DefaultTimeServiceProvider.createForTesting(
+                               Executors.newSingleThreadScheduledExecutor(), 
lock);
+                       mockTask = createMockTaskWithTimer(timerService, lock);
+
                        op = new 
AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
                                        StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, 1200, 1100);
                        op.setup(mockTask, new StreamConfig(new 
Configuration()), mockOut);
@@ -243,7 +258,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                try {
                        final int windowSize = 50;
                        final CollectingOutput<Integer> out = new 
CollectingOutput<>(windowSize);
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
+                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService, lock);
 
                        // tumbling window that triggers every 20 milliseconds
                        AccumulatingProcessingTimeWindowOperator<Integer, 
Integer, Integer> op =
@@ -297,7 +312,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
                try {
                        final CollectingOutput<Integer> out = new 
CollectingOutput<>(50);
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
+                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService, lock);
                        
                        // tumbling window that triggers every 20 milliseconds
                        AccumulatingProcessingTimeWindowOperator<Integer, 
Integer, Integer> op =
@@ -359,7 +374,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
                try {
                        final CollectingOutput<Integer> out = new 
CollectingOutput<>(50);
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
+                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService, lock);
 
                        // tumbling window that triggers every 20 milliseconds
                        AccumulatingProcessingTimeWindowOperator<Integer, 
Integer, Integer> op =
@@ -416,7 +431,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
                try {
                        final CollectingOutput<Integer> out = new 
CollectingOutput<>(50);
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
+                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService, lock);
 
                        // tumbling window that triggers every 20 milliseconds
                        AccumulatingProcessingTimeWindowOperator<Integer, 
Integer, Integer> op =
@@ -743,7 +758,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
        // 
------------------------------------------------------------------------
 
-       private static StreamTask<?, ?> createMockTask() {
+       private static StreamTask<?, ?> createMockTask(Object lock) {
                Configuration configuration = new Configuration();
                configuration.setString(ConfigConstants.STATE_BACKEND, 
"jobmanager");
 
@@ -751,6 +766,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                when(task.getAccumulatorMap()).thenReturn(new HashMap<String, 
Accumulator<?, ?>>());
                when(task.getName()).thenReturn("Test task name");
                when(task.getExecutionConfig()).thenReturn(new 
ExecutionConfig());
+               when(task.getCheckpointLock()).thenReturn(lock);
 
                final TaskManagerRuntimeInfo mockTaskManagerRuntimeInfo = 
mock(TaskManagerRuntimeInfo.class);
                
when(mockTaskManagerRuntimeInfo.getConfiguration()).thenReturn(configuration);
@@ -765,9 +781,9 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
        }
 
        private static StreamTask<?, ?> createMockTaskWithTimer(
-               final TimeServiceProvider timerService)
+               final TimeServiceProvider timerService, final Object lock)
        {
-               StreamTask<?, ?> mockTask = createMockTask();
+               StreamTask<?, ?> mockTask = createMockTask(lock);
                when(mockTask.getTimerService()).thenReturn(timerService);
                return mockTask;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index 826b230..7539c2d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -191,13 +191,13 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
        @Test
        public void testWindowTriggerTimeAlignment() throws Exception {
                final Object lock = new Object();
-               final TimeServiceProvider timerService = 
DefaultTimeServiceProvider.createForTesting(
+               TimeServiceProvider timerService = 
DefaultTimeServiceProvider.createForTesting(
                        Executors.newSingleThreadScheduledExecutor(), lock);
 
                try {
                        @SuppressWarnings("unchecked")
                        final Output<StreamRecord<String>> mockOut = 
mock(Output.class);
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
+                       StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService, lock);
                        
                        AggregatingProcessingTimeWindowOperator<String, String> 
op;
 
@@ -209,6 +209,11 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                        assertTrue(op.getNextEvaluationTime() % 1000 == 0);
                        op.dispose();
 
+                       timerService.shutdownService();
+                       timerService = 
DefaultTimeServiceProvider.createForTesting(
+                               Executors.newSingleThreadScheduledExecutor(), 
lock);
+                       mockTask = createMockTaskWithTimer(timerService, lock);
+
                        op = new 
AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
                                        StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, 1000, 1000);
                        op.setup(mockTask, new StreamConfig(new 
Configuration()), mockOut);
@@ -217,6 +222,11 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                        assertTrue(op.getNextEvaluationTime() % 1000 == 0);
                        op.dispose();
 
+                       timerService.shutdownService();
+                       timerService = 
DefaultTimeServiceProvider.createForTesting(
+                               Executors.newSingleThreadScheduledExecutor(), 
lock);
+                       mockTask = createMockTaskWithTimer(timerService, lock);
+
                        op = new 
AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
                                        StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, 1500, 1000);
                        op.setup(mockTask, new StreamConfig(new 
Configuration()), mockOut);
@@ -225,6 +235,11 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                        assertTrue(op.getNextEvaluationTime() % 1000 == 0);
                        op.dispose();
 
+                       timerService.shutdownService();
+                       timerService = 
DefaultTimeServiceProvider.createForTesting(
+                               Executors.newSingleThreadScheduledExecutor(), 
lock);
+                       mockTask = createMockTaskWithTimer(timerService, lock);
+
                        op = new 
AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
                                        StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, 1200, 1100);
                        op.setup(mockTask, new StreamConfig(new 
Configuration()), mockOut);
@@ -257,7 +272,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                                                        IntSerializer.INSTANCE, 
tupleSerializer,
                                                        windowSize, windowSize);
 
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
+                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService, lock);
 
                        op.setup(mockTask, createTaskConfig(fieldOneSelector, 
IntSerializer.INSTANCE, 10), out);
                        op.open();
@@ -309,7 +324,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                        final int windowSize = 50;
                        final CollectingOutput<Tuple2<Integer, Integer>> out = 
new CollectingOutput<>(windowSize);
 
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
+                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService, lock);
 
                        AggregatingProcessingTimeWindowOperator<Integer, 
Tuple2<Integer, Integer>> op =
                                        new 
AggregatingProcessingTimeWindowOperator<>(
@@ -377,7 +392,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                try {
                        final CollectingOutput<Tuple2<Integer, Integer>> out = 
new CollectingOutput<>(50);
 
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
+                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService, lock);
 
                        // tumbling window that triggers every 20 milliseconds
                        AggregatingProcessingTimeWindowOperator<Integer, 
Tuple2<Integer, Integer>> op =
@@ -448,7 +463,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
 
                try {
                        final CollectingOutput<Tuple2<Integer, Integer>> out = 
new CollectingOutput<>(50);
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
+                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService, lock);
 
                        // tumbling window that triggers every 20 milliseconds
                        AggregatingProcessingTimeWindowOperator<Integer, 
Tuple2<Integer, Integer>> op =
@@ -508,7 +523,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
 
                try {
                        final CollectingOutput<Tuple2<Integer, Integer>> out = 
new CollectingOutput<>();
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
+                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService, lock);
 
                        ReduceFunction<Tuple2<Integer, Integer>> 
failingFunction = new FailingFunction(100);
 
@@ -929,7 +944,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
 
        // 
------------------------------------------------------------------------
        
-       private static StreamTask<?, ?> createMockTask() {
+       private static StreamTask<?, ?> createMockTask(Object lock) {
                Configuration configuration = new Configuration();
                configuration.setString(ConfigConstants.STATE_BACKEND, 
"jobmanager");
 
@@ -937,6 +952,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                when(task.getAccumulatorMap()).thenReturn(new HashMap<String, 
Accumulator<?, ?>>());
                when(task.getName()).thenReturn("Test task name");
                when(task.getExecutionConfig()).thenReturn(new 
ExecutionConfig());
+               when(task.getCheckpointLock()).thenReturn(lock);
 
                final TaskManagerRuntimeInfo mockTaskManagerRuntimeInfo = 
mock(TaskManagerRuntimeInfo.class);
                
when(mockTaskManagerRuntimeInfo.getConfiguration()).thenReturn(configuration);
@@ -947,9 +963,9 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                return task;
        }
 
-       private static StreamTask<?, ?> createMockTaskWithTimer(final 
TimeServiceProvider timerService)
+       private static StreamTask<?, ?> createMockTaskWithTimer(final 
TimeServiceProvider timerService, final Object lock)
        {
-               StreamTask<?, ?> mockTask = createMockTask();
+               StreamTask<?, ?> mockTask = createMockTask(lock);
                when(mockTask.getTimerService()).thenReturn(timerService);
                return mockTask;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index f638ddd..9b773d8 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -102,6 +102,8 @@ public class StreamMockEnvironment implements Environment {
 
        private final ExecutionConfig executionConfig;
 
+       private volatile boolean wasFailedExternally = false;
+
        public StreamMockEnvironment(Configuration jobConfig, Configuration 
taskConfig, ExecutionConfig executionConfig,
                                                                        long 
memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
                this.taskInfo = new TaskInfo("", 1, 0, 1, 0);
@@ -325,7 +327,11 @@ public class StreamMockEnvironment implements Environment {
 
        @Override
        public void failExternally(Throwable cause) {
-               throw new UnsupportedOperationException("StreamMockEnvironment 
does not support external task failure.");
+               this.wasFailedExternally = true;
+       }
+
+       public boolean wasFailedExternally() {
+               return wasFailedExternally;
        }
 
        @Override

Reply via email to