[FLINK-6830] [fileSource] Port continuous file reader migration tests for Flink 1.3
This commit also consolidates all Flink 1.1 and 1.2 migration tests into a single ContinuousFileProcessingMigrationTest class. Parameterization is used to test restore from different previous Flink versions. This closes #4059. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d4a646a0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d4a646a0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d4a646a0 Branch: refs/heads/release-1.3 Commit: d4a646a035366918a100f64428c471464870b8d0 Parents: e5a435b Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org> Authored: Sun Jun 4 01:42:04 2017 +0200 Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org> Committed: Wed Jun 7 19:11:43 2017 +0200 ---------------------------------------------------------------------- ...inuousFileProcessingFrom11MigrationTest.java | 402 ------------------ ...inuousFileProcessingFrom12MigrationTest.java | 366 ---------------- .../ContinuousFileProcessingMigrationTest.java | 423 +++++++++++++++++++ ...gration-test-1496532000000-flink1.3-snapshot | Bin 0 -> 537 bytes .../reader-migration-test-flink1.3-snapshot | Bin 0 -> 2823 bytes 5 files changed, 423 insertions(+), 768 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d4a646a0/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingFrom11MigrationTest.java ---------------------------------------------------------------------- diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingFrom11MigrationTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingFrom11MigrationTest.java deleted file mode 100644 index ec5e1ad..0000000 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingFrom11MigrationTest.java +++ /dev/null @@ -1,402 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.hdfstests; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.io.FileInputFormat; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.io.TextInputFormat; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.core.fs.FileInputSplit; -import org.apache.flink.core.fs.Path; -import org.apache.flink.core.testutils.OneShotLatch; -import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction; -import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator; -import org.apache.flink.streaming.api.functions.source.FileProcessingMode; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit; -import org.apache.flink.streaming.api.operators.StreamSource; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; -import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.apache.flink.util.Preconditions; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -import java.io.File; -import java.io.IOException; -import java.net.URL; - -public class ContinuousFileProcessingFrom11MigrationTest { - - private static final int NO_OF_FILES = 5; - private static final int LINES_PER_FILE = 10; - - private static final long INTERVAL = 100; - - private static File baseDir; - - private static FileSystem hdfs; - private static String hdfsURI; - private static MiniDFSCluster hdfsCluster; - - @ClassRule - public static TemporaryFolder tempFolder = new TemporaryFolder(); - - // PREPARING FOR THE TESTS - - @BeforeClass - public static void createHDFS() { - try { - baseDir = tempFolder.newFolder().getAbsoluteFile(); - FileUtil.fullyDelete(baseDir); - - Configuration hdConf = new Configuration(); - hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath()); - hdConf.set("dfs.block.size", String.valueOf(1048576)); // this is the minimum we can set. - - MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf); - hdfsCluster = builder.build(); - - hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/"; - hdfs = new org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf); - - } catch(Throwable e) { - e.printStackTrace(); - Assert.fail("Test failed " + e.getMessage()); - } - } - - @AfterClass - public static void destroyHDFS() { - try { - FileUtil.fullyDelete(baseDir); - hdfsCluster.shutdown(); - } catch (Throwable t) { - throw new RuntimeException(t); - } - } - - private static String getResourceFilename(String filename) { - ClassLoader cl = ContinuousFileProcessingFrom11MigrationTest.class.getClassLoader(); - URL resource = cl.getResource(filename); - return resource.getFile(); - } - - // END OF PREPARATIONS - - // TESTS - - @Test - public void testReaderSnapshotRestore() throws Exception { - - /* - - FileInputSplit split1 = - new FileInputSplit(3, new Path("test/test1"), 0, 100, null); - FileInputSplit split2 = - new FileInputSplit(2, new Path("test/test2"), 101, 200, null); - FileInputSplit split3 = - new FileInputSplit(1, new Path("test/test2"), 0, 100, null); - FileInputSplit split4 = - new FileInputSplit(0, new Path("test/test3"), 0, 100, null); - - final OneShotLatch latch = new OneShotLatch(); - BlockingFileInputFormat format = new BlockingFileInputFormat(latch, new Path(hdfsURI)); - TypeInformation<FileInputSplit> typeInfo = TypeExtractor.getInputFormatTypes(format); - ContinuousFileReaderOperator<FileInputSplit, ?> initReader = new ContinuousFileReaderOperator<>(format); - initReader.setOutputType(typeInfo, new ExecutionConfig()); - OneInputStreamOperatorTestHarness<FileInputSplit, FileInputSplit> initTestInstance = - new OneInputStreamOperatorTestHarness<>(initReader); - initTestInstance.setTimeCharacteristic(TimeCharacteristic.EventTime); - initTestInstance.open(); - // create some state in the reader - initTestInstance.processElement(new StreamRecord<>(split1)); - initTestInstance.processElement(new StreamRecord<>(split2)); - initTestInstance.processElement(new StreamRecord<>(split3)); - initTestInstance.processElement(new StreamRecord<>(split4)); - // take a snapshot of the operator's state. This will be used - // to initialize another reader and compare the results of the - // two operators. - final StreamTaskState snapshot; - synchronized (initTestInstance.getCheckpointLock()) { - snapshot = initTestInstance.snapshot(0L, 0L); - } - - initTestInstance.snaphotToFile(snapshot, "src/test/resources/reader-migration-test-flink1.1-snapshot"); - - */ - TimestampedFileInputSplit split1 = - new TimestampedFileInputSplit(0, 3, new Path("test/test1"), 0, 100, null); - - TimestampedFileInputSplit split2 = - new TimestampedFileInputSplit(10, 2, new Path("test/test2"), 101, 200, null); - - TimestampedFileInputSplit split3 = - new TimestampedFileInputSplit(10, 1, new Path("test/test2"), 0, 100, null); - - TimestampedFileInputSplit split4 = - new TimestampedFileInputSplit(11, 0, new Path("test/test3"), 0, 100, null); - - - final OneShotLatch latch = new OneShotLatch(); - - BlockingFileInputFormat format = new BlockingFileInputFormat(latch, new Path(hdfsURI)); - TypeInformation<FileInputSplit> typeInfo = TypeExtractor.getInputFormatTypes(format); - - ContinuousFileReaderOperator<FileInputSplit> initReader = new ContinuousFileReaderOperator<>(format); - initReader.setOutputType(typeInfo, new ExecutionConfig()); - - OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, FileInputSplit> initTestInstance = - new OneInputStreamOperatorTestHarness<>(initReader); - initTestInstance.setTimeCharacteristic(TimeCharacteristic.EventTime); - - initTestInstance.setup(); - initTestInstance.initializeStateFromLegacyCheckpoint(getResourceFilename("reader-migration-test-flink1.1-snapshot")); - initTestInstance.open(); - - latch.trigger(); - - // ... and wait for the operators to close gracefully - - synchronized (initTestInstance.getCheckpointLock()) { - initTestInstance.close(); - } - - FileInputSplit fsSplit1 = createSplitFromTimestampedSplit(split1); - FileInputSplit fsSplit2 = createSplitFromTimestampedSplit(split2); - FileInputSplit fsSplit3 = createSplitFromTimestampedSplit(split3); - FileInputSplit fsSplit4 = createSplitFromTimestampedSplit(split4); - - // compare if the results contain what they should contain and also if - // they are the same, as they should. - - Assert.assertTrue(initTestInstance.getOutput().contains(new StreamRecord<>(fsSplit1))); - Assert.assertTrue(initTestInstance.getOutput().contains(new StreamRecord<>(fsSplit2))); - Assert.assertTrue(initTestInstance.getOutput().contains(new StreamRecord<>(fsSplit3))); - Assert.assertTrue(initTestInstance.getOutput().contains(new StreamRecord<>(fsSplit4))); - } - - private FileInputSplit createSplitFromTimestampedSplit(TimestampedFileInputSplit split) { - Preconditions.checkNotNull(split); - - return new FileInputSplit( - split.getSplitNumber(), - split.getPath(), - split.getStart(), - split.getLength(), - split.getHostnames() - ); - } - - private static class BlockingFileInputFormat extends FileInputFormat<FileInputSplit> { - - private static final long serialVersionUID = -6727603565381560267L; - - private final OneShotLatch latch; - - private FileInputSplit split; - - private boolean reachedEnd; - - BlockingFileInputFormat(OneShotLatch latch, Path filePath) { - super(filePath); - this.latch = latch; - this.reachedEnd = false; - } - - @Override - public void open(FileInputSplit fileSplit) throws IOException { - this.split = fileSplit; - this.reachedEnd = false; - } - - @Override - public boolean reachedEnd() throws IOException { - if (!latch.isTriggered()) { - try { - latch.await(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - return reachedEnd; - } - - @Override - public FileInputSplit nextRecord(FileInputSplit reuse) throws IOException { - this.reachedEnd = true; - return split; - } - - @Override - public void close() { - - } - } - - //// Monitoring Function Tests ////// - - @Test - public void testFunctionRestore() throws Exception { - - /* - org.apache.hadoop.fs.Path path = null; - long fileModTime = Long.MIN_VALUE; - for (int i = 0; i < 1; i++) { - Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line."); - path = file.f0; - fileModTime = hdfs.getFileStatus(file.f0).getModificationTime(); - } - - TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); - - final ContinuousFileMonitoringFunction<String> monitoringFunction = - new ContinuousFileMonitoringFunction<>(format, format.getFilePath().toString(), new PathFilter(), FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL); - - StreamSource<FileInputSplit, ContinuousFileMonitoringFunction<String>> src = - new StreamSource<>(monitoringFunction); - - final OneInputStreamOperatorTestHarness<Void, FileInputSplit> testHarness = - new OneInputStreamOperatorTestHarness<>(src); - testHarness.open(); - - final Throwable[] error = new Throwable[1]; - - final OneShotLatch latch = new OneShotLatch(); - - // run the source asynchronously - Thread runner = new Thread() { - @Override - public void run() { - try { - monitoringFunction.run(new DummySourceContext() { - @Override - public void collect(FileInputSplit element) { - latch.trigger(); - } - }); - } - catch (Throwable t) { - t.printStackTrace(); - error[0] = t; - } - } - }; - runner.start(); - - if (!latch.isTriggered()) { - latch.await(); - } - - StreamTaskState snapshot = testHarness.snapshot(0, 0); - testHarness.snaphotToFile(snapshot, "src/test/resources/monitoring-function-migration-test-" + fileModTime +"-flink1.1-snapshot"); - monitoringFunction.cancel(); - runner.join(); - - testHarness.close(); - */ - - Long expectedModTime = Long.parseLong("1482144479339"); - TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); - - final ContinuousFileMonitoringFunction<String> monitoringFunction = - new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL); - - StreamSource<TimestampedFileInputSplit, ContinuousFileMonitoringFunction<String>> src = - new StreamSource<>(monitoringFunction); - - final AbstractStreamOperatorTestHarness<TimestampedFileInputSplit> testHarness = - new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0); - testHarness.setup(); - testHarness.initializeStateFromLegacyCheckpoint(getResourceFilename("monitoring-function-migration-test-1482144479339-flink1.1-snapshot")); - testHarness.open(); - - Assert.assertEquals((long) expectedModTime, monitoringFunction.getGlobalModificationTime()); - - } - - /////////// Source Contexts Used by the tests ///////////////// - - private static abstract class DummySourceContext - implements SourceFunction.SourceContext<TimestampedFileInputSplit> { - - private final Object lock = new Object(); - - @Override - public void collectWithTimestamp(TimestampedFileInputSplit element, long timestamp) { - } - - @Override - public void emitWatermark(Watermark mark) { - } - - @Override - public Object getCheckpointLock() { - return lock; - } - - @Override - public void close() { - } - } - - ///////// Auxiliary Methods ///////////// - - /** - * Create a file with pre-determined String format of the form: - * {@code fileIdx +": "+ sampleLine +" "+ lineNo}. - * */ - private Tuple2<org.apache.hadoop.fs.Path, String> createFileAndFillWithData( - String base, String fileName, int fileIdx, String sampleLine) throws IOException { - - assert (hdfs != null); - - org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(base + "/" + fileName + fileIdx); - Assert.assertFalse(hdfs.exists(file)); - - org.apache.hadoop.fs.Path tmp = new org.apache.hadoop.fs.Path(base + "/." + fileName + fileIdx); - FSDataOutputStream stream = hdfs.create(tmp); - StringBuilder str = new StringBuilder(); - for (int i = 0; i < LINES_PER_FILE; i++) { - String line = fileIdx +": "+ sampleLine + " " + i +"\n"; - str.append(line); - stream.write(line.getBytes(ConfigConstants.DEFAULT_CHARSET)); - } - stream.close(); - - hdfs.rename(tmp, file); - - Assert.assertTrue("No result file present", hdfs.exists(file)); - return new Tuple2<>(file, str.toString()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d4a646a0/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingFrom12MigrationTest.java ---------------------------------------------------------------------- diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingFrom12MigrationTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingFrom12MigrationTest.java deleted file mode 100644 index bf09447..0000000 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingFrom12MigrationTest.java +++ /dev/null @@ -1,366 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.hdfstests; - -import java.io.FileOutputStream; -import org.apache.commons.io.FileUtils; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.io.FileInputFormat; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.io.TextInputFormat; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.core.fs.FileInputSplit; -import org.apache.flink.core.fs.Path; -import org.apache.flink.core.testutils.OneShotLatch; -import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction; -import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator; -import org.apache.flink.streaming.api.functions.source.FileProcessingMode; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit; -import org.apache.flink.streaming.api.operators.StreamSource; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; -import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; -import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.apache.flink.streaming.util.OperatorSnapshotUtil; -import org.apache.flink.util.Preconditions; -import org.junit.Assert; -import org.junit.ClassRule; -import org.junit.Ignore; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -import java.io.File; -import java.io.IOException; - -public class ContinuousFileProcessingFrom12MigrationTest { - - private static final int LINES_PER_FILE = 10; - - private static final long INTERVAL = 100; - - @ClassRule - public static TemporaryFolder tempFolder = new TemporaryFolder(); - - /** - * Manually run this to write binary snapshot data. Remove @Ignore to run. - */ - @Ignore - @Test - public void writeReaderSnapshot() throws Exception { - - File testFolder = tempFolder.newFolder(); - - TimestampedFileInputSplit split1 = - new TimestampedFileInputSplit(0, 3, new Path("test/test1"), 0, 100, null); - - TimestampedFileInputSplit split2 = - new TimestampedFileInputSplit(10, 2, new Path("test/test2"), 101, 200, null); - - TimestampedFileInputSplit split3 = - new TimestampedFileInputSplit(10, 1, new Path("test/test2"), 0, 100, null); - - TimestampedFileInputSplit split4 = - new TimestampedFileInputSplit(11, 0, new Path("test/test3"), 0, 100, null); - - // this always blocks to ensure that the reader doesn't to any actual processing so that - // we keep the state for the four splits - final OneShotLatch blockingLatch = new OneShotLatch(); - BlockingFileInputFormat format = new BlockingFileInputFormat(blockingLatch, new Path(testFolder.getAbsolutePath())); - - TypeInformation<FileInputSplit> typeInfo = TypeExtractor.getInputFormatTypes(format); - ContinuousFileReaderOperator<FileInputSplit> initReader = new ContinuousFileReaderOperator<>( - format); - initReader.setOutputType(typeInfo, new ExecutionConfig()); - OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, FileInputSplit> testHarness = - new OneInputStreamOperatorTestHarness<>(initReader); - testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime); - testHarness.open(); - // create some state in the reader - testHarness.processElement(new StreamRecord<>(split1)); - testHarness.processElement(new StreamRecord<>(split2)); - testHarness.processElement(new StreamRecord<>(split3)); - testHarness.processElement(new StreamRecord<>(split4)); - // take a snapshot of the operator's state. This will be used - // to initialize another reader and compare the results of the - // two operators. - - final OperatorStateHandles snapshot; - synchronized (testHarness.getCheckpointLock()) { - snapshot = testHarness.snapshot(0L, 0L); - } - - OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/reader-migration-test-flink1.2-snapshot"); - } - - @Test - public void testReaderRestore() throws Exception { - File testFolder = tempFolder.newFolder(); - - final OneShotLatch latch = new OneShotLatch(); - - BlockingFileInputFormat format = new BlockingFileInputFormat(latch, new Path(testFolder.getAbsolutePath())); - TypeInformation<FileInputSplit> typeInfo = TypeExtractor.getInputFormatTypes(format); - - ContinuousFileReaderOperator<FileInputSplit> initReader = new ContinuousFileReaderOperator<>(format); - initReader.setOutputType(typeInfo, new ExecutionConfig()); - - OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, FileInputSplit> testHarness = - new OneInputStreamOperatorTestHarness<>(initReader); - testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime); - - testHarness.setup(); - OperatorStateHandles operatorStateHandles = OperatorSnapshotUtil.readStateHandle( - OperatorSnapshotUtil.getResourceFilename( - "reader-migration-test-flink1.2-snapshot")); - testHarness.initializeState(operatorStateHandles); - testHarness.open(); - - latch.trigger(); - - // ... and wait for the operators to close gracefully - - synchronized (testHarness.getCheckpointLock()) { - testHarness.close(); - } - - TimestampedFileInputSplit split1 = - new TimestampedFileInputSplit(0, 3, new Path("test/test1"), 0, 100, null); - - TimestampedFileInputSplit split2 = - new TimestampedFileInputSplit(10, 2, new Path("test/test2"), 101, 200, null); - - TimestampedFileInputSplit split3 = - new TimestampedFileInputSplit(10, 1, new Path("test/test2"), 0, 100, null); - - TimestampedFileInputSplit split4 = - new TimestampedFileInputSplit(11, 0, new Path("test/test3"), 0, 100, null); - - // compare if the results contain what they should contain and also if - // they are the same, as they should. - - Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(split1))); - Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(split2))); - Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(split3))); - Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(split4))); - } - - /** - * Manually run this to write binary snapshot data. Remove @Ignore to run. - */ - @Ignore - @Test - public void writeMonitoringSourceSnapshot() throws Exception { - - File testFolder = tempFolder.newFolder(); - - long fileModTime = Long.MIN_VALUE; - for (int i = 0; i < 1; i++) { - Tuple2<File, String> file = createFileAndFillWithData(testFolder, "file", i, "This is test line."); - fileModTime = file.f0.lastModified(); - } - - TextInputFormat format = new TextInputFormat(new Path(testFolder.getAbsolutePath())); - - final ContinuousFileMonitoringFunction<String> monitoringFunction = - new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL); - - StreamSource<TimestampedFileInputSplit, ContinuousFileMonitoringFunction<String>> src = - new StreamSource<>(monitoringFunction); - - final AbstractStreamOperatorTestHarness<TimestampedFileInputSplit> testHarness = - new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0); - - testHarness.open(); - - final Throwable[] error = new Throwable[1]; - - final OneShotLatch latch = new OneShotLatch(); - - // run the source asynchronously - Thread runner = new Thread() { - @Override - public void run() { - try { - monitoringFunction.run(new DummySourceContext() { - @Override - public void collect(TimestampedFileInputSplit element) { - latch.trigger(); - } - - @Override - public void markAsTemporarilyIdle() { - - } - }); - } - catch (Throwable t) { - t.printStackTrace(); - error[0] = t; - } - } - }; - runner.start(); - - if (!latch.isTriggered()) { - latch.await(); - } - - final OperatorStateHandles snapshot; - synchronized (testHarness.getCheckpointLock()) { - snapshot = testHarness.snapshot(0L, 0L); - } - - OperatorSnapshotUtil.writeStateHandle( - snapshot, - "src/test/resources/monitoring-function-migration-test-" + fileModTime +"-flink1.2-snapshot"); - - monitoringFunction.cancel(); - runner.join(); - - testHarness.close(); - } - - @Test - public void testMonitoringSourceRestore() throws Exception { - - File testFolder = tempFolder.newFolder(); - - Long expectedModTime = Long.parseLong("1493116191000"); - TextInputFormat format = new TextInputFormat(new Path(testFolder.getAbsolutePath())); - - final ContinuousFileMonitoringFunction<String> monitoringFunction = - new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL); - - StreamSource<TimestampedFileInputSplit, ContinuousFileMonitoringFunction<String>> src = - new StreamSource<>(monitoringFunction); - - final AbstractStreamOperatorTestHarness<TimestampedFileInputSplit> testHarness = - new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0); - - testHarness.setup(); - OperatorStateHandles operatorStateHandles = OperatorSnapshotUtil.readStateHandle( - OperatorSnapshotUtil.getResourceFilename( - "monitoring-function-migration-test-1493116191000-flink1.2-snapshot")); - - testHarness.initializeState(operatorStateHandles); - testHarness.open(); - - Assert.assertEquals((long) expectedModTime, monitoringFunction.getGlobalModificationTime()); - - } - - private static class BlockingFileInputFormat extends FileInputFormat<FileInputSplit> { - - private static final long serialVersionUID = -6727603565381560267L; - - private final OneShotLatch latch; - - private FileInputSplit split; - - private boolean reachedEnd; - - BlockingFileInputFormat(OneShotLatch latch, Path filePath) { - super(filePath); - this.latch = latch; - this.reachedEnd = false; - } - - @Override - public void open(FileInputSplit fileSplit) throws IOException { - this.split = fileSplit; - this.reachedEnd = false; - } - - @Override - public boolean reachedEnd() throws IOException { - if (!latch.isTriggered()) { - try { - latch.await(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - return reachedEnd; - } - - @Override - public FileInputSplit nextRecord(FileInputSplit reuse) throws IOException { - this.reachedEnd = true; - return split; - } - - @Override - public void close() { - - } - } - - private static abstract class DummySourceContext - implements SourceFunction.SourceContext<TimestampedFileInputSplit> { - - private final Object lock = new Object(); - - @Override - public void collectWithTimestamp(TimestampedFileInputSplit element, long timestamp) { - } - - @Override - public void emitWatermark(Watermark mark) { - } - - @Override - public Object getCheckpointLock() { - return lock; - } - - @Override - public void close() { - } - } - - /** - * Create a file with pre-determined String format of the form: - * {@code fileIdx +": "+ sampleLine +" "+ lineNo}. - * */ - private Tuple2<File, String> createFileAndFillWithData( - File base, String fileName, int fileIdx, String sampleLine) throws IOException { - - File file = new File(base, fileName + fileIdx); - Assert.assertFalse(file.exists()); - - File tmp = new File(base, "." + fileName + fileIdx); - FileOutputStream stream = new FileOutputStream(tmp); - StringBuilder str = new StringBuilder(); - for (int i = 0; i < LINES_PER_FILE; i++) { - String line = fileIdx +": "+ sampleLine + " " + i +"\n"; - str.append(line); - stream.write(line.getBytes()); - } - stream.close(); - - FileUtils.moveFile(tmp, file); - - Assert.assertTrue("No result file present", file.exists()); - return new Tuple2<>(file, str.toString()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d4a646a0/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java ---------------------------------------------------------------------- diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java new file mode 100644 index 0000000..89776eb --- /dev/null +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java @@ -0,0 +1,423 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.hdfstests; + +import java.io.FileOutputStream; +import org.apache.commons.io.FileUtils; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.io.TextInputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction; +import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator; +import org.apache.flink.streaming.api.functions.source.FileProcessingMode; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit; +import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OperatorSnapshotUtil; +import org.apache.flink.streaming.util.migration.MigrationTestUtil; +import org.apache.flink.streaming.util.migration.MigrationVersion; + +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Tests that verify the migration from previous Flink version snapshots. + */ +@RunWith(Parameterized.class) +public class ContinuousFileProcessingMigrationTest { + + private static final int LINES_PER_FILE = 10; + + private static final long INTERVAL = 100; + + @Parameterized.Parameters(name = "Migration Savepoint / Mod Time: {0}") + public static Collection<Tuple2<MigrationVersion, Long>> parameters () { + return Arrays.asList( + Tuple2.of(MigrationVersion.v1_1, 1482144479339L), + Tuple2.of(MigrationVersion.v1_2, 1493116191000L), + Tuple2.of(MigrationVersion.v1_3, 1496532000000L)); + } + + /** + * TODO change this to the corresponding savepoint version to be written (e.g. {@link MigrationVersion#v1_3} for 1.3) + * TODO and remove all @Ignore annotations on write*Snapshot() methods to generate savepoints + */ + private final MigrationVersion flinkGenerateSavepointVersion = null; + + private final MigrationVersion testMigrateVersion; + private final Long expectedModTime; + + public ContinuousFileProcessingMigrationTest(Tuple2<MigrationVersion, Long> migrationVersionAndModTime) { + this.testMigrateVersion = migrationVersionAndModTime.f0; + this.expectedModTime = migrationVersionAndModTime.f1; + } + + @ClassRule + public static TemporaryFolder tempFolder = new TemporaryFolder(); + + /** + * Manually run this to write binary snapshot data. Remove @Ignore to run. + */ + @Ignore + @Test + public void writeReaderSnapshot() throws Exception { + + File testFolder = tempFolder.newFolder(); + + TimestampedFileInputSplit split1 = + new TimestampedFileInputSplit(0, 3, new Path("test/test1"), 0, 100, null); + + TimestampedFileInputSplit split2 = + new TimestampedFileInputSplit(10, 2, new Path("test/test2"), 101, 200, null); + + TimestampedFileInputSplit split3 = + new TimestampedFileInputSplit(10, 1, new Path("test/test2"), 0, 100, null); + + TimestampedFileInputSplit split4 = + new TimestampedFileInputSplit(11, 0, new Path("test/test3"), 0, 100, null); + + // this always blocks to ensure that the reader doesn't to any actual processing so that + // we keep the state for the four splits + final OneShotLatch blockingLatch = new OneShotLatch(); + BlockingFileInputFormat format = new BlockingFileInputFormat(blockingLatch, new Path(testFolder.getAbsolutePath())); + + TypeInformation<FileInputSplit> typeInfo = TypeExtractor.getInputFormatTypes(format); + ContinuousFileReaderOperator<FileInputSplit> initReader = new ContinuousFileReaderOperator<>( + format); + initReader.setOutputType(typeInfo, new ExecutionConfig()); + OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, FileInputSplit> testHarness = + new OneInputStreamOperatorTestHarness<>(initReader); + testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime); + testHarness.open(); + // create some state in the reader + testHarness.processElement(new StreamRecord<>(split1)); + testHarness.processElement(new StreamRecord<>(split2)); + testHarness.processElement(new StreamRecord<>(split3)); + testHarness.processElement(new StreamRecord<>(split4)); + // take a snapshot of the operator's state. This will be used + // to initialize another reader and compare the results of the + // two operators. + + final OperatorStateHandles snapshot; + synchronized (testHarness.getCheckpointLock()) { + snapshot = testHarness.snapshot(0L, 0L); + } + + OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/reader-migration-test-flink" + flinkGenerateSavepointVersion + "-snapshot"); + } + + @Test + public void testReaderRestore() throws Exception { + File testFolder = tempFolder.newFolder(); + + final OneShotLatch latch = new OneShotLatch(); + + BlockingFileInputFormat format = new BlockingFileInputFormat(latch, new Path(testFolder.getAbsolutePath())); + TypeInformation<FileInputSplit> typeInfo = TypeExtractor.getInputFormatTypes(format); + + ContinuousFileReaderOperator<FileInputSplit> initReader = new ContinuousFileReaderOperator<>(format); + initReader.setOutputType(typeInfo, new ExecutionConfig()); + + OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, FileInputSplit> testHarness = + new OneInputStreamOperatorTestHarness<>(initReader); + testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime); + + testHarness.setup(); + + MigrationTestUtil.restoreFromSnapshot( + testHarness, + OperatorSnapshotUtil.getResourceFilename( + "reader-migration-test-flink" + testMigrateVersion + "-snapshot"), + testMigrateVersion); + + testHarness.open(); + + latch.trigger(); + + // ... and wait for the operators to close gracefully + + synchronized (testHarness.getCheckpointLock()) { + testHarness.close(); + } + + TimestampedFileInputSplit split1 = + new TimestampedFileInputSplit(0, 3, new Path("test/test1"), 0, 100, null); + + TimestampedFileInputSplit split2 = + new TimestampedFileInputSplit(10, 2, new Path("test/test2"), 101, 200, null); + + TimestampedFileInputSplit split3 = + new TimestampedFileInputSplit(10, 1, new Path("test/test2"), 0, 100, null); + + TimestampedFileInputSplit split4 = + new TimestampedFileInputSplit(11, 0, new Path("test/test3"), 0, 100, null); + + // compare if the results contain what they should contain and also if + // they are the same, as they should. + + if (testMigrateVersion == MigrationVersion.v1_1) { + Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(createSplitFromTimestampedSplit(split1)))); + Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(createSplitFromTimestampedSplit(split2)))); + Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(createSplitFromTimestampedSplit(split3)))); + Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(createSplitFromTimestampedSplit(split4)))); + } else { + Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(split1))); + Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(split2))); + Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(split3))); + Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(split4))); + } + } + + /** + * Manually run this to write binary snapshot data. Remove @Ignore to run. + */ + @Ignore + @Test + public void writeMonitoringSourceSnapshot() throws Exception { + + File testFolder = tempFolder.newFolder(); + + long fileModTime = Long.MIN_VALUE; + for (int i = 0; i < 1; i++) { + Tuple2<File, String> file = createFileAndFillWithData(testFolder, "file", i, "This is test line."); + fileModTime = file.f0.lastModified(); + } + + TextInputFormat format = new TextInputFormat(new Path(testFolder.getAbsolutePath())); + + final ContinuousFileMonitoringFunction<String> monitoringFunction = + new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL); + + StreamSource<TimestampedFileInputSplit, ContinuousFileMonitoringFunction<String>> src = + new StreamSource<>(monitoringFunction); + + final AbstractStreamOperatorTestHarness<TimestampedFileInputSplit> testHarness = + new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0); + + testHarness.open(); + + final Throwable[] error = new Throwable[1]; + + final OneShotLatch latch = new OneShotLatch(); + + // run the source asynchronously + Thread runner = new Thread() { + @Override + public void run() { + try { + monitoringFunction.run(new DummySourceContext() { + @Override + public void collect(TimestampedFileInputSplit element) { + latch.trigger(); + } + + @Override + public void markAsTemporarilyIdle() { + + } + }); + } + catch (Throwable t) { + t.printStackTrace(); + error[0] = t; + } + } + }; + runner.start(); + + if (!latch.isTriggered()) { + latch.await(); + } + + final OperatorStateHandles snapshot; + synchronized (testHarness.getCheckpointLock()) { + snapshot = testHarness.snapshot(0L, 0L); + } + + OperatorSnapshotUtil.writeStateHandle( + snapshot, + "src/test/resources/monitoring-function-migration-test-" + fileModTime + "-flink" + flinkGenerateSavepointVersion + "-snapshot"); + + monitoringFunction.cancel(); + runner.join(); + + testHarness.close(); + } + + @Test + public void testMonitoringSourceRestore() throws Exception { + + File testFolder = tempFolder.newFolder(); + + TextInputFormat format = new TextInputFormat(new Path(testFolder.getAbsolutePath())); + + final ContinuousFileMonitoringFunction<String> monitoringFunction = + new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL); + + StreamSource<TimestampedFileInputSplit, ContinuousFileMonitoringFunction<String>> src = + new StreamSource<>(monitoringFunction); + + final AbstractStreamOperatorTestHarness<TimestampedFileInputSplit> testHarness = + new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0); + + testHarness.setup(); + + MigrationTestUtil.restoreFromSnapshot( + testHarness, + OperatorSnapshotUtil.getResourceFilename( + "monitoring-function-migration-test-" + expectedModTime + "-flink" + testMigrateVersion + "-snapshot"), + testMigrateVersion); + + testHarness.open(); + + Assert.assertEquals((long) expectedModTime, monitoringFunction.getGlobalModificationTime()); + + } + + private static class BlockingFileInputFormat extends FileInputFormat<FileInputSplit> { + + private static final long serialVersionUID = -6727603565381560267L; + + private final OneShotLatch latch; + + private FileInputSplit split; + + private boolean reachedEnd; + + BlockingFileInputFormat(OneShotLatch latch, Path filePath) { + super(filePath); + this.latch = latch; + this.reachedEnd = false; + } + + @Override + public void open(FileInputSplit fileSplit) throws IOException { + this.split = fileSplit; + this.reachedEnd = false; + } + + @Override + public boolean reachedEnd() throws IOException { + if (!latch.isTriggered()) { + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + return reachedEnd; + } + + @Override + public FileInputSplit nextRecord(FileInputSplit reuse) throws IOException { + this.reachedEnd = true; + return split; + } + + @Override + public void close() { + + } + } + + private static abstract class DummySourceContext + implements SourceFunction.SourceContext<TimestampedFileInputSplit> { + + private final Object lock = new Object(); + + @Override + public void collectWithTimestamp(TimestampedFileInputSplit element, long timestamp) { + } + + @Override + public void emitWatermark(Watermark mark) { + } + + @Override + public Object getCheckpointLock() { + return lock; + } + + @Override + public void close() { + } + } + + /** + * Create a file with pre-determined String format of the form: + * {@code fileIdx +": "+ sampleLine +" "+ lineNo}. + * */ + private Tuple2<File, String> createFileAndFillWithData( + File base, String fileName, int fileIdx, String sampleLine) throws IOException { + + File file = new File(base, fileName + fileIdx); + Assert.assertFalse(file.exists()); + + File tmp = new File(base, "." + fileName + fileIdx); + FileOutputStream stream = new FileOutputStream(tmp); + StringBuilder str = new StringBuilder(); + for (int i = 0; i < LINES_PER_FILE; i++) { + String line = fileIdx +": "+ sampleLine + " " + i +"\n"; + str.append(line); + stream.write(line.getBytes()); + } + stream.close(); + + FileUtils.moveFile(tmp, file); + + Assert.assertTrue("No result file present", file.exists()); + return new Tuple2<>(file, str.toString()); + } + + private FileInputSplit createSplitFromTimestampedSplit(TimestampedFileInputSplit split) { + checkNotNull(split); + + return new FileInputSplit( + split.getSplitNumber(), + split.getPath(), + split.getStart(), + split.getLength(), + split.getHostnames() + ); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d4a646a0/flink-fs-tests/src/test/resources/monitoring-function-migration-test-1496532000000-flink1.3-snapshot ---------------------------------------------------------------------- diff --git a/flink-fs-tests/src/test/resources/monitoring-function-migration-test-1496532000000-flink1.3-snapshot b/flink-fs-tests/src/test/resources/monitoring-function-migration-test-1496532000000-flink1.3-snapshot new file mode 100644 index 0000000..7ed677b Binary files /dev/null and b/flink-fs-tests/src/test/resources/monitoring-function-migration-test-1496532000000-flink1.3-snapshot differ http://git-wip-us.apache.org/repos/asf/flink/blob/d4a646a0/flink-fs-tests/src/test/resources/reader-migration-test-flink1.3-snapshot ---------------------------------------------------------------------- diff --git a/flink-fs-tests/src/test/resources/reader-migration-test-flink1.3-snapshot b/flink-fs-tests/src/test/resources/reader-migration-test-flink1.3-snapshot new file mode 100644 index 0000000..bb612bd Binary files /dev/null and b/flink-fs-tests/src/test/resources/reader-migration-test-flink1.3-snapshot differ