[FLINK-6830] [fileSource] Port continuous file reader migration tests for Flink 
1.3

This commit also consolidates all Flink 1.1 and 1.2 migration tests into
a single ContinuousFileProcessingMigrationTest class. Parameterization
is used to test restore from different previous Flink versions.

This closes #4059.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d4a646a0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d4a646a0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d4a646a0

Branch: refs/heads/release-1.3
Commit: d4a646a035366918a100f64428c471464870b8d0
Parents: e5a435b
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Authored: Sun Jun 4 01:42:04 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Committed: Wed Jun 7 19:11:43 2017 +0200

----------------------------------------------------------------------
 ...inuousFileProcessingFrom11MigrationTest.java | 402 ------------------
 ...inuousFileProcessingFrom12MigrationTest.java | 366 ----------------
 .../ContinuousFileProcessingMigrationTest.java  | 423 +++++++++++++++++++
 ...gration-test-1496532000000-flink1.3-snapshot | Bin 0 -> 537 bytes
 .../reader-migration-test-flink1.3-snapshot     | Bin 0 -> 2823 bytes
 5 files changed, 423 insertions(+), 768 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d4a646a0/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingFrom11MigrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingFrom11MigrationTest.java
 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingFrom11MigrationTest.java
deleted file mode 100644
index ec5e1ad..0000000
--- 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingFrom11MigrationTest.java
+++ /dev/null
@@ -1,402 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hdfstests;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.io.FileInputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.io.TextInputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
-import 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
-import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import 
org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
-import org.apache.flink.streaming.api.operators.StreamSource;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.util.Preconditions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-
-public class ContinuousFileProcessingFrom11MigrationTest {
-
-       private static final int NO_OF_FILES = 5;
-       private static final int LINES_PER_FILE = 10;
-
-       private static final long INTERVAL = 100;
-
-       private static File baseDir;
-
-       private static FileSystem hdfs;
-       private static String hdfsURI;
-       private static MiniDFSCluster hdfsCluster;
-
-       @ClassRule
-       public static TemporaryFolder tempFolder = new TemporaryFolder();
-
-       //                                              PREPARING FOR THE TESTS
-
-       @BeforeClass
-       public static void createHDFS() {
-               try {
-                       baseDir = tempFolder.newFolder().getAbsoluteFile();
-                       FileUtil.fullyDelete(baseDir);
-
-                       Configuration hdConf = new Configuration();
-                       hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
baseDir.getAbsolutePath());
-                       hdConf.set("dfs.block.size", String.valueOf(1048576)); 
// this is the minimum we can set.
-
-                       MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(hdConf);
-                       hdfsCluster = builder.build();
-
-                       hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + 
":" + hdfsCluster.getNameNodePort() +"/";
-                       hdfs = new 
org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf);
-
-               } catch(Throwable e) {
-                       e.printStackTrace();
-                       Assert.fail("Test failed " + e.getMessage());
-               }
-       }
-
-       @AfterClass
-       public static void destroyHDFS() {
-               try {
-                       FileUtil.fullyDelete(baseDir);
-                       hdfsCluster.shutdown();
-               } catch (Throwable t) {
-                       throw new RuntimeException(t);
-               }
-       }
-
-       private static String getResourceFilename(String filename) {
-               ClassLoader cl = 
ContinuousFileProcessingFrom11MigrationTest.class.getClassLoader();
-               URL resource = cl.getResource(filename);
-               return resource.getFile();
-       }
-
-       //                                              END OF PREPARATIONS
-
-       //                                              TESTS
-
-       @Test
-       public void testReaderSnapshotRestore() throws Exception {
-
-               /*
-
-               FileInputSplit split1 =
-                       new FileInputSplit(3, new Path("test/test1"), 0, 100, 
null);
-               FileInputSplit split2 =
-                       new FileInputSplit(2, new Path("test/test2"), 101, 200, 
null);
-               FileInputSplit split3 =
-                       new FileInputSplit(1, new Path("test/test2"), 0, 100, 
null);
-               FileInputSplit split4 =
-                       new FileInputSplit(0, new Path("test/test3"), 0, 100, 
null);
-
-               final OneShotLatch latch = new OneShotLatch();
-               BlockingFileInputFormat format = new 
BlockingFileInputFormat(latch, new Path(hdfsURI));
-               TypeInformation<FileInputSplit> typeInfo = 
TypeExtractor.getInputFormatTypes(format);
-               ContinuousFileReaderOperator<FileInputSplit, ?> initReader = 
new ContinuousFileReaderOperator<>(format);
-               initReader.setOutputType(typeInfo, new ExecutionConfig());
-               OneInputStreamOperatorTestHarness<FileInputSplit, 
FileInputSplit> initTestInstance =
-                       new OneInputStreamOperatorTestHarness<>(initReader);
-               
initTestInstance.setTimeCharacteristic(TimeCharacteristic.EventTime);
-               initTestInstance.open();
-               // create some state in the reader
-               initTestInstance.processElement(new StreamRecord<>(split1));
-               initTestInstance.processElement(new StreamRecord<>(split2));
-               initTestInstance.processElement(new StreamRecord<>(split3));
-               initTestInstance.processElement(new StreamRecord<>(split4));
-               // take a snapshot of the operator's state. This will be used
-               // to initialize another reader and compare the results of the
-               // two operators.
-               final StreamTaskState snapshot;
-               synchronized (initTestInstance.getCheckpointLock()) {
-                       snapshot = initTestInstance.snapshot(0L, 0L);
-               }
-
-               initTestInstance.snaphotToFile(snapshot, 
"src/test/resources/reader-migration-test-flink1.1-snapshot");
-
-               */
-               TimestampedFileInputSplit split1 =
-                       new TimestampedFileInputSplit(0, 3, new 
Path("test/test1"), 0, 100, null);
-
-               TimestampedFileInputSplit split2 =
-                       new TimestampedFileInputSplit(10, 2, new 
Path("test/test2"), 101, 200, null);
-
-               TimestampedFileInputSplit split3 =
-                       new TimestampedFileInputSplit(10, 1, new 
Path("test/test2"), 0, 100, null);
-
-               TimestampedFileInputSplit split4 =
-                       new TimestampedFileInputSplit(11, 0, new 
Path("test/test3"), 0, 100, null);
-
-
-               final OneShotLatch latch = new OneShotLatch();
-
-               BlockingFileInputFormat format = new 
BlockingFileInputFormat(latch, new Path(hdfsURI));
-               TypeInformation<FileInputSplit> typeInfo = 
TypeExtractor.getInputFormatTypes(format);
-
-               ContinuousFileReaderOperator<FileInputSplit> initReader = new 
ContinuousFileReaderOperator<>(format);
-               initReader.setOutputType(typeInfo, new ExecutionConfig());
-
-               OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, 
FileInputSplit> initTestInstance =
-                       new OneInputStreamOperatorTestHarness<>(initReader);
-               
initTestInstance.setTimeCharacteristic(TimeCharacteristic.EventTime);
-
-               initTestInstance.setup();
-               
initTestInstance.initializeStateFromLegacyCheckpoint(getResourceFilename("reader-migration-test-flink1.1-snapshot"));
-               initTestInstance.open();
-
-               latch.trigger();
-
-               // ... and wait for the operators to close gracefully
-
-               synchronized (initTestInstance.getCheckpointLock()) {
-                       initTestInstance.close();
-               }
-
-               FileInputSplit fsSplit1 = 
createSplitFromTimestampedSplit(split1);
-               FileInputSplit fsSplit2 = 
createSplitFromTimestampedSplit(split2);
-               FileInputSplit fsSplit3 = 
createSplitFromTimestampedSplit(split3);
-               FileInputSplit fsSplit4 = 
createSplitFromTimestampedSplit(split4);
-
-               // compare if the results contain what they should contain and 
also if
-               // they are the same, as they should.
-
-               Assert.assertTrue(initTestInstance.getOutput().contains(new 
StreamRecord<>(fsSplit1)));
-               Assert.assertTrue(initTestInstance.getOutput().contains(new 
StreamRecord<>(fsSplit2)));
-               Assert.assertTrue(initTestInstance.getOutput().contains(new 
StreamRecord<>(fsSplit3)));
-               Assert.assertTrue(initTestInstance.getOutput().contains(new 
StreamRecord<>(fsSplit4)));
-       }
-
-       private FileInputSplit 
createSplitFromTimestampedSplit(TimestampedFileInputSplit split) {
-               Preconditions.checkNotNull(split);
-
-               return new FileInputSplit(
-                       split.getSplitNumber(),
-                       split.getPath(),
-                       split.getStart(),
-                       split.getLength(),
-                       split.getHostnames()
-               );
-       }
-
-       private static class BlockingFileInputFormat extends 
FileInputFormat<FileInputSplit> {
-
-               private static final long serialVersionUID = 
-6727603565381560267L;
-
-               private final OneShotLatch latch;
-
-               private FileInputSplit split;
-
-               private boolean reachedEnd;
-
-               BlockingFileInputFormat(OneShotLatch latch, Path filePath) {
-                       super(filePath);
-                       this.latch = latch;
-                       this.reachedEnd = false;
-               }
-
-               @Override
-               public void open(FileInputSplit fileSplit) throws IOException {
-                       this.split = fileSplit;
-                       this.reachedEnd = false;
-               }
-
-               @Override
-               public boolean reachedEnd() throws IOException {
-                       if (!latch.isTriggered()) {
-                               try {
-                                       latch.await();
-                               } catch (InterruptedException e) {
-                                       e.printStackTrace();
-                               }
-                       }
-                       return reachedEnd;
-               }
-
-               @Override
-               public FileInputSplit nextRecord(FileInputSplit reuse) throws 
IOException {
-                       this.reachedEnd = true;
-                       return split;
-               }
-
-               @Override
-               public void close() {
-
-               }
-       }
-
-       ////                            Monitoring Function Tests               
                //////
