Repository: flink Updated Branches: refs/heads/master 99188390f -> b2e8792b8
[FLINK-5021] Make the ContinuousFileReaderOperator rescalable. This is the last commit that completes the refactoring of the ContinuousFileReaderOperator so that it can be rescalable. With this, the reader can restart from a savepoint with a different parallelism without compromising the provided exactly-once guarantees. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b2e8792b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b2e8792b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b2e8792b Branch: refs/heads/master Commit: b2e8792b8ef07f5e9f676fd83137b3ce4d72cdb0 Parents: 5a90c6b Author: kl0u <[email protected]> Authored: Thu Nov 3 11:21:08 2016 +0100 Committer: Aljoscha Krettek <[email protected]> Committed: Fri Nov 11 14:05:58 2016 +0100 ---------------------------------------------------------------------- .../hdfstests/ContinuousFileProcessingTest.java | 182 ++++++++++- .../source/ContinuousFileReaderOperator.java | 83 +++-- .../ContinuousFileProcessingRescalingTest.java | 316 +++++++++++++++++++ ...ontinuousFileProcessingCheckpointITCase.java | 34 +- 4 files changed, 560 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b2e8792b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java ---------------------------------------------------------------------- diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java index 5b14251..6454c11 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java @@ -19,6 +19,7 @@ package org.apache.flink.hdfstests; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.io.FileInputFormat; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.io.TextInputFormat; import org.apache.flink.api.java.tuple.Tuple2; @@ -36,7 +37,9 @@ import org.apache.flink.streaming.api.functions.source.FileProcessingMode; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.util.Preconditions; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -387,6 +390,149 @@ public class ContinuousFileProcessingTest { } } + @Test + public void testReaderSnapshotRestore() throws Exception { + + TimestampedFileInputSplit split1 = + new TimestampedFileInputSplit(0, 3, new Path("test/test1"), 0, 100, null); + + TimestampedFileInputSplit split2 = + new TimestampedFileInputSplit(10, 2, new Path("test/test2"), 101, 200, null); + + TimestampedFileInputSplit split3 = + new TimestampedFileInputSplit(10, 1, new Path("test/test2"), 0, 100, null); + + TimestampedFileInputSplit split4 = + new TimestampedFileInputSplit(11, 0, new Path("test/test3"), 0, 100, null); + + + final OneShotLatch latch = new OneShotLatch(); + + BlockingFileInputFormat format = new BlockingFileInputFormat(latch, new Path(hdfsURI)); + TypeInformation<FileInputSplit> typeInfo = TypeExtractor.getInputFormatTypes(format); + + ContinuousFileReaderOperator<FileInputSplit> initReader = new ContinuousFileReaderOperator<>(format); + initReader.setOutputType(typeInfo, new ExecutionConfig()); + + OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, FileInputSplit> initTestInstance = + new OneInputStreamOperatorTestHarness<>(initReader); + initTestInstance.setTimeCharacteristic(TimeCharacteristic.EventTime); + initTestInstance.open(); + + // create some state in the reader + initTestInstance.processElement(new StreamRecord<>(split1)); + initTestInstance.processElement(new StreamRecord<>(split2)); + initTestInstance.processElement(new StreamRecord<>(split3)); + initTestInstance.processElement(new StreamRecord<>(split4)); + + // take a snapshot of the operator's state. This will be used + // to initialize another reader and compare the results of the + // two operators. + + final OperatorStateHandles snapshot; + synchronized (initTestInstance.getCheckpointLock()) { + snapshot = initTestInstance.snapshot(0L, 0L); + } + + ContinuousFileReaderOperator<FileInputSplit> restoredReader = new ContinuousFileReaderOperator<>( + new BlockingFileInputFormat(latch, new Path(hdfsURI))); + restoredReader.setOutputType(typeInfo, new ExecutionConfig()); + + OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, FileInputSplit> restoredTestInstance = + new OneInputStreamOperatorTestHarness<>(restoredReader); + restoredTestInstance.setTimeCharacteristic(TimeCharacteristic.EventTime); + + restoredTestInstance.initializeState(snapshot); + restoredTestInstance.open(); + + // now let computation start + latch.trigger(); + + // ... and wait for the operators to close gracefully + + synchronized (initTestInstance.getCheckpointLock()) { + initTestInstance.close(); + } + + synchronized (restoredTestInstance.getCheckpointLock()) { + restoredTestInstance.close(); + } + + FileInputSplit fsSplit1 = createSplitFromTimestampedSplit(split1); + FileInputSplit fsSplit2 = createSplitFromTimestampedSplit(split2); + FileInputSplit fsSplit3 = createSplitFromTimestampedSplit(split3); + FileInputSplit fsSplit4 = createSplitFromTimestampedSplit(split4); + + // compare if the results contain what they should contain and also if + // they are the same, as they should. + + Assert.assertTrue(initTestInstance.getOutput().contains(new StreamRecord<>(fsSplit1))); + Assert.assertTrue(initTestInstance.getOutput().contains(new StreamRecord<>(fsSplit2))); + Assert.assertTrue(initTestInstance.getOutput().contains(new StreamRecord<>(fsSplit3))); + Assert.assertTrue(initTestInstance.getOutput().contains(new StreamRecord<>(fsSplit4))); + + Assert.assertArrayEquals( + initTestInstance.getOutput().toArray(), + restoredTestInstance.getOutput().toArray() + ); + } + + private FileInputSplit createSplitFromTimestampedSplit(TimestampedFileInputSplit split) { + Preconditions.checkNotNull(split); + + return new FileInputSplit( + split.getSplitNumber(), + split.getPath(), + split.getStart(), + split.getLength(), + split.getHostnames() + ); + } + + private static class BlockingFileInputFormat extends FileInputFormat<FileInputSplit> { + + private final OneShotLatch latch; + + private FileInputSplit split; + + private boolean reachedEnd; + + BlockingFileInputFormat(OneShotLatch latch, Path filePath) { + super(filePath); + this.latch = latch; + this.reachedEnd = false; + } + + @Override + public void open(FileInputSplit fileSplit) throws IOException { + this.split = fileSplit; + this.reachedEnd = false; + } + + @Override + public boolean reachedEnd() throws IOException { + if (!latch.isTriggered()) { + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + return reachedEnd; + } + + @Override + public FileInputSplit nextRecord(FileInputSplit reuse) throws IOException { + this.reachedEnd = true; + return split; + } + + @Override + public void close() { + + } + } + //// Monitoring Function Tests ////// @Test @@ -421,7 +567,7 @@ public class ContinuousFileProcessingTest { FileProcessingMode.PROCESS_ONCE, 1, INTERVAL); final FileVerifyingSourceContext context = - new FileVerifyingSourceContext(new OneShotLatch(), monitoringFunction, 0, -1); + new FileVerifyingSourceContext(new OneShotLatch(), monitoringFunction); monitoringFunction.open(new Configuration()); monitoringFunction.run(context); @@ -490,8 +636,7 @@ public class ContinuousFileProcessingTest { new ContinuousFileMonitoringFunction<>(format, hdfsURI, FileProcessingMode.PROCESS_ONCE, 1, INTERVAL); - final FileVerifyingSourceContext context = - new FileVerifyingSourceContext(latch, monitoringFunction, 1, -1); + final FileVerifyingSourceContext context = new FileVerifyingSourceContext(latch, monitoringFunction); final Thread t = new Thread() { @Override @@ -499,6 +644,13 @@ public class ContinuousFileProcessingTest { try { monitoringFunction.open(new Configuration()); monitoringFunction.run(context); + + // we would never arrive here if we were in + // PROCESS_CONTINUOUSLY mode. + + // this will trigger the latch + context.close(); + } catch (Exception e) { Assert.fail(e.getMessage()); } @@ -599,11 +751,16 @@ public class ContinuousFileProcessingTest { private final ContinuousFileMonitoringFunction src; private final OneShotLatch latch; private final Set<String> seenFiles; - private final int elementsBeforeNotifying; + private int elementsBeforeNotifying = -1; private int elementsBeforeCanceling = -1; FileVerifyingSourceContext(OneShotLatch latch, + ContinuousFileMonitoringFunction src) { + this(latch, src, -1, -1); + } + + FileVerifyingSourceContext(OneShotLatch latch, ContinuousFileMonitoringFunction src, int elementsBeforeNotifying, int elementsBeforeCanceling) { @@ -621,16 +778,27 @@ public class ContinuousFileProcessingTest { @Override public void collect(TimestampedFileInputSplit element) { String seenFileName = element.getPath().getName(); - this.seenFiles.add(seenFileName); - if (seenFiles.size() == elementsBeforeNotifying) { + + if (seenFiles.size() == elementsBeforeNotifying && !latch.isTriggered()) { latch.trigger(); } - if (elementsBeforeCanceling != -1 && seenFiles.size() == elementsBeforeCanceling) { + if (seenFiles.size() == elementsBeforeCanceling) { src.cancel(); } } + + @Override + public void close() { + // the context was terminated so trigger so + // that all threads that were waiting for this + // are un-blocked. + if (!latch.isTriggered()) { + latch.trigger(); + } + src.cancel(); + } } private static class ModTimeVerifyingSourceContext extends DummySourceContext { http://git-wip-us.apache.org/repos/asf/flink/blob/b2e8792b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java index db8e8fd..74c58f9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java @@ -20,13 +20,13 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.io.CheckpointableInputFormat; import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.FSDataInputStream; -import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.metrics.Counter; -import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -38,8 +38,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import java.io.Serializable; import java.util.ArrayList; import java.util.List; @@ -61,7 +59,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ @Internal public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OUT> - implements OneInputStreamOperator<TimestampedFileInputSplit, OUT>, OutputTypeConfigurable<OUT>, StreamCheckpointedOperator { + implements OneInputStreamOperator<TimestampedFileInputSplit, OUT>, OutputTypeConfigurable<OUT> { private static final long serialVersionUID = 1L; @@ -74,6 +72,8 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU private transient SplitReader<OUT> reader; private transient SourceFunction.SourceContext<OUT> readerContext; + + private ListState<TimestampedFileInputSplit> checkpointedState; private List<TimestampedFileInputSplit> restoredReaderState; public ContinuousFileReaderOperator(FileInputFormat<OUT> format) { @@ -86,6 +86,32 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU } @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + + checkState(this.checkpointedState == null && this.restoredReaderState == null, + "The reader state has already been initialized."); + + checkpointedState = context.getManagedOperatorStateStore().getSerializableListState("splits"); + + int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask(); + if (context.isRestored()) { + LOG.info("Restoring state for the ContinuousFileReaderOperator (taskIdx={}).", subtaskIdx); + + this.restoredReaderState = new ArrayList<>(); + for (TimestampedFileInputSplit split : this.checkpointedState.get()) { + this.restoredReaderState.add(split); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("ContinuousFileReaderOperator idx {} restored {}.", subtaskIdx, this.restoredReaderState); + } + } else { + LOG.info("No state to restore for the ContinuousFileReaderOperator (taskIdx={}).", subtaskIdx); + } + } + + @Override public void open() throws Exception { super.open(); @@ -182,7 +208,7 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU private class SplitReader<OT> extends Thread { - private volatile boolean isClosed; + private volatile boolean shouldClose; private volatile boolean isRunning; @@ -209,7 +235,7 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU this.readerContext = checkNotNull(readerContext, "Unspecified Reader Context."); this.checkpointLock = checkNotNull(checkpointLock, "Unspecified checkpoint lock."); - this.isClosed = false; + this.shouldClose = false; this.isRunning = true; this.pendingSplits = new PriorityQueue<>(); @@ -250,10 +276,10 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU // 2) if not wait 50 ms and try again to fetch a new split to read if (currentSplit == null) { - if (!this.isClosed) { - checkpointLock.wait(50); - } else { + if (this.shouldClose) { isRunning = false; + } else { + checkpointLock.wait(50); } continue; } @@ -275,7 +301,7 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU this.isSplitOpen = true; } - LOG.info("Reading split: " + currentSplit); + LOG.debug("Reading split: " + currentSplit); try { OT nextElement = serializer.createInstance(); @@ -343,37 +369,30 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU } public void close() { - this.isClosed = true; + this.shouldClose = true; } } // --------------------- Checkpointing -------------------------- @Override - public void snapshotState(FSDataOutputStream os, long checkpointId, long timestamp) throws Exception { - final ObjectOutputStream oos = new ObjectOutputStream(os); + public void snapshotState(StateSnapshotContext context) throws Exception { + super.snapshotState(context); + + checkState(this.checkpointedState != null, + "The operator state has not been properly initialized."); + + int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask(); + this.checkpointedState.clear(); List<TimestampedFileInputSplit> readerState = this.reader.getReaderState(); - oos.writeInt(readerState.size()); for (TimestampedFileInputSplit split : readerState) { - oos.writeObject(split); + // create a new partition for each entry. + this.checkpointedState.add(split); } - oos.flush(); - } - - @Override - public void restoreState(FSDataInputStream is) throws Exception { - - checkState(this.restoredReaderState == null, - "The reader state has already been initialized."); - - final ObjectInputStream ois = new ObjectInputStream(is); - int noOfSplits = ois.readInt(); - List<TimestampedFileInputSplit> pendingSplits = new ArrayList<>(noOfSplits); - for (int i = 0; i < noOfSplits; i++) { - pendingSplits.add((TimestampedFileInputSplit) ois.readObject()); + if (LOG.isDebugEnabled()) { + LOG.debug("ContinuousFileReaderOperator idx {} checkpointed {}.", subtaskIdx, readerState); } - this.restoredReaderState = pendingSplits; } } http://git-wip-us.apache.org/repos/asf/flink/blob/b2e8792b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java new file mode 100644 index 0000000..466ca65 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java @@ -0,0 +1,316 @@ +/* +c * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.io.CheckpointableInputFormat; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator; +import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.util.Preconditions; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Queue; + +public class ContinuousFileProcessingRescalingTest { + + @Test + public void testReaderScalingDown() throws Exception { + // simulates the scenario of scaling down from 2 to 1 instances + + final OneShotLatch waitingLatch = new OneShotLatch(); + + // create the first instance and let it process the first split till element 5 + final OneShotLatch triggerLatch1 = new OneShotLatch(); + BlockingFileInputFormat format1 = new BlockingFileInputFormat( + triggerLatch1, waitingLatch, new Path("test"), 20, 5); + FileInputSplit[] splits = format1.createInputSplits(2); + + OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> testHarness1 = getTestHarness(format1, 2, 0); + testHarness1.open(); + testHarness1.processElement(new StreamRecord<>(getTimestampedSplit(0, splits[0]))); + + // wait until its arrives to element 5 + if (!triggerLatch1.isTriggered()) { + triggerLatch1.await(); + } + + // create the second instance and let it process the second split till element 15 + final OneShotLatch triggerLatch2 = new OneShotLatch(); + BlockingFileInputFormat format2 = new BlockingFileInputFormat( + triggerLatch2, waitingLatch, new Path("test"), 20, 15); + + OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> testHarness2 = getTestHarness(format2, 2, 1); + testHarness2.open(); + testHarness2.processElement(new StreamRecord<>(getTimestampedSplit(0, splits[1]))); + + // wait until its arrives to element 15 + if (!triggerLatch2.isTriggered()) { + triggerLatch2.await(); + } + + // 1) clear the outputs of the two previous instances so that + // we can compare their newly produced outputs with the merged one + testHarness1.getOutput().clear(); + testHarness2.getOutput().clear(); + + + // 2) and take the snapshots from the previous instances and merge them + // into a new one which will be then used to initialize a third instance + OperatorStateHandles mergedState = AbstractStreamOperatorTestHarness. + repackageState( + testHarness2.snapshot(0, 0), + testHarness1.snapshot(0, 0) + ); + + // create the third instance + final OneShotLatch wLatch = new OneShotLatch(); + final OneShotLatch tLatch = new OneShotLatch(); + + BlockingFileInputFormat format = new BlockingFileInputFormat(wLatch, tLatch, new Path("test"), 20, 5); + OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> testHarness = getTestHarness(format, 1, 0); + + // initialize the state of the new operator with the constructed by + // combining the partial states of the instances above. + testHarness.initializeState(mergedState); + testHarness.open(); + + // now restart the waiting operators + wLatch.trigger(); + tLatch.trigger(); + waitingLatch.trigger(); + + // and wait for the processing to finish + synchronized (testHarness1.getCheckpointLock()) { + testHarness1.close(); + } + synchronized (testHarness2.getCheckpointLock()) { + testHarness2.close(); + } + synchronized (testHarness.getCheckpointLock()) { + testHarness.close(); + } + + Queue<Object> expectedResult = new ArrayDeque<>(); + putElementsInQ(expectedResult, testHarness1.getOutput()); + putElementsInQ(expectedResult, testHarness2.getOutput()); + + Queue<Object> actualResult = new ArrayDeque<>(); + putElementsInQ(actualResult, testHarness.getOutput()); + + Assert.assertEquals(20, actualResult.size()); + Assert.assertArrayEquals(expectedResult.toArray(), actualResult.toArray()); + } + + @Test + public void testReaderScalingUp() throws Exception { + // simulates the scenario of scaling up from 1 to 2 instances + + final OneShotLatch waitingLatch1 = new OneShotLatch(); + final OneShotLatch triggerLatch1 = new OneShotLatch(); + + BlockingFileInputFormat format1 = new BlockingFileInputFormat( + triggerLatch1, waitingLatch1, new Path("test"), 20, 5); + FileInputSplit[] splits = format1.createInputSplits(2); + + OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> testHarness1 = getTestHarness(format1, 1, 0); + testHarness1.open(); + + testHarness1.processElement(new StreamRecord<>(getTimestampedSplit(0, splits[0]))); + testHarness1.processElement(new StreamRecord<>(getTimestampedSplit(1, splits[1]))); + + // wait until its arrives to element 5 + if (!triggerLatch1.isTriggered()) { + triggerLatch1.await(); + } + + // this will be the state shared by the 2 new instances. + OperatorStateHandles snapshot = testHarness1.snapshot(0, 0); + + // 1) clear the output of instance so that we can compare it with one created by the new instances, and + // 2) let the operator process the rest of its state + testHarness1.getOutput().clear(); + waitingLatch1.trigger(); + + // create the second instance and let it process the second split till element 15 + final OneShotLatch triggerLatch2 = new OneShotLatch(); + final OneShotLatch waitingLatch2 = new OneShotLatch(); + + BlockingFileInputFormat format2 = new BlockingFileInputFormat( + triggerLatch2, waitingLatch2, new Path("test"), 20, 15); + + OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> testHarness2 = getTestHarness(format2, 2, 0); + testHarness2.setup(); + testHarness2.initializeState(snapshot); + testHarness2.open(); + + BlockingFileInputFormat format3 = new BlockingFileInputFormat( + triggerLatch2, waitingLatch2, new Path("test"), 20, 15); + + OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> testHarness3 = getTestHarness(format3, 2, 1); + testHarness3.setup(); + testHarness3.initializeState(snapshot); + testHarness3.open(); + + triggerLatch2.trigger(); + waitingLatch2.trigger(); + + // and wait for the processing to finish + synchronized (testHarness1.getCheckpointLock()) { + testHarness1.close(); + } + synchronized (testHarness2.getCheckpointLock()) { + testHarness2.close(); + } + synchronized (testHarness3.getCheckpointLock()) { + testHarness3.close(); + } + + Queue<Object> expectedResult = new ArrayDeque<>(); + putElementsInQ(expectedResult, testHarness1.getOutput()); + + Queue<Object> actualResult = new ArrayDeque<>(); + putElementsInQ(actualResult, testHarness2.getOutput()); + putElementsInQ(actualResult, testHarness3.getOutput()); + + Assert.assertEquals(35, actualResult.size()); + Assert.assertArrayEquals(expectedResult.toArray(), actualResult.toArray()); + } + + private void putElementsInQ(Queue<Object> res, Queue<Object> partial) { + for (Object o : partial) { + if (o instanceof Watermark) { + continue; + } + res.add(o); + } + } + + private OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> getTestHarness( + BlockingFileInputFormat format, int noOfTasks, int taksIdx) throws Exception { + + ContinuousFileReaderOperator<String> reader = new ContinuousFileReaderOperator<>(format); + reader.setOutputType(TypeExtractor.getInputFormatTypes(format), new ExecutionConfig()); + + OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> testHarness = + new OneInputStreamOperatorTestHarness<>(reader, 10, noOfTasks, taksIdx); + testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime); + return testHarness; + } + + private TimestampedFileInputSplit getTimestampedSplit(long modTime, FileInputSplit split) { + Preconditions.checkNotNull(split); + return new TimestampedFileInputSplit( + modTime, + split.getSplitNumber(), + split.getPath(), + split.getStart(), + split.getLength(), + split.getHostnames()); + } + + private static class BlockingFileInputFormat + extends FileInputFormat<String> + implements CheckpointableInputFormat<FileInputSplit, Integer> { + + private final OneShotLatch triggerLatch; + private final OneShotLatch waitingLatch; + + private final int elementsBeforeCheckpoint; + private final int linesPerSplit; + + private FileInputSplit split; + + private int state; + + BlockingFileInputFormat(OneShotLatch triggerLatch, + OneShotLatch waitingLatch, + Path filePath, + int sizeOfSplit, + int elementsBeforeCheckpoint) { + super(filePath); + + this.triggerLatch = triggerLatch; + this.waitingLatch = waitingLatch; + this.elementsBeforeCheckpoint = elementsBeforeCheckpoint; + this.linesPerSplit = sizeOfSplit; + + this.state = 0; + } + + @Override + public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException { + FileInputSplit[] splits = new FileInputSplit[minNumSplits]; + for (int i = 0; i < minNumSplits; i++) { + splits[i] = new FileInputSplit(i, getFilePath(), i * linesPerSplit + 1, linesPerSplit, null); + } + return splits; + } + + @Override + public void open(FileInputSplit fileSplit) throws IOException { + this.split = fileSplit; + this.state = 0; + } + + @Override + public void reopen(FileInputSplit split, Integer state) throws IOException { + this.split = split; + this.state = state; + } + + @Override + public Integer getCurrentState() throws IOException { + return state; + } + + @Override + public boolean reachedEnd() throws IOException { + if (state == elementsBeforeCheckpoint) { + triggerLatch.trigger(); + if (!waitingLatch.isTriggered()) { + try { + waitingLatch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + return state == linesPerSplit; + } + + @Override + public String nextRecord(String reuse) throws IOException { + return reachedEnd() ? null : split.getSplitNumber() + ": test line " + state++; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b2e8792b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java index 0e9b054..90d8861 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java @@ -24,7 +24,7 @@ import org.apache.flink.api.java.io.TextInputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.CheckpointListener; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; @@ -163,11 +163,11 @@ public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleran // -------------------------- Task Sink ------------------------------ private static class TestingSinkFunction extends RichSinkFunction<String> - implements Checkpointed<Tuple2<Long, Map<Integer, Set<String>>>>, CheckpointListener { + implements ListCheckpointed<Tuple2<Long, Map<Integer, Set<String>>>>, CheckpointListener { - private boolean hasFailed; + private boolean hasRestoredAfterFailure; - private volatile boolean hasSuccessfulCheckpoints; + private volatile int successfulCheckpoints; private long elementsToFailure; @@ -176,9 +176,9 @@ public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleran private Map<Integer, Set<String>> actualContent = new HashMap<>(); TestingSinkFunction() { - hasFailed = false; + hasRestoredAfterFailure = false; elementCounter = 0; - hasSuccessfulCheckpoints = false; + successfulCheckpoints = 0; } @Override @@ -216,13 +216,13 @@ public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleran throw new SuccessException(); } - // add some latency so that we have at least one checkpoint in - if (!hasFailed && !hasSuccessfulCheckpoints) { + // add some latency so that we have at least two checkpoint in + if (!hasRestoredAfterFailure && successfulCheckpoints < 2) { Thread.sleep(5); } // simulate a node failure - if (!hasFailed && hasSuccessfulCheckpoints && elementCounter >= elementsToFailure) { + if (!hasRestoredAfterFailure && successfulCheckpoints >= 2 && elementCounter >= elementsToFailure) { throw new Exception("Task Failure @ elem: " + elementCounter + " / " + elementsToFailure); } } @@ -237,20 +237,22 @@ public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleran } @Override - public Tuple2<Long, Map<Integer, Set<String>>> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - return new Tuple2<>(elementCounter, actualContent); + public List<Tuple2<Long, Map<Integer, Set<String>>>> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + Tuple2<Long, Map<Integer, Set<String>>> state = new Tuple2<>(elementCounter, actualContent); + return Collections.singletonList(state); } @Override - public void restoreState(Tuple2<Long, Map<Integer, Set<String>>> state) throws Exception { - this.hasFailed = true; - this.elementCounter = state.f0; - this.actualContent = state.f1; + public void restoreState(List<Tuple2<Long, Map<Integer, Set<String>>>> state) throws Exception { + Tuple2<Long, Map<Integer, Set<String>>> s = state.get(0); + this.elementCounter = s.f0; + this.actualContent = s.f1; + this.hasRestoredAfterFailure = this.elementCounter != 0; // because now restore is also called at initialization } @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { - hasSuccessfulCheckpoints = true; + this.successfulCheckpoints++; } private int getFileIdx(String line) {
