Repository: flink
Updated Branches:
  refs/heads/release-1.1 fddd89bcd -> bab59dfa7


[FLINK-4329] Fix Streaming File Source Timestamps/Watermarks Handling

This closes #2593.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bab59dfa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bab59dfa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bab59dfa

Branch: refs/heads/release-1.1
Commit: bab59dfa7cf94f4c392c3205ee180e72e1ad7814
Parents: fddd89b
Author: kl0u <[email protected]>
Authored: Tue Oct 4 15:27:59 2016 +0200
Committer: Maximilian Michels <[email protected]>
Committed: Wed Oct 5 17:52:26 2016 +0200

----------------------------------------------------------------------
 .../ContinuousFileMonitoringFunctionITCase.java |  15 +-
 .../hdfstests/ContinuousFileMonitoringTest.java | 240 +++++++++++++++----
 .../source/ContinuousFileReaderOperator.java    | 108 +++++----
 .../api/operators/AsyncExceptionChecker.java    |  27 +++
 .../streaming/api/operators/StreamSource.java   |  53 ++--
 .../util/OneInputStreamOperatorTestHarness.java |   9 +
 6 files changed, 335 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bab59dfa/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
index e6cd5d9..dd4dc5a 100644
--- 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
+++ 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
@@ -130,7 +130,7 @@ public class ContinuousFileMonitoringFunctionITCase extends 
StreamingProgramTest
 
                        TypeInformation<String> typeInfo = 
TypeExtractor.getInputFormatTypes(format);
                        ContinuousFileReaderOperator<String, ?> reader = new 
ContinuousFileReaderOperator<>(format);
-                       TestingSinkFunction sink = new 
TestingSinkFunction(monitoringFunction);
+                       TestingSinkFunction sink = new TestingSinkFunction();
 
                        DataStream<FileInputSplit> splits = 
env.addSource(monitoringFunction);
                        splits.transform("FileSplitReader", typeInfo, 
reader).addSink(sink).setParallelism(1);
@@ -161,16 +161,10 @@ public class ContinuousFileMonitoringFunctionITCase 
extends StreamingProgramTest
 
        private static class TestingSinkFunction extends 