-
-       @Test
-       public void testFunctionRestore() throws Exception {
-
-               /*
-               org.apache.hadoop.fs.Path path = null;
-               long fileModTime = Long.MIN_VALUE;
-               for (int i = 0; i < 1; i++) {
-                       Tuple2<org.apache.hadoop.fs.Path, String> file = 
fillWithData(hdfsURI, "file", i, "This is test line.");
-                       path = file.f0;
-                       fileModTime = 
hdfs.getFileStatus(file.f0).getModificationTime();
-               }
-
-               TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
-
-               final ContinuousFileMonitoringFunction<String> 
monitoringFunction =
-                       new ContinuousFileMonitoringFunction<>(format, 
format.getFilePath().toString(), new PathFilter(), 
FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
-
-               StreamSource<FileInputSplit, 
ContinuousFileMonitoringFunction<String>> src =
-                       new StreamSource<>(monitoringFunction);
-
-               final OneInputStreamOperatorTestHarness<Void, FileInputSplit> 
testHarness =
-                       new OneInputStreamOperatorTestHarness<>(src);
-               testHarness.open();
-
-               final Throwable[] error = new Throwable[1];
-
-               final OneShotLatch latch = new OneShotLatch();
-
-               // run the source asynchronously
-               Thread runner = new Thread() {
-                       @Override
-                       public void run() {
-                               try {
-                                       monitoringFunction.run(new 
DummySourceContext() {
-                                               @Override
-                                               public void 
collect(FileInputSplit element) {
-                                                       latch.trigger();
-                                               }
-                                       });
-                               }
-                               catch (Throwable t) {
-                                       t.printStackTrace();
-                                       error[0] = t;
-                               }
-                       }
-               };
-               runner.start();
-
-               if (!latch.isTriggered()) {
-                       latch.await();
-               }
-
-               StreamTaskState snapshot = testHarness.snapshot(0, 0);
-               testHarness.snaphotToFile(snapshot, 
"src/test/resources/monitoring-function-migration-test-" + fileModTime 
+"-flink1.1-snapshot");
-               monitoringFunction.cancel();
-               runner.join();
-
-               testHarness.close();
-               */
-
-               Long expectedModTime = Long.parseLong("1482144479339");
-               TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
-
-               final ContinuousFileMonitoringFunction<String> 
monitoringFunction =
-                       new ContinuousFileMonitoringFunction<>(format, 
FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
-
-               StreamSource<TimestampedFileInputSplit, 
ContinuousFileMonitoringFunction<String>> src =
-                       new StreamSource<>(monitoringFunction);
-
-               final 
AbstractStreamOperatorTestHarness<TimestampedFileInputSplit> testHarness =
-                       new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0);
-               testHarness.setup();
-               
testHarness.initializeStateFromLegacyCheckpoint(getResourceFilename("monitoring-function-migration-test-1482144479339-flink1.1-snapshot"));
-               testHarness.open();
-
-               Assert.assertEquals((long) expectedModTime, 
monitoringFunction.getGlobalModificationTime());
-
-       }
-
-       ///////////                             Source Contexts Used by the 
tests                               /////////////////
-
-       private static abstract class DummySourceContext
-               implements 
SourceFunction.SourceContext<TimestampedFileInputSplit> {
-
-               private final Object lock = new Object();
-
-               @Override
-               public void collectWithTimestamp(TimestampedFileInputSplit 
element, long timestamp) {
-               }
-
-               @Override
-               public void emitWatermark(Watermark mark) {
-               }
-
-               @Override
-               public Object getCheckpointLock() {
-                       return lock;
-               }
-
-               @Override
-               public void close() {
-               }
-       }
-
-       /////////                               Auxiliary Methods               
                /////////////
