[FLINK-4329] [streaming api] Fix Streaming File Source Timestamps/Watermarks Handling
This closes #2546 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8ff451be Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8ff451be Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8ff451be Branch: refs/heads/master Commit: 8ff451bec58e9f5800eb77c74c1d7457b776cc94 Parents: c62776f Author: kl0u <[email protected]> Authored: Thu Aug 25 17:38:49 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Wed Oct 5 19:36:13 2016 +0200 ---------------------------------------------------------------------- .../state/RocksDBAsyncSnapshotTest.java | 11 +- .../ContinuousFileMonitoringFunctionITCase.java | 17 +- .../hdfstests/ContinuousFileMonitoringTest.java | 209 ++++++++++++-- .../fs/bucketing/BucketingSinkTest.java | 4 +- .../source/ContinuousFileReaderOperator.java | 96 ++++--- .../streaming/api/operators/StreamSource.java | 275 +----------------- .../api/operators/StreamSourceContexts.java | 284 +++++++++++++++++++ .../runtime/tasks/AsyncExceptionHandler.java | 8 +- .../tasks/DefaultTimeServiceProvider.java | 11 +- .../runtime/tasks/OneInputStreamTask.java | 2 +- .../streaming/runtime/tasks/StreamTask.java | 54 +--- .../runtime/tasks/TwoInputStreamTask.java | 2 +- .../operators/StreamSourceOperatorTest.java | 17 +- .../runtime/operators/TimeProviderTest.java | 79 ++++-- ...AlignedProcessingTimeWindowOperatorTest.java | 34 ++- ...AlignedProcessingTimeWindowOperatorTest.java | 36 ++- .../runtime/tasks/StreamMockEnvironment.java | 8 +- .../KeyedOneInputStreamOperatorTestHarness.java | 4 +- .../util/OneInputStreamOperatorTestHarness.java | 23 +- 19 files changed, 694 insertions(+), 480 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java index bccbabc..2ebd84a 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java @@ -86,7 +86,7 @@ public class RocksDBAsyncSnapshotTest { } /** - * This ensures that asynchronous state handles are actually materialized asynchonously. + * This ensures that asynchronous state handles are actually materialized asynchronously. * * <p>We use latches to block at various stages and see if the code still continues through * the parts that are not asynchronous. If the checkpoint is not done asynchronously the @@ -168,7 +168,6 @@ public class RocksDBAsyncSnapshotTest { while (!field.getBoolean(task)) { Thread.sleep(10); } - } } @@ -189,7 +188,9 @@ public class RocksDBAsyncSnapshotTest { Assert.assertTrue(threadPool.awaitTermination(60_000, TimeUnit.MILLISECONDS)); testHarness.waitForTaskCompletion(); - task.checkTimerException(); + if (mockEnv.wasFailedExternally()) { + Assert.fail("Unexpected exception during execution."); + } } /** @@ -261,8 +262,10 @@ public class RocksDBAsyncSnapshotTest { threadPool.shutdown(); Assert.assertTrue(threadPool.awaitTermination(60_000, TimeUnit.MILLISECONDS)); testHarness.waitForTaskCompletion(); - task.checkTimerException(); + if (mockEnv.wasFailedExternally()) { + throw new AsynchronousException(new InterruptedException("Exception was thrown as expected.")); + } Assert.fail("Operation completed. Cancel failed."); } catch (Exception expected) { AsynchronousException asynchronousException = null; http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java ---------------------------------------------------------------------- diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java index 663345c..079bf04 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java @@ -120,7 +120,7 @@ public class ContinuousFileMonitoringFunctionITCase extends StreamingProgramTest try { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); + env.setParallelism(4); format.setFilesFilter(FilePathFilter.createDefaultFilter()); ContinuousFileMonitoringFunction<String> monitoringFunction = @@ -130,7 +130,7 @@ public class ContinuousFileMonitoringFunctionITCase extends StreamingProgramTest TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format); ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format); - TestingSinkFunction sink = new TestingSinkFunction(monitoringFunction); + TestingSinkFunction sink = new TestingSinkFunction(); DataStream<FileInputSplit> splits = env.addSource(monitoringFunction); splits.transform("FileSplitReader", typeInfo, reader).addSink(sink).setParallelism(1); @@ -161,16 +161,10 @@ public class ContinuousFileMonitoringFunctionITCase extends StreamingProgramTest private static class TestingSinkFunction extends RichSinkFunction<String> { - private final ContinuousFileMonitoringFunction src; - private int elementCounter = 0; private Map<Integer, Integer> elementCounters = new HashMap<>(); private Map<Integer, List<String>> collectedContent = new HashMap<>(); - TestingSinkFunction(ContinuousFileMonitoringFunction monitoringFunction) { - this.src = monitoringFunction; - } - @Override public void open(Configuration parameters) throws Exception { // this sink can only work with DOP 1 @@ -200,13 +194,6 @@ public class ContinuousFileMonitoringFunctionITCase extends StreamingProgramTest Assert.assertEquals(cntntStr.toString(), expectedContents.get(fileIdx)); } expectedContents.clear(); - - src.cancel(); - try { - src.close(); - } catch (Exception e) { - e.printStackTrace(); - } } private int getLineNo(String line) { http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java ---------------------------------------------------------------------- diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java index 8a700f5..36b5c5e 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java @@ -27,12 +27,14 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; import org.apache.flink.api.common.io.FilePathFilter; +import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction; import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator; import org.apache.flink.streaming.api.functions.source.FileProcessingMode; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileUtil; @@ -51,7 +53,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.Set; public class ContinuousFileMonitoringTest { @@ -106,6 +107,155 @@ public class ContinuousFileMonitoringTest { // TESTS @Test + public void testFileReadingOperatorWithIngestionTime() throws Exception { + Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>(); + Map<Integer, String> expectedFileContents = new HashMap<>(); + + for(int i = 0; i < NO_OF_FILES; i++) { + Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + } + + TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); + TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format); + + final long watermarkInterval = 10; + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setAutoWatermarkInterval(watermarkInterval); + + ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format); + reader.setOutputType(typeInfo, executionConfig); + + final TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider(); + final OneInputStreamOperatorTestHarness<FileInputSplit, String> tester = + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider); + tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime); + tester.open(); + + Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic()); + + // test that watermarks are correctly emitted + + timeServiceProvider.setCurrentTime(201); + timeServiceProvider.setCurrentTime(301); + timeServiceProvider.setCurrentTime(401); + timeServiceProvider.setCurrentTime(501); + + int i = 0; + for(Object line: tester.getOutput()) { + if (!(line instanceof Watermark)) { + Assert.fail("Only watermarks are expected here "); + } + Watermark w = (Watermark) line; + Assert.assertEquals(200 + (i * 100), w.getTimestamp()); + i++; + } + + // clear the output to get the elements only and the final watermark + tester.getOutput().clear(); + Assert.assertEquals(0, tester.getOutput().size()); + + // create the necessary splits for the test + FileInputSplit[] splits = format.createInputSplits( + reader.getRuntimeContext().getNumberOfParallelSubtasks()); + + // and feed them to the operator + Map<Integer, List<String>> actualFileContents = new HashMap<>(); + + long lastSeenWatermark = Long.MIN_VALUE; + int lineCounter = 0; // counter for the lines read from the splits + int watermarkCounter = 0; + + for(FileInputSplit split: splits) { + + // set the next "current processing time". + long nextTimestamp = timeServiceProvider.getCurrentProcessingTime() + watermarkInterval; + timeServiceProvider.setCurrentTime(nextTimestamp); + + // send the next split to be read and wait until it is fully read. + tester.processElement(new StreamRecord<>(split)); + synchronized (tester.getCheckpointLock()) { + while (tester.getOutput().isEmpty() || tester.getOutput().size() != (LINES_PER_FILE + 1)) { + tester.getCheckpointLock().wait(10); + } + } + + // verify that the results are the expected + for(Object line: tester.getOutput()) { + if (line instanceof StreamRecord) { + StreamRecord<String> element = (StreamRecord<String>) line; + lineCounter++; + + Assert.assertEquals(nextTimestamp, element.getTimestamp()); + + int fileIdx = Character.getNumericValue(element.getValue().charAt(0)); + List<String> content = actualFileContents.get(fileIdx); + if (content == null) { + content = new ArrayList<>(); + actualFileContents.put(fileIdx, content); + } + content.add(element.getValue() + "\n"); + } else if (line instanceof Watermark) { + long watermark = ((Watermark) line).getTimestamp(); + + Assert.assertEquals(nextTimestamp - (nextTimestamp % watermarkInterval), watermark); + Assert.assertTrue(watermark > lastSeenWatermark); + watermarkCounter++; + + lastSeenWatermark = watermark; + } else { + Assert.fail("Unknown element in the list."); + } + } + + // clean the output to be ready for the next split + tester.getOutput().clear(); + } + + // now we are processing one split after the other, + // so all the elements must be here by now. + Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE, lineCounter); + + // because we expect one watermark per split. + Assert.assertEquals(NO_OF_FILES, watermarkCounter); + + // then close the reader gracefully so that the Long.MAX watermark is emitted + synchronized (tester.getCheckpointLock()) { + tester.close(); + } + + for(org.apache.hadoop.fs.Path file: filesCreated) { + hdfs.delete(file, false); + } + + // check if the last element is the LongMax watermark (by now this must be the only element) + Assert.assertEquals(1, tester.getOutput().size()); + Assert.assertTrue(tester.getOutput().peek() instanceof Watermark); + Assert.assertEquals(Long.MAX_VALUE, ((Watermark) tester.getOutput().poll()).getTimestamp()); + + // check if the elements are the expected ones. + Assert.assertEquals(expectedFileContents.size(), actualFileContents.size()); + for (Integer fileIdx: expectedFileContents.keySet()) { + Assert.assertTrue("file" + fileIdx + " not found", actualFileContents.keySet().contains(fileIdx)); + + List<String> cntnt = actualFileContents.get(fileIdx); + Collections.sort(cntnt, new Comparator<String>() { + @Override + public int compare(String o1, String o2) { + return getLineNo(o1) - getLineNo(o2); + } + }); + + StringBuilder cntntStr = new StringBuilder(); + for (String line: cntnt) { + cntntStr.append(line); + } + Assert.assertEquals(expectedFileContents.get(fileIdx), cntntStr.toString()); + } + } + + @Test public void testFileReadingOperator() throws Exception { Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>(); Map<Integer, String> expectedFileContents = new HashMap<>(); @@ -119,10 +269,11 @@ public class ContinuousFileMonitoringTest { TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format); ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format); + reader.setOutputType(typeInfo, new ExecutionConfig()); + OneInputStreamOperatorTestHarness<FileInputSplit, String> tester = new OneInputStreamOperatorTestHarness<>(reader); - - reader.setOutputType(typeInfo, new ExecutionConfig()); + tester.setTimeCharacteristic(TimeCharacteristic.EventTime); tester.open(); // create the necessary splits for the test @@ -134,38 +285,38 @@ public class ContinuousFileMonitoringTest { tester.processElement(new StreamRecord<>(split)); } - // then close the reader gracefully + // then close the reader gracefully (and wait to finish reading) synchronized (tester.getCheckpointLock()) { tester.close(); } - /* - * Given that the reader is multithreaded, the test finishes before the reader thread finishes - * reading. This results in files being deleted by the test before being read, thus throwing an exception. - * In addition, even if file deletion happens at the end, the results are not ready for testing. - * To face this, we wait until all the output is collected or until the waiting time exceeds 1000 ms, or 1s. - */ + // the lines received must be the elements in the files +1 for for the longMax watermark + // we are in event time, which emits no watermarks, so the last watermark will mark the + // of the input stream. - long start = System.currentTimeMillis(); - Queue<Object> output; - do { - output = tester.getOutput(); - Thread.sleep(50); - } while ((output == null || output.size() != NO_OF_FILES * LINES_PER_FILE) && (System.currentTimeMillis() - start) < 1000); + Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE + 1, tester.getOutput().size()); Map<Integer, List<String>> actualFileContents = new HashMap<>(); + Object lastElement = null; for(Object line: tester.getOutput()) { - StreamRecord<String> element = (StreamRecord<String>) line; - - int fileIdx = Character.getNumericValue(element.getValue().charAt(0)); - List<String> content = actualFileContents.get(fileIdx); - if(content == null) { - content = new ArrayList<>(); - actualFileContents.put(fileIdx, content); + lastElement = line; + if (line instanceof StreamRecord) { + StreamRecord<String> element = (StreamRecord<String>) line; + + int fileIdx = Character.getNumericValue(element.getValue().charAt(0)); + List<String> content = actualFileContents.get(fileIdx); + if (content == null) { + content = new ArrayList<>(); + actualFileContents.put(fileIdx, content); + } + content.add(element.getValue() + "\n"); } - content.add(element.getValue() +"\n"); } + // check if the last element is the LongMax watermark + Assert.assertTrue(lastElement instanceof Watermark); + Assert.assertEquals(Long.MAX_VALUE, ((Watermark) lastElement).getTimestamp()); + Assert.assertEquals(expectedFileContents.size(), actualFileContents.size()); for (Integer fileIdx: expectedFileContents.keySet()) { Assert.assertTrue("file" + fileIdx + " not found", actualFileContents.keySet().contains(fileIdx)); @@ -224,7 +375,7 @@ public class ContinuousFileMonitoringTest { monitoringFunction.open(new Configuration()); monitoringFunction.run(new TestingSourceContext(monitoringFunction, uniqFilesFound)); - Assert.assertTrue(uniqFilesFound.size() == NO_OF_FILES); + Assert.assertEquals(NO_OF_FILES, uniqFilesFound.size()); for(int i = 0; i < NO_OF_FILES; i++) { org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(hdfsURI + "/file" + i); Assert.assertTrue(uniqFilesFound.contains(file.toString())); @@ -268,8 +419,8 @@ public class ContinuousFileMonitoringTest { t.interrupt(); fc.join(); - Assert.assertTrue(fc.getFilesCreated().size() == NO_OF_FILES); - Assert.assertTrue(uniqFilesFound.size() == NO_OF_FILES); + Assert.assertEquals(NO_OF_FILES, fc.getFilesCreated().size()); + Assert.assertEquals(NO_OF_FILES, uniqFilesFound.size()); Set<org.apache.hadoop.fs.Path> filesCreated = fc.getFilesCreated(); Set<String> fileNamesCreated = new HashSet<>(); @@ -316,7 +467,7 @@ public class ContinuousFileMonitoringTest { // wait until all the files are created fc.join(); - Assert.assertTrue(filesCreated.size() == NO_OF_FILES); + Assert.assertEquals(NO_OF_FILES, filesCreated.size()); Set<String> fileNamesCreated = new HashSet<>(); for (org.apache.hadoop.fs.Path path: fc.getFilesCreated()) { @@ -337,7 +488,7 @@ public class ContinuousFileMonitoringTest { private int getLineNo(String line) { String[] tkns = line.split("\\s"); - Assert.assertTrue(tkns.length == 6); + Assert.assertEquals(6, tkns.length); return Integer.parseInt(tkns[tkns.length - 1]); } http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java index e274fdd..ac1e3f0 100644 --- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java +++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java @@ -70,7 +70,7 @@ public class BucketingSinkTest { private static org.apache.hadoop.fs.FileSystem dfs; private static String hdfsURI; - private OneInputStreamOperatorTestHarness<String, Object> createTestSink(File dataDir, TimeServiceProvider clock) { + private OneInputStreamOperatorTestHarness<String, Object> createTestSink(File dataDir, TestTimeServiceProvider clock) { BucketingSink<String> sink = new BucketingSink<String>(dataDir.getAbsolutePath()) .setBucketer(new Bucketer<String>() { private static final long serialVersionUID = 1L; @@ -91,7 +91,7 @@ public class BucketingSinkTest { } private <T> OneInputStreamOperatorTestHarness<T, Object> createTestSink(BucketingSink<T> sink, - TimeServiceProvider clock) { + TestTimeServiceProvider clock) { return new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink), new ExecutionConfig(), clock); } http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java index 838bee6..35e72a7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java @@ -29,14 +29,13 @@ import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.metrics.Counter; import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator; +import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; -import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.StreamSourceContexts; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.StreamTask; -import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,20 +43,24 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; +import java.util.ArrayDeque; import java.util.ArrayList; -import java.util.LinkedList; import java.util.List; import java.util.Queue; +import static org.apache.flink.util.Preconditions.checkState; import static org.apache.flink.util.Preconditions.checkNotNull; /** * This is the operator that reads the {@link FileInputSplit FileInputSplits} received from - * the preceding {@link ContinuousFileMonitoringFunction}. This operator will receive just the split descriptors - * and then read and emit records. This may lead to increased backpressure. To avoid this, we have another - * thread ({@link SplitReader}) actually reading the splits and emitting the elements, which is separate from - * the thread forwarding the checkpoint barriers. The two threads sync on the {@link StreamTask#getCheckpointLock()} - * so that the checkpoints reflect the current state. + * the preceding {@link ContinuousFileMonitoringFunction}. This operator can have parallelism + * greater than 1, contrary to the {@link ContinuousFileMonitoringFunction} which has + * a parallelism of 1. + * <p/> + * This operator will receive the split descriptors, put them in a queue, and have another + * thread read the actual data from the split. This architecture allows the separation of the + * reading thread, from the one emitting the checkpoint barriers, thus removing any potential + * back-pressure. */ @Internal public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends AbstractStreamOperator<OUT> @@ -67,16 +70,16 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileReaderOperator.class); + /** A value that serves as a kill-pill to stop the reading thread when no more splits remain. */ private static final FileInputSplit EOS = new FileInputSplit(-1, null, -1, -1, null); - private transient SplitReader<S, OUT> reader; - private transient TimestampedCollector<OUT> collector; - private FileInputFormat<OUT> format; private TypeSerializer<OUT> serializer; private transient Object checkpointLock; + private transient SplitReader<S, OUT> reader; + private transient SourceFunction.SourceContext<OUT> readerContext; private Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState; public ContinuousFileReaderOperator(FileInputFormat<OUT> format) { @@ -92,25 +95,22 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A public void open() throws Exception { super.open(); - if (this.serializer == null) { - throw new IllegalStateException("The serializer has not been set. " + - "Probably the setOutputType() was not called and this should not have happened. " + - "Please report it."); - } + checkState(this.reader == null, "The reader is already initialized."); + checkState(this.serializer != null, "The serializer has not been set. " + + "Probably the setOutputType() was not called. Please report it."); this.format.setRuntimeContext(getRuntimeContext()); this.format.configure(new Configuration()); - - this.collector = new TimestampedCollector<>(output); this.checkpointLock = getContainingTask().getCheckpointLock(); - Preconditions.checkState(reader == null, "The reader is already initialized."); + // set the reader context based on the time characteristic + final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic(); + final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(); + this.readerContext = StreamSourceContexts.getSourceContext( + timeCharacteristic, getTimerService(), checkpointLock, output, watermarkInterval); - this.reader = new SplitReader<>(format, serializer, collector, checkpointLock, readerState); - - // the readerState is needed for the initialization of the reader - // when recovering from a failure. So after the initialization, - // we can set it to null. + // and initialize the split reading thread + this.reader = new SplitReader<>(format, serializer, readerContext, checkpointLock, readerState); this.readerState = null; this.reader.start(); } @@ -122,7 +122,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A @Override public void processWatermark(Watermark mark) throws Exception { - output.emitWatermark(mark); + // we do nothing because we emit our own watermarks if needed. } @Override @@ -156,7 +156,8 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A } } reader = null; - collector = null; + readerContext = null; + readerState = null; format = null; serializer = null; } @@ -177,7 +178,16 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A // called by the StreamTask while having it. checkpointLock.wait(); } - collector.close(); + + // finally if we are closed normally and we are operating on + // event or ingestion time, emit the max watermark indicating + // the end of the stream, like a normal source would do. + + if (readerContext != null) { + readerContext.emitWatermark(Watermark.MAX_WATERMARK); + readerContext.close(); + } + output.close(); } private class SplitReader<S extends Serializable, OT> extends Thread { @@ -188,7 +198,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A private final TypeSerializer<OT> serializer; private final Object checkpointLock; - private final TimestampedCollector<OT> collector; + private final SourceFunction.SourceContext<OT> readerContext; private final Queue<FileInputSplit> pendingSplits; @@ -200,16 +210,16 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A private SplitReader(FileInputFormat<OT> format, TypeSerializer<OT> serializer, - TimestampedCollector<OT> collector, + SourceFunction.SourceContext<OT> readerContext, Object checkpointLock, Tuple3<List<FileInputSplit>, FileInputSplit, S> restoredState) { this.format = checkNotNull(format, "Unspecified FileInputFormat."); this.serializer = checkNotNull(serializer, "Unspecified Serializer."); + this.readerContext = checkNotNull(readerContext, "Unspecified Reader Context."); + this.checkpointLock = checkNotNull(checkpointLock, "Unspecified checkpoint lock."); - this.pendingSplits = new LinkedList<>(); - this.collector = collector; - this.checkpointLock = checkpointLock; + this.pendingSplits = new ArrayDeque<>(); this.isRunning = true; // this is the case where a task recovers from a previous failed attempt @@ -219,7 +229,6 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A S formatState = restoredState.f2; for (FileInputSplit split : pending) { - Preconditions.checkArgument(!pendingSplits.contains(split), "Duplicate split entry to read: " + split + "."); pendingSplits.add(split); } @@ -229,9 +238,8 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A } private void addSplit(FileInputSplit split) { - Preconditions.checkNotNull(split); + checkNotNull(split, "Cannot insert a null value in the pending splits queue."); synchronized (checkpointLock) { - Preconditions.checkArgument(!pendingSplits.contains(split), "Duplicate split entry to read: " + split + "."); this.pendingSplits.add(split); } } @@ -267,7 +275,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A checkpointableFormat.reopen(currentSplit, restoredFormatState); } else { // this is the case of a non-checkpointable input format that will reprocess the last split. - LOG.info("Format " + this.format.getClass().getName() + " used is not checkpointable."); + LOG.info("Format " + this.format.getClass().getName() + " does not support checkpointing."); format.open(currentSplit); } // reset the restored state to null for the next iteration @@ -299,7 +307,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A synchronized (checkpointLock) { nextElement = format.nextRecord(nextElement); if (nextElement != null) { - collector.collect(nextElement); + readerContext.collect(nextElement); } else { break; } @@ -318,10 +326,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A } } catch (Throwable e) { - if (isRunning) { - LOG.error("Caught exception processing split: ", currentSplit); - } - getContainingTask().failExternally(e); + getContainingTask().handleAsyncException("Caught exception when processing split: " + currentSplit, e); } finally { synchronized (checkpointLock) { LOG.info("Reader terminated, and exiting..."); @@ -358,7 +363,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A restoredFormatState; return new Tuple3<>(snapshot, currentSplit, formatState); } else { - LOG.info("The format used is not checkpointable. The current input split will be restarted upon recovery."); + LOG.info("The format does not support checkpointing. The current input split will be re-read from start upon recovery."); return new Tuple3<>(snapshot, currentSplit, null); } } else { @@ -404,7 +409,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A FileInputSplit currSplit = (FileInputSplit) ois.readObject(); // read the pending splits list - List<FileInputSplit> pendingSplits = new LinkedList<>(); + List<FileInputSplit> pendingSplits = new ArrayList<>(); int noOfSplits = ois.readInt(); for (int i = 0; i < noOfSplits; i++) { FileInputSplit split = (FileInputSplit) ois.readObject(); @@ -416,8 +421,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A S formatState = (S) ois.readObject(); // set the whole reader state for the open() to find. - Preconditions.checkState(this.readerState == null, - "The reader state has already been initialized."); + checkState(this.readerState == null, "The reader state has already been initialized."); this.readerState = new Tuple3<>(pendingSplits, currSplit, formatState); } http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java index 22987ab..1409ae4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java @@ -21,11 +21,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; - -import java.util.concurrent.ScheduledFuture; /** * {@link StreamOperator} for streaming sources. @@ -57,26 +53,11 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>> public void run(final Object lockingObject, final Output<StreamRecord<OUT>> collector) throws Exception { final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic(); - final SourceFunction.SourceContext<OUT> ctx; - - switch (timeCharacteristic) { - case EventTime: - ctx = new ManualWatermarkContext<>(this, lockingObject, collector); - break; - case IngestionTime: - ctx = new AutomaticWatermarkContext<>(this, lockingObject, collector, - getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval()); - break; - case ProcessingTime: - ctx = new NonTimestampContext<>(this, lockingObject, collector); - break; - default: - throw new Exception(String.valueOf(timeCharacteristic)); - } + final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(); + + this.ctx = StreamSourceContexts.getSourceContext( + timeCharacteristic, getTimerService(), lockingObject, collector, watermarkInterval); - // copy to a field to give the 'cancel()' method access - this.ctx = ctx; - try { userFunction.run(ctx); @@ -122,252 +103,4 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>> protected boolean isCanceledOrStopped() { return canceledOrStopped; } - - /** - * Checks whether any asynchronous thread (checkpoint trigger, timer, watermark generator, ...) - * has caused an exception. If one of these threads caused an exception, this method will - * throw that exception. - */ - void checkAsyncException() { - getContainingTask().checkTimerException(); - } - - // ------------------------------------------------------------------------ - // Source contexts for various stream time characteristics - // ------------------------------------------------------------------------ - - /** - * A source context that attached {@code -1} as a timestamp to all records, and that - * does not forward watermarks. - */ - public static class NonTimestampContext<T> implements SourceFunction.SourceContext<T> { - - private final StreamSource<?, ?> owner; - private final Object lockingObject; - private final Output<StreamRecord<T>> output; - private final StreamRecord<T> reuse; - - public NonTimestampContext(StreamSource<?, ?> owner, Object lockingObject, Output<StreamRecord<T>> output) { - this.owner = owner; - this.lockingObject = lockingObject; - this.output = output; - this.reuse = new StreamRecord<T>(null); - } - - @Override - public void collect(T element) { - owner.checkAsyncException(); - synchronized (lockingObject) { - output.collect(reuse.replace(element)); - } - } - - @Override - public void collectWithTimestamp(T element, long timestamp) { - // ignore the timestamp - collect(element); - } - - @Override - public void emitWatermark(Watermark mark) { - owner.checkAsyncException(); - // do nothing else - } - - @Override - public Object getCheckpointLock() { - return lockingObject; - } - - @Override - public void close() {} - } - - /** - * {@link SourceFunction.SourceContext} to be used for sources with automatic timestamps - * and watermark emission. - */ - public static class AutomaticWatermarkContext<T> implements SourceFunction.SourceContext<T> { - - private final StreamSource<?, ?> owner; - private final TimeServiceProvider timeService; - private final Object lockingObject; - private final Output<StreamRecord<T>> output; - private final StreamRecord<T> reuse; - - private final ScheduledFuture<?> watermarkTimer; - private final long watermarkInterval; - - private volatile long nextWatermarkTime; - - public AutomaticWatermarkContext( - final StreamSource<?, ?> owner, - final Object lockingObjectParam, - final Output<StreamRecord<T>> outputParam, - final long watermarkInterval) { - - if (watermarkInterval < 1L) { - throw new IllegalArgumentException("The watermark interval cannot be smaller than one."); - } - - this.owner = owner; - this.timeService = owner.getTimerService(); - this.lockingObject = lockingObjectParam; - this.output = outputParam; - this.watermarkInterval = watermarkInterval; - this.reuse = new StreamRecord<T>(null); - - long now = this.timeService.getCurrentProcessingTime(); - this.watermarkTimer = this.timeService.registerTimer(now + watermarkInterval, - new WatermarkEmittingTask(this.timeService, lockingObjectParam, outputParam)); - } - - @Override - public void collect(T element) { - owner.checkAsyncException(); - - synchronized (lockingObject) { - final long currentTime = this.timeService.getCurrentProcessingTime(); - output.collect(reuse.replace(element, currentTime)); - - // this is to avoid lock contention in the lockingObject by - // sending the watermark before the firing of the watermark - // emission task. - - if (currentTime > nextWatermarkTime) { - // in case we jumped some watermarks, recompute the next watermark time - final long watermarkTime = currentTime - (currentTime % watermarkInterval); - nextWatermarkTime = watermarkTime + watermarkInterval; - output.emitWatermark(new Watermark(watermarkTime)); - - // we do not need to register another timer here - // because the emitting task will do so. - } - } - } - - @Override - public void collectWithTimestamp(T element, long timestamp) { - collect(element); - } - - @Override - public void emitWatermark(Watermark mark) { - owner.checkAsyncException(); - - if (mark.getTimestamp() == Long.MAX_VALUE) { - // allow it since this is the special end-watermark that for example the Kafka source emits - synchronized (lockingObject) { - nextWatermarkTime = Long.MAX_VALUE; - output.emitWatermark(mark); - } - - // we can shutdown the timer now, no watermarks will be needed any more - watermarkTimer.cancel(true); - } - } - - @Override - public Object getCheckpointLock() { - return lockingObject; - } - - @Override - public void close() { - watermarkTimer.cancel(true); - } - - private class WatermarkEmittingTask implements Triggerable { - - private final TimeServiceProvider timeService; - private final Object lockingObject; - private final Output<StreamRecord<T>> output; - - private WatermarkEmittingTask(TimeServiceProvider timeService, Object lock, Output<StreamRecord<T>> output) { - this.timeService = timeService; - this.lockingObject = lock; - this.output = output; - } - - @Override - public void trigger(long timestamp) { - final long currentTime = this.timeService.getCurrentProcessingTime(); - - if (currentTime > nextWatermarkTime) { - // align the watermarks across all machines. this will ensure that we - // don't have watermarks that creep along at different intervals because - // the machine clocks are out of sync - final long watermarkTime = currentTime - (currentTime % watermarkInterval); - - synchronized (lockingObject) { - if (currentTime > nextWatermarkTime) { - output.emitWatermark(new Watermark(watermarkTime)); - nextWatermarkTime += watermarkInterval; - } - } - } - - this.timeService.registerTimer(this.timeService.getCurrentProcessingTime() + watermarkInterval, - new WatermarkEmittingTask(this.timeService, lockingObject, output)); - } - } - } - - /** - * A SourceContext for event time. Sources may directly attach timestamps and generate - * watermarks, but if records are emitted without timestamps, no timestamps are automatically - * generated and attached. The records will simply have no timestamp in that case. - * - * Streaming topologies can use timestamp assigner functions to override the timestamps - * assigned here. - */ - public static class ManualWatermarkContext<T> implements SourceFunction.SourceContext<T> { - - private final StreamSource<?, ?> owner; - private final Object lockingObject; - private final Output<StreamRecord<T>> output; - private final StreamRecord<T> reuse; - - public ManualWatermarkContext(StreamSource<?, ?> owner, Object lockingObject, Output<StreamRecord<T>> output) { - this.owner = owner; - this.lockingObject = lockingObject; - this.output = output; - this.reuse = new StreamRecord<T>(null); - } - - @Override - public void collect(T element) { - owner.checkAsyncException(); - - synchronized (lockingObject) { - output.collect(reuse.replace(element)); - } - } - - @Override - public void collectWithTimestamp(T element, long timestamp) { - owner.checkAsyncException(); - - synchronized (lockingObject) { - output.collect(reuse.replace(element, timestamp)); - } - } - - @Override - public void emitWatermark(Watermark mark) { - owner.checkAsyncException(); - - synchronized (lockingObject) { - output.emitWatermark(mark); - } - } - - @Override - public Object getCheckpointLock() { - return lockingObject; - } - - @Override - public void close() {} - } } http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java new file mode 100644 index 0000000..abaf4e7 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.operators.Triggerable; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; +import org.apache.flink.util.Preconditions; + +import java.util.concurrent.ScheduledFuture; + +/** + * Source contexts for various stream time characteristics. + */ +public class StreamSourceContexts { + + /** + * Depending on the {@link TimeCharacteristic}, this method will return the adequate + * {@link org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext}. That is: + * <ul> + * <li> {@link TimeCharacteristic#IngestionTime} = {@link AutomaticWatermarkContext} + * <li> {@link TimeCharacteristic#ProcessingTime} = {@link NonTimestampContext} + * <li> {@link TimeCharacteristic#EventTime} = {@link ManualWatermarkContext} + * </ul> + * */ + public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext( + TimeCharacteristic timeCharacteristic, TimeServiceProvider timeService, + Object checkpointLock, Output<StreamRecord<OUT>> output, long watermarkInterval) { + + final SourceFunction.SourceContext<OUT> ctx; + switch (timeCharacteristic) { + case EventTime: + ctx = new ManualWatermarkContext<>(checkpointLock, output); + break; + case IngestionTime: + ctx = new AutomaticWatermarkContext<>(timeService, checkpointLock, output, watermarkInterval); + break; + case ProcessingTime: + ctx = new NonTimestampContext<>(checkpointLock, output); + break; + default: + throw new IllegalArgumentException(String.valueOf(timeCharacteristic)); + } + return ctx; + } + + /** + * A source context that attached {@code -1} as a timestamp to all records, and that + * does not forward watermarks. + */ + private static class NonTimestampContext<T> implements SourceFunction.SourceContext<T> { + + private final Object lock; + private final Output<StreamRecord<T>> output; + private final StreamRecord<T> reuse; + + private NonTimestampContext(Object checkpointLock, Output<StreamRecord<T>> output) { + this.lock = Preconditions.checkNotNull(checkpointLock, "The checkpoint lock cannot be null."); + this.output = Preconditions.checkNotNull(output, "The output cannot be null."); + this.reuse = new StreamRecord<>(null); + } + + @Override + public void collect(T element) { + synchronized (lock) { + output.collect(reuse.replace(element)); + } + } + + @Override + public void collectWithTimestamp(T element, long timestamp) { + // ignore the timestamp + collect(element); + } + + @Override + public void emitWatermark(Watermark mark) { + // do nothing + } + + @Override + public Object getCheckpointLock() { + return lock; + } + + @Override + public void close() {} + } + + /** + * {@link SourceFunction.SourceContext} to be used for sources with automatic timestamps + * and watermark emission. + */ + private static class AutomaticWatermarkContext<T> implements SourceFunction.SourceContext<T> { + + private final TimeServiceProvider timeService; + private final Object lock; + private final Output<StreamRecord<T>> output; + private final StreamRecord<T> reuse; + + private final ScheduledFuture<?> watermarkTimer; + private final long watermarkInterval; + + private volatile long nextWatermarkTime; + + private AutomaticWatermarkContext( + final TimeServiceProvider timeService, + final Object checkpointLock, + final Output<StreamRecord<T>> output, + final long watermarkInterval) { + + this.timeService = Preconditions.checkNotNull(timeService, "Time Service cannot be null."); + this.lock = Preconditions.checkNotNull(checkpointLock, "The checkpoint lock cannot be null."); + this.output = Preconditions.checkNotNull(output, "The output cannot be null."); + + Preconditions.checkArgument(watermarkInterval > 1L, "The watermark interval cannot be smaller than 1 ms."); + this.watermarkInterval = watermarkInterval; + + this.reuse = new StreamRecord<>(null); + + long now = this.timeService.getCurrentProcessingTime(); + this.watermarkTimer = this.timeService.registerTimer(now + watermarkInterval, + new WatermarkEmittingTask(this.timeService, lock, output)); + } + + @Override + public void collect(T element) { + synchronized (lock) { + final long currentTime = this.timeService.getCurrentProcessingTime(); + output.collect(reuse.replace(element, currentTime)); + + // this is to avoid lock contention in the lockingObject by + // sending the watermark before the firing of the watermark + // emission task. + + if (currentTime > nextWatermarkTime) { + // in case we jumped some watermarks, recompute the next watermark time + final long watermarkTime = currentTime - (currentTime % watermarkInterval); + nextWatermarkTime = watermarkTime + watermarkInterval; + output.emitWatermark(new Watermark(watermarkTime)); + + // we do not need to register another timer here + // because the emitting task will do so. + } + } + } + + @Override + public void collectWithTimestamp(T element, long timestamp) { + collect(element); + } + + @Override + public void emitWatermark(Watermark mark) { + + if (mark.getTimestamp() == Long.MAX_VALUE) { + // allow it since this is the special end-watermark that for example the Kafka source emits + synchronized (lock) { + nextWatermarkTime = Long.MAX_VALUE; + output.emitWatermark(mark); + } + + // we can shutdown the timer now, no watermarks will be needed any more + if (watermarkTimer != null) { + watermarkTimer.cancel(true); + } + } + } + + @Override + public Object getCheckpointLock() { + return lock; + } + + @Override + public void close() { + if (watermarkTimer != null) { + watermarkTimer.cancel(true); + } + } + + private class WatermarkEmittingTask implements Triggerable { + + private final TimeServiceProvider timeService; + private final Object lock; + private final Output<StreamRecord<T>> output; + + private WatermarkEmittingTask(TimeServiceProvider timeService, Object checkpointLock, Output<StreamRecord<T>> output) { + this.timeService = Preconditions.checkNotNull(timeService, "Time Service cannot be null."); + this.lock = Preconditions.checkNotNull(checkpointLock, "The checkpoint lock cannot be null."); + this.output = Preconditions.checkNotNull(output, "The output cannot be null."); + } + + @Override + public void trigger(long timestamp) { + final long currentTime = timeService.getCurrentProcessingTime(); + + if (currentTime > nextWatermarkTime) { + // align the watermarks across all machines. this will ensure that we + // don't have watermarks that creep along at different intervals because + // the machine clocks are out of sync + final long watermarkTime = currentTime - (currentTime % watermarkInterval); + + synchronized (lock) { + if (currentTime > nextWatermarkTime) { + output.emitWatermark(new Watermark(watermarkTime)); + nextWatermarkTime = watermarkTime + watermarkInterval; + } + } + } + + long nextWatermark = currentTime + watermarkInterval; + this.timeService.registerTimer(nextWatermark, new WatermarkEmittingTask(this.timeService, lock, output)); + } + } + } + + /** + * A SourceContext for event time. Sources may directly attach timestamps and generate + * watermarks, but if records are emitted without timestamps, no timestamps are automatically + * generated and attached. The records will simply have no timestamp in that case. + * + * Streaming topologies can use timestamp assigner functions to override the timestamps + * assigned here. + */ + private static class ManualWatermarkContext<T> implements SourceFunction.SourceContext<T> { + + private final Object lock; + private final Output<StreamRecord<T>> output; + private final StreamRecord<T> reuse; + + private ManualWatermarkContext(Object checkpointLock, Output<StreamRecord<T>> output) { + this.lock = Preconditions.checkNotNull(checkpointLock, "The checkpoint lock cannot be null."); + this.output = Preconditions.checkNotNull(output, "The output cannot be null."); + this.reuse = new StreamRecord<>(null); + } + + @Override + public void collect(T element) { + synchronized (lock) { + output.collect(reuse.replace(element)); + } + } + + @Override + public void collectWithTimestamp(T element, long timestamp) { + synchronized (lock) { + output.collect(reuse.replace(element, timestamp)); + } + } + + @Override + public void emitWatermark(Watermark mark) { + synchronized (lock) { + output.emitWatermark(mark); + } + } + + @Override + public Object getCheckpointLock() { + return lock; + } + + @Override + public void close() {} + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java index c7ec2ed..4c55055 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java @@ -18,12 +18,14 @@ package org.apache.flink.streaming.runtime.tasks; /** - * Interface for reporting exceptions that are thrown in (possibly) a different thread. + * An interface marking a task as capable of handling exceptions thrown + * by different threads, other than the one executing the task itself. */ public interface AsyncExceptionHandler { /** - * Registers the given exception. + * Handles an exception thrown by another thread (e.g. a TriggerTask), + * other than the one executing the main task. */ - void registerAsyncException(AsynchronousException exception); + void handleAsyncException(String message, Throwable exception); } http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java index ea2b07f..9534b3c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.streaming.runtime.operators.Triggerable; +import org.apache.flink.util.Preconditions; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -49,9 +50,9 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider { private DefaultTimeServiceProvider(AsyncExceptionHandler task, ScheduledExecutorService threadPoolExecutor, Object checkpointLock) { - this.task = task; - this.timerService = threadPoolExecutor; - this.checkpointLock = checkpointLock; + this.task = Preconditions.checkNotNull(task); + this.timerService = Preconditions.checkNotNull(threadPoolExecutor); + this.checkpointLock = Preconditions.checkNotNull(checkpointLock); } @Override @@ -99,7 +100,7 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider { target.trigger(timestamp); } catch (Throwable t) { TimerException asyncException = new TimerException(t); - exceptionHandler.registerAsyncException(asyncException); + exceptionHandler.handleAsyncException("Caught exception while processing timer.", asyncException); } } } @@ -109,7 +110,7 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider { public static DefaultTimeServiceProvider createForTesting(ScheduledExecutorService executor, Object checkpointLock) { return new DefaultTimeServiceProvider(new AsyncExceptionHandler() { @Override - public void registerAsyncException(AsynchronousException exception) { + public void handleAsyncException(String message, Throwable exception) { exception.printStackTrace(); } }, executor, checkpointLock); http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java index d6d2fb5..cf8853e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java @@ -64,7 +64,7 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO final Object lock = getCheckpointLock(); while (running && inputProcessor.processInput(operator, lock)) { - checkTimerException(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 9802a16..33317fa 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -53,6 +53,7 @@ import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -158,11 +159,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> private List<Collection<OperatorStateHandle>> lazyRestoreOperatorState; - /** - * This field is used to forward an exception that is caught in the timer thread or other - * asynchronous Threads. Subclasses must ensure that exceptions stored here get thrown on the - * actual execution Thread. */ - private volatile AsynchronousException asyncException; /** The currently active background materialization threads */ private final ClosableRegistry cancelables = new ClosableRegistry(); @@ -301,9 +297,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> // still let the computation fail tryDisposeAllOperators(); disposed = true; - - // Don't forget to check and throw exceptions that happened in async thread one last time - checkTimerException(); } finally { // clean up everything we initialized @@ -354,19 +347,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> } } - /** - * Marks task execution failed for an external reason (a reason other than the task code itself - * throwing an exception). If the task is already in a terminal state - * (such as FINISHED, CANCELED, FAILED), or if the task is already canceling this does nothing. - * Otherwise it sets the state to FAILED, and, if the invokable code is running, - * starts an asynchronous thread that aborts that code. - * - * <p>This method never blocks.</p> - */ - public void failExternally(Throwable cause) { - getEnvironment().failExternally(cause); - } - @Override public final void cancel() throws Exception { isRunning = false; @@ -898,27 +878,21 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> } /** - * Check whether an exception was thrown in a Thread other than the main Thread. (For example - * in the processing-time trigger Thread). This will rethrow that exception in case on - * occurred. + * Handles an exception thrown by another thread (e.g. a TriggerTask), + * other than the one executing the main task by failing the task entirely. + * + * In more detail, it marks task execution failed for an external reason + * (a reason other than the task code itself throwing an exception). If the task + * is already in a terminal state (such as FINISHED, CANCELED, FAILED), or if the + * task is already canceling this does nothing. Otherwise it sets the state to + * FAILED, and, if the invokable code is running, starts an asynchronous thread + * that aborts that code. * - * <p>This must be called in the main loop of {@code StreamTask} subclasses to ensure - * that we propagate failures. + * <p>This method never blocks.</p> */ - public void checkTimerException() throws AsynchronousException { - if (asyncException != null) { - throw asyncException; - } - } - @Override - public void registerAsyncException(AsynchronousException exception) { - if (isRunning) { - LOG.error("Asynchronous exception registered.", exception); - } - if (this.asyncException == null) { - this.asyncException = exception; - } + public void handleAsyncException(String message, Throwable exception) { + getEnvironment().failExternally(exception); } // ------------------------------------------------------------------------ @@ -1030,7 +1004,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> catch (Exception e) { // registers the exception and tries to fail the whole task AsynchronousException asyncException = new AsynchronousException(e); - owner.registerAsyncException(asyncException); + owner.handleAsyncException("Failure in asynchronous checkpoint materialization", asyncException); } finally { cancelables.unregisterClosable(this); http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java index 9252063..0197c53 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java @@ -88,7 +88,7 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS final Object lock = getCheckpointLock(); while (running && inputProcessor.processInput(operator, lock)) { - checkTimerException(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java index e8663f5..10b30d0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java @@ -31,6 +31,7 @@ import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StoppableStreamSource; import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.api.operators.StreamSourceContexts; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -187,12 +188,11 @@ public class StreamSourceOperatorTest { final List<StreamElement> output = new ArrayList<>(); - StreamSource.AutomaticWatermarkContext<String> ctx = - new StreamSource.AutomaticWatermarkContext<>( - operator, - operator.getContainingTask().getCheckpointLock(), - new CollectorOutput<String>(output), - operator.getExecutionConfig().getAutoWatermarkInterval()); + StreamSourceContexts.getSourceContext(TimeCharacteristic.IngestionTime, + operator.getContainingTask().getTimerService(), + operator.getContainingTask().getCheckpointLock(), + new CollectorOutput<String>(output), + operator.getExecutionConfig().getAutoWatermarkInterval()); // periodically emit the watermarks // even though we start from 1 the watermark are still @@ -218,7 +218,7 @@ public class StreamSourceOperatorTest { private static <T> void setupSourceOperator(StreamSource<T, ?> operator, TimeCharacteristic timeChar, long watermarkInterval, - final TimeServiceProvider timeProvider) { + final TestTimeServiceProvider timeProvider) { ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.setAutoWatermarkInterval(watermarkInterval); @@ -241,9 +241,6 @@ public class StreamSourceOperatorTest { doAnswer(new Answer<TimeServiceProvider>() { @Override public TimeServiceProvider answer(InvocationOnMock invocation) throws Throwable { - if (timeProvider == null) { - throw new RuntimeException("The time provider is null."); - } return timeProvider; } }).when(mockTask).getTimerService(); http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java index 60850d8..0351978 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java @@ -23,7 +23,6 @@ import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.StreamMap; import org.apache.flink.streaming.runtime.tasks.AsyncExceptionHandler; -import org.apache.flink.streaming.runtime.tasks.AsynchronousException; import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; @@ -39,6 +38,7 @@ import org.powermock.modules.junit4.PowerMockRunner; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; import static org.junit.Assert.assertEquals; @@ -52,28 +52,24 @@ public class TimeProviderTest { final OneShotLatch latch = new OneShotLatch(); final Object lock = new Object(); - TimeServiceProvider timeServiceProvider = DefaultTimeServiceProvider.create( - new AsyncExceptionHandler() { - @Override - public void registerAsyncException(AsynchronousException exception) { - exception.printStackTrace(); - } - }, - Executors.newSingleThreadScheduledExecutor(), - lock); + TimeServiceProvider timeServiceProvider = DefaultTimeServiceProvider + .createForTesting(Executors.newSingleThreadScheduledExecutor(), lock); final List<Long> timestamps = new ArrayList<>(); - long start = System.currentTimeMillis(); long interval = 50L; - final long noOfTimers = 20; // we add 2 timers per iteration minus the first that would have a negative timestamp - final long expectedNoOfTimers = 2 * noOfTimers - 1; + final long expectedNoOfTimers = 2 * noOfTimers; for (int i = 0; i < noOfTimers; i++) { - double nextTimer = start + i * interval; + + // we add a delay (100ms) so that both timers are inserted before the first is processed. + // If not, and given that we add timers out of order, we may have a timer firing + // before the next one (with smaller timestamp) is added. + + double nextTimer = timeServiceProvider.getCurrentProcessingTime() + 100 + i * interval; timeServiceProvider.registerTimer((long) nextTimer, new Triggerable() { @Override @@ -88,17 +84,15 @@ public class TimeProviderTest { // add also out-of-order tasks to verify that eventually // they will be executed in the correct order. - if (i > 0) { - timeServiceProvider.registerTimer((long) (nextTimer - 10), new Triggerable() { - @Override - public void trigger(long timestamp) throws Exception { - timestamps.add(timestamp); - if (timestamps.size() == expectedNoOfTimers) { - latch.trigger(); - } + timeServiceProvider.registerTimer((long) (nextTimer - 10L), new Triggerable() { + @Override + public void trigger(long timestamp) throws Exception { + timestamps.add(timestamp); + if (timestamps.size() == expectedNoOfTimers) { + latch.trigger(); } - }); - } + } + }); } if (!latch.isTriggered()) { @@ -114,15 +108,46 @@ public class TimeProviderTest { long lastTs = Long.MIN_VALUE; for (long timestamp: timestamps) { Assert.assertTrue(timestamp >= lastTs); + if (lastTs != Long.MIN_VALUE && counter % 2 == 1) { + Assert.assertEquals((timestamp - lastTs), 10); + } lastTs = timestamp; - - long expectedTs = start + (counter/2) * interval; - Assert.assertEquals(timestamp, (expectedTs + ((counter % 2 == 0) ? 0 : 40))); counter++; } } @Test + public void testDefaultTimeProviderExceptionHandling() throws InterruptedException { + final OneShotLatch latch = new OneShotLatch(); + + final AtomicBoolean exceptionWasThrown = new AtomicBoolean(false); + + final Object lock = new Object(); + + TimeServiceProvider timeServiceProvider = DefaultTimeServiceProvider + .create(new AsyncExceptionHandler() { + @Override + public void handleAsyncException(String message, Throwable exception) { + exceptionWasThrown.compareAndSet(false, true); + latch.trigger(); + } + }, Executors.newSingleThreadScheduledExecutor(), lock); + + long now = System.currentTimeMillis(); + timeServiceProvider.registerTimer(now, new Triggerable() { + @Override + public void trigger(long timestamp) throws Exception { + throw new Exception("Exception in Timer"); + } + }); + + if (!latch.isTriggered()) { + latch.await(); + } + Assert.assertTrue(exceptionWasThrown.get()); + } + + @Test public void testTimerSorting() throws Exception { final List<Long> result = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java index f33da89..30f38e3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java @@ -183,13 +183,13 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { @Test public void testWindowTriggerTimeAlignment() throws Exception { final Object lock = new Object(); - final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting( + TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting( Executors.newSingleThreadScheduledExecutor(), lock); try { @SuppressWarnings("unchecked") final Output<StreamRecord<String>> mockOut = mock(Output.class); - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); + StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock); AccumulatingProcessingTimeWindowOperator<String, String, String> op; @@ -201,6 +201,11 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { assertTrue(op.getNextEvaluationTime() % 1000 == 0); op.dispose(); + timerService.shutdownService(); + timerService = DefaultTimeServiceProvider.createForTesting( + Executors.newSingleThreadScheduledExecutor(), lock); + mockTask = createMockTaskWithTimer(timerService, lock); + op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1000, 1000); op.setup(mockTask, new StreamConfig(new Configuration()), mockOut); @@ -209,6 +214,11 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { assertTrue(op.getNextEvaluationTime() % 1000 == 0); op.dispose(); + timerService.shutdownService(); + timerService = DefaultTimeServiceProvider.createForTesting( + Executors.newSingleThreadScheduledExecutor(), lock); + mockTask = createMockTaskWithTimer(timerService, lock); + op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1500, 1000); op.setup(mockTask, new StreamConfig(new Configuration()), mockOut); @@ -217,6 +227,11 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { assertTrue(op.getNextEvaluationTime() % 1000 == 0); op.dispose(); + timerService.shutdownService(); + timerService = DefaultTimeServiceProvider.createForTesting( + Executors.newSingleThreadScheduledExecutor(), lock); + mockTask = createMockTaskWithTimer(timerService, lock); + op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1200, 1100); op.setup(mockTask, new StreamConfig(new Configuration()), mockOut); @@ -243,7 +258,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { try { final int windowSize = 50; final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize); - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); + final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock); // tumbling window that triggers every 20 milliseconds AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op = @@ -297,7 +312,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { try { final CollectingOutput<Integer> out = new CollectingOutput<>(50); - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); + final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock); // tumbling window that triggers every 20 milliseconds AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op = @@ -359,7 +374,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { try { final CollectingOutput<Integer> out = new CollectingOutput<>(50); - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); + final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock); // tumbling window that triggers every 20 milliseconds AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op = @@ -416,7 +431,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { try { final CollectingOutput<Integer> out = new CollectingOutput<>(50); - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); + final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock); // tumbling window that triggers every 20 milliseconds AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op = @@ -743,7 +758,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { // ------------------------------------------------------------------------ - private static StreamTask<?, ?> createMockTask() { + private static StreamTask<?, ?> createMockTask(Object lock) { Configuration configuration = new Configuration(); configuration.setString(ConfigConstants.STATE_BACKEND, "jobmanager"); @@ -751,6 +766,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { when(task.getAccumulatorMap()).thenReturn(new HashMap<String, Accumulator<?, ?>>()); when(task.getName()).thenReturn("Test task name"); when(task.getExecutionConfig()).thenReturn(new ExecutionConfig()); + when(task.getCheckpointLock()).thenReturn(lock); final TaskManagerRuntimeInfo mockTaskManagerRuntimeInfo = mock(TaskManagerRuntimeInfo.class); when(mockTaskManagerRuntimeInfo.getConfiguration()).thenReturn(configuration); @@ -765,9 +781,9 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { } private static StreamTask<?, ?> createMockTaskWithTimer( - final TimeServiceProvider timerService) + final TimeServiceProvider timerService, final Object lock) { - StreamTask<?, ?> mockTask = createMockTask(); + StreamTask<?, ?> mockTask = createMockTask(lock); when(mockTask.getTimerService()).thenReturn(timerService); return mockTask; } http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java index 826b230..7539c2d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java @@ -191,13 +191,13 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { @Test public void testWindowTriggerTimeAlignment() throws Exception { final Object lock = new Object(); - final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting( + TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting( Executors.newSingleThreadScheduledExecutor(), lock); try { @SuppressWarnings("unchecked") final Output<StreamRecord<String>> mockOut = mock(Output.class); - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); + StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock); AggregatingProcessingTimeWindowOperator<String, String> op; @@ -209,6 +209,11 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { assertTrue(op.getNextEvaluationTime() % 1000 == 0); op.dispose(); + timerService.shutdownService(); + timerService = DefaultTimeServiceProvider.createForTesting( + Executors.newSingleThreadScheduledExecutor(), lock); + mockTask = createMockTaskWithTimer(timerService, lock); + op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1000, 1000); op.setup(mockTask, new StreamConfig(new Configuration()), mockOut); @@ -217,6 +222,11 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { assertTrue(op.getNextEvaluationTime() % 1000 == 0); op.dispose(); + timerService.shutdownService(); + timerService = DefaultTimeServiceProvider.createForTesting( + Executors.newSingleThreadScheduledExecutor(), lock); + mockTask = createMockTaskWithTimer(timerService, lock); + op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1500, 1000); op.setup(mockTask, new StreamConfig(new Configuration()), mockOut); @@ -225,6 +235,11 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { assertTrue(op.getNextEvaluationTime() % 1000 == 0); op.dispose(); + timerService.shutdownService(); + timerService = DefaultTimeServiceProvider.createForTesting( + Executors.newSingleThreadScheduledExecutor(), lock); + mockTask = createMockTaskWithTimer(timerService, lock); + op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1200, 1100); op.setup(mockTask, new StreamConfig(new Configuration()), mockOut); @@ -257,7 +272,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { IntSerializer.INSTANCE, tupleSerializer, windowSize, windowSize); - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); + final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock); op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out); op.open(); @@ -309,7 +324,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { final int windowSize = 50; final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(windowSize); - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); + final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock); AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op = new AggregatingProcessingTimeWindowOperator<>( @@ -377,7 +392,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { try { final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(50); - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); + final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock); // tumbling window that triggers every 20 milliseconds AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op = @@ -448,7 +463,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { try { final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(50); - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); + final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock); // tumbling window that triggers every 20 milliseconds AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op = @@ -508,7 +523,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { try { final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(); - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); + final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock); ReduceFunction<Tuple2<Integer, Integer>> failingFunction = new FailingFunction(100); @@ -929,7 +944,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { // ------------------------------------------------------------------------ - private static StreamTask<?, ?> createMockTask() { + private static StreamTask<?, ?> createMockTask(Object lock) { Configuration configuration = new Configuration(); configuration.setString(ConfigConstants.STATE_BACKEND, "jobmanager"); @@ -937,6 +952,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { when(task.getAccumulatorMap()).thenReturn(new HashMap<String, Accumulator<?, ?>>()); when(task.getName()).thenReturn("Test task name"); when(task.getExecutionConfig()).thenReturn(new ExecutionConfig()); + when(task.getCheckpointLock()).thenReturn(lock); final TaskManagerRuntimeInfo mockTaskManagerRuntimeInfo = mock(TaskManagerRuntimeInfo.class); when(mockTaskManagerRuntimeInfo.getConfiguration()).thenReturn(configuration); @@ -947,9 +963,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { return task; } - private static StreamTask<?, ?> createMockTaskWithTimer(final TimeServiceProvider timerService) + private static StreamTask<?, ?> createMockTaskWithTimer(final TimeServiceProvider timerService, final Object lock) { - StreamTask<?, ?> mockTask = createMockTask(); + StreamTask<?, ?> mockTask = createMockTask(lock); when(mockTask.getTimerService()).thenReturn(timerService); return mockTask; } http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java index f638ddd..9b773d8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java @@ -102,6 +102,8 @@ public class StreamMockEnvironment implements Environment { private final ExecutionConfig executionConfig; + private volatile boolean wasFailedExternally = false; + public StreamMockEnvironment(Configuration jobConfig, Configuration taskConfig, ExecutionConfig executionConfig, long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) { this.taskInfo = new TaskInfo("", 1, 0, 1, 0); @@ -325,7 +327,11 @@ public class StreamMockEnvironment implements Environment { @Override public void failExternally(Throwable cause) { - throw new UnsupportedOperationException("StreamMockEnvironment does not support external task failure."); + this.wasFailedExternally = true; + } + + public boolean wasFailedExternally() { + return wasFailedExternally; } @Override
