[FLINK-2805] [blobmanager] Write JARs to file state backend for recovery Move StateBackend enum to top level and org.apache.flink.runtime.state
Abstract blob store in blob server for recovery This closes #1227. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c3a4d1d9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c3a4d1d9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c3a4d1d9 Branch: refs/heads/master Commit: c3a4d1d9f720a1da9697d0bbf48f7a3b1f5851b8 Parents: c2989f2 Author: Ufuk Celebi <[email protected]> Authored: Mon Oct 5 14:30:46 2015 +0200 Committer: Till Rohrmann <[email protected]> Committed: Tue Oct 20 00:16:52 2015 +0200 ---------------------------------------------------------------------- .../apache/flink/runtime/blob/BlobServer.java | 105 +++++++++-- .../runtime/blob/BlobServerConnection.java | 52 ++++-- .../apache/flink/runtime/blob/BlobStore.java | 97 ++++++++++ .../apache/flink/runtime/blob/BlobUtils.java | 75 +++++++- .../flink/runtime/blob/FileSystemBlobStore.java | 186 +++++++++++++++++++ .../flink/runtime/blob/VoidBlobStore.java | 61 ++++++ .../flink/runtime/jobmanager/RecoveryMode.java | 12 +- .../flink/runtime/blob/BlobRecoveryITCase.java | 159 ++++++++++++++++ .../BlobLibraryCacheRecoveryITCase.java | 176 ++++++++++++++++++ 9 files changed, 874 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c3a4d1d9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java index ef2ef61..d0bed8c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java @@ -18,6 +18,14 @@ package org.apache.flink.runtime.blob; +import org.apache.commons.io.FileUtils; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; @@ -30,13 +38,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.io.FileUtils; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.api.common.JobID; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import static com.google.common.base.Preconditions.checkNotNull; /** * This class implements the BLOB server. The BLOB server is responsible for listening for incoming requests and @@ -57,12 +59,12 @@ public class BlobServer extends Thread implements BlobService { /** Indicates whether a shutdown of server component has been requested. */ private final AtomicBoolean shutdownRequested = new AtomicBoolean(); - /** Shutdown hook thread to ensure deletion of the storage directory. */ - private final Thread shutdownHook; - /** Is the root directory for file storage */ private final File storageDir; + /** Blob store for recovery */ + private final BlobStore blobStore; + /** Set of currently running threads */ private final Set<BlobServerConnection> activeConnections = new HashSet<BlobServerConnection>(); @@ -70,18 +72,43 @@ public class BlobServer extends Thread implements BlobService { private final int maxConnections; /** + * Shutdown hook thread to ensure deletion of the storage directory (or <code>null</code> if + * the configured recovery mode does not equal{@link RecoveryMode#STANDALONE}) + */ + private final Thread shutdownHook; + + /** * Instantiates a new BLOB server and binds it to a free network port. - * + * * @throws IOException * thrown if the BLOB server cannot bind to a free network port */ public BlobServer(Configuration config) throws IOException { + checkNotNull(config, "Configuration"); + + RecoveryMode recoveryMode = RecoveryMode.fromConfig(config); // configure and create the storage directory String storageDirectory = config.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null); this.storageDir = BlobUtils.initStorageDirectory(storageDirectory); LOG.info("Created BLOB server storage directory {}", storageDir); + // No recovery. + if (recoveryMode == RecoveryMode.STANDALONE) { + this.blobStore = new VoidBlobStore(); + } + // Recovery. Check that everything has been setup correctly. This is not clean, but it's + // better to resolve this with some upcoming changes to the state backend setup. + else if (config.containsKey(ConfigConstants.STATE_BACKEND) && + config.containsKey(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH)) { + + this.blobStore = new FileSystemBlobStore(config); + } + // Fallback. + else { + this.blobStore = new VoidBlobStore(); + } + // configure the maximum number of concurrent connections final int maxConnections = config.getInteger( ConfigConstants.BLOB_FETCH_CONCURRENT_KEY, ConfigConstants.DEFAULT_BLOB_FETCH_CONCURRENT); @@ -102,8 +129,13 @@ public class BlobServer extends Thread implements BlobService { backlog = ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG; } - // Add shutdown hook to delete storage directory - this.shutdownHook = BlobUtils.addShutdownHook(this, LOG); + if (recoveryMode == RecoveryMode.STANDALONE) { + // Add shutdown hook to delete storage directory + this.shutdownHook = BlobUtils.addShutdownHook(this, LOG); + } + else { + this.shutdownHook = null; + } // start the server try { @@ -132,37 +164,43 @@ public class BlobServer extends Thread implements BlobService { * Returns a file handle to the file associated with the given blob key on the blob * server. * + * <p><strong>This is only called from the {@link BlobServerConnection}</strong> + * * @param key identifying the file * @return file handle to the file */ - public File getStorageLocation(BlobKey key) { + File getStorageLocation(BlobKey key) { return BlobUtils.getStorageLocation(storageDir, key); } /** * Returns a file handle to the file identified by the given jobID and key. * + * <p><strong>This is only called from the {@link BlobServerConnection}</strong> + * * @param jobID to which the file is associated * @param key to identify the file within the job context * @return file handle to the file */ - public File getStorageLocation(JobID jobID, String key) { + File getStorageLocation(JobID jobID, String key) { return BlobUtils.getStorageLocation(storageDir, jobID, key); } /** * Method which deletes all files associated with the given jobID. * + * <p><strong>This is only called from the {@link BlobServerConnection}</strong> + * * @param jobID all files associated to this jobID will be deleted * @throws IOException */ - public void deleteJobDirectory(JobID jobID) throws IOException { + void deleteJobDirectory(JobID jobID) throws IOException { BlobUtils.deleteJobDirectory(storageDir, jobID); } /** * Returns a temporary file inside the BLOB server's incoming directory. - * + * * @return a temporary file inside the BLOB server's incoming directory */ File createTemporaryFilename() { @@ -170,6 +208,13 @@ public class BlobServer extends Thread implements BlobService { String.format("temp-%08d", tempFileCounter.getAndIncrement())); } + /** + * Returns the blob store. + */ + BlobStore getBlobStore() { + return blobStore; + } + @Override public void run() { try { @@ -245,6 +290,9 @@ public class BlobServer extends Thread implements BlobService { LOG.error("BLOB server failed to properly clean up its storage directory."); } + // Clean up the recovery directory + blobStore.cleanUp(); + // Remove shutdown hook to prevent resource leaks, unless this is invoked by the // shutdown hook itself if (shutdownHook != null && shutdownHook != Thread.currentThread()) { @@ -282,11 +330,26 @@ public class BlobServer extends Thread implements BlobService { final File localFile = BlobUtils.getStorageLocation(storageDir, requiredBlob); - if (!localFile.exists()) { - throw new FileNotFoundException("File " + localFile.getCanonicalPath() + " does not exist."); - } else { + if (localFile.exists()) { return localFile.toURI().toURL(); } + else { + try { + // Try the blob store + blobStore.get(requiredBlob, localFile); + } + catch (Exception e) { + throw new IOException("Failed to copy from blob store.", e); + } + + if (localFile.exists()) { + return localFile.toURI().toURL(); + } + else { + throw new FileNotFoundException("Local file " + localFile + " does not exist " + + "and failed to copy from blob store."); + } + } } /** @@ -305,6 +368,8 @@ public class BlobServer extends Thread implements BlobService { LOG.warn("Failed to delete locally BLOB " + key + " at " + localFile.getAbsolutePath()); } } + + blobStore.delete(key); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/c3a4d1d9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java index 793a9d6..d7bba8f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java @@ -18,6 +18,12 @@ package org.apache.flink.runtime.blob; +import com.google.common.io.Files; +import org.apache.flink.api.common.JobID; +import org.apache.flink.util.InstantiationUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.EOFException; import java.io.File; import java.io.FileInputStream; @@ -29,28 +35,21 @@ import java.net.Socket; import java.net.SocketException; import java.security.MessageDigest; -import com.google.common.io.Files; -import org.apache.flink.api.common.JobID; -import org.apache.flink.util.InstantiationUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE; import static org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_ADDRESSABLE; +import static org.apache.flink.runtime.blob.BlobServerProtocol.DELETE_OPERATION; +import static org.apache.flink.runtime.blob.BlobServerProtocol.GET_OPERATION; import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_ID_SCOPE; +import static org.apache.flink.runtime.blob.BlobServerProtocol.MAX_KEY_LENGTH; import static org.apache.flink.runtime.blob.BlobServerProtocol.NAME_ADDRESSABLE; +import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION; +import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_ERROR; +import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY; import static org.apache.flink.runtime.blob.BlobUtils.closeSilently; import static org.apache.flink.runtime.blob.BlobUtils.readFully; import static org.apache.flink.runtime.blob.BlobUtils.readLength; import static org.apache.flink.runtime.blob.BlobUtils.writeLength; -import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE; -import static org.apache.flink.runtime.blob.BlobServerProtocol.DELETE_OPERATION; -import static org.apache.flink.runtime.blob.BlobServerProtocol.GET_OPERATION; -import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION; -import static org.apache.flink.runtime.blob.BlobServerProtocol.MAX_KEY_LENGTH; -import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY; -import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_ERROR; - /** * A BLOB connection handles a series of requests from a particular BLOB client. */ @@ -181,10 +180,18 @@ class BlobServerConnection extends Thread { JobID jobID = JobID.fromByteArray(jidBytes); String key = readKey(buf, inputStream); blobFile = this.blobServer.getStorageLocation(jobID, key); + + if (!blobFile.exists()) { + blobServer.getBlobStore().get(jobID, key, blobFile); + } } else if (contentAddressable == CONTENT_ADDRESSABLE) { final BlobKey key = BlobKey.readFromInputStream(inputStream); blobFile = blobServer.getStorageLocation(key); + + if (!blobFile.exists()) { + blobServer.getBlobStore().get(key, blobFile); + } } else { throw new IOException("Unknown type of BLOB addressing."); @@ -194,6 +201,7 @@ class BlobServerConnection extends Thread { if (!blobFile.exists()) { throw new IOException("Cannot find required BLOB at " + blobFile.getAbsolutePath()); } + if (blobFile.length() > Integer.MAX_VALUE) { throw new IOException("BLOB size exceeds the maximum size (2 GB)."); } @@ -220,8 +228,7 @@ class BlobServerConnection extends Thread { int blobLen = (int) blobFile.length(); writeLength(blobLen, outputStream); - FileInputStream fis = new FileInputStream(blobFile); - try { + try (FileInputStream fis = new FileInputStream(blobFile)) { int bytesRemaining = blobLen; while (bytesRemaining > 0) { int read = fis.read(buf); @@ -231,8 +238,6 @@ class BlobServerConnection extends Thread { outputStream.write(buf, 0, read); bytesRemaining -= read; } - } finally { - fis.close(); } } catch (SocketException e) { @@ -314,6 +319,9 @@ class BlobServerConnection extends Thread { File storageFile = this.blobServer.getStorageLocation(jobID, key); Files.move(incomingFile, storageFile); incomingFile = null; + + blobServer.getBlobStore().put(storageFile, jobID, key); + outputStream.write(RETURN_OKAY); } else { @@ -322,6 +330,8 @@ class BlobServerConnection extends Thread { Files.move(incomingFile, storageFile); incomingFile = null; + blobServer.getBlobStore().put(storageFile, blobKey); + // Return computed key to client for validation outputStream.write(RETURN_OKAY); blobKey.writeToOutputStream(outputStream); @@ -379,6 +389,8 @@ class BlobServerConnection extends Thread { if (blobFile.exists() && !blobFile.delete()) { throw new IOException("Cannot delete BLOB file " + blobFile.getAbsolutePath()); } + + blobServer.getBlobStore().delete(key); } else if (type == NAME_ADDRESSABLE) { byte[] jidBytes = new byte[JobID.SIZE]; @@ -391,6 +403,8 @@ class BlobServerConnection extends Thread { if (blobFile.exists() && !blobFile.delete()) { throw new IOException("Cannot delete BLOB file " + blobFile.getAbsolutePath()); } + + blobServer.getBlobStore().delete(jobID, key); } else if (type == JOB_ID_SCOPE) { byte[] jidBytes = new byte[JobID.SIZE]; @@ -398,6 +412,8 @@ class BlobServerConnection extends Thread { JobID jobID = JobID.fromByteArray(jidBytes); blobServer.deleteJobDirectory(jobID); + + blobServer.getBlobStore().deleteAll(jobID); } else { throw new IOException("Unrecognized addressing type: " + type); http://git-wip-us.apache.org/repos/asf/flink/blob/c3a4d1d9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java new file mode 100644 index 0000000..1e72d91 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.api.common.JobID; + +import java.io.File; + +/** + * A blob store. + */ +interface BlobStore { + + /** + * Copies the local file to the blob store. + * + * @param localFile The file to copy + * @param blobKey The ID for the file in the blob store + * @throws Exception If the copy fails + */ + void put(File localFile, BlobKey blobKey) throws Exception; + + /** + * Copies a local file to the blob store. + * + * <p>The job ID and key make up a composite key for the file. + * + * @param localFile The file to copy + * @param jobId The JobID part of ID for the file in the blob store + * @param key The String part of ID for the file in the blob store + * @throws Exception If the copy fails + */ + void put(File localFile, JobID jobId, String key) throws Exception; + + /** + * Copies a blob to a local file. + * + * @param blobKey The blob ID + * @param localFile The local file to copy to + * @throws Exception If the copy fails + */ + void get(BlobKey blobKey, File localFile) throws Exception; + + /** + * Copies a blob to a local file. + * + * @param jobId The JobID part of ID for the blob + * @param key The String part of ID for the blob + * @param localFile The local file to copy to + * @throws Exception If the copy fails + */ + void get(JobID jobId, String key, File localFile) throws Exception; + + /** + * Deletes a blob. + * + * @param blobKey The blob ID + */ + void delete(BlobKey blobKey); + + /** + * Deletes a blob. + * + * @param jobId The JobID part of ID for the blob + * @param key The String part of ID for the blob + */ + void delete(JobID jobId, String key); + + /** + * Deletes blobs. + * + * @param jobId The JobID part of all blobs to delete + */ + void deleteAll(JobID jobId); + + /** + * Cleans up the store and deletes all blobs. + */ + void cleanUp(); + +} http://git-wip-us.apache.org/repos/asf/flink/blob/c3a4d1d9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java index c47ecf2..d8f744b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java @@ -21,14 +21,19 @@ package org.apache.flink.runtime.blob; import com.google.common.io.BaseEncoding; import org.apache.commons.io.FileUtils; import org.apache.flink.api.common.JobID; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.util.IOUtils; import org.slf4j.Logger; import java.io.EOFException; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; +import java.net.URI; import java.nio.charset.Charset; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; @@ -49,12 +54,12 @@ public class BlobUtils { /** * The prefix of all BLOB files stored by the BLOB server. */ - private static final String BLOB_FILE_PREFIX = "blob_"; + static final String BLOB_FILE_PREFIX = "blob_"; /** * The prefix of all job-specific directories created by the BLOB server. */ - private static final String JOB_DIR_PREFIX = "job_"; + static final String JOB_DIR_PREFIX = "job_"; /** * The default character set to translate between characters and bytes. @@ -103,7 +108,7 @@ public class BlobUtils { static File getIncomingDirectory(File storageDir) { final File incomingDir = new File(storageDir, "incoming"); - if (!incomingDir.exists() && !incomingDir.mkdir()) { + if (!incomingDir.exists() && !incomingDir.mkdirs()) { throw new RuntimeException("Cannot create directory for incoming files " + incomingDir.getAbsolutePath()); } @@ -119,7 +124,7 @@ public class BlobUtils { private static File getCacheDirectory(File storageDir) { final File cacheDirectory = new File(storageDir, "cache"); - if (!cacheDirectory.exists() && !cacheDirectory.mkdir()) { + if (!cacheDirectory.exists() && !cacheDirectory.mkdirs()) { throw new RuntimeException("Could not create cache directory '" + cacheDirectory.getAbsolutePath() + "'."); } @@ -174,7 +179,7 @@ public class BlobUtils { * the user's key for a BLOB * @return the internal name for the BLOB as used by the BLOB server */ - private static String encodeKey(String key) { + static String encodeKey(String key) { return BaseEncoding.base64().encode(key.getBytes(DEFAULT_CHARSET)); } @@ -327,6 +332,66 @@ public class BlobUtils { } /** + * Returns the path for the given blob key. + * + * <p>The returned path can be used with the state backend for recovery purposes. + * + * <p>This follows the same scheme as {@link #getStorageLocation(File, BlobKey)}. + */ + static String getRecoveryPath(String basePath, BlobKey blobKey) { + // format: $base/cache/blob_$key + return String.format("%s/cache/%s", basePath, BLOB_FILE_PREFIX + blobKey.toString()); + } + + /** + * Returns the path for the given job ID and key. + * + * <p>The returned path can be used with the state backend for recovery purposes. + * + * <p>This follows the same scheme as {@link #getStorageLocation(File, JobID, String)}. + */ + static String getRecoveryPath(String basePath, JobID jobId, String key) { + // format: $base/job_$id/blob_$key + return String.format("%s/%s/%s", basePath, JOB_DIR_PREFIX + jobId.toString(), + BLOB_FILE_PREFIX + encodeKey(key)); + } + + /** + * Returns the path for the given job ID. + * + * <p>The returned path can be used with the state backend for recovery purposes. + */ + static String getRecoveryPath(String basePath, JobID jobId) { + return String.format("%s/%s", basePath, JOB_DIR_PREFIX + jobId.toString()); + } + + /** + * Copies the file from the recovery path to the local file. + */ + static void copyFromRecoveryPath(String recoveryPath, File localBlobFile) throws Exception { + if (recoveryPath == null) { + throw new IllegalStateException("Failed to determine recovery path."); + } + + if (!localBlobFile.createNewFile()) { + throw new IllegalStateException("Failed to create new local file to copy to"); + } + + URI uri = new URI(recoveryPath); + Path path = new Path(recoveryPath); + + if (FileSystem.get(uri).exists(path)) { + try (InputStream is = FileSystem.get(uri).open(path)) { + FileOutputStream fos = new FileOutputStream(localBlobFile); + IOUtils.copyBytes(is, fos); // closes the streams + } + } + else { + throw new IOException("Cannot find required BLOB at '" + recoveryPath + "' for recovery."); + } + } + + /** * Private constructor to prevent instantiation. */ private BlobUtils() { http://git-wip-us.apache.org/repos/asf/flink/blob/c3a4d1d9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java new file mode 100644 index 0000000..8a037ad --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import com.google.common.io.Files; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.util.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.net.URISyntaxException; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * Blob store backed by {@link FileSystem}. + */ +class FileSystemBlobStore implements BlobStore { + + private static final Logger LOG = LoggerFactory.getLogger(FileSystemBlobStore.class); + + /** The base path of the blob store */ + private final String basePath; + + FileSystemBlobStore(Configuration config) throws IOException { + StateBackend stateBackend = StateBackend.fromConfig(config); + + if (stateBackend == StateBackend.FILESYSTEM) { + String stateBackendBasePath = config.getString( + ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, ""); + + if (stateBackendBasePath.equals("")) { + throw new IllegalConfigurationException(String.format("Missing configuration for " + + "file system state backend recovery path. Please specify via " + + "'%s' key.", ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH)); + } + + stateBackendBasePath += "/blob"; + + this.basePath = stateBackendBasePath; + + try { + FileSystem.get(new URI(basePath)).mkdirs(new Path(basePath)); + } + catch (URISyntaxException e) { + throw new IOException(e); + } + + LOG.info("Created blob directory {}.", basePath); + } + else { + // Nothing else support at the moment + throw new IllegalConfigurationException( + String.format("Illegal state backend " + + "configuration '%s'. Please configure '%s' as state " + + "backend and specify the recovery path via '%s' key.", + stateBackend, StateBackend.FILESYSTEM, + ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH)); + } + } + + // - Put ------------------------------------------------------------------ + + @Override + public void put(File localFile, BlobKey blobKey) throws Exception { + put(localFile, BlobUtils.getRecoveryPath(basePath, blobKey)); + } + + @Override + public void put(File localFile, JobID jobId, String key) throws Exception { + put(localFile, BlobUtils.getRecoveryPath(basePath, jobId, key)); + } + + private void put(File fromFile, String toBlobPath) throws Exception { + try (OutputStream os = FileSystem.get(new URI(toBlobPath)) + .create(new Path(toBlobPath), true)) { + + LOG.debug("Copying from {} to {}.", fromFile, toBlobPath); + Files.copy(fromFile, os); + } + } + + // - Get ------------------------------------------------------------------ + + @Override + public void get(BlobKey blobKey, File localFile) throws Exception { + get(BlobUtils.getRecoveryPath(basePath, blobKey), localFile); + } + + @Override + public void get(JobID jobId, String key, File localFile) throws Exception { + get(BlobUtils.getRecoveryPath(basePath, jobId, key), localFile); + } + + private void get(String fromBlobPath, File toFile) throws Exception { + checkNotNull(fromBlobPath, "Blob path"); + checkNotNull(toFile, "File"); + + if (!toFile.exists() && !toFile.createNewFile()) { + throw new IllegalStateException("Failed to create target file to copy to"); + } + + final URI fromUri = new URI(fromBlobPath); + final Path fromPath = new Path(fromBlobPath); + + if (FileSystem.get(fromUri).exists(fromPath)) { + try (InputStream is = FileSystem.get(fromUri).open(fromPath)) { + FileOutputStream fos = new FileOutputStream(toFile); + + LOG.debug("Copying from {} to {}.", fromBlobPath, toFile); + IOUtils.copyBytes(is, fos); // closes the streams + } + } + else { + throw new IOException(fromBlobPath + " does not exist."); + } + } + + // - Delete --------------------------------------------------------------- + + @Override + public void delete(BlobKey blobKey) { + delete(BlobUtils.getRecoveryPath(basePath, blobKey)); + } + + @Override + public void delete(JobID jobId, String key) { + delete(BlobUtils.getRecoveryPath(basePath, jobId, key)); + } + + @Override + public void deleteAll(JobID jobId) { + delete(BlobUtils.getRecoveryPath(basePath, jobId)); + } + + private void delete(String blobPath) { + try { + LOG.debug("Deleting {}.", blobPath); + + FileSystem.get(new URI(blobPath)).delete(new Path(blobPath), true); + } + catch (Exception e) { + LOG.warn("Failed to delete blob at " + blobPath); + } + } + + @Override + public void cleanUp() { + try { + LOG.debug("Cleaning up {}.", basePath); + + FileSystem.get(new URI(basePath)).delete(new Path(basePath), true); + } + catch (Exception e) { + LOG.error("Failed to clean up recovery directory."); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c3a4d1d9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java new file mode 100644 index 0000000..1b71add --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.api.common.JobID; + +import java.io.File; + +/** + * A blob store doing nothing. + */ +class VoidBlobStore implements BlobStore { + + @Override + public void put(File localFile, BlobKey blobKey) throws Exception { + } + + @Override + public void put(File localFile, JobID jobId, String key) throws Exception { + } + + @Override + public void get(BlobKey blobKey, File localFile) throws Exception { + } + + @Override + public void get(JobID jobId, String key, File localFile) throws Exception { + } + + @Override + public void delete(BlobKey blobKey) { + } + + @Override + public void delete(JobID jobId, String key) { + } + + @Override + public void deleteAll(JobID jobId) { + } + + @Override + public void cleanUp() { + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c3a4d1d9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java index 17322d8..077e34d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java @@ -24,11 +24,11 @@ import org.apache.flink.configuration.Configuration; /** * Recovery mode for Flink's cluster execution. Currently supported modes are: * - * - Standalone: No recovery from JobManager failures - * - ZooKeeper: JobManager high availability via ZooKeeper - * ZooKeeper is used to select a leader among a group of JobManager. This JobManager - * is responsible for the job execution. Upon failure of the leader a new leader is elected - * which will take over the responsibilities of the old leader + * - Standalone: No recovery from JobManager failures + * - ZooKeeper: JobManager high availability via ZooKeeper + * ZooKeeper is used to select a leader among a group of JobManager. This JobManager + * is responsible for the job execution. Upon failure of the leader a new leader is elected + * which will take over the responsibilities of the old leader */ public enum RecoveryMode { STANDALONE, @@ -69,4 +69,4 @@ public enum RecoveryMode { return false; } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/c3a4d1d9/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java new file mode 100644 index 0000000..0e324a8 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.commons.io.FileUtils; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.Random; + +import static org.junit.Assert.assertEquals; + +public class BlobRecoveryITCase { + + private File recoveryDir; + + @Before + public void setUp() throws Exception { + recoveryDir = new File(FileUtils.getTempDirectory(), "BlobRecoveryITCaseDir"); + if (!recoveryDir.exists() && !recoveryDir.mkdirs()) { + throw new IllegalStateException("Failed to create temp directory for test"); + } + } + + @After + public void cleanUp() throws Exception { + if (recoveryDir != null) { + FileUtils.deleteDirectory(recoveryDir); + } + } + + /** + * Tests that with {@link RecoveryMode#ZOOKEEPER} distributed JARs are recoverable from any + * participating BlobServer. + */ + @Test + public void testBlobServerRecovery() throws Exception { + Random rand = new Random(); + + BlobServer[] server = new BlobServer[2]; + InetSocketAddress[] serverAddress = new InetSocketAddress[2]; + BlobClient client = null; + + try { + Configuration config = new Configuration(); + config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER"); + config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM"); + config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, recoveryDir.getPath()); + + for (int i = 0; i < server.length; i++) { + server[i] = new BlobServer(config); + serverAddress[i] = new InetSocketAddress("localhost", server[i].getPort()); + } + + client = new BlobClient(serverAddress[0]); + + // Random data + byte[] expected = new byte[1024]; + rand.nextBytes(expected); + + BlobKey[] keys = new BlobKey[2]; + + // Put data + keys[0] = client.put(expected); // Request 1 + keys[1] = client.put(expected, 32, 256); // Request 2 + + JobID[] jobId = new JobID[] { new JobID(), new JobID() }; + String[] testKey = new String[] { "test-key-1", "test-key-2" }; + + client.put(jobId[0], testKey[0], expected); // Request 3 + client.put(jobId[1], testKey[1], expected, 32, 256); // Request 4 + + // Close the client and connect to the other server + client.close(); + client = new BlobClient(serverAddress[1]); + + // Verify request 1 + try (InputStream is = client.get(keys[0])) { + byte[] actual = new byte[expected.length]; + + BlobUtils.readFully(is, actual, 0, expected.length, null); + + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], actual[i]); + } + } + + // Verify request 2 + try (InputStream is = client.get(keys[1])) { + byte[] actual = new byte[256]; + BlobUtils.readFully(is, actual, 0, 256, null); + + for (int i = 32, j = 0; i < 256; i++, j++) { + assertEquals(expected[i], actual[j]); + } + } + + // Verify request 3 + try (InputStream is = client.get(jobId[0], testKey[0])) { + byte[] actual = new byte[expected.length]; + BlobUtils.readFully(is, actual, 0, expected.length, null); + + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], actual[i]); + } + } + + // Verify request 4 + try (InputStream is = client.get(jobId[1], testKey[1])) { + byte[] actual = new byte[256]; + BlobUtils.readFully(is, actual, 0, 256, null); + + for (int i = 32, j = 0; i < 256; i++, j++) { + assertEquals(expected[i], actual[j]); + } + } + } + finally { + for (BlobServer s : server) { + if (s != null) { + s.shutdown(); + } + } + + if (client != null) { + client.close(); + } + } + + // Verify everything is clean + File[] recoveryFiles = recoveryDir.listFiles(); + assertEquals("Unclean state backend: " + Arrays.toString(recoveryFiles), 0, recoveryFiles.length); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c3a4d1d9/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java new file mode 100644 index 0000000..4df8afb --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.execution.librarycache; + +import org.apache.commons.io.FileUtils; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.blob.BlobCache; +import org.apache.flink.runtime.blob.BlobClient; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FileInputStream; +import java.net.InetSocketAddress; +import java.net.URL; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +import static org.junit.Assert.assertEquals; + +public class BlobLibraryCacheRecoveryITCase { + + private File recoveryDir; + + @Before + public void setUp() throws Exception { + recoveryDir = new File(FileUtils.getTempDirectory(), "BlobRecoveryITCaseDir"); + if (!recoveryDir.exists() && !recoveryDir.mkdirs()) { + throw new IllegalStateException("Failed to create temp directory for test"); + } + } + + @After + public void cleanUp() throws Exception { + if (recoveryDir != null) { + FileUtils.deleteDirectory(recoveryDir); + } + } + + /** + * Tests that with {@link RecoveryMode#ZOOKEEPER} distributed JARs are recoverable from any + * participating BlobLibraryCacheManager. + */ + @Test + public void testRecoveryRegisterAndDownload() throws Exception { + Random rand = new Random(); + + BlobServer[] server = new BlobServer[2]; + InetSocketAddress[] serverAddress = new InetSocketAddress[2]; + BlobLibraryCacheManager[] libServer = new BlobLibraryCacheManager[2]; + BlobCache cache = null; + BlobLibraryCacheManager libCache = null; + + try { + Configuration config = new Configuration(); + config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER"); + config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM"); + config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, recoveryDir.getPath()); + + for (int i = 0; i < server.length; i++) { + server[i] = new BlobServer(config); + serverAddress[i] = new InetSocketAddress("localhost", server[i].getPort()); + libServer[i] = new BlobLibraryCacheManager(server[i], 3600 * 1000); + } + + // Random data + byte[] expected = new byte[1024]; + rand.nextBytes(expected); + + List<BlobKey> keys = new ArrayList<>(2); + + // Upload some data (libraries) + try (BlobClient client = new BlobClient(serverAddress[0])) { + keys.add(client.put(expected)); // Request 1 + keys.add(client.put(expected, 32, 256)); // Request 2 + } + + // The cache + cache = new BlobCache(serverAddress[0], config); + libCache = new BlobLibraryCacheManager(cache, 3600 * 1000); + + // Register uploaded libraries + JobID jobId = new JobID(); + ExecutionAttemptID executionId = new ExecutionAttemptID(); + libServer[0].registerTask(jobId, executionId, keys, Collections.<URL>emptyList()); + + // Verify key 1 + File f = libCache.getFile(keys.get(0)); + assertEquals(expected.length, f.length()); + + try (FileInputStream fis = new FileInputStream(f)) { + for (int i = 0; i < expected.length && fis.available() > 0; i++) { + assertEquals(expected[i], (byte) fis.read()); + } + + assertEquals(0, fis.available()); + } + + // Shutdown cache and start with other server + cache.shutdown(); + libCache.shutdown(); + + cache = new BlobCache(serverAddress[1], config); + libCache = new BlobLibraryCacheManager(cache, 3600 * 1000); + + // Verify key 1 + f = libCache.getFile(keys.get(0)); + assertEquals(expected.length, f.length()); + + try (FileInputStream fis = new FileInputStream(f)) { + for (int i = 0; i < expected.length && fis.available() > 0; i++) { + assertEquals(expected[i], (byte) fis.read()); + } + + assertEquals(0, fis.available()); + } + + // Verify key 2 + f = libCache.getFile(keys.get(1)); + assertEquals(256, f.length()); + + try (FileInputStream fis = new FileInputStream(f)) { + for (int i = 0; i < 256 && fis.available() > 0; i++) { + assertEquals(expected[32 + i], (byte) fis.read()); + } + + assertEquals(0, fis.available()); + } + } + finally { + for (BlobServer s : server) { + if (s != null) { + s.shutdown(); + } + } + + if (cache != null) { + cache.shutdown(); + } + + if (libCache != null) { + libCache.shutdown(); + } + } + + // Verify everything is clean + File[] recoveryFiles = recoveryDir.listFiles(); + assertEquals("Unclean state backend: " + Arrays.toString(recoveryFiles), 0, recoveryFiles.length); + } +}
