Repository: flink
Updated Branches:
  refs/heads/master 99188390f -> b2e8792b8


[FLINK-5021] Make the ContinuousFileReaderOperator rescalable.

This is the last commit that completes the refactoring of the
ContinuousFileReaderOperator so that it can be rescalable.
With this, the reader can restart from a savepoint with a
different parallelism without compromising the provided
exactly-once guarantees.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b2e8792b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b2e8792b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b2e8792b

Branch: refs/heads/master
Commit: b2e8792b8ef07f5e9f676fd83137b3ce4d72cdb0
Parents: 5a90c6b
Author: kl0u <[email protected]>
Authored: Thu Nov 3 11:21:08 2016 +0100
Committer: Aljoscha Krettek <[email protected]>
Committed: Fri Nov 11 14:05:58 2016 +0100

----------------------------------------------------------------------
 .../hdfstests/ContinuousFileProcessingTest.java | 182 ++++++++++-
 .../source/ContinuousFileReaderOperator.java    |  83 +++--
 .../ContinuousFileProcessingRescalingTest.java  | 316 +++++++++++++++++++
 ...ontinuousFileProcessingCheckpointITCase.java |  34 +-
 4 files changed, 560 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b2e8792b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
index 5b14251..6454c11 100644
--- 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
+++ 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.hdfstests;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.io.TextInputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -36,7 +37,9 @@ import 
org.apache.flink.streaming.api.functions.source.FileProcessingMode;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Preconditions;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -387,6 +390,149 @@ public class ContinuousFileProcessingTest {
                }
        }
 
