Repository: flink Updated Branches: refs/heads/release-1.1 fddd89bcd -> bab59dfa7
[FLINK-4329] Fix Streaming File Source Timestamps/Watermarks Handling This closes #2593. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bab59dfa Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bab59dfa Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bab59dfa Branch: refs/heads/release-1.1 Commit: bab59dfa7cf94f4c392c3205ee180e72e1ad7814 Parents: fddd89b Author: kl0u <[email protected]> Authored: Tue Oct 4 15:27:59 2016 +0200 Committer: Maximilian Michels <[email protected]> Committed: Wed Oct 5 17:52:26 2016 +0200 ---------------------------------------------------------------------- .../ContinuousFileMonitoringFunctionITCase.java | 15 +- .../hdfstests/ContinuousFileMonitoringTest.java | 240 +++++++++++++++---- .../source/ContinuousFileReaderOperator.java | 108 +++++---- .../api/operators/AsyncExceptionChecker.java | 27 +++ .../streaming/api/operators/StreamSource.java | 53 ++-- .../util/OneInputStreamOperatorTestHarness.java | 9 + 6 files changed, 335 insertions(+), 117 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/bab59dfa/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java ---------------------------------------------------------------------- diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java index e6cd5d9..dd4dc5a 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java @@ -130,7 +130,7 @@ public class ContinuousFileMonitoringFunctionITCase extends StreamingProgramTest TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format); ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format); - TestingSinkFunction sink = new TestingSinkFunction(monitoringFunction); + TestingSinkFunction sink = new TestingSinkFunction(); DataStream<FileInputSplit> splits = env.addSource(monitoringFunction); splits.transform("FileSplitReader", typeInfo, reader).addSink(sink).setParallelism(1); @@ -161,16 +161,10 @@ public class ContinuousFileMonitoringFunctionITCase extends StreamingProgramTest private static class TestingSinkFunction extends RichSinkFunction<String> { - private final ContinuousFileMonitoringFunction src; - private int elementCounter = 0; private Map<Integer, Integer> elementCounters = new HashMap<>(); private Map<Integer, List<String>> collectedContent = new HashMap<>(); - TestingSinkFunction(ContinuousFileMonitoringFunction monitoringFunction) { - this.src = monitoringFunction; - } - @Override public void open(Configuration parameters) throws Exception { // this sink can only work with DOP 1 @@ -200,13 +194,6 @@ public class ContinuousFileMonitoringFunctionITCase extends StreamingProgramTest Assert.assertEquals(cntntStr.toString(), expectedContents.get(fileIdx)); } expectedContents.clear(); - - src.cancel(); - try { - src.close(); - } catch (Exception e) { - e.printStackTrace(); - } } private int getLineNo(String line) { http://git-wip-us.apache.org/repos/asf/flink/blob/bab59dfa/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java ---------------------------------------------------------------------- diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java index def9378..9358ba9 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java @@ -27,12 +27,14 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.functions.source.FilePathFilter; +import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction; import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator; import org.apache.flink.streaming.api.functions.source.FileProcessingMode; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileUtil; @@ -51,8 +53,8 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; public class ContinuousFileMonitoringTest { @@ -106,10 +108,162 @@ public class ContinuousFileMonitoringTest { // TESTS @Test + public void testFileReadingOperatorWithIngestionTime() throws Exception { + Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>(); + Map<Integer, String> expectedFileContents = new HashMap<>(); + + for (int i = 0; i < NO_OF_FILES; i++) { + Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + } + + TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); + TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format); + + final long watermarkInterval = 10; + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setAutoWatermarkInterval(watermarkInterval); + + ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format); + reader.setOutputType(typeInfo, executionConfig); + + final TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider(); + final OneInputStreamOperatorTestHarness<FileInputSplit, String> tester = + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider); + tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime); + tester.open(); + + Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic()); + + // test that watermarks are correctly emitted + + ConcurrentLinkedQueue<Object> output = tester.getOutput(); + + timeServiceProvider.setCurrentTime(201); + Assert.assertTrue(output.peek() instanceof Watermark); + Assert.assertEquals(200, ((Watermark) output.poll()).getTimestamp()); + + timeServiceProvider.setCurrentTime(301); + Assert.assertTrue(output.peek() instanceof Watermark); + Assert.assertEquals(300, ((Watermark) output.poll()).getTimestamp()); + + timeServiceProvider.setCurrentTime(401); + Assert.assertTrue(output.peek() instanceof Watermark); + Assert.assertEquals(400, ((Watermark) output.poll()).getTimestamp()); + + timeServiceProvider.setCurrentTime(501); + Assert.assertTrue(output.peek() instanceof Watermark); + Assert.assertEquals(500, ((Watermark) output.poll()).getTimestamp()); + + Assert.assertTrue(output.isEmpty()); + + // create the necessary splits for the test + FileInputSplit[] splits = format.createInputSplits( + reader.getRuntimeContext().getNumberOfParallelSubtasks()); + + // and feed them to the operator + Map<Integer, List<String>> actualFileContents = new HashMap<>(); + + long lastSeenWatermark = Long.MIN_VALUE; + int lineCounter = 0; // counter for the lines read from the splits + int watermarkCounter = 0; + + for (FileInputSplit split: splits) { + + // set the next "current processing time". + long nextTimestamp = timeServiceProvider.getCurrentProcessingTime() + watermarkInterval; + timeServiceProvider.setCurrentTime(nextTimestamp); + + // send the next split to be read and wait until it is fully read, the +1 is for the watermark. + tester.processElement(new StreamRecord<>(split)); + + // NOTE: the following check works because each file fits in one split. + // In other case it would fail and wait forever. + // BUT THIS IS JUST FOR THIS TEST + while (tester.getOutput().isEmpty() || tester.getOutput().size() != (LINES_PER_FILE + 1)) { + Thread.sleep(10); + } + + // verify that the results are the expected + for (Object line: tester.getOutput()) { + if (line instanceof StreamRecord) { + StreamRecord<String> element = (StreamRecord<String>) line; + lineCounter++; + + Assert.assertEquals(nextTimestamp, element.getTimestamp()); + + int fileIdx = Character.getNumericValue(element.getValue().charAt(0)); + List<String> content = actualFileContents.get(fileIdx); + if (content == null) { + content = new ArrayList<>(); + actualFileContents.put(fileIdx, content); + } + content.add(element.getValue() + "\n"); + } else if (line instanceof Watermark) { + long watermark = ((Watermark) line).getTimestamp(); + + Assert.assertEquals(nextTimestamp - (nextTimestamp % watermarkInterval), watermark); + Assert.assertTrue(watermark > lastSeenWatermark); + watermarkCounter++; + + lastSeenWatermark = watermark; + } else { + Assert.fail("Unknown element in the list."); + } + } + + // clean the output to be ready for the next split + tester.getOutput().clear(); + } + + // now we are processing one split after the other, + // so all the elements must be here by now. + Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE, lineCounter); + + // because we expect one watermark per split. + Assert.assertEquals(splits.length, watermarkCounter); + + // then close the reader gracefully so that the Long.MAX watermark is emitted + synchronized (tester.getCheckpointLock()) { + tester.close(); + } + + for (org.apache.hadoop.fs.Path file: filesCreated) { + hdfs.delete(file, false); + } + + // check if the last element is the LongMax watermark (by now this must be the only element) + Assert.assertEquals(1, tester.getOutput().size()); + Assert.assertTrue(tester.getOutput().peek() instanceof Watermark); + Assert.assertEquals(Long.MAX_VALUE, ((Watermark) tester.getOutput().poll()).getTimestamp()); + + // check if the elements are the expected ones. + Assert.assertEquals(expectedFileContents.size(), actualFileContents.size()); + for (Integer fileIdx: expectedFileContents.keySet()) { + Assert.assertTrue("file" + fileIdx + " not found", actualFileContents.keySet().contains(fileIdx)); + + List<String> cntnt = actualFileContents.get(fileIdx); + Collections.sort(cntnt, new Comparator<String>() { + @Override + public int compare(String o1, String o2) { + return getLineNo(o1) - getLineNo(o2); + } + }); + + StringBuilder cntntStr = new StringBuilder(); + for (String line: cntnt) { + cntntStr.append(line); + } + Assert.assertEquals(expectedFileContents.get(fileIdx), cntntStr.toString()); + } + } + + @Test public void testFileReadingOperator() throws Exception { Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>(); Map<Integer, String> expectedFileContents = new HashMap<>(); - for(int i = 0; i < NO_OF_FILES; i++) { + for (int i = 0; i < NO_OF_FILES; i++) { Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line."); filesCreated.add(file.f0); expectedFileContents.put(i, file.f1); @@ -119,10 +273,11 @@ public class ContinuousFileMonitoringTest { TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format); ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format); + reader.setOutputType(typeInfo, new ExecutionConfig()); + OneInputStreamOperatorTestHarness<FileInputSplit, String> tester = new OneInputStreamOperatorTestHarness<>(reader); - - reader.setOutputType(typeInfo, new ExecutionConfig()); + tester.setTimeCharacteristic(TimeCharacteristic.EventTime); tester.open(); // create the necessary splits for the test @@ -130,42 +285,42 @@ public class ContinuousFileMonitoringTest { reader.getRuntimeContext().getNumberOfParallelSubtasks()); // and feed them to the operator - for(FileInputSplit split: splits) { + for (FileInputSplit split: splits) { tester.processElement(new StreamRecord<>(split)); } - // then close the reader gracefully + // then close the reader gracefully (and wait to finish reading) synchronized (tester.getCheckpointLock()) { tester.close(); } - /* - * Given that the reader is multithreaded, the test finishes before the reader thread finishes - * reading. This results in files being deleted by the test before being read, thus throwing an exception. - * In addition, even if file deletion happens at the end, the results are not ready for testing. - * To face this, we wait until all the output is collected or until the waiting time exceeds 1000 ms, or 1s. - */ + // the lines received must be the elements in the files +1 for for the longMax watermark + // we are in event time, which emits no watermarks, so the last watermark will mark the + // of the input stream. - long start = System.currentTimeMillis(); - Queue<Object> output; - do { - output = tester.getOutput(); - Thread.sleep(50); - } while ((output == null || output.size() != NO_OF_FILES * LINES_PER_FILE) && (System.currentTimeMillis() - start) < 1000); + Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE + 1, tester.getOutput().size()); Map<Integer, List<String>> actualFileContents = new HashMap<>(); - for(Object line: tester.getOutput()) { - StreamRecord<String> element = (StreamRecord<String>) line; - - int fileIdx = Character.getNumericValue(element.getValue().charAt(0)); - List<String> content = actualFileContents.get(fileIdx); - if(content == null) { - content = new ArrayList<>(); - actualFileContents.put(fileIdx, content); + Object lastElement = null; + for (Object line: tester.getOutput()) { + lastElement = line; + if (line instanceof StreamRecord) { + StreamRecord<String> element = (StreamRecord<String>) line; + + int fileIdx = Character.getNumericValue(element.getValue().charAt(0)); + List<String> content = actualFileContents.get(fileIdx); + if (content == null) { + content = new ArrayList<>(); + actualFileContents.put(fileIdx, content); + } + content.add(element.getValue() + "\n"); } - content.add(element.getValue() +"\n"); } + // check if the last element is the LongMax watermark + Assert.assertTrue(lastElement instanceof Watermark); + Assert.assertEquals(Long.MAX_VALUE, ((Watermark) lastElement).getTimestamp()); + Assert.assertEquals(expectedFileContents.size(), actualFileContents.size()); for (Integer fileIdx: expectedFileContents.keySet()) { Assert.assertTrue("file" + fileIdx + " not found", actualFileContents.keySet().contains(fileIdx)); @@ -185,7 +340,7 @@ public class ContinuousFileMonitoringTest { Assert.assertEquals(expectedFileContents.get(fileIdx), cntntStr.toString()); } - for(org.apache.hadoop.fs.Path file: filesCreated) { + for (org.apache.hadoop.fs.Path file: filesCreated) { hdfs.delete(file, false); } } @@ -223,13 +378,13 @@ public class ContinuousFileMonitoringTest { monitoringFunction.open(new Configuration()); monitoringFunction.run(new TestingSourceContext(monitoringFunction, uniqFilesFound)); - Assert.assertTrue(uniqFilesFound.size() == NO_OF_FILES); - for(int i = 0; i < NO_OF_FILES; i++) { + Assert.assertEquals(NO_OF_FILES, uniqFilesFound.size()); + for (int i = 0; i < NO_OF_FILES; i++) { org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(hdfsURI + "/file" + i); Assert.assertTrue(uniqFilesFound.contains(file.toString())); } - for(org.apache.hadoop.fs.Path file: filesCreated) { + for (org.apache.hadoop.fs.Path file: filesCreated) { hdfs.delete(file, false); } } @@ -255,9 +410,10 @@ public class ContinuousFileMonitoringTest { uniqFilesFound.wait(7 * INTERVAL); } } + fc.join(); - Assert.assertTrue(fc.getFilesCreated().size() == NO_OF_FILES); - Assert.assertTrue(uniqFilesFound.size() == NO_OF_FILES); + Assert.assertEquals(NO_OF_FILES, fc.getFilesCreated().size()); + Assert.assertEquals(NO_OF_FILES, uniqFilesFound.size()); Set<org.apache.hadoop.fs.Path> filesCreated = fc.getFilesCreated(); Set<String> fileNamesCreated = new HashSet<>(); @@ -265,11 +421,11 @@ public class ContinuousFileMonitoringTest { fileNamesCreated.add(path.toString()); } - for(String file: uniqFilesFound) { + for (String file: uniqFilesFound) { Assert.assertTrue(fileNamesCreated.contains(file)); } - for(org.apache.hadoop.fs.Path file: filesCreated) { + for (org.apache.hadoop.fs.Path file: filesCreated) { hdfs.delete(file, false); } } @@ -301,7 +457,7 @@ public class ContinuousFileMonitoringTest { // wait until all the files are created fc.join(); - Assert.assertTrue(filesCreated.size() == NO_OF_FILES); + Assert.assertEquals(NO_OF_FILES, filesCreated.size()); Set<String> fileNamesCreated = new HashSet<>(); for (org.apache.hadoop.fs.Path path: fc.getFilesCreated()) { @@ -309,11 +465,11 @@ public class ContinuousFileMonitoringTest { } Assert.assertTrue(uniqFilesFound.size() >= 1 && uniqFilesFound.size() < fileNamesCreated.size()); - for(String file: uniqFilesFound) { + for (String file: uniqFilesFound) { Assert.assertTrue(fileNamesCreated.contains(file)); } - for(org.apache.hadoop.fs.Path file: filesCreated) { + for (org.apache.hadoop.fs.Path file: filesCreated) { hdfs.delete(file, false); } } @@ -322,7 +478,7 @@ public class ContinuousFileMonitoringTest { private int getLineNo(String line) { String[] tkns = line.split("\\s"); - Assert.assertTrue(tkns.length == 6); + Assert.assertEquals(6, tkns.length); return Integer.parseInt(tkns[tkns.length - 1]); } @@ -345,7 +501,7 @@ public class ContinuousFileMonitoringTest { public void run() { try { - for(int i = 0; i < NO_OF_FILES; i++) { + for (int i = 0; i < NO_OF_FILES; i++) { Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line."); @@ -427,12 +583,12 @@ public class ContinuousFileMonitoringTest { assert (hdfs != null); org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(base + "/" + fileName + fileIdx); - Assert.assertTrue (!hdfs.exists(file)); + Assert.assertFalse(hdfs.exists(file)); org.apache.hadoop.fs.Path tmp = new org.apache.hadoop.fs.Path(base + "/." + fileName + fileIdx); FSDataOutputStream stream = hdfs.create(tmp); StringBuilder str = new StringBuilder(); - for(int i = 0; i < LINES_PER_FILE; i++) { + for (int i = 0; i < LINES_PER_FILE; i++) { String line = fileIdx +": "+ sampleLine + " " + i +"\n"; str.append(line); stream.write(line.getBytes()); http://git-wip-us.apache.org/repos/asf/flink/blob/bab59dfa/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java index fda5efd..923943f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java @@ -29,15 +29,15 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.AsyncExceptionChecker; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; -import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.StreamTaskState; -import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,39 +46,44 @@ import java.io.InputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Queue; +import static org.apache.flink.util.Preconditions.checkState; import static org.apache.flink.util.Preconditions.checkNotNull; /** * This is the operator that reads the {@link FileInputSplit FileInputSplits} received from - * the preceding {@link ContinuousFileMonitoringFunction}. This operator will receive just the split descriptors - * and then read and emit records. This may lead to increased backpressure. To avoid this, we have another - * thread ({@link SplitReader}) actually reading the splits and emitting the elements, which is separate from - * the thread forwarding the checkpoint barriers. The two threads sync on the {@link StreamTask#getCheckpointLock()} - * so that the checkpoints reflect the current state. + * the preceding {@link ContinuousFileMonitoringFunction}. This operator can have parallelism + * greater than 1, contrary to the {@link ContinuousFileMonitoringFunction} which has + * a parallelism of 1. + * <p/> + * This operator will receive the split descriptors, put them in a queue, and have another + * thread read the actual data from the split. This architecture allows the separation of the + * reading thread, from the one emitting the checkpoint barriers, thus removing any potential + * back-pressure. */ @Internal public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends AbstractStreamOperator<OUT> - implements OneInputStreamOperator<FileInputSplit, OUT>, OutputTypeConfigurable<OUT> { + implements OneInputStreamOperator<FileInputSplit, OUT>, OutputTypeConfigurable<OUT>, AsyncExceptionChecker { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileReaderOperator.class); + /** A value that serves as a kill-pill to stop the reading thread when no more splits remain. */ private static final FileInputSplit EOS = new FileInputSplit(-1, null, -1, -1, null); - private transient SplitReader<S, OUT> reader; - private transient TimestampedCollector<OUT> collector; - private FileInputFormat<OUT> format; private TypeSerializer<OUT> serializer; private transient Object checkpointLock; + private transient SplitReader<S, OUT> reader; + private transient SourceFunction.SourceContext<OUT> readerContext; private Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState; public ContinuousFileReaderOperator(FileInputFormat<OUT> format) { @@ -94,25 +99,34 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A public void open() throws Exception { super.open(); - if (this.serializer == null) { - throw new IllegalStateException("The serializer has not been set. " + - "Probably the setOutputType() was not called and this should not have happened. " + - "Please report it."); - } + checkState(this.reader == null, "The reader is already initialized."); + checkState(this.serializer != null, "The serializer has not been set. " + + "Probably the setOutputType() was not called. Please report it."); this.format.setRuntimeContext(getRuntimeContext()); this.format.configure(new Configuration()); - - this.collector = new TimestampedCollector<>(output); this.checkpointLock = getContainingTask().getCheckpointLock(); - Preconditions.checkState(reader == null, "The reader is already initialized."); - - this.reader = new SplitReader<>(format, serializer, collector, checkpointLock, readerState); + // set the reader context based on the time characteristic + final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic(); + + switch (timeCharacteristic) { + case EventTime: + this.readerContext = new StreamSource.ManualWatermarkContext<>(this, this.checkpointLock, this.output); + break; + case IngestionTime: + final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(); + this.readerContext = new StreamSource.AutomaticWatermarkContext<>(this, this.checkpointLock, this.output, watermarkInterval); + break; + case ProcessingTime: + this.readerContext = new StreamSource.NonTimestampContext<>(this, this.checkpointLock, this.output); + break; + default: + throw new Exception(String.valueOf(timeCharacteristic)); + } - // the readerState is needed for the initialization of the reader - // when recovering from a failure. So after the initialization, - // we can set it to null. + // and initialize the split reading thread + this.reader = new SplitReader<>(format, serializer, readerContext, checkpointLock, readerState); this.readerState = null; this.reader.start(); } @@ -124,7 +138,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A @Override public void processWatermark(Watermark mark) throws Exception { - output.emitWatermark(mark); + // we do nothing because we emit our own watermarks if needed. } @Override @@ -158,7 +172,8 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A } } reader = null; - collector = null; + readerContext = null; + readerState = null; format = null; serializer = null; } @@ -179,7 +194,21 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A // called by the StreamTask while having it. checkpointLock.wait(); } - collector.close(); + + // finally if we are closed normally and we are operating on + // event or ingestion time, emit the max watermark indicating + // the end of the stream, like a normal source would do. + + if (readerContext != null) { + readerContext.emitWatermark(Watermark.MAX_WATERMARK); + readerContext.close(); + } + output.close(); + } + + @Override + public void checkAsyncException() { + // do nothing } private class SplitReader<S extends Serializable, OT> extends Thread { @@ -190,7 +219,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A private final TypeSerializer<OT> serializer; private final Object checkpointLock; - private final TimestampedCollector<OT> collector; + private final SourceFunction.SourceContext<OT> readerContext; private final Queue<FileInputSplit> pendingSplits; @@ -202,16 +231,16 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A private SplitReader(FileInputFormat<OT> format, TypeSerializer<OT> serializer, - TimestampedCollector<OT> collector, + SourceFunction.SourceContext<OT> readerContext, Object checkpointLock, Tuple3<List<FileInputSplit>, FileInputSplit, S> restoredState) { this.format = checkNotNull(format, "Unspecified FileInputFormat."); this.serializer = checkNotNull(serializer, "Unspecified Serializer."); + this.readerContext = checkNotNull(readerContext, "Unspecified Reader Context."); + this.checkpointLock = checkNotNull(checkpointLock, "Unspecified checkpoint lock."); - this.pendingSplits = new LinkedList<>(); - this.collector = collector; - this.checkpointLock = checkpointLock; + this.pendingSplits = new ArrayDeque<>(); this.isRunning = true; // this is the case where a task recovers from a previous failed attempt @@ -221,7 +250,6 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A S formatState = restoredState.f2; for (FileInputSplit split : pending) { - Preconditions.checkArgument(!pendingSplits.contains(split), "Duplicate split entry to read: " + split + "."); pendingSplits.add(split); } @@ -231,9 +259,8 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A } private void addSplit(FileInputSplit split) { - Preconditions.checkNotNull(split); + checkNotNull(split, "Cannot insert a null value in the pending splits queue."); synchronized (checkpointLock) { - Preconditions.checkArgument(!pendingSplits.contains(split), "Duplicate split entry to read: " + split + "."); this.pendingSplits.add(split); } } @@ -269,7 +296,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A checkpointableFormat.reopen(currentSplit, restoredFormatState); } else { // this is the case of a non-checkpointable input format that will reprocess the last split. - LOG.info("Format " + this.format.getClass().getName() + " used is not checkpointable."); + LOG.info("Format " + this.format.getClass().getName() + " does not support checkpointing."); format.open(currentSplit); } // reset the restored state to null for the next iteration @@ -301,7 +328,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A synchronized (checkpointLock) { nextElement = format.nextRecord(nextElement); if (nextElement != null) { - collector.collect(nextElement); + readerContext.collect(nextElement); } else { break; } @@ -360,7 +387,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A restoredFormatState; return new Tuple3<>(snapshot, currentSplit, formatState); } else { - LOG.info("The format used is not checkpointable. The current input split will be restarted upon recovery."); + LOG.info("The format does not support checkpointing. The current input split will be re-read from start upon recovery."); return new Tuple3<>(snapshot, currentSplit, null); } } else { @@ -431,8 +458,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A S formatState = (S) ois.readObject(); // set the whole reader state for the open() to find. - Preconditions.checkState(this.readerState == null, - "The reader state has already been initialized."); + checkState(this.readerState == null, "The reader state has already been initialized."); this.readerState = new Tuple3<>(pendingSplits, currSplit, formatState); div.close(); http://git-wip-us.apache.org/repos/asf/flink/blob/bab59dfa/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AsyncExceptionChecker.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AsyncExceptionChecker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AsyncExceptionChecker.java new file mode 100644 index 0000000..12018a7 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AsyncExceptionChecker.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + +/** + * An interface used by sources that may throw exceptions + * asynchronously like the {@link StreamSource}. + */ +public interface AsyncExceptionChecker { + + /** Checks if an asynchronous exception was thrown. */ + void checkAsyncException(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/bab59dfa/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java index 38c948b..e914240 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java @@ -34,7 +34,7 @@ import java.util.concurrent.ScheduledFuture; */ @Internal public class StreamSource<OUT, SRC extends SourceFunction<OUT>> - extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT> { + extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT>, AsyncExceptionChecker { private static final long serialVersionUID = 1L; @@ -70,7 +70,7 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>> ctx = new NonTimestampContext<>(this, lockingObject, collector); break; default: - throw new Exception(String.valueOf(timeCharacteristic)); + throw new Exception("Invalid time characteristic: " + String.valueOf(timeCharacteristic)); } // copy to a field to give the 'cancel()' method access @@ -127,7 +127,8 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>> * has caused an exception. If one of these threads caused an exception, this method will * throw that exception. */ - void checkAsyncException() { + @Override + public void checkAsyncException() { getContainingTask().checkTimerException(); } @@ -141,12 +142,12 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>> */ public static class NonTimestampContext<T> implements SourceFunction.SourceContext<T> { - private final StreamSource<?, ?> owner; + private final AsyncExceptionChecker owner; private final Object lockingObject; private final Output<StreamRecord<T>> output; private final StreamRecord<T> reuse; - public NonTimestampContext(StreamSource<?, ?> owner, Object lockingObject, Output<StreamRecord<T>> output) { + public NonTimestampContext(AsyncExceptionChecker owner, Object lockingObject, Output<StreamRecord<T>> output) { this.owner = owner; this.lockingObject = lockingObject; this.output = output; @@ -188,18 +189,19 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>> */ public static class AutomaticWatermarkContext<T> implements SourceFunction.SourceContext<T> { - private final StreamSource<?, ?> owner; + private final AbstractStreamOperator<T> owner; private final Object lockingObject; private final Output<StreamRecord<T>> output; private final StreamRecord<T> reuse; - + private final AsyncExceptionChecker source; + private final ScheduledFuture<?> watermarkTimer; private final long watermarkInterval; private volatile long nextWatermarkTime; public AutomaticWatermarkContext( - final StreamSource<?, ?> owner, + final AbstractStreamOperator<T> owner, final Object lockingObjectParam, final Output<StreamRecord<T>> outputParam, final long watermarkInterval) { @@ -214,6 +216,16 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>> this.watermarkInterval = watermarkInterval; this.reuse = new StreamRecord<T>(null); + // if it is a source, then we cast and cache it + // here so that we do not have to do it in every collect(), + // collectWithTimestamp() and emitWatermark() + + if (!(owner instanceof AsyncExceptionChecker)) { + throw new IllegalStateException("The AutomaticWatermarkContext can only be used " + + "with sources that implement the AsyncExceptionChecker interface."); + } + this.source = (AsyncExceptionChecker) owner; + long now = owner.getCurrentProcessingTime(); this.watermarkTimer = owner.registerTimer(now + watermarkInterval, new WatermarkEmittingTask(owner, lockingObjectParam, outputParam)); @@ -221,7 +233,7 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>> @Override public void collect(T element) { - owner.checkAsyncException(); + source.checkAsyncException(); synchronized (lockingObject) { final long currentTime = owner.getCurrentProcessingTime(); @@ -250,7 +262,7 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>> @Override public void emitWatermark(Watermark mark) { - owner.checkAsyncException(); + source.checkAsyncException(); if (mark.getTimestamp() == Long.MAX_VALUE) { // allow it since this is the special end-watermark that for example the Kafka source emits @@ -260,7 +272,9 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>> } // we can shutdown the timer now, no watermarks will be needed any more - watermarkTimer.cancel(true); + if (watermarkTimer != null) { + watermarkTimer.cancel(true); + } } } @@ -271,16 +285,18 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>> @Override public void close() { - watermarkTimer.cancel(true); + if (watermarkTimer != null) { + watermarkTimer.cancel(true); + } } private class WatermarkEmittingTask implements Triggerable { - private final StreamSource<?, ?> owner; + private final AbstractStreamOperator<T> owner; private final Object lockingObject; private final Output<StreamRecord<T>> output; - private WatermarkEmittingTask(StreamSource<?, ?> src, Object lock, Output<StreamRecord<T>> output) { + private WatermarkEmittingTask(AbstractStreamOperator<T> src, Object lock, Output<StreamRecord<T>> output) { this.owner = src; this.lockingObject = lock; this.output = output; @@ -299,7 +315,7 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>> synchronized (lockingObject) { if (currentTime > nextWatermarkTime) { output.emitWatermark(new Watermark(watermarkTime)); - nextWatermarkTime += watermarkInterval; + nextWatermarkTime = watermarkTime + watermarkInterval; } } } @@ -320,12 +336,12 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>> */ public static class ManualWatermarkContext<T> implements SourceFunction.SourceContext<T> { - private final StreamSource<?, ?> owner; + private final AsyncExceptionChecker owner; private final Object lockingObject; private final Output<StreamRecord<T>> output; private final StreamRecord<T> reuse; - public ManualWatermarkContext(StreamSource<?, ?> owner, Object lockingObject, Output<StreamRecord<T>> output) { + public ManualWatermarkContext(AsyncExceptionChecker owner, Object lockingObject, Output<StreamRecord<T>> output) { this.owner = owner; this.lockingObject = lockingObject; this.output = output; @@ -335,7 +351,6 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>> @Override public void collect(T element) { owner.checkAsyncException(); - synchronized (lockingObject) { output.collect(reuse.replace(element)); } @@ -344,7 +359,6 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>> @Override public void collectWithTimestamp(T element, long timestamp) { owner.checkAsyncException(); - synchronized (lockingObject) { output.collect(reuse.replace(element, timestamp)); } @@ -353,7 +367,6 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>> @Override public void emitWatermark(Watermark mark) { owner.checkAsyncException(); - synchronized (lockingObject) { output.emitWatermark(mark); } http://git-wip-us.apache.org/repos/asf/flink/blob/bab59dfa/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java index 40086c5..5708639 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.runtime.state.AsynchronousKvStateSnapshot; import org.apache.flink.runtime.state.AsynchronousStateHandle; import org.apache.flink.runtime.state.KvStateSnapshot; +import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; @@ -156,6 +157,14 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> { }).when(mockTask).getCurrentProcessingTime(); } + public void setTimeCharacteristic(TimeCharacteristic timeCharacteristic) { + this.config.setTimeCharacteristic(timeCharacteristic); + } + + public TimeCharacteristic getTimeCharacteristic() { + return this.config.getTimeCharacteristic(); + } + public void setStateBackend(AbstractStateBackend stateBackend) { this.stateBackend = stateBackend; }