-
-       /**
-        * Create a file with pre-determined String format of the form:
-        * {@code fileIdx +": "+ sampleLine +" "+ lineNo}.
-        * */
-       private Tuple2<org.apache.hadoop.fs.Path, String> 
createFileAndFillWithData(
-               String base, String fileName, int fileIdx, String sampleLine) 
throws IOException {
-
-               assert (hdfs != null);
-
-               org.apache.hadoop.fs.Path file = new 
org.apache.hadoop.fs.Path(base + "/" + fileName + fileIdx);
-               Assert.assertFalse(hdfs.exists(file));
-
-               org.apache.hadoop.fs.Path tmp = new 
org.apache.hadoop.fs.Path(base + "/." + fileName + fileIdx);
-               FSDataOutputStream stream = hdfs.create(tmp);
-               StringBuilder str = new StringBuilder();
-               for (int i = 0; i < LINES_PER_FILE; i++) {
-                       String line = fileIdx +": "+ sampleLine + " " + i +"\n";
-                       str.append(line);
-                       
stream.write(line.getBytes(ConfigConstants.DEFAULT_CHARSET));
-               }
-               stream.close();
-
-               hdfs.rename(tmp, file);
-
-               Assert.assertTrue("No result file present", hdfs.exists(file));
-               return new Tuple2<>(file, str.toString());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d4a646a0/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingFrom12MigrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingFrom12MigrationTest.java
 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingFrom12MigrationTest.java
deleted file mode 100644
index bf09447..0000000
--- 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingFrom12MigrationTest.java
+++ /dev/null
@@ -1,366 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hdfstests;
-
-import java.io.FileOutputStream;
-import org.apache.commons.io.FileUtils;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.io.FileInputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.io.TextInputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
-import 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
-import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import 
org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
-import org.apache.flink.streaming.api.operators.StreamSource;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
-import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.OperatorSnapshotUtil;
-import org.apache.flink.util.Preconditions;
-import org.junit.Assert;
-import org.junit.ClassRule;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.io.IOException;
-
-public class ContinuousFileProcessingFrom12MigrationTest {
-
-       private static final int LINES_PER_FILE = 10;
-
-       private static final long INTERVAL = 100;
-
-       @ClassRule
-       public static TemporaryFolder tempFolder = new TemporaryFolder();
-
-       /**
-        * Manually run this to write binary snapshot data. Remove @Ignore to 
run.
-        */
-       @Ignore
-       @Test
-       public void writeReaderSnapshot() throws Exception {
-
-               File testFolder = tempFolder.newFolder();
-
-               TimestampedFileInputSplit split1 =
-                               new TimestampedFileInputSplit(0, 3, new 
Path("test/test1"), 0, 100, null);
-
-               TimestampedFileInputSplit split2 =
-                               new TimestampedFileInputSplit(10, 2, new 
Path("test/test2"), 101, 200, null);
-
-               TimestampedFileInputSplit split3 =
-                               new TimestampedFileInputSplit(10, 1, new 
Path("test/test2"), 0, 100, null);
-
-               TimestampedFileInputSplit split4 =
-                               new TimestampedFileInputSplit(11, 0, new 
Path("test/test3"), 0, 100, null);
-
-               // this always blocks to ensure that the reader doesn't to any 
actual processing so that
-               // we keep the state for the four splits
-               final OneShotLatch blockingLatch = new OneShotLatch();
-               BlockingFileInputFormat format = new 
BlockingFileInputFormat(blockingLatch, new Path(testFolder.getAbsolutePath()));
-
-               TypeInformation<FileInputSplit> typeInfo = 
TypeExtractor.getInputFormatTypes(format);
-               ContinuousFileReaderOperator<FileInputSplit> initReader = new 
ContinuousFileReaderOperator<>(
-                               format);
-               initReader.setOutputType(typeInfo, new ExecutionConfig());
-               OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, 
FileInputSplit> testHarness =
-                               new 
OneInputStreamOperatorTestHarness<>(initReader);
-               testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
-               testHarness.open();
-               // create some state in the reader
-               testHarness.processElement(new StreamRecord<>(split1));
-               testHarness.processElement(new StreamRecord<>(split2));
-               testHarness.processElement(new StreamRecord<>(split3));
-               testHarness.processElement(new StreamRecord<>(split4));
-               // take a snapshot of the operator's state. This will be used
-               // to initialize another reader and compare the results of the
-               // two operators.
-
-               final OperatorStateHandles snapshot;
-               synchronized (testHarness.getCheckpointLock()) {
-                       snapshot = testHarness.snapshot(0L, 0L);
-               }
-
-               OperatorSnapshotUtil.writeStateHandle(snapshot, 
"src/test/resources/reader-migration-test-flink1.2-snapshot");
-       }
-
-       @Test
-       public void testReaderRestore() throws Exception {
-               File testFolder = tempFolder.newFolder();
-
-               final OneShotLatch latch = new OneShotLatch();
-
-               BlockingFileInputFormat format = new 
BlockingFileInputFormat(latch, new Path(testFolder.getAbsolutePath()));
-               TypeInformation<FileInputSplit> typeInfo = 
TypeExtractor.getInputFormatTypes(format);
-
-               ContinuousFileReaderOperator<FileInputSplit> initReader = new 
ContinuousFileReaderOperator<>(format);
-               initReader.setOutputType(typeInfo, new ExecutionConfig());
-
-               OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, 
FileInputSplit> testHarness =
-                       new OneInputStreamOperatorTestHarness<>(initReader);
-               testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
-
-               testHarness.setup();
-               OperatorStateHandles operatorStateHandles = 
OperatorSnapshotUtil.readStateHandle(
-                               OperatorSnapshotUtil.getResourceFilename(
-                                               
"reader-migration-test-flink1.2-snapshot"));
-               testHarness.initializeState(operatorStateHandles);
-               testHarness.open();
-
-               latch.trigger();
-
-               // ... and wait for the operators to close gracefully
-
-               synchronized (testHarness.getCheckpointLock()) {
-                       testHarness.close();
-               }
-
-               TimestampedFileInputSplit split1 =
-                               new TimestampedFileInputSplit(0, 3, new 
Path("test/test1"), 0, 100, null);
-
-               TimestampedFileInputSplit split2 =
-                               new TimestampedFileInputSplit(10, 2, new 
Path("test/test2"), 101, 200, null);
-
-               TimestampedFileInputSplit split3 =
-                               new TimestampedFileInputSplit(10, 1, new 
Path("test/test2"), 0, 100, null);
-
-               TimestampedFileInputSplit split4 =
-                               new TimestampedFileInputSplit(11, 0, new 
Path("test/test3"), 0, 100, null);
-
-               // compare if the results contain what they should contain and 
also if
-               // they are the same, as they should.
-
-               Assert.assertTrue(testHarness.getOutput().contains(new 
StreamRecord<>(split1)));
-               Assert.assertTrue(testHarness.getOutput().contains(new 
StreamRecord<>(split2)));
-               Assert.assertTrue(testHarness.getOutput().contains(new 
StreamRecord<>(split3)));
-               Assert.assertTrue(testHarness.getOutput().contains(new 
StreamRecord<>(split4)));
-       }
-
-       /**
-        * Manually run this to write binary snapshot data. Remove @Ignore to 
run.
-        */
-       @Ignore
-       @Test
-       public void writeMonitoringSourceSnapshot() throws Exception {
-
-               File testFolder = tempFolder.newFolder();
-
-               long fileModTime = Long.MIN_VALUE;
-               for (int i = 0; i < 1; i++) {
-                       Tuple2<File, String> file = 
createFileAndFillWithData(testFolder, "file", i, "This is test line.");
-                       fileModTime = file.f0.lastModified();
-               }
-
-               TextInputFormat format = new TextInputFormat(new 
Path(testFolder.getAbsolutePath()));
-
-               final ContinuousFileMonitoringFunction<String> 
monitoringFunction =
-                       new ContinuousFileMonitoringFunction<>(format, 
FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
-
-               StreamSource<TimestampedFileInputSplit, 
ContinuousFileMonitoringFunction<String>> src =
-                       new StreamSource<>(monitoringFunction);
-
-               final 
AbstractStreamOperatorTestHarness<TimestampedFileInputSplit> testHarness =
-                               new AbstractStreamOperatorTestHarness<>(src, 1, 
1, 0);
-
-               testHarness.open();
-
-               final Throwable[] error = new Throwable[1];
-
-               final OneShotLatch latch = new OneShotLatch();
-
-               // run the source asynchronously
-               Thread runner = new Thread() {
-                       @Override
-                       public void run() {
-                               try {
-                                       monitoringFunction.run(new 
DummySourceContext() {
-                                               @Override
-                                               public void 
collect(TimestampedFileInputSplit element) {
-                                                       latch.trigger();
-                                               }
-
-                                               @Override
-                                               public void 
markAsTemporarilyIdle() {
-
-                                               }
-                                       });
-                               }
-                               catch (Throwable t) {
-                                       t.printStackTrace();
-                                       error[0] = t;
-                               }
-                       }
-               };
-               runner.start();
-
-               if (!latch.isTriggered()) {
-                       latch.await();
-               }
-
-               final OperatorStateHandles snapshot;
-               synchronized (testHarness.getCheckpointLock()) {
-                       snapshot = testHarness.snapshot(0L, 0L);
-               }
-
-               OperatorSnapshotUtil.writeStateHandle(
-                               snapshot,
-                               
"src/test/resources/monitoring-function-migration-test-" + fileModTime 
+"-flink1.2-snapshot");
-
-               monitoringFunction.cancel();
-               runner.join();
-
-               testHarness.close();
-       }
-
-       @Test
-       public void testMonitoringSourceRestore() throws Exception {
-
-               File testFolder = tempFolder.newFolder();
-
-               Long expectedModTime = Long.parseLong("1493116191000");
-               TextInputFormat format = new TextInputFormat(new 
Path(testFolder.getAbsolutePath()));
-
-               final ContinuousFileMonitoringFunction<String> 
monitoringFunction =
-                       new ContinuousFileMonitoringFunction<>(format, 
FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
-
-               StreamSource<TimestampedFileInputSplit, 
ContinuousFileMonitoringFunction<String>> src =
-                       new StreamSource<>(monitoringFunction);
-
-               final 
AbstractStreamOperatorTestHarness<TimestampedFileInputSplit> testHarness =
-                       new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0);
-
-               testHarness.setup();
-               OperatorStateHandles operatorStateHandles = 
OperatorSnapshotUtil.readStateHandle(
-                               OperatorSnapshotUtil.getResourceFilename(
-                                               
"monitoring-function-migration-test-1493116191000-flink1.2-snapshot"));
-
-               testHarness.initializeState(operatorStateHandles);
-               testHarness.open();
-
-               Assert.assertEquals((long) expectedModTime, 
monitoringFunction.getGlobalModificationTime());
-
-       }
-
-       private static class BlockingFileInputFormat extends 
FileInputFormat<FileInputSplit> {
-
-               private static final long serialVersionUID = 
-6727603565381560267L;
-
-               private final OneShotLatch latch;
-
-               private FileInputSplit split;
-
-               private boolean reachedEnd;
-
-               BlockingFileInputFormat(OneShotLatch latch, Path filePath) {
-                       super(filePath);
-                       this.latch = latch;
-                       this.reachedEnd = false;
-               }
-
-               @Override
-               public void open(FileInputSplit fileSplit) throws IOException {
-                       this.split = fileSplit;
-                       this.reachedEnd = false;
-               }
-
-               @Override
-               public boolean reachedEnd() throws IOException {
-                       if (!latch.isTriggered()) {
-                               try {
-                                       latch.await();
-                               } catch (InterruptedException e) {
-                                       e.printStackTrace();
-                               }
-                       }
-                       return reachedEnd;
-               }
-
-               @Override
-               public FileInputSplit nextRecord(FileInputSplit reuse) throws 
IOException {
-                       this.reachedEnd = true;
-                       return split;
-               }
-
-               @Override
-               public void close() {
-
-               }
-       }
-
-       private static abstract class DummySourceContext
-               implements 
SourceFunction.SourceContext<TimestampedFileInputSplit> {
-
-               private final Object lock = new Object();
-
-               @Override
-               public void collectWithTimestamp(TimestampedFileInputSplit 
element, long timestamp) {
-               }
-
-               @Override
-               public void emitWatermark(Watermark mark) {
-               }
-
-               @Override
-               public Object getCheckpointLock() {
-                       return lock;
-               }
-
-               @Override
-               public void close() {
-               }
-       }
-
-       /**
-        * Create a file with pre-determined String format of the form:
-        * {@code fileIdx +": "+ sampleLine +" "+ lineNo}.
-        * */
-       private Tuple2<File, String> createFileAndFillWithData(
-               File base, String fileName, int fileIdx, String sampleLine) 
throws IOException {
-
-               File file = new File(base, fileName + fileIdx);
-               Assert.assertFalse(file.exists());
-
-               File tmp = new File(base, "." + fileName + fileIdx);
-               FileOutputStream stream = new FileOutputStream(tmp);
-               StringBuilder str = new StringBuilder();
-               for (int i = 0; i < LINES_PER_FILE; i++) {
-                       String line = fileIdx +": "+ sampleLine + " " + i +"\n";
-                       str.append(line);
-                       stream.write(line.getBytes());
-               }
-               stream.close();
-
-               FileUtils.moveFile(tmp, file);
-
-               Assert.assertTrue("No result file present", file.exists());
-               return new Tuple2<>(file, str.toString());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d4a646a0/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
new file mode 100644
index 0000000..89776eb
--- /dev/null
+++ 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hdfstests;
+
+import java.io.FileOutputStream;
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.io.TextInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
+import 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
+import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OperatorSnapshotUtil;
+import org.apache.flink.streaming.util.migration.MigrationTestUtil;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Tests that verify the migration from previous Flink version snapshots.
+ */
+@RunWith(Parameterized.class)
+public class ContinuousFileProcessingMigrationTest {
+
+       private static final int LINES_PER_FILE = 10;
+
+       private static final long INTERVAL = 100;
+
+       @Parameterized.Parameters(name = "Migration Savepoint / Mod Time: {0}")
+       public static Collection<Tuple2<MigrationVersion, Long>> parameters () {
+               return Arrays.asList(
+                       Tuple2.of(MigrationVersion.v1_1, 1482144479339L),
+                       Tuple2.of(MigrationVersion.v1_2, 1493116191000L),
+                       Tuple2.of(MigrationVersion.v1_3, 1496532000000L));
+       }
+
+       /**
+        * TODO change this to the corresponding savepoint version to be 
written (e.g. {@link MigrationVersion#v1_3} for 1.3)
+        * TODO and remove all @Ignore annotations on write*Snapshot() methods 
to generate savepoints
+        */
+       private final MigrationVersion flinkGenerateSavepointVersion = null;
+
+       private final MigrationVersion testMigrateVersion;
+       private final Long expectedModTime;
+
+       public ContinuousFileProcessingMigrationTest(Tuple2<MigrationVersion, 
Long> migrationVersionAndModTime) {
+               this.testMigrateVersion = migrationVersionAndModTime.f0;
+               this.expectedModTime = migrationVersionAndModTime.f1;
+       }
+
+       @ClassRule
+       public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+       /**
+        * Manually run this to write binary snapshot data. Remove @Ignore to 
run.
+        */
+       @Ignore
+       @Test
+       public void writeReaderSnapshot() throws Exception {
+
+               File testFolder = tempFolder.newFolder();
+
+               TimestampedFileInputSplit split1 =
+                               new TimestampedFileInputSplit(0, 3, new 
Path("test/test1"), 0, 100, null);
+
+               TimestampedFileInputSplit split2 =
+                               new TimestampedFileInputSplit(10, 2, new 
Path("test/test2"), 101, 200, null);
+
+               TimestampedFileInputSplit split3 =
+                               new TimestampedFileInputSplit(10, 1, new 
Path("test/test2"), 0, 100, null);
+
+               TimestampedFileInputSplit split4 =
+                               new TimestampedFileInputSplit(11, 0, new 
Path("test/test3"), 0, 100, null);
+
+               // this always blocks to ensure that the reader doesn't to any 
actual processing so that
+               // we keep the state for the four splits
+               final OneShotLatch blockingLatch = new OneShotLatch();
+               BlockingFileInputFormat format = new 
BlockingFileInputFormat(blockingLatch, new Path(testFolder.getAbsolutePath()));
+
+               TypeInformation<FileInputSplit> typeInfo = 
TypeExtractor.getInputFormatTypes(format);
+               ContinuousFileReaderOperator<FileInputSplit> initReader = new 
ContinuousFileReaderOperator<>(
+                               format);
+               initReader.setOutputType(typeInfo, new ExecutionConfig());
+               OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, 
FileInputSplit> testHarness =
+                               new 
OneInputStreamOperatorTestHarness<>(initReader);
+               testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
+               testHarness.open();
+               // create some state in the reader
+               testHarness.processElement(new StreamRecord<>(split1));
+               testHarness.processElement(new StreamRecord<>(split2));
+               testHarness.processElement(new StreamRecord<>(split3));
+               testHarness.processElement(new StreamRecord<>(split4));
+               // take a snapshot of the operator's state. This will be used
+               // to initialize another reader and compare the results of the
+               // two operators.
+
+               final OperatorStateHandles snapshot;
+               synchronized (testHarness.getCheckpointLock()) {
+                       snapshot = testHarness.snapshot(0L, 0L);
+               }
+
+               OperatorSnapshotUtil.writeStateHandle(snapshot, 
"src/test/resources/reader-migration-test-flink" + 
flinkGenerateSavepointVersion + "-snapshot");
+       }
+
+       @Test
+       public void testReaderRestore() throws Exception {
+               File testFolder = tempFolder.newFolder();
+
+               final OneShotLatch latch = new OneShotLatch();
+
+               BlockingFileInputFormat format = new 
BlockingFileInputFormat(latch, new Path(testFolder.getAbsolutePath()));
+               TypeInformation<FileInputSplit> typeInfo = 
TypeExtractor.getInputFormatTypes(format);
+
+               ContinuousFileReaderOperator<FileInputSplit> initReader = new 
ContinuousFileReaderOperator<>(format);
+               initReader.setOutputType(typeInfo, new ExecutionConfig());
+
+               OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, 
FileInputSplit> testHarness =
+                       new OneInputStreamOperatorTestHarness<>(initReader);
+               testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
+
+               testHarness.setup();
+
+               MigrationTestUtil.restoreFromSnapshot(
+                       testHarness,
+                       OperatorSnapshotUtil.getResourceFilename(
+                               "reader-migration-test-flink" + 
testMigrateVersion + "-snapshot"),
+                       testMigrateVersion);
+
+               testHarness.open();
+
+               latch.trigger();
+
+               // ... and wait for the operators to close gracefully
+
+               synchronized (testHarness.getCheckpointLock()) {
+                       testHarness.close();
+               }
+
+               TimestampedFileInputSplit split1 =
+                               new TimestampedFileInputSplit(0, 3, new 
Path("test/test1"), 0, 100, null);
+
+               TimestampedFileInputSplit split2 =
+                               new TimestampedFileInputSplit(10, 2, new 
Path("test/test2"), 101, 200, null);
+
+               TimestampedFileInputSplit split3 =
+                               new TimestampedFileInputSplit(10, 1, new 
Path("test/test2"), 0, 100, null);
+
+               TimestampedFileInputSplit split4 =
+                               new TimestampedFileInputSplit(11, 0, new 
Path("test/test3"), 0, 100, null);
+
+               // compare if the results contain what they should contain and 
also if
+               // they are the same, as they should.
+
+               if (testMigrateVersion == MigrationVersion.v1_1) {
+                       Assert.assertTrue(testHarness.getOutput().contains(new 
StreamRecord<>(createSplitFromTimestampedSplit(split1))));
+                       Assert.assertTrue(testHarness.getOutput().contains(new 
StreamRecord<>(createSplitFromTimestampedSplit(split2))));
+                       Assert.assertTrue(testHarness.getOutput().contains(new 
StreamRecord<>(createSplitFromTimestampedSplit(split3))));
+                       Assert.assertTrue(testHarness.getOutput().contains(new 
StreamRecord<>(createSplitFromTimestampedSplit(split4))));
+               } else {
+                       Assert.assertTrue(testHarness.getOutput().contains(new 
StreamRecord<>(split1)));
+                       Assert.assertTrue(testHarness.getOutput().contains(new 
StreamRecord<>(split2)));
+                       Assert.assertTrue(testHarness.getOutput().contains(new 
StreamRecord<>(split3)));
+                       Assert.assertTrue(testHarness.getOutput().contains(new 
StreamRecord<>(split4)));
+               }
+       }
+
+       /**
+        * Manually run this to write binary snapshot data. Remove @Ignore to 
run.
+        */
+       @Ignore
+       @Test
+       public void writeMonitoringSourceSnapshot() throws Exception {
+
+               File testFolder = tempFolder.newFolder();
+
+               long fileModTime = Long.MIN_VALUE;
+               for (int i = 0; i < 1; i++) {
+                       Tuple2<File, String> file = 
createFileAndFillWithData(testFolder, "file", i, "This is test line.");
+                       fileModTime = file.f0.lastModified();
+               }
+
+               TextInputFormat format = new TextInputFormat(new 
Path(testFolder.getAbsolutePath()));
+
+               final ContinuousFileMonitoringFunction<String> 
monitoringFunction =
+                       new ContinuousFileMonitoringFunction<>(format, 
FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
+
+               StreamSource<TimestampedFileInputSplit, 
ContinuousFileMonitoringFunction<String>> src =
+                       new StreamSource<>(monitoringFunction);
+
+               final 
AbstractStreamOperatorTestHarness<TimestampedFileInputSplit> testHarness =
+                               new AbstractStreamOperatorTestHarness<>(src, 1, 
1, 0);
+
+               testHarness.open();
+
+               final Throwable[] error = new Throwable[1];
+
+               final OneShotLatch latch = new OneShotLatch();
+
+               // run the source asynchronously
+               Thread runner = new Thread() {
+                       @Override
+                       public void run() {
+                               try {
+                                       monitoringFunction.run(new 
DummySourceContext() {
+                                               @Override
+                                               public void 
collect(TimestampedFileInputSplit element) {
+                                                       latch.trigger();
+                                               }
+
+                                               @Override
+                                               public void 
markAsTemporarilyIdle() {
+
+                                               }
+                                       });
+                               }
+                               catch (Throwable t) {
+                                       t.printStackTrace();
+                                       error[0] = t;
+                               }
+                       }
+               };
+               runner.start();
+
+               if (!latch.isTriggered()) {
+                       latch.await();
+               }
+
+               final OperatorStateHandles snapshot;
+               synchronized (testHarness.getCheckpointLock()) {
+                       snapshot = testHarness.snapshot(0L, 0L);
+               }
+
+               OperatorSnapshotUtil.writeStateHandle(
+                               snapshot,
+                               
"src/test/resources/monitoring-function-migration-test-" + fileModTime + 
"-flink" + flinkGenerateSavepointVersion + "-snapshot");
+
+               monitoringFunction.cancel();
+               runner.join();
+
+               testHarness.close();
+       }
+
+       @Test
+       public void testMonitoringSourceRestore() throws Exception {
+
+               File testFolder = tempFolder.newFolder();
+
+               TextInputFormat format = new TextInputFormat(new 
Path(testFolder.getAbsolutePath()));
+
+               final ContinuousFileMonitoringFunction<String> 
monitoringFunction =
+                       new ContinuousFileMonitoringFunction<>(format, 
FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
+
+               StreamSource<TimestampedFileInputSplit, 
ContinuousFileMonitoringFunction<String>> src =
+                       new StreamSource<>(monitoringFunction);
+
+               final 
AbstractStreamOperatorTestHarness<TimestampedFileInputSplit> testHarness =
+                       new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0);
+
+               testHarness.setup();
+
+               MigrationTestUtil.restoreFromSnapshot(
+                       testHarness,
+                       OperatorSnapshotUtil.getResourceFilename(
+                               "monitoring-function-migration-test-" + 
expectedModTime + "-flink" + testMigrateVersion + "-snapshot"),
+                       testMigrateVersion);
+
+               testHarness.open();
+
+               Assert.assertEquals((long) expectedModTime, 
monitoringFunction.getGlobalModificationTime());
+
+       }
+
+       private static class BlockingFileInputFormat extends 
FileInputFormat<FileInputSplit> {
+
+               private static final long serialVersionUID = 
-6727603565381560267L;
+
+               private final OneShotLatch latch;
+
+               private FileInputSplit split;
+
+               private boolean reachedEnd;
+
+               BlockingFileInputFormat(OneShotLatch latch, Path filePath) {
+                       super(filePath);
+                       this.latch = latch;
+                       this.reachedEnd = false;
+               }
+
+               @Override
+               public void open(FileInputSplit fileSplit) throws IOException {
+                       this.split = fileSplit;
+                       this.reachedEnd = false;
+               }
+
+               @Override
+               public boolean reachedEnd() throws IOException {
+                       if (!latch.isTriggered()) {
+                               try {
+                                       latch.await();
+                               } catch (InterruptedException e) {
+                                       e.printStackTrace();
+                               }
+                       }
+                       return reachedEnd;
+               }
+
+               @Override
+               public FileInputSplit nextRecord(FileInputSplit reuse) throws 
IOException {
+                       this.reachedEnd = true;
+                       return split;
+               }
+
+               @Override
+               public void close() {
+
+               }
+       }
+
+       private static abstract class DummySourceContext
+               implements 
SourceFunction.SourceContext<TimestampedFileInputSplit> {
+
+               private final Object lock = new Object();
+
+               @Override
+               public void collectWithTimestamp(TimestampedFileInputSplit 
element, long timestamp) {
+               }
+
+               @Override
+               public void emitWatermark(Watermark mark) {
+               }
+
+               @Override
+               public Object getCheckpointLock() {
+                       return lock;
+               }
+
+               @Override
+               public void close() {
+               }
+       }
+
+       /**
+        * Create a file with pre-determined String format of the form:
+        * {@code fileIdx +": "+ sampleLine +" "+ lineNo}.
+        * */
+       private Tuple2<File, String> createFileAndFillWithData(
+               File base, String fileName, int fileIdx, String sampleLine) 
throws IOException {
+
+               File file = new File(base, fileName + fileIdx);
+               Assert.assertFalse(file.exists());
+
+               File tmp = new File(base, "." + fileName + fileIdx);
+               FileOutputStream stream = new FileOutputStream(tmp);
+               StringBuilder str = new StringBuilder();
+               for (int i = 0; i < LINES_PER_FILE; i++) {
+                       String line = fileIdx +": "+ sampleLine + " " + i +"\n";
+                       str.append(line);
+                       stream.write(line.getBytes());
+               }
+               stream.close();
+
+               FileUtils.moveFile(tmp, file);
+
+               Assert.assertTrue("No result file present", file.exists());
+               return new Tuple2<>(file, str.toString());
+       }
+
+       private FileInputSplit 
createSplitFromTimestampedSplit(TimestampedFileInputSplit split) {
+               checkNotNull(split);
+
+               return new FileInputSplit(
+                       split.getSplitNumber(),
+                       split.getPath(),
+                       split.getStart(),
+                       split.getLength(),
+                       split.getHostnames()
+               );
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d4a646a0/flink-fs-tests/src/test/resources/monitoring-function-migration-test-1496532000000-flink1.3-snapshot
----------------------------------------------------------------------
diff --git 
a/flink-fs-tests/src/test/resources/monitoring-function-migration-test-1496532000000-flink1.3-snapshot
 
b/flink-fs-tests/src/test/resources/monitoring-function-migration-test-1496532000000-flink1.3-snapshot
new file mode 100644
index 0000000..7ed677b
Binary files /dev/null and 
b/flink-fs-tests/src/test/resources/monitoring-function-migration-test-1496532000000-flink1.3-snapshot
 differ

http://git-wip-us.apache.org/repos/asf/flink/blob/d4a646a0/flink-fs-tests/src/test/resources/reader-migration-test-flink1.3-snapshot
----------------------------------------------------------------------
diff --git 
a/flink-fs-tests/src/test/resources/reader-migration-test-flink1.3-snapshot 
b/flink-fs-tests/src/test/resources/reader-migration-test-flink1.3-snapshot
new file mode 100644
index 0000000..bb612bd
Binary files /dev/null and 
b/flink-fs-tests/src/test/resources/reader-migration-test-flink1.3-snapshot 
differ

Reply via email to