+       @Test
+       public void testReaderSnapshotRestore() throws Exception {
+
+               TimestampedFileInputSplit split1 =
+                       new TimestampedFileInputSplit(0, 3, new 
Path("test/test1"), 0, 100, null);
+
+               TimestampedFileInputSplit split2 =
+                       new TimestampedFileInputSplit(10, 2, new 
Path("test/test2"), 101, 200, null);
+
+               TimestampedFileInputSplit split3 =
+                       new TimestampedFileInputSplit(10, 1, new 
Path("test/test2"), 0, 100, null);
+
+               TimestampedFileInputSplit split4 =
+                       new TimestampedFileInputSplit(11, 0, new 
Path("test/test3"), 0, 100, null);
+
+
+               final OneShotLatch latch = new OneShotLatch();
+
+               BlockingFileInputFormat format = new 
BlockingFileInputFormat(latch, new Path(hdfsURI));
+               TypeInformation<FileInputSplit> typeInfo = 
TypeExtractor.getInputFormatTypes(format);
+
+               ContinuousFileReaderOperator<FileInputSplit> initReader = new 
ContinuousFileReaderOperator<>(format);
+               initReader.setOutputType(typeInfo, new ExecutionConfig());
+
+               OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, 
FileInputSplit> initTestInstance =
+                       new OneInputStreamOperatorTestHarness<>(initReader);
+               
initTestInstance.setTimeCharacteristic(TimeCharacteristic.EventTime);
+               initTestInstance.open();
+
+               // create some state in the reader
+               initTestInstance.processElement(new StreamRecord<>(split1));
+               initTestInstance.processElement(new StreamRecord<>(split2));
+               initTestInstance.processElement(new StreamRecord<>(split3));
+               initTestInstance.processElement(new StreamRecord<>(split4));
+
+               // take a snapshot of the operator's state. This will be used
+               // to initialize another reader and compare the results of the
+               // two operators.
+
+               final OperatorStateHandles snapshot;
+               synchronized (initTestInstance.getCheckpointLock()) {
+                       snapshot = initTestInstance.snapshot(0L, 0L);
+               }
+
+               ContinuousFileReaderOperator<FileInputSplit> restoredReader = 
new ContinuousFileReaderOperator<>(
+                       new BlockingFileInputFormat(latch, new Path(hdfsURI)));
+               restoredReader.setOutputType(typeInfo, new ExecutionConfig());
+
+               OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, 
FileInputSplit> restoredTestInstance  =
+                       new OneInputStreamOperatorTestHarness<>(restoredReader);
+               
restoredTestInstance.setTimeCharacteristic(TimeCharacteristic.EventTime);
+
+               restoredTestInstance.initializeState(snapshot);
+               restoredTestInstance.open();
+
+               // now let computation start
+               latch.trigger();
+
+               // ... and wait for the operators to close gracefully
+
+               synchronized (initTestInstance.getCheckpointLock()) {
+                       initTestInstance.close();
+               }
+
+               synchronized (restoredTestInstance.getCheckpointLock()) {
+                       restoredTestInstance.close();
+               }
+
+               FileInputSplit fsSplit1 = 
createSplitFromTimestampedSplit(split1);
+               FileInputSplit fsSplit2 = 
createSplitFromTimestampedSplit(split2);
+               FileInputSplit fsSplit3 = 
createSplitFromTimestampedSplit(split3);
+               FileInputSplit fsSplit4 = 
createSplitFromTimestampedSplit(split4);
+
+               // compare if the results contain what they should contain and 
also if
+               // they are the same, as they should.
+
+               Assert.assertTrue(initTestInstance.getOutput().contains(new 
StreamRecord<>(fsSplit1)));
+               Assert.assertTrue(initTestInstance.getOutput().contains(new 
StreamRecord<>(fsSplit2)));
+               Assert.assertTrue(initTestInstance.getOutput().contains(new 
StreamRecord<>(fsSplit3)));
+               Assert.assertTrue(initTestInstance.getOutput().contains(new 
StreamRecord<>(fsSplit4)));
+
+               Assert.assertArrayEquals(
+                       initTestInstance.getOutput().toArray(),
+                       restoredTestInstance.getOutput().toArray()
+               );
+       }
+
+       private FileInputSplit 
createSplitFromTimestampedSplit(TimestampedFileInputSplit split) {
+               Preconditions.checkNotNull(split);
+
+               return new FileInputSplit(
+                       split.getSplitNumber(),
+                       split.getPath(),
+                       split.getStart(),
+                       split.getLength(),
+                       split.getHostnames()
+               );
+       }
+
+       private static class BlockingFileInputFormat extends 
FileInputFormat<FileInputSplit> {
+
+               private final OneShotLatch latch;
+
+               private FileInputSplit split;
+
+               private boolean reachedEnd;
+
+               BlockingFileInputFormat(OneShotLatch latch, Path filePath) {
+                       super(filePath);
+                       this.latch = latch;
+                       this.reachedEnd = false;
+               }
+
+               @Override
+               public void open(FileInputSplit fileSplit) throws IOException {
+                       this.split = fileSplit;
+                       this.reachedEnd = false;
+               }
+
+               @Override
+               public boolean reachedEnd() throws IOException {
+                       if (!latch.isTriggered()) {
+                               try {
+                                       latch.await();
+                               } catch (InterruptedException e) {
+                                       e.printStackTrace();
+                               }
+                       }
+                       return reachedEnd;
+               }
+
+               @Override
+               public FileInputSplit nextRecord(FileInputSplit reuse) throws 
IOException {
+                       this.reachedEnd = true;
+                       return split;
+               }
+
+               @Override
+               public void close() {
+
+               }
+       }
+
        ////                            Monitoring Function Tests               
                //////
 
        @Test
