Repository: flink Updated Branches: refs/heads/release-1.2 07865aaf8 -> b1ab75f48
[FLINK-5666] [tests] Add blob server clean up tests Previously, deleting in HA mode was only tested with a local file system. This verifies that the delete still works on HDFS. This closes #3222. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b1ab75f4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b1ab75f4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b1ab75f4 Branch: refs/heads/release-1.2 Commit: b1ab75f482a8fa07e2a7f2a2ee31c0d808038cb0 Parents: 07865aa Author: Nico Kruber <n...@data-artisans.com> Authored: Thu Jan 26 20:29:58 2017 +0100 Committer: Ufuk Celebi <u...@apache.org> Committed: Fri Jan 27 10:45:10 2017 +0100 ---------------------------------------------------------------------- .../org/apache/flink/hdfstests/HDFSTest.java | 19 ++++++++++ .../flink/runtime/blob/BlobRecoveryITCase.java | 40 ++++++++++++++++---- .../BlobLibraryCacheRecoveryITCase.java | 8 ++-- 3 files changed, 55 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b1ab75f4/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java ---------------------------------------------------------------------- diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java index 1df6390..49db0f8 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java @@ -24,10 +24,14 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.ExecutionEnvironmentFactory; import org.apache.flink.api.java.LocalEnvironment; import org.apache.flink.api.java.io.AvroOutputFormat; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.examples.java.wordcount.WordCount; +import org.apache.flink.runtime.blob.BlobRecoveryITCase; import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.util.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -217,6 +221,21 @@ public class HDFSTest { assertFalse(fs.exists(directory)); } + /** + * Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed JARs are recoverable from any + * participating BlobServer. + */ + @Test + public void testBlobServerRecovery() throws Exception { + org.apache.flink.configuration.Configuration + config = new org.apache.flink.configuration.Configuration(); + config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); + config.setString(ConfigConstants.STATE_BACKEND, "ZOOKEEPER"); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI); + + BlobRecoveryITCase.testBlobServerRecovery(config); + } + // package visible static abstract class DopOneTestEnvironment extends ExecutionEnvironment { http://git-wip-us.apache.org/repos/asf/flink/blob/b1ab75f4/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java index 3fe207e..a8eb1d3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java @@ -23,18 +23,24 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.io.File; +import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Arrays; import java.util.Random; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class BlobRecoveryITCase { @@ -61,6 +67,16 @@ public class BlobRecoveryITCase { */ @Test public void testBlobServerRecovery() throws Exception { + Configuration config = new Configuration(); + config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); + config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM"); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, recoveryDir.getPath()); + + testBlobServerRecovery(config); + } + + public static void testBlobServerRecovery(final Configuration config) throws IOException { + String storagePath = config.getString(HighAvailabilityOptions.HA_STORAGE_PATH); Random rand = new Random(); BlobServer[] server = new BlobServer[2]; @@ -68,10 +84,6 @@ public class BlobRecoveryITCase { BlobClient client = null; try { - Configuration config = new Configuration(); - config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); - config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM"); - config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, recoveryDir.getPath()); for (int i = 0; i < server.length; i++) { server[i] = new BlobServer(config); @@ -96,6 +108,11 @@ public class BlobRecoveryITCase { client.put(jobId[0], testKey[0], expected); // Request 3 client.put(jobId[1], testKey[1], expected, 32, 256); // Request 4 + // check that the storage directory exists + final Path blobServerPath = new Path(storagePath, "blob"); + FileSystem fs = blobServerPath.getFileSystem(); + assertTrue("Unknown storage dir: " + blobServerPath, fs.exists(blobServerPath)); + // Close the client and connect to the other server client.close(); client = new BlobClient(serverAddress[1], config); @@ -146,6 +163,17 @@ public class BlobRecoveryITCase { client.delete(keys[1]); client.delete(jobId[0], testKey[0]); client.delete(jobId[1], testKey[1]); + + // Verify everything is clean + if (fs.exists(blobServerPath)) { + final org.apache.flink.core.fs.FileStatus[] recoveryFiles = + fs.listStatus(blobServerPath); + ArrayList<String> filenames = new ArrayList<String>(recoveryFiles.length); + for (org.apache.flink.core.fs.FileStatus file: recoveryFiles) { + filenames.add(file.toString()); + } + fail("Unclean state backend: " + filenames); + } } finally { for (BlobServer s : server) { @@ -158,9 +186,5 @@ public class BlobRecoveryITCase { client.close(); } } - - // Verify everything is clean - File[] recoveryFiles = recoveryDir.listFiles(); - assertEquals("Unclean state backend: " + Arrays.toString(recoveryFiles), 0, recoveryFiles.length); } } http://git-wip-us.apache.org/repos/asf/flink/blob/b1ab75f4/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java index 8fabdf6..a727d51 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java @@ -143,6 +143,10 @@ public class BlobLibraryCacheRecoveryITCase { client.delete(keys.get(0)); client.delete(keys.get(1)); } + + // Verify everything is clean + File[] recoveryFiles = temporaryFolder.getRoot().listFiles(); + assertEquals("Unclean state backend: " + Arrays.toString(recoveryFiles), 0, recoveryFiles.length); } finally { for (BlobServer s : server) { @@ -159,9 +163,5 @@ public class BlobLibraryCacheRecoveryITCase { libCache.shutdown(); } } - - // Verify everything is clean - File[] recoveryFiles = temporaryFolder.getRoot().listFiles(); - assertEquals("Unclean state backend: " + Arrays.toString(recoveryFiles), 0, recoveryFiles.length); } }