RichSinkFunction<String> {
 
-               private final ContinuousFileMonitoringFunction src;
-
                private int elementCounter = 0;
                private Map<Integer, Integer> elementCounters = new HashMap<>();
                private Map<Integer, List<String>> collectedContent = new 
HashMap<>();
 
-               TestingSinkFunction(ContinuousFileMonitoringFunction 
monitoringFunction) {
-                       this.src = monitoringFunction;
-               }
-
                @Override
                public void open(Configuration parameters) throws Exception {
                        // this sink can only work with DOP 1
@@ -200,13 +194,6 @@ public class ContinuousFileMonitoringFunctionITCase 
extends StreamingProgramTest
                                Assert.assertEquals(cntntStr.toString(), 
expectedContents.get(fileIdx));
                        }
                        expectedContents.clear();
-
-                       src.cancel();
-                       try {
-                               src.close();
-                       } catch (Exception e) {
-                               e.printStackTrace();
-                       }
                }
 
                private int getLineNo(String line) {

http://git-wip-us.apache.org/repos/asf/flink/blob/bab59dfa/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
----------------------------------------------------------------------
diff --git 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
index def9378..9358ba9 100644
--- 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
+++ 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
@@ -27,12 +27,14 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.functions.source.FilePathFilter;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
 import 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
 import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileUtil;
@@ -51,8 +53,8 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 public class ContinuousFileMonitoringTest {
 
@@ -106,10 +108,162 @@ public class ContinuousFileMonitoringTest {
        //                                              TESTS
 
        @Test
+       public void testFileReadingOperatorWithIngestionTime() throws Exception 
{
+               Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
+               Map<Integer, String> expectedFileContents = new HashMap<>();
+
+               for (int i = 0; i < NO_OF_FILES; i++) {
+                       Tuple2<org.apache.hadoop.fs.Path, String> file = 
fillWithData(hdfsURI, "file", i, "This is test line.");
+                       filesCreated.add(file.f0);
+                       expectedFileContents.put(i, file.f1);
+               }
+
+               TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+               TypeInformation<String> typeInfo = 
TypeExtractor.getInputFormatTypes(format);
+
+               final long watermarkInterval = 10;
+               ExecutionConfig executionConfig = new ExecutionConfig();
+               executionConfig.setAutoWatermarkInterval(watermarkInterval);
+
+               ContinuousFileReaderOperator<String, ?> reader = new 
ContinuousFileReaderOperator<>(format);
+               reader.setOutputType(typeInfo, executionConfig);
+
+               final TestTimeServiceProvider timeServiceProvider = new 
TestTimeServiceProvider();
+               final OneInputStreamOperatorTestHarness<FileInputSplit, String> 
tester =
+                       new OneInputStreamOperatorTestHarness<>(reader, 
executionConfig, timeServiceProvider);
+               tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
+               tester.open();
+
+               Assert.assertEquals(TimeCharacteristic.IngestionTime, 
tester.getTimeCharacteristic());
+
+               // test that watermarks are correctly emitted
+
+               ConcurrentLinkedQueue<Object> output = tester.getOutput();
+
+               timeServiceProvider.setCurrentTime(201);
+               Assert.assertTrue(output.peek() instanceof Watermark);
+               Assert.assertEquals(200, ((Watermark) 
output.poll()).getTimestamp());
+
+               timeServiceProvider.setCurrentTime(301);
+               Assert.assertTrue(output.peek() instanceof Watermark);
+               Assert.assertEquals(300, ((Watermark) 
output.poll()).getTimestamp());
+
+               timeServiceProvider.setCurrentTime(401);
+               Assert.assertTrue(output.peek() instanceof Watermark);
+               Assert.assertEquals(400, ((Watermark) 
output.poll()).getTimestamp());
+
+               timeServiceProvider.setCurrentTime(501);
+               Assert.assertTrue(output.peek() instanceof Watermark);
+               Assert.assertEquals(500, ((Watermark) 
output.poll()).getTimestamp());
+
+               Assert.assertTrue(output.isEmpty());
+
+               // create the necessary splits for the test
+               FileInputSplit[] splits = format.createInputSplits(
+                       
reader.getRuntimeContext().getNumberOfParallelSubtasks());
+
+               // and feed them to the operator
+               Map<Integer, List<String>> actualFileContents = new HashMap<>();
+
+               long lastSeenWatermark = Long.MIN_VALUE;
+               int lineCounter = 0;    // counter for the lines read from the 
splits
+               int watermarkCounter = 0;
+
+               for (FileInputSplit split: splits) {
+
+                       // set the next "current processing time".
+                       long nextTimestamp = 
timeServiceProvider.getCurrentProcessingTime() + watermarkInterval;
+                       timeServiceProvider.setCurrentTime(nextTimestamp);
+
+                       // send the next split to be read and wait until it is 
fully read, the +1 is for the watermark.
+                       tester.processElement(new StreamRecord<>(split));
+
+                       // NOTE: the following check works because each file 
fits in one split.
+                       // In other case it would fail and wait forever.
+                       // BUT THIS IS JUST FOR THIS TEST
+                       while (tester.getOutput().isEmpty() || 
tester.getOutput().size() != (LINES_PER_FILE + 1)) {
+                               Thread.sleep(10);
+                       }
+
+                       // verify that the results are the expected
+                       for (Object line: tester.getOutput()) {
+                               if (line instanceof StreamRecord) {
+                                       StreamRecord<String> element = 
(StreamRecord<String>) line;
+                                       lineCounter++;
+
+                                       Assert.assertEquals(nextTimestamp, 
element.getTimestamp());
+
+                                       int fileIdx = 
Character.getNumericValue(element.getValue().charAt(0));
+                                       List<String> content = 
actualFileContents.get(fileIdx);
+                                       if (content == null) {
+                                               content = new ArrayList<>();
+                                               actualFileContents.put(fileIdx, 
content);
+                                       }
+                                       content.add(element.getValue() + "\n");
+                               } else if (line instanceof Watermark) {
+                                       long watermark = ((Watermark) 
line).getTimestamp();
+
+                                       Assert.assertEquals(nextTimestamp - 
(nextTimestamp % watermarkInterval), watermark);
+                                       Assert.assertTrue(watermark > 
lastSeenWatermark);
+                                       watermarkCounter++;
+
+                                       lastSeenWatermark = watermark;
+                               } else {
+                                       Assert.fail("Unknown element in the 
list.");
+                               }
+                       }
+
+                       // clean the output to be ready for the next split
+                       tester.getOutput().clear();
+               }
+
+               // now we are processing one split after the other,
+               // so all the elements must be here by now.
+               Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE, lineCounter);
+
+               // because we expect one watermark per split.
+               Assert.assertEquals(splits.length, watermarkCounter);
+
+               // then close the reader gracefully so that the Long.MAX 
watermark is emitted
+               synchronized (tester.getCheckpointLock()) {
+                       tester.close();
+               }
+
+               for (org.apache.hadoop.fs.Path file: filesCreated) {
+                       hdfs.delete(file, false);
+               }
+
+               // check if the last element is the LongMax watermark (by now 
this must be the only element)
+               Assert.assertEquals(1, tester.getOutput().size());
+               Assert.assertTrue(tester.getOutput().peek() instanceof 
Watermark);
+               Assert.assertEquals(Long.MAX_VALUE, ((Watermark) 
tester.getOutput().poll()).getTimestamp());
+
+               // check if the elements are the expected ones.
+               Assert.assertEquals(expectedFileContents.size(), 
actualFileContents.size());
+               for (Integer fileIdx: expectedFileContents.keySet()) {
+                       Assert.assertTrue("file" + fileIdx + " not found", 
actualFileContents.keySet().contains(fileIdx));
+
+                       List<String> cntnt = actualFileContents.get(fileIdx);
+                       Collections.sort(cntnt, new Comparator<String>() {
+                               @Override
+                               public int compare(String o1, String o2) {
+                                       return getLineNo(o1) - getLineNo(o2);
+                               }
+                       });
+
+                       StringBuilder cntntStr = new StringBuilder();
+                       for (String line: cntnt) {
+                               cntntStr.append(line);
+                       }
+                       Assert.assertEquals(expectedFileContents.get(fileIdx), 
cntntStr.toString());
+               }
+       }
+
+       @Test
        public void testFileReadingOperator() throws Exception {
                Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
                Map<Integer, String> expectedFileContents = new HashMap<>();
-               for(int i = 0; i < NO_OF_FILES; i++) {
+               for (int i = 0; i < NO_OF_FILES; i++) {
                        Tuple2<org.apache.hadoop.fs.Path, String> file = 
fillWithData(hdfsURI, "file", i, "This is test line.");
                        filesCreated.add(file.f0);
                        expectedFileContents.put(i, file.f1);
@@ -119,10 +273,11 @@ public class ContinuousFileMonitoringTest {
                TypeInformation<String> typeInfo = 
TypeExtractor.getInputFormatTypes(format);
 
                ContinuousFileReaderOperator<String, ?> reader = new 
ContinuousFileReaderOperator<>(format);
+               reader.setOutputType(typeInfo, new ExecutionConfig());
+
                OneInputStreamOperatorTestHarness<FileInputSplit, String> 
tester =
                        new OneInputStreamOperatorTestHarness<>(reader);
-
-               reader.setOutputType(typeInfo, new ExecutionConfig());
+               tester.setTimeCharacteristic(TimeCharacteristic.EventTime);
                tester.open();
 
                // create the necessary splits for the test
@@ -130,42 +285,42 @@ public class ContinuousFileMonitoringTest {
                        
reader.getRuntimeContext().getNumberOfParallelSubtasks());
 
                // and feed them to the operator
-               for(FileInputSplit split: splits) {
+               for (FileInputSplit split: splits) {
                        tester.processElement(new StreamRecord<>(split));
                }
 
-               // then close the reader gracefully
+               // then close the reader gracefully (and wait to finish reading)
                synchronized (tester.getCheckpointLock()) {
                        tester.close();
                }
 
-               /*
-               * Given that the reader is multithreaded, the test finishes 
before the reader thread finishes
-               * reading. This results in files being deleted by the test 
before being read, thus throwing an exception.
-               * In addition, even if file deletion happens at the end, the 
results are not ready for testing.
-               * To face this, we wait until all the output is collected or 
until the waiting time exceeds 1000 ms, or 1s.
-               */
+               // the lines received must be the elements in the files +1 for 
for the longMax watermark
+               // we are in event time, which emits no watermarks, so the last 
watermark will mark the
+               // of the input stream.
 
-               long start = System.currentTimeMillis();
-               Queue<Object> output;
-               do {
-                       output = tester.getOutput();
-                       Thread.sleep(50);
-               } while ((output == null || output.size() != NO_OF_FILES * 
LINES_PER_FILE) && (System.currentTimeMillis() - start) < 1000);
+               Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE + 1, 
tester.getOutput().size());
 
                Map<Integer, List<String>> actualFileContents = new HashMap<>();
-               for(Object line: tester.getOutput()) {
-                       StreamRecord<String> element = (StreamRecord<String>) 
line;
-
-                       int fileIdx = 
Character.getNumericValue(element.getValue().charAt(0));
-                       List<String> content = actualFileContents.get(fileIdx);
-                       if(content == null) {
-                               content = new ArrayList<>();
-                               actualFileContents.put(fileIdx, content);
+               Object lastElement = null;
+               for (Object line: tester.getOutput()) {
+                       lastElement = line;
+                       if (line instanceof StreamRecord) {
+                               StreamRecord<String> element = 
(StreamRecord<String>) line;
+
+                               int fileIdx = 
Character.getNumericValue(element.getValue().charAt(0));
+                               List<String> content = 
actualFileContents.get(fileIdx);
+                               if (content == null) {
+                                       content = new ArrayList<>();
+                                       actualFileContents.put(fileIdx, 
content);
+                               }
+                               content.add(element.getValue() + "\n");
                        }
-                       content.add(element.getValue() +"\n");
                }
 
+               // check if the last element is the LongMax watermark
+               Assert.assertTrue(lastElement instanceof Watermark);
+               Assert.assertEquals(Long.MAX_VALUE, ((Watermark) 
lastElement).getTimestamp());
+
                Assert.assertEquals(expectedFileContents.size(), 
actualFileContents.size());
                for (Integer fileIdx: expectedFileContents.keySet()) {
                        Assert.assertTrue("file" + fileIdx + " not found", 
actualFileContents.keySet().contains(fileIdx));
@@ -185,7 +340,7 @@ public class ContinuousFileMonitoringTest {
                        Assert.assertEquals(expectedFileContents.get(fileIdx), 
cntntStr.toString());
                }
 
-               for(org.apache.hadoop.fs.Path file: filesCreated) {
+               for (org.apache.hadoop.fs.Path file: filesCreated) {
                        hdfs.delete(file, false);
                }
        }
@@ -223,13 +378,13 @@ public class ContinuousFileMonitoringTest {
                monitoringFunction.open(new Configuration());
                monitoringFunction.run(new 
TestingSourceContext(monitoringFunction, uniqFilesFound));
 
-               Assert.assertTrue(uniqFilesFound.size() == NO_OF_FILES);
-               for(int i = 0; i < NO_OF_FILES; i++) {
+               Assert.assertEquals(NO_OF_FILES, uniqFilesFound.size());
+               for (int i = 0; i < NO_OF_FILES; i++) {
                        org.apache.hadoop.fs.Path file = new 
org.apache.hadoop.fs.Path(hdfsURI + "/file" + i);
                        
Assert.assertTrue(uniqFilesFound.contains(file.toString()));
                }
 
-               for(org.apache.hadoop.fs.Path file: filesCreated) {
+               for (org.apache.hadoop.fs.Path file: filesCreated) {
                        hdfs.delete(file, false);
                }
        }
@@ -255,9 +410,10 @@ public class ContinuousFileMonitoringTest {
                                uniqFilesFound.wait(7 * INTERVAL);
                        }
                }
+               fc.join();
 
-               Assert.assertTrue(fc.getFilesCreated().size() == NO_OF_FILES);
-               Assert.assertTrue(uniqFilesFound.size() == NO_OF_FILES);
+               Assert.assertEquals(NO_OF_FILES, fc.getFilesCreated().size());
+               Assert.assertEquals(NO_OF_FILES, uniqFilesFound.size());
 
                Set<org.apache.hadoop.fs.Path> filesCreated = 
fc.getFilesCreated();
                Set<String> fileNamesCreated = new HashSet<>();
@@ -265,11 +421,11 @@ public class ContinuousFileMonitoringTest {
                        fileNamesCreated.add(path.toString());
                }
 
-               for(String file: uniqFilesFound) {
+               for (String file: uniqFilesFound) {
                        Assert.assertTrue(fileNamesCreated.contains(file));
                }
 
-               for(org.apache.hadoop.fs.Path file: filesCreated) {
+               for (org.apache.hadoop.fs.Path file: filesCreated) {
                        hdfs.delete(file, false);
                }
        }
@@ -301,7 +457,7 @@ public class ContinuousFileMonitoringTest {
                // wait until all the files are created
                fc.join();
 
-               Assert.assertTrue(filesCreated.size() == NO_OF_FILES);
+               Assert.assertEquals(NO_OF_FILES, filesCreated.size());
 
                Set<String> fileNamesCreated = new HashSet<>();
                for (org.apache.hadoop.fs.Path path: fc.getFilesCreated()) {
@@ -309,11 +465,11 @@ public class ContinuousFileMonitoringTest {
                }
 
                Assert.assertTrue(uniqFilesFound.size() >= 1 && 
uniqFilesFound.size() < fileNamesCreated.size());
-               for(String file: uniqFilesFound) {
+               for (String file: uniqFilesFound) {
                        Assert.assertTrue(fileNamesCreated.contains(file));
                }
 
-               for(org.apache.hadoop.fs.Path file: filesCreated) {
+               for (org.apache.hadoop.fs.Path file: filesCreated) {
                        hdfs.delete(file, false);
                }
        }
@@ -322,7 +478,7 @@ public class ContinuousFileMonitoringTest {
 
        private int getLineNo(String line) {
                String[] tkns = line.split("\\s");
-               Assert.assertTrue(tkns.length == 6);
+               Assert.assertEquals(6, tkns.length);
                return Integer.parseInt(tkns[tkns.length - 1]);
        }
 
@@ -345,7 +501,7 @@ public class ContinuousFileMonitoringTest {
 
                public void run() {
                        try {
-                               for(int i = 0; i < NO_OF_FILES; i++) {
+                               for (int i = 0; i < NO_OF_FILES; i++) {
                                        Tuple2<org.apache.hadoop.fs.Path, 
String> file =
                                                fillWithData(hdfsURI, "file", 
i, "This is test line.");
 
@@ -427,12 +583,12 @@ public class ContinuousFileMonitoringTest {
                assert (hdfs != null);
 
                org.apache.hadoop.fs.Path file = new 
org.apache.hadoop.fs.Path(base + "/" + fileName + fileIdx);
-               Assert.assertTrue (!hdfs.exists(file));
+               Assert.assertFalse(hdfs.exists(file));
 
                org.apache.hadoop.fs.Path tmp = new 
org.apache.hadoop.fs.Path(base + "/." + fileName + fileIdx);
                FSDataOutputStream stream = hdfs.create(tmp);
                StringBuilder str = new StringBuilder();
-               for(int i = 0; i < LINES_PER_FILE; i++) {
+               for (int i = 0; i < LINES_PER_FILE; i++) {
                        String line = fileIdx +": "+ sampleLine + " " + i +"\n";
                        str.append(line);
                        stream.write(line.getBytes());

http://git-wip-us.apache.org/repos/asf/flink/blob/bab59dfa/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index fda5efd..923943f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -29,15 +29,15 @@ import 
org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AsyncExceptionChecker;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
-import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,39 +46,44 @@ import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
 
+import static org.apache.flink.util.Preconditions.checkState;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * This is the operator that reads the {@link FileInputSplit FileInputSplits} 
received from
- * the preceding {@link ContinuousFileMonitoringFunction}. This operator will 
receive just the split descriptors
- * and then read and emit records. This may lead to increased backpressure. To 
avoid this, we have another
- * thread ({@link SplitReader}) actually reading the splits and emitting the 
elements, which is separate from
- * the thread forwarding the checkpoint barriers. The two threads sync on the 
{@link StreamTask#getCheckpointLock()}
- * so that the checkpoints reflect the current state.
+ * the preceding {@link ContinuousFileMonitoringFunction}. This operator can 
have parallelism
+ * greater than 1, contrary to the {@link ContinuousFileMonitoringFunction} 
which has
+ * a parallelism of 1.
+ * <p/>
+ * This operator will receive the split descriptors, put them in a queue, and 
have another
+ * thread read the actual data from the split. This architecture allows the 
separation of the
+ * reading thread, from the one emitting the checkpoint barriers, thus 
removing any potential
+ * back-pressure.
  */
 @Internal
 public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends 
AbstractStreamOperator<OUT>
-       implements OneInputStreamOperator<FileInputSplit, OUT>, 
OutputTypeConfigurable<OUT> {
+       implements OneInputStreamOperator<FileInputSplit, OUT>, 
OutputTypeConfigurable<OUT>, AsyncExceptionChecker {
 
        private static final long serialVersionUID = 1L;
 
        private static final Logger LOG = 
LoggerFactory.getLogger(ContinuousFileReaderOperator.class);
 
+       /** A value that serves as a kill-pill to stop the reading thread when 
no more splits remain. */
        private static final FileInputSplit EOS = new FileInputSplit(-1, null, 
-1, -1, null);
 
-       private transient SplitReader<S, OUT> reader;
-       private transient TimestampedCollector<OUT> collector;
-
        private FileInputFormat<OUT> format;
        private TypeSerializer<OUT> serializer;
 
        private transient Object checkpointLock;
 
+       private transient SplitReader<S, OUT> reader;
+       private transient SourceFunction.SourceContext<OUT> readerContext;
        private Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState;
 
        public ContinuousFileReaderOperator(FileInputFormat<OUT> format) {
@@ -94,25 +99,34 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
        public void open() throws Exception {
                super.open();
 
-               if (this.serializer == null) {
-                       throw new IllegalStateException("The serializer has not 
been set. " +
-                               "Probably the setOutputType() was not called 
and this should not have happened. " +
-                               "Please report it.");
-               }
+               checkState(this.reader == null, "The reader is already 
initialized.");
+               checkState(this.serializer != null, "The serializer has not 
been set. " +
+                       "Probably the setOutputType() was not called. Please 
report it.");
 
                this.format.setRuntimeContext(getRuntimeContext());
                this.format.configure(new Configuration());
-
-               this.collector = new TimestampedCollector<>(output);
                this.checkpointLock = getContainingTask().getCheckpointLock();
 
-               Preconditions.checkState(reader == null, "The reader is already 
initialized.");
-
-               this.reader = new SplitReader<>(format, serializer, collector, 
checkpointLock, readerState);
+               // set the reader context based on the time characteristic
+               final TimeCharacteristic timeCharacteristic = 
getOperatorConfig().getTimeCharacteristic();
+
+               switch (timeCharacteristic) {
+                       case EventTime:
+                               this.readerContext = new 
StreamSource.ManualWatermarkContext<>(this, this.checkpointLock, this.output);
+                               break;
+                       case IngestionTime:
+                               final long watermarkInterval = 
getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
+                               this.readerContext = new 
StreamSource.AutomaticWatermarkContext<>(this, this.checkpointLock, 
this.output, watermarkInterval);
+                               break;
+                       case ProcessingTime:
+                               this.readerContext = new 
StreamSource.NonTimestampContext<>(this, this.checkpointLock, this.output);
+                               break;
+                       default:
+                               throw new 
Exception(String.valueOf(timeCharacteristic));
+               }
 
-               // the readerState is needed for the initialization of the 
reader
-               // when recovering from a failure. So after the initialization,
-               // we can set it to null.
+               // and initialize the split reading thread
+               this.reader = new SplitReader<>(format, serializer, 
readerContext, checkpointLock, readerState);
                this.readerState = null;
                this.reader.start();
        }
@@ -124,7 +138,7 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
 
        @Override
        public void processWatermark(Watermark mark) throws Exception {
-               output.emitWatermark(mark);
+               // we do nothing because we emit our own watermarks if needed.
        }
 
        @Override
@@ -158,7 +172,8 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
                        }
                }
                reader = null;
-               collector = null;
+               readerContext = null;
+               readerState = null;
                format = null;
                serializer = null;
        }
@@ -179,7 +194,21 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
                        // called by the StreamTask while having it.
                        checkpointLock.wait();
                }
-               collector.close();
+
+               // finally if we are closed normally and we are operating on
+               // event or ingestion time, emit the max watermark indicating
+               // the end of the stream, like a normal source would do.
+
+               if (readerContext != null) {
+                       readerContext.emitWatermark(Watermark.MAX_WATERMARK);
+                       readerContext.close();
+               }
+               output.close();
+       }
+
+       @Override
+       public void checkAsyncException() {
+               // do nothing
        }
 
        private class SplitReader<S extends Serializable, OT> extends Thread {
@@ -190,7 +219,7 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
                private final TypeSerializer<OT> serializer;
 
                private final Object checkpointLock;
-               private final TimestampedCollector<OT> collector;
+               private final SourceFunction.SourceContext<OT> readerContext;
 
                private final Queue<FileInputSplit> pendingSplits;
 
@@ -202,16 +231,16 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
 
                private SplitReader(FileInputFormat<OT> format,
                                        TypeSerializer<OT> serializer,
-                                       TimestampedCollector<OT> collector,
+                                       SourceFunction.SourceContext<OT> 
readerContext,
                                        Object checkpointLock,
                                        Tuple3<List<FileInputSplit>, 
FileInputSplit, S> restoredState) {
 
                        this.format = checkNotNull(format, "Unspecified 
FileInputFormat.");
                        this.serializer = checkNotNull(serializer, "Unspecified 
Serializer.");
+                       this.readerContext = checkNotNull(readerContext, 
"Unspecified Reader Context.");
+                       this.checkpointLock = checkNotNull(checkpointLock, 
"Unspecified checkpoint lock.");
 
-                       this.pendingSplits = new LinkedList<>();
-                       this.collector = collector;
-                       this.checkpointLock = checkpointLock;
+                       this.pendingSplits = new ArrayDeque<>();
                        this.isRunning = true;
 
                        // this is the case where a task recovers from a 
previous failed attempt
@@ -221,7 +250,6 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
                                S formatState = restoredState.f2;
 
                                for (FileInputSplit split : pending) {
-                                       
Preconditions.checkArgument(!pendingSplits.contains(split), "Duplicate split 
entry to read: " + split + ".");
                                        pendingSplits.add(split);
                                }
 
@@ -231,9 +259,8 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
                }
 
                private void addSplit(FileInputSplit split) {
-                       Preconditions.checkNotNull(split);
+                       checkNotNull(split, "Cannot insert a null value in the 
pending splits queue.");
                        synchronized (checkpointLock) {
-                               
Preconditions.checkArgument(!pendingSplits.contains(split), "Duplicate split 
entry to read: " + split + ".");
                                this.pendingSplits.add(split);
                        }
                }
@@ -269,7 +296,7 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
                                                                
checkpointableFormat.reopen(currentSplit, restoredFormatState);
                                                        } else {
                                                                // this is the 
case of a non-checkpointable input format that will reprocess the last split.
-                                                               
LOG.info("Format " + this.format.getClass().getName() + " used is not 
checkpointable.");
+                                                               
LOG.info("Format " + this.format.getClass().getName() + " does not support 
checkpointing.");
                                                                
format.open(currentSplit);
                                                        }
                                                        // reset the restored 
state to null for the next iteration
@@ -301,7 +328,7 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
                                                        synchronized 
(checkpointLock) {
                                                                nextElement = 
format.nextRecord(nextElement);
                                                                if (nextElement 
!= null) {
-                                                                       
collector.collect(nextElement);
+                                                                       
readerContext.collect(nextElement);
                                                                } else {
                                                                        break;
                                                                }
@@ -360,7 +387,7 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
                                                        restoredFormatState;
                                        return new Tuple3<>(snapshot, 
currentSplit, formatState);
                                } else {
-                                       LOG.info("The format used is not 
checkpointable. The current input split will be restarted upon recovery.");
+                                       LOG.info("The format does not support 
checkpointing. The current input split will be re-read from start upon 
recovery.");
                                        return new Tuple3<>(snapshot, 
currentSplit, null);
                                }
                        } else {
@@ -431,8 +458,7 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
                S formatState = (S) ois.readObject();
 
                // set the whole reader state for the open() to find.
-               Preconditions.checkState(this.readerState == null,
-                       "The reader state has already been initialized.");
+               checkState(this.readerState == null, "The reader state has 
already been initialized.");
 
                this.readerState = new Tuple3<>(pendingSplits, currSplit, 
formatState);
                div.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/bab59dfa/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AsyncExceptionChecker.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AsyncExceptionChecker.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AsyncExceptionChecker.java
new file mode 100644
index 0000000..12018a7
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AsyncExceptionChecker.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+/**
+ * An interface used by sources that may throw exceptions
+ * asynchronously like the {@link StreamSource}.
+ */
+public interface AsyncExceptionChecker {
+
+       /** Checks if an asynchronous exception was thrown. */
+       void checkAsyncException();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bab59dfa/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 38c948b..e914240 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -34,7 +34,7 @@ import java.util.concurrent.ScheduledFuture;
  */
 @Internal
 public class StreamSource<OUT, SRC extends SourceFunction<OUT>> 
-               extends AbstractUdfStreamOperator<OUT, SRC> implements 
StreamOperator<OUT> {
+               extends AbstractUdfStreamOperator<OUT, SRC> implements 
StreamOperator<OUT>, AsyncExceptionChecker {
 
        private static final long serialVersionUID = 1L;
        
@@ -70,7 +70,7 @@ public class StreamSource<OUT, SRC extends 
SourceFunction<OUT>>
                                ctx = new NonTimestampContext<>(this, 
lockingObject, collector);
                                break;
                        default:
-                               throw new 
Exception(String.valueOf(timeCharacteristic));
+                               throw new Exception("Invalid time 
characteristic: " + String.valueOf(timeCharacteristic));
                }
 
                // copy to a field to give the 'cancel()' method access
@@ -127,7 +127,8 @@ public class StreamSource<OUT, SRC extends 
SourceFunction<OUT>>
         * has caused an exception. If one of these threads caused an 
exception, this method will
         * throw that exception.
         */
-       void checkAsyncException() {
+       @Override
+       public void checkAsyncException() {
                getContainingTask().checkTimerException();
        }
 
@@ -141,12 +142,12 @@ public class StreamSource<OUT, SRC extends 
SourceFunction<OUT>>
         */
        public static class NonTimestampContext<T> implements 
SourceFunction.SourceContext<T> {
 
-               private final StreamSource<?, ?> owner;
+               private final AsyncExceptionChecker owner;
                private final Object lockingObject;
                private final Output<StreamRecord<T>> output;
                private final StreamRecord<T> reuse;
 
-               public NonTimestampContext(StreamSource<?, ?> owner, Object 
lockingObject, Output<StreamRecord<T>> output) {
+               public NonTimestampContext(AsyncExceptionChecker owner, Object 
lockingObject, Output<StreamRecord<T>> output) {
                        this.owner = owner;
                        this.lockingObject = lockingObject;
                        this.output = output;
@@ -188,18 +189,19 @@ public class StreamSource<OUT, SRC extends 
SourceFunction<OUT>>
         */
        public static class AutomaticWatermarkContext<T> implements 
SourceFunction.SourceContext<T> {
 
-               private final StreamSource<?, ?> owner;
+               private final AbstractStreamOperator<T> owner;
                private final Object lockingObject;
                private final Output<StreamRecord<T>> output;
                private final StreamRecord<T> reuse;
-               
+               private final AsyncExceptionChecker source;
+
                private final ScheduledFuture<?> watermarkTimer;
                private final long watermarkInterval;
 
                private volatile long nextWatermarkTime;
 
                public AutomaticWatermarkContext(
-                               final StreamSource<?, ?> owner,
+                               final AbstractStreamOperator<T> owner,
                                final Object lockingObjectParam,
                                final Output<StreamRecord<T>> outputParam,
                                final long watermarkInterval) {
@@ -214,6 +216,16 @@ public class StreamSource<OUT, SRC extends 
SourceFunction<OUT>>
                        this.watermarkInterval = watermarkInterval;
                        this.reuse = new StreamRecord<T>(null);
 
+                       // if it is a source, then we cast and cache it
+                       // here so that we do not have to do it in every 
collect(),
+                       // collectWithTimestamp() and emitWatermark()
+
+                       if (!(owner instanceof AsyncExceptionChecker)) {
+                               throw new IllegalStateException("The 
AutomaticWatermarkContext can only be used " +
+                                       "with sources that implement the 
AsyncExceptionChecker interface.");
+                       }
+                       this.source = (AsyncExceptionChecker) owner;
+
                        long now = owner.getCurrentProcessingTime();
                        this.watermarkTimer = owner.registerTimer(now + 
watermarkInterval,
                                new WatermarkEmittingTask(owner, 
lockingObjectParam, outputParam));
@@ -221,7 +233,7 @@ public class StreamSource<OUT, SRC extends 
SourceFunction<OUT>>
 
                @Override
                public void collect(T element) {
-                       owner.checkAsyncException();
+                       source.checkAsyncException();
                        
                        synchronized (lockingObject) {
                                final long currentTime = 
owner.getCurrentProcessingTime();
@@ -250,7 +262,7 @@ public class StreamSource<OUT, SRC extends 
SourceFunction<OUT>>
 
                @Override
                public void emitWatermark(Watermark mark) {
-                       owner.checkAsyncException();
+                       source.checkAsyncException();
                        
                        if (mark.getTimestamp() == Long.MAX_VALUE) {
                                // allow it since this is the special 
end-watermark that for example the Kafka source emits
@@ -260,7 +272,9 @@ public class StreamSource<OUT, SRC extends 
SourceFunction<OUT>>
                                }
 
                                // we can shutdown the timer now, no watermarks 
will be needed any more
-                               watermarkTimer.cancel(true);
+                               if (watermarkTimer != null) {
+                                       watermarkTimer.cancel(true);
+                               }
                        }
                }
 
@@ -271,16 +285,18 @@ public class StreamSource<OUT, SRC extends 
SourceFunction<OUT>>
 
                @Override
                public void close() {
-                       watermarkTimer.cancel(true);
+                       if (watermarkTimer != null) {
+                               watermarkTimer.cancel(true);
+                       }
                }
 
                private class WatermarkEmittingTask implements Triggerable {
 
-                       private final StreamSource<?, ?> owner;
+                       private final AbstractStreamOperator<T> owner;
                        private final Object lockingObject;
                        private final Output<StreamRecord<T>> output;
 
-                       private WatermarkEmittingTask(StreamSource<?, ?> src, 
Object lock, Output<StreamRecord<T>> output) {
+                       private WatermarkEmittingTask(AbstractStreamOperator<T> 
src, Object lock, Output<StreamRecord<T>> output) {
                                this.owner = src;
                                this.lockingObject = lock;
                                this.output = output;
@@ -299,7 +315,7 @@ public class StreamSource<OUT, SRC extends 
SourceFunction<OUT>>
                                        synchronized (lockingObject) {
                                                if (currentTime > 
nextWatermarkTime) {
                                                        
output.emitWatermark(new Watermark(watermarkTime));
-                                                       nextWatermarkTime += 
watermarkInterval;
+                                                       nextWatermarkTime = 
watermarkTime + watermarkInterval;
                                                }
                                        }
                                }
@@ -320,12 +336,12 @@ public class StreamSource<OUT, SRC extends 
SourceFunction<OUT>>
         */
        public static class ManualWatermarkContext<T> implements 
SourceFunction.SourceContext<T> {
 
-               private final StreamSource<?, ?> owner;
+               private final AsyncExceptionChecker owner;
                private final Object lockingObject;
                private final Output<StreamRecord<T>> output;
                private final StreamRecord<T> reuse;
 
-               public ManualWatermarkContext(StreamSource<?, ?> owner, Object 
lockingObject, Output<StreamRecord<T>> output) {
+               public ManualWatermarkContext(AsyncExceptionChecker owner, 
Object lockingObject, Output<StreamRecord<T>> output) {
                        this.owner = owner;
                        this.lockingObject = lockingObject;
                        this.output = output;
@@ -335,7 +351,6 @@ public class StreamSource<OUT, SRC extends 
SourceFunction<OUT>>
                @Override
                public void collect(T element) {
                        owner.checkAsyncException();
-                       
                        synchronized (lockingObject) {
                                output.collect(reuse.replace(element));
                        }
@@ -344,7 +359,6 @@ public class StreamSource<OUT, SRC extends 
SourceFunction<OUT>>
                @Override
                public void collectWithTimestamp(T element, long timestamp) {
                        owner.checkAsyncException();
-                       
                        synchronized (lockingObject) {
                                output.collect(reuse.replace(element, 
timestamp));
                        }
@@ -353,7 +367,6 @@ public class StreamSource<OUT, SRC extends 
SourceFunction<OUT>>
                @Override
                public void emitWatermark(Watermark mark) {
                        owner.checkAsyncException();
-                       
                        synchronized (lockingObject) {
                                output.emitWatermark(mark);
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/bab59dfa/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 40086c5..5708639 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.AsynchronousKvStateSnapshot;
 import org.apache.flink.runtime.state.AsynchronousStateHandle;
 import org.apache.flink.runtime.state.KvStateSnapshot;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
@@ -156,6 +157,14 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
                }).when(mockTask).getCurrentProcessingTime();
        }
 
+       public void setTimeCharacteristic(TimeCharacteristic 
timeCharacteristic) {
+               this.config.setTimeCharacteristic(timeCharacteristic);
+       }
+
+       public TimeCharacteristic getTimeCharacteristic() {
+               return this.config.getTimeCharacteristic();
+       }
+
        public void setStateBackend(AbstractStateBackend stateBackend) {
                this.stateBackend = stateBackend;
        }

Reply via email to