Repository: flink Updated Branches: refs/heads/release-1.4 df0526172 -> c701a335b
[FLINK-8473][webUI] Improve error behavior of JarListHandler This closes #5331. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/20be204b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/20be204b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/20be204b Branch: refs/heads/release-1.4 Commit: 20be204b96edd5c92683013a4c5af9ea4096acca Parents: df05261 Author: zentol <[email protected]> Authored: Mon Jan 22 13:29:34 2018 +0100 Committer: zentol <[email protected]> Committed: Wed Jan 24 10:42:16 2018 +0100 ---------------------------------------------------------------------- .../runtime/webmonitor/HttpRequestHandler.java | 4 +++ .../runtime/webmonitor/WebRuntimeMonitor.java | 32 +++++++++++++++----- .../webmonitor/handlers/JarActionHandler.java | 11 +++++++ .../webmonitor/handlers/JarDeleteHandler.java | 13 ++++++++ .../webmonitor/handlers/JarListHandler.java | 11 +++++++ 5 files changed, 64 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/20be204b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java index f8b51d7..a0fda9d 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java @@ -133,6 +133,10 @@ public class HttpRequestHandler extends SimpleChannelInboundHandler<HttpObject> String name = file.getFilename(); File target = new File(tmpDir, UUID.randomUUID() + "_" + name); + if (!tmpDir.exists()) { + WebRuntimeMonitor.logExternalUploadDirDeletion(tmpDir); + WebRuntimeMonitor.checkAndCreateUploadDir(tmpDir); + } file.renameTo(target); QueryStringEncoder encoder = new QueryStringEncoder(currentRequestPath); http://git-wip-us.apache.org/repos/asf/flink/blob/20be204b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java index fe5f106..e7bb157 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java @@ -188,13 +188,7 @@ public class WebRuntimeMonitor implements WebMonitor { if (webSubmitAllow) { // create storage for uploads this.uploadDir = getUploadDir(config); - // the upload directory should either 1. exist and writable or 2. can be created and writable - if (!(uploadDir.exists() && uploadDir.canWrite()) && !(uploadDir.mkdirs() && uploadDir.canWrite())) { - throw new IOException( - String.format("Jar upload directory %s cannot be created or is not writable.", - uploadDir.getAbsolutePath())); - } - LOG.info("Using directory {} for web frontend JAR file uploads", uploadDir); + checkAndCreateUploadDir(uploadDir); } else { this.uploadDir = null; @@ -578,4 +572,28 @@ public class WebRuntimeMonitor implements WebMonitor { boolean uploadDirSpecified = configuration.contains(WebOptions.UPLOAD_DIR); return uploadDirSpecified ? baseDir : new File(baseDir, "flink-web-" + UUID.randomUUID()); } + + public static void logExternalUploadDirDeletion(File uploadDir) { + LOG.warn("Jar storage directory {} has been deleted externally. Previously uploaded jars are no longer available.", uploadDir.getAbsolutePath()); + } + + /** + * Checks whether the given directory exists and is writable. If it doesn't exist this method will attempt to create + * it. + * + * @param uploadDir directory to check + * @throws IOException if the directory does not exist and cannot be created, or if the directory isn't writable + */ + public static synchronized void checkAndCreateUploadDir(File uploadDir) throws IOException { + if (uploadDir.exists() && uploadDir.canWrite()) { + LOG.info("Using directory {} for web frontend JAR file uploads.", uploadDir); + } else if (uploadDir.mkdirs() && uploadDir.canWrite()) { + LOG.info("Created directory {} for web frontend JAR file uploads.", uploadDir); + } else { + LOG.warn("Jar upload directory {} cannot be created or is not writable.", uploadDir.getAbsolutePath()); + throw new IOException( + String.format("Jar upload directory %s cannot be created or is not writable.", + uploadDir.getAbsolutePath())); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/20be204b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java index c601a8d..eea3a87 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java @@ -37,11 +37,13 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler; import org.apache.flink.runtime.rest.handler.legacy.JsonFactory; +import org.apache.flink.runtime.webmonitor.WebRuntimeMonitor; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; import java.io.File; +import java.io.IOException; import java.io.StringWriter; import java.net.URISyntaxException; import java.net.URL; @@ -66,6 +68,15 @@ public abstract class JarActionHandler extends AbstractJsonRequestHandler { // generate the graph JobGraph graph = null; + if (!jarDir.exists()) { + WebRuntimeMonitor.logExternalUploadDirDeletion(jarDir); + try { + WebRuntimeMonitor.checkAndCreateUploadDir(jarDir); + } catch (IOException ioe) { + // the following code will throw an exception since the jar can't be found + } + } + PackagedProgram program = new PackagedProgram( new File(jarDir, config.getJarFile()), config.getEntryClass(), http://git-wip-us.apache.org/repos/asf/flink/blob/20be204b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java index fb7fb40..c1a5f60 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java @@ -21,12 +21,14 @@ package org.apache.flink.runtime.webmonitor.handlers; import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler; import org.apache.flink.runtime.rest.handler.legacy.JsonFactory; +import org.apache.flink.runtime.webmonitor.WebRuntimeMonitor; import org.apache.flink.util.FlinkException; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; import java.io.File; import java.io.FilenameFilter; +import java.io.IOException; import java.io.StringWriter; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -64,6 +66,17 @@ public class JarDeleteHandler extends AbstractJsonRequestHandler { return name.equals(file); } }); + + if (list == null) { + WebRuntimeMonitor.logExternalUploadDirDeletion(jarDir); + try { + WebRuntimeMonitor.checkAndCreateUploadDir(jarDir); + } catch (IOException ioe) { + // entire directory doesn't exist anymore, continue as if deletion succeeded + } + list = new File[0]; + } + boolean success = false; for (File f: list) { // although next to impossible for multiple files, we still delete them. http://git-wip-us.apache.org/repos/asf/flink/blob/20be204b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java index 59a436f..2b56ecd 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler; import org.apache.flink.runtime.rest.handler.legacy.JsonFactory; import org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler; +import org.apache.flink.runtime.webmonitor.WebRuntimeMonitor; import org.apache.flink.util.FlinkException; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; @@ -77,6 +78,16 @@ public class JarListHandler extends AbstractJsonRequestHandler { } }); + if (list == null) { + WebRuntimeMonitor.logExternalUploadDirDeletion(jarDir); + try { + WebRuntimeMonitor.checkAndCreateUploadDir(jarDir); + } catch (IOException ioe) { + // re-throwing an exception here breaks the UI + } + list = new File[0]; + } + // last modified ascending order Arrays.sort(list, (f1, f2) -> Long.compare(f2.lastModified(), f1.lastModified()));
