Repository: flink
Updated Branches:
  refs/heads/release-1.2 07865aaf8 -> b1ab75f48


[FLINK-5666] [tests] Add blob server clean up tests

Previously, deleting in HA mode was only tested with a local file system.
This verifies that the delete still works on HDFS.

This closes #3222.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b1ab75f4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b1ab75f4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b1ab75f4

Branch: refs/heads/release-1.2
Commit: b1ab75f482a8fa07e2a7f2a2ee31c0d808038cb0
Parents: 07865aa
Author: Nico Kruber <n...@data-artisans.com>
Authored: Thu Jan 26 20:29:58 2017 +0100
Committer: Ufuk Celebi <u...@apache.org>
Committed: Fri Jan 27 10:45:10 2017 +0100

----------------------------------------------------------------------
 .../org/apache/flink/hdfstests/HDFSTest.java    | 19 ++++++++++
 .../flink/runtime/blob/BlobRecoveryITCase.java  | 40 ++++++++++++++++----
 .../BlobLibraryCacheRecoveryITCase.java         |  8 ++--
 3 files changed, 55 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b1ab75f4/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
----------------------------------------------------------------------
diff --git 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
index 1df6390..49db0f8 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
@@ -24,10 +24,14 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironmentFactory;
 import org.apache.flink.api.java.LocalEnvironment;
 import org.apache.flink.api.java.io.AvroOutputFormat;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.examples.java.wordcount.WordCount;
+import org.apache.flink.runtime.blob.BlobRecoveryITCase;
 import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.util.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -217,6 +221,21 @@ public class HDFSTest {
                assertFalse(fs.exists(directory));
        }
 
+       /**
+        * Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed 
JARs are recoverable from any
+        * participating BlobServer.
+        */
+       @Test
+       public void testBlobServerRecovery() throws Exception {
+               org.apache.flink.configuration.Configuration
+                       config = new 
org.apache.flink.configuration.Configuration();
+               config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+               config.setString(ConfigConstants.STATE_BACKEND, "ZOOKEEPER");
+               config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
hdfsURI);
+
+               BlobRecoveryITCase.testBlobServerRecovery(config);
+       }
+
        // package visible
        static abstract class DopOneTestEnvironment extends 
ExecutionEnvironment {
                

http://git-wip-us.apache.org/repos/asf/flink/blob/b1ab75f4/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
index 3fe207e..a8eb1d3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
@@ -23,18 +23,24 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
+import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class BlobRecoveryITCase {
 
@@ -61,6 +67,16 @@ public class BlobRecoveryITCase {
         */
        @Test
        public void testBlobServerRecovery() throws Exception {
+               Configuration config = new Configuration();
+               config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+               config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
+               config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
recoveryDir.getPath());
+
+               testBlobServerRecovery(config);
+       }
+
+       public static void testBlobServerRecovery(final Configuration config) 
throws IOException {
+               String storagePath = 
config.getString(HighAvailabilityOptions.HA_STORAGE_PATH);
                Random rand = new Random();
 
                BlobServer[] server = new BlobServer[2];
@@ -68,10 +84,6 @@ public class BlobRecoveryITCase {
                BlobClient client = null;
 
                try {
-                       Configuration config = new Configuration();
-                       config.setString(HighAvailabilityOptions.HA_MODE, 
"ZOOKEEPER");
-                       config.setString(ConfigConstants.STATE_BACKEND, 
"FILESYSTEM");
-                       
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
recoveryDir.getPath());
 
                        for (int i = 0; i < server.length; i++) {
                                server[i] = new BlobServer(config);
@@ -96,6 +108,11 @@ public class BlobRecoveryITCase {
                        client.put(jobId[0], testKey[0], expected); // Request 3
                        client.put(jobId[1], testKey[1], expected, 32, 256); // 
Request 4
 
+                       // check that the storage directory exists
+                       final Path blobServerPath = new Path(storagePath, 
"blob");
+                       FileSystem fs = blobServerPath.getFileSystem();
+                       assertTrue("Unknown storage dir: " + blobServerPath, 
fs.exists(blobServerPath));
+
                        // Close the client and connect to the other server
                        client.close();
                        client = new BlobClient(serverAddress[1], config);
@@ -146,6 +163,17 @@ public class BlobRecoveryITCase {
                        client.delete(keys[1]);
                        client.delete(jobId[0], testKey[0]);
                        client.delete(jobId[1], testKey[1]);
+
+                       // Verify everything is clean
+                       if (fs.exists(blobServerPath)) {
+                               final org.apache.flink.core.fs.FileStatus[] 
recoveryFiles =
+                                       fs.listStatus(blobServerPath);
+                               ArrayList<String> filenames = new 
ArrayList<String>(recoveryFiles.length);
+                               for (org.apache.flink.core.fs.FileStatus file: 
recoveryFiles) {
+                                       filenames.add(file.toString());
+                               }
+                               fail("Unclean state backend: " + filenames);
+                       }
                }
                finally {
                        for (BlobServer s : server) {
@@ -158,9 +186,5 @@ public class BlobRecoveryITCase {
                                client.close();
                        }
                }
-
-               // Verify everything is clean
-               File[] recoveryFiles = recoveryDir.listFiles();
-               assertEquals("Unclean state backend: " + 
Arrays.toString(recoveryFiles), 0, recoveryFiles.length);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b1ab75f4/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index 8fabdf6..a727d51 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -143,6 +143,10 @@ public class BlobLibraryCacheRecoveryITCase {
                                client.delete(keys.get(0));
                                client.delete(keys.get(1));
                        }
+
+                       // Verify everything is clean
+                       File[] recoveryFiles = 
temporaryFolder.getRoot().listFiles();
+                       assertEquals("Unclean state backend: " + 
Arrays.toString(recoveryFiles), 0, recoveryFiles.length);
                }
                finally {
                        for (BlobServer s : server) {
@@ -159,9 +163,5 @@ public class BlobLibraryCacheRecoveryITCase {
                                libCache.shutdown();
                        }
                }
-
-               // Verify everything is clean
-               File[] recoveryFiles = temporaryFolder.getRoot().listFiles();
-               assertEquals("Unclean state backend: " + 
Arrays.toString(recoveryFiles), 0, recoveryFiles.length);
        }
 }

Reply via email to