@@ -421,7 +567,7 @@ public class ContinuousFileProcessingTest {
                                FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
 
                final FileVerifyingSourceContext context =
-                       new FileVerifyingSourceContext(new OneShotLatch(), 
monitoringFunction, 0, -1);
+                       new FileVerifyingSourceContext(new OneShotLatch(), 
monitoringFunction);
 
                monitoringFunction.open(new Configuration());
                monitoringFunction.run(context);
@@ -490,8 +636,7 @@ public class ContinuousFileProcessingTest {
                        new ContinuousFileMonitoringFunction<>(format, hdfsURI,
                                FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
 
-               final FileVerifyingSourceContext context =
-                       new FileVerifyingSourceContext(latch, 
monitoringFunction, 1, -1);
+               final FileVerifyingSourceContext context = new 
FileVerifyingSourceContext(latch, monitoringFunction);
 
                final Thread t = new Thread() {
                        @Override
@@ -499,6 +644,13 @@ public class ContinuousFileProcessingTest {
                                try {
                                        monitoringFunction.open(new 
Configuration());
                                        monitoringFunction.run(context);
+
+                                       // we would never arrive here if we 
were in
+                                       // PROCESS_CONTINUOUSLY mode.
+
+                                       // this will trigger the latch
+                                       context.close();
+
                                } catch (Exception e) {
                                        Assert.fail(e.getMessage());
                                }
@@ -599,11 +751,16 @@ public class ContinuousFileProcessingTest {
                private final ContinuousFileMonitoringFunction src;
                private final OneShotLatch latch;
                private final Set<String> seenFiles;
-               private final int elementsBeforeNotifying;
 
+               private int elementsBeforeNotifying = -1;
                private int elementsBeforeCanceling = -1;
 
                FileVerifyingSourceContext(OneShotLatch latch,
+                                                                  
ContinuousFileMonitoringFunction src) {
+                       this(latch, src, -1, -1);
+               }
+
+               FileVerifyingSourceContext(OneShotLatch latch,
                                                                
ContinuousFileMonitoringFunction src,
                                                                int 
elementsBeforeNotifying,
                                                                int 
elementsBeforeCanceling) {
@@ -621,16 +778,27 @@ public class ContinuousFileProcessingTest {
                @Override
                public void collect(TimestampedFileInputSplit element) {
                        String seenFileName = element.getPath().getName();
-
                        this.seenFiles.add(seenFileName);
-                       if (seenFiles.size() == elementsBeforeNotifying) {
+
+                       if (seenFiles.size() == elementsBeforeNotifying && 
!latch.isTriggered()) {
                                latch.trigger();
                        }
 
-                       if (elementsBeforeCanceling != -1 && seenFiles.size() 
== elementsBeforeCanceling) {
+                       if (seenFiles.size() == elementsBeforeCanceling) {
                                src.cancel();
                        }
                }
+
+               @Override
+               public void close() {
+                       // the context was terminated so trigger so
+                       // that all threads that were waiting for this
+                       // are un-blocked.
+                       if (!latch.isTriggered()) {
+                               latch.trigger();
+                       }
+                       src.cancel();
+               }
        }
 
        private static class ModTimeVerifyingSourceContext extends 
DummySourceContext {

http://git-wip-us.apache.org/repos/asf/flink/blob/b2e8792b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index db8e8fd..74c58f9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -20,13 +20,13 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.io.CheckpointableInputFormat;
 import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.metrics.Counter;
-import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -38,8 +38,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
@@ -61,7 +59,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 @Internal
 public class ContinuousFileReaderOperator<OUT> extends 
AbstractStreamOperator<OUT>
-       implements OneInputStreamOperator<TimestampedFileInputSplit, OUT>, 
OutputTypeConfigurable<OUT>, StreamCheckpointedOperator {
+       implements OneInputStreamOperator<TimestampedFileInputSplit, OUT>, 
OutputTypeConfigurable<OUT> {
 
        private static final long serialVersionUID = 1L;
 
@@ -74,6 +72,8 @@ public class ContinuousFileReaderOperator<OUT> extends 
AbstractStreamOperator<OU
 
        private transient SplitReader<OUT> reader;
        private transient SourceFunction.SourceContext<OUT> readerContext;
+
+       private ListState<TimestampedFileInputSplit> checkpointedState;
        private List<TimestampedFileInputSplit> restoredReaderState;
 
        public ContinuousFileReaderOperator(FileInputFormat<OUT> format) {
@@ -86,6 +86,32 @@ public class ContinuousFileReaderOperator<OUT> extends 
AbstractStreamOperator<OU
        }
 
        @Override
+       public void initializeState(StateInitializationContext context) throws 
Exception {
+               super.initializeState(context);
+
+               checkState(this.checkpointedState == null && 
this.restoredReaderState == null,
+                       "The reader state has already been initialized.");
+
+               checkpointedState = 
context.getManagedOperatorStateStore().getSerializableListState("splits");
+
+               int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
+               if (context.isRestored()) {
+                       LOG.info("Restoring state for the 
ContinuousFileReaderOperator (taskIdx={}).", subtaskIdx);
+
+                       this.restoredReaderState = new ArrayList<>();
+                       for (TimestampedFileInputSplit split : 
this.checkpointedState.get()) {
+                               this.restoredReaderState.add(split);
+                       }
+
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("ContinuousFileReaderOperator idx {} 
restored {}.", subtaskIdx, this.restoredReaderState);
+                       }
+               } else {
+                       LOG.info("No state to restore for the 
ContinuousFileReaderOperator (taskIdx={}).", subtaskIdx);
+               }
+       }
+
+       @Override
        public void open() throws Exception {
                super.open();
 
@@ -182,7 +208,7 @@ public class ContinuousFileReaderOperator<OUT> extends 
AbstractStreamOperator<OU
 
        private class SplitReader<OT> extends Thread {
 
-               private volatile boolean isClosed;
+               private volatile boolean shouldClose;
 
                private volatile boolean isRunning;
 
@@ -209,7 +235,7 @@ public class ContinuousFileReaderOperator<OUT> extends 
AbstractStreamOperator<OU
                        this.readerContext = checkNotNull(readerContext, 
"Unspecified Reader Context.");
                        this.checkpointLock = checkNotNull(checkpointLock, 
"Unspecified checkpoint lock.");
 
-                       this.isClosed = false;
+                       this.shouldClose = false;
                        this.isRunning = true;
 
                        this.pendingSplits = new PriorityQueue<>();
@@ -250,10 +276,10 @@ public class ContinuousFileReaderOperator<OUT> extends 
AbstractStreamOperator<OU
                                                        //   2) if not wait 50 
ms and try again to fetch a new split to read
 
                                                        if (currentSplit == 
null) {
-                                                               if 
(!this.isClosed) {
-                                                                       
checkpointLock.wait(50);
-                                                               } else {
+                                                               if 
(this.shouldClose) {
                                                                        
isRunning = false;
+                                                               } else {
+                                                                       
checkpointLock.wait(50);
                                                                }
                                                                continue;
                                                        }
@@ -275,7 +301,7 @@ public class ContinuousFileReaderOperator<OUT> extends 
AbstractStreamOperator<OU
                                                this.isSplitOpen = true;
                                        }
 
-                                       LOG.info("Reading split: " + 
currentSplit);
+                                       LOG.debug("Reading split: " + 
currentSplit);
 
                                        try {
                                                OT nextElement = 
serializer.createInstance();
@@ -343,37 +369,30 @@ public class ContinuousFileReaderOperator<OUT> extends 
AbstractStreamOperator<OU
                }
 
                public void close() {
-                       this.isClosed = true;
+                       this.shouldClose = true;
                }
        }
 
        //      ---------------------                   Checkpointing           
        --------------------------
 
        @Override
-       public void snapshotState(FSDataOutputStream os, long checkpointId, 
long timestamp) throws Exception {
-               final ObjectOutputStream oos = new ObjectOutputStream(os);
+       public void snapshotState(StateSnapshotContext context) throws 
Exception {
+               super.snapshotState(context);
+
+               checkState(this.checkpointedState != null,
+                       "The operator state has not been properly 
initialized.");
+
+               int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
 
+               this.checkpointedState.clear();
                List<TimestampedFileInputSplit> readerState = 
this.reader.getReaderState();
-               oos.writeInt(readerState.size());
                for (TimestampedFileInputSplit split : readerState) {
-                       oos.writeObject(split);
+                       // create a new partition for each entry.
+                       this.checkpointedState.add(split);
                }
-               oos.flush();
-       }
-
-       @Override
-       public void restoreState(FSDataInputStream is) throws Exception {
-
-               checkState(this.restoredReaderState == null,
-                       "The reader state has already been initialized.");
-
-               final ObjectInputStream ois = new ObjectInputStream(is);
 
-               int noOfSplits = ois.readInt();
-               List<TimestampedFileInputSplit> pendingSplits = new 
ArrayList<>(noOfSplits);
-               for (int i = 0; i < noOfSplits; i++) {
-                       pendingSplits.add((TimestampedFileInputSplit) 
ois.readObject());
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("ContinuousFileReaderOperator idx {} 
checkpointed {}.", subtaskIdx, readerState);
                }
-               this.restoredReaderState = pendingSplits;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b2e8792b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java
new file mode 100644
index 0000000..466ca65
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java
@@ -0,0 +1,316 @@
+/*
+c * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
+import 
org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Preconditions;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+public class ContinuousFileProcessingRescalingTest {
+
+       @Test
+       public void testReaderScalingDown() throws Exception {
+               // simulates the scenario of scaling down from 2 to 1 instances
+
+               final OneShotLatch waitingLatch = new OneShotLatch();
+
+               // create the first instance and let it process the first split 
till element 5
+               final OneShotLatch triggerLatch1 = new OneShotLatch();
+               BlockingFileInputFormat format1 = new BlockingFileInputFormat(
+                       triggerLatch1, waitingLatch, new Path("test"), 20, 5);
+               FileInputSplit[] splits = format1.createInputSplits(2);
+
+               OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, 
String> testHarness1 = getTestHarness(format1, 2, 0);
+               testHarness1.open();
+               testHarness1.processElement(new 
StreamRecord<>(getTimestampedSplit(0, splits[0])));
+
+               // wait until its arrives to element 5
+               if (!triggerLatch1.isTriggered()) {
+                       triggerLatch1.await();
+               }
+
+               // create the second instance and let it process the second 
split till element 15
+               final OneShotLatch triggerLatch2 = new OneShotLatch();
+               BlockingFileInputFormat format2 = new BlockingFileInputFormat(
+                       triggerLatch2, waitingLatch, new Path("test"), 20, 15);
+
+               OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, 
String> testHarness2 = getTestHarness(format2, 2, 1);
+               testHarness2.open();
+               testHarness2.processElement(new 
StreamRecord<>(getTimestampedSplit(0, splits[1])));
+
+               // wait until its arrives to element 15
+               if (!triggerLatch2.isTriggered()) {
+                       triggerLatch2.await();
+               }
+
+               // 1) clear the outputs of the two previous instances so that
+               // we can compare their newly produced outputs with the merged 
one
+               testHarness1.getOutput().clear();
+               testHarness2.getOutput().clear();
+
+
+               // 2) and take the snapshots from the previous instances and 
merge them
+               // into a new one which will be then used to initialize a third 
instance
+               OperatorStateHandles mergedState = 
AbstractStreamOperatorTestHarness.
+                       repackageState(
+                               testHarness2.snapshot(0, 0),
+                               testHarness1.snapshot(0, 0)
+                       );
+
+               // create the third instance
+               final OneShotLatch wLatch = new OneShotLatch();
+               final OneShotLatch tLatch = new OneShotLatch();
+
+               BlockingFileInputFormat format = new 
BlockingFileInputFormat(wLatch, tLatch, new Path("test"), 20, 5);
+               OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, 
String> testHarness = getTestHarness(format, 1, 0);
+
+               // initialize the state of the new operator with the 
constructed by
+               // combining the partial states of the instances above.
+               testHarness.initializeState(mergedState);
+               testHarness.open();
+
+               // now restart the waiting operators
+               wLatch.trigger();
+               tLatch.trigger();
+               waitingLatch.trigger();
+
+               // and wait for the processing to finish
+               synchronized (testHarness1.getCheckpointLock()) {
+                       testHarness1.close();
+               }
+               synchronized (testHarness2.getCheckpointLock()) {
+                       testHarness2.close();
+               }
+               synchronized (testHarness.getCheckpointLock()) {
+                       testHarness.close();
+               }
+
+               Queue<Object> expectedResult = new ArrayDeque<>();
+               putElementsInQ(expectedResult, testHarness1.getOutput());
+               putElementsInQ(expectedResult, testHarness2.getOutput());
+
+               Queue<Object> actualResult = new ArrayDeque<>();
+               putElementsInQ(actualResult, testHarness.getOutput());
+
+               Assert.assertEquals(20, actualResult.size());
+               Assert.assertArrayEquals(expectedResult.toArray(), 
actualResult.toArray());
+       }
+
+       @Test
+       public void testReaderScalingUp() throws Exception {
+               // simulates the scenario of scaling up from 1 to 2 instances
+
+               final OneShotLatch waitingLatch1 = new OneShotLatch();
+               final OneShotLatch triggerLatch1 = new OneShotLatch();
+
+               BlockingFileInputFormat format1 = new BlockingFileInputFormat(
+                       triggerLatch1, waitingLatch1, new Path("test"), 20, 5);
+               FileInputSplit[] splits = format1.createInputSplits(2);
+
+               OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, 
String> testHarness1 = getTestHarness(format1, 1, 0);
+               testHarness1.open();
+
+               testHarness1.processElement(new 
StreamRecord<>(getTimestampedSplit(0, splits[0])));
+               testHarness1.processElement(new 
StreamRecord<>(getTimestampedSplit(1, splits[1])));
+
+               // wait until its arrives to element 5
+               if (!triggerLatch1.isTriggered()) {
+                       triggerLatch1.await();
+               }
+
+               // this will be the state shared by the 2 new instances.
+               OperatorStateHandles snapshot = testHarness1.snapshot(0, 0);
+
+               // 1) clear the output of instance so that we can compare it 
with one created by the new instances, and
+               // 2) let the operator process the rest of its state
+               testHarness1.getOutput().clear();
+               waitingLatch1.trigger();
+
+               // create the second instance and let it process the second 
split till element 15
+               final OneShotLatch triggerLatch2 = new OneShotLatch();
+               final OneShotLatch waitingLatch2 = new OneShotLatch();
+
+               BlockingFileInputFormat format2 = new BlockingFileInputFormat(
+                       triggerLatch2, waitingLatch2, new Path("test"), 20, 15);
+
+               OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, 
String> testHarness2 = getTestHarness(format2, 2, 0);
+               testHarness2.setup();
+               testHarness2.initializeState(snapshot);
+               testHarness2.open();
+
+               BlockingFileInputFormat format3 = new BlockingFileInputFormat(
+                       triggerLatch2, waitingLatch2, new Path("test"), 20, 15);
+
+               OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, 
String> testHarness3 = getTestHarness(format3, 2, 1);
+               testHarness3.setup();
+               testHarness3.initializeState(snapshot);
+               testHarness3.open();
+
+               triggerLatch2.trigger();
+               waitingLatch2.trigger();
+
+               // and wait for the processing to finish
+               synchronized (testHarness1.getCheckpointLock()) {
+                       testHarness1.close();
+               }
+               synchronized (testHarness2.getCheckpointLock()) {
+                       testHarness2.close();
+               }
+               synchronized (testHarness3.getCheckpointLock()) {
+                       testHarness3.close();
+               }
+
+               Queue<Object> expectedResult = new ArrayDeque<>();
+               putElementsInQ(expectedResult, testHarness1.getOutput());
+
+               Queue<Object> actualResult = new ArrayDeque<>();
+               putElementsInQ(actualResult, testHarness2.getOutput());
+               putElementsInQ(actualResult, testHarness3.getOutput());
+
+               Assert.assertEquals(35, actualResult.size());
+               Assert.assertArrayEquals(expectedResult.toArray(), 
actualResult.toArray());
+       }
+
+       private void putElementsInQ(Queue<Object> res, Queue<Object> partial) {
+               for (Object o : partial) {
+                       if (o instanceof Watermark) {
+                               continue;
+                       }
+                       res.add(o);
+               }
+       }
+
+       private OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, 
String> getTestHarness(
+               BlockingFileInputFormat format, int noOfTasks, int taksIdx) 
throws Exception {
+
+               ContinuousFileReaderOperator<String> reader = new 
ContinuousFileReaderOperator<>(format);
+               reader.setOutputType(TypeExtractor.getInputFormatTypes(format), 
new ExecutionConfig());
+
+               OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, 
String> testHarness =
+                       new OneInputStreamOperatorTestHarness<>(reader, 10, 
noOfTasks, taksIdx);
+               testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
+               return testHarness;
+       }
+
+       private TimestampedFileInputSplit getTimestampedSplit(long modTime, 
FileInputSplit split) {
+               Preconditions.checkNotNull(split);
+               return new TimestampedFileInputSplit(
+                       modTime,
+                       split.getSplitNumber(),
+                       split.getPath(),
+                       split.getStart(),
+                       split.getLength(),
+                       split.getHostnames());
+       }
+
+       private static class BlockingFileInputFormat
+               extends FileInputFormat<String>
+               implements CheckpointableInputFormat<FileInputSplit, Integer> {
+
+               private final OneShotLatch triggerLatch;
+               private final OneShotLatch waitingLatch;
+
+               private final int elementsBeforeCheckpoint;
+               private final int linesPerSplit;
+
+               private FileInputSplit split;
+
+               private int state;
+
+               BlockingFileInputFormat(OneShotLatch triggerLatch,
+                                                               OneShotLatch 
waitingLatch,
+                                                               Path filePath,
+                                                               int sizeOfSplit,
+                                                               int 
elementsBeforeCheckpoint) {
+                       super(filePath);
+
+                       this.triggerLatch = triggerLatch;
+                       this.waitingLatch = waitingLatch;
+                       this.elementsBeforeCheckpoint = 
elementsBeforeCheckpoint;
+                       this.linesPerSplit = sizeOfSplit;
+
+                       this.state = 0;
+               }
+
+               @Override
+               public FileInputSplit[] createInputSplits(int minNumSplits) 
throws IOException {
+                       FileInputSplit[] splits = new 
FileInputSplit[minNumSplits];
+                       for (int i = 0; i < minNumSplits; i++) {
+                               splits[i] = new FileInputSplit(i, 
getFilePath(), i * linesPerSplit + 1, linesPerSplit, null);
+                       }
+                       return splits;
+               }
+
+               @Override
+               public void open(FileInputSplit fileSplit) throws IOException {
+                       this.split = fileSplit;
+                       this.state = 0;
+               }
+
+               @Override
+               public void reopen(FileInputSplit split, Integer state) throws 
IOException {
+                       this.split = split;
+                       this.state = state;
+               }
+
+               @Override
+               public Integer getCurrentState() throws IOException {
+                       return state;
+               }
+
+               @Override
+               public boolean reachedEnd() throws IOException {
+                       if (state == elementsBeforeCheckpoint) {
+                               triggerLatch.trigger();
+                               if (!waitingLatch.isTriggered()) {
+                                       try {
+                                               waitingLatch.await();
+                                       } catch (InterruptedException e) {
+                                               e.printStackTrace();
+                                       }
+                               }
+                       }
+                       return state == linesPerSplit;
+               }
+
+               @Override
+               public String nextRecord(String reuse) throws IOException {
+                       return reachedEnd() ? null : split.getSplitNumber() + 
": test line " + state++;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b2e8792b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
index 0e9b054..90d8861 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.java.io.TextInputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -163,11 +163,11 @@ public class ContinuousFileProcessingCheckpointITCase 
extends StreamFaultToleran
        // --------------------------                   Task Sink               
        ------------------------------
 
        private static class TestingSinkFunction extends 
RichSinkFunction<String>
-               implements Checkpointed<Tuple2<Long, Map<Integer, 
Set<String>>>>, CheckpointListener {
+               implements ListCheckpointed<Tuple2<Long, Map<Integer, 
Set<String>>>>, CheckpointListener {
 
-               private boolean hasFailed;
+               private boolean hasRestoredAfterFailure;
 
-               private volatile boolean hasSuccessfulCheckpoints;
+               private volatile int successfulCheckpoints;
 
                private long elementsToFailure;
 
@@ -176,9 +176,9 @@ public class ContinuousFileProcessingCheckpointITCase 
extends StreamFaultToleran
                private Map<Integer, Set<String>> actualContent = new 
HashMap<>();
 
                TestingSinkFunction() {
-                       hasFailed = false;
+                       hasRestoredAfterFailure = false;
                        elementCounter = 0;
-                       hasSuccessfulCheckpoints = false;
+                       successfulCheckpoints = 0;
                }
 
                @Override
@@ -216,13 +216,13 @@ public class ContinuousFileProcessingCheckpointITCase 
extends StreamFaultToleran
                                throw new SuccessException();
                        }
 
-                       // add some latency so that we have at least one 
checkpoint in
-                       if (!hasFailed && !hasSuccessfulCheckpoints) {
+                       // add some latency so that we have at least two 
checkpoint in
+                       if (!hasRestoredAfterFailure && successfulCheckpoints < 
2) {
                                Thread.sleep(5);
                        }
 
                        // simulate a node failure
-                       if (!hasFailed && hasSuccessfulCheckpoints && 
elementCounter >= elementsToFailure) {
+                       if (!hasRestoredAfterFailure && successfulCheckpoints 
>= 2 && elementCounter >= elementsToFailure) {
                                throw new Exception("Task Failure @ elem: " + 
elementCounter + " / " + elementsToFailure);
                        }
                }
@@ -237,20 +237,22 @@ public class ContinuousFileProcessingCheckpointITCase 
extends StreamFaultToleran
                }
 
                @Override
-               public Tuple2<Long, Map<Integer, Set<String>>> 
snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-                       return new Tuple2<>(elementCounter, actualContent);
+               public List<Tuple2<Long, Map<Integer, Set<String>>>> 
snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+                       Tuple2<Long, Map<Integer, Set<String>>> state = new 
Tuple2<>(elementCounter, actualContent);
+                       return Collections.singletonList(state);
                }
 
                @Override
-               public void restoreState(Tuple2<Long, Map<Integer, 
Set<String>>> state) throws Exception {
-                       this.hasFailed = true;
-                       this.elementCounter = state.f0;
-                       this.actualContent = state.f1;
+               public void restoreState(List<Tuple2<Long, Map<Integer, 
Set<String>>>> state) throws Exception {
+                       Tuple2<Long, Map<Integer, Set<String>>> s = 
state.get(0);
+                       this.elementCounter = s.f0;
+                       this.actualContent = s.f1;
+                       this.hasRestoredAfterFailure = this.elementCounter != 
0; // because now restore is also called at initialization
                }
 
                @Override
                public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
-                       hasSuccessfulCheckpoints = true;
+                       this.successfulCheckpoints++;
                }
 
                private int getFileIdx(String line) {

Reply via email to