[FLINK-2873] detect & serve the job manager log files correctly

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e7119697
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e7119697
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e7119697

Branch: refs/heads/master
Commit: e71196972c6acd124bd5ff36ad57dc493cf35e93
Parents: 6c44d93
Author: Maximilian Michels <[email protected]>
Authored: Tue Oct 20 17:07:40 2015 +0200
Committer: Maximilian Michels <[email protected]>
Committed: Tue Oct 20 17:07:40 2015 +0200

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  5 +-
 .../runtime/webmonitor/WebRuntimeMonitor.java   | 53 +++-------------
 .../files/StaticFileServerHandler.java          | 15 +----
 .../webmonitor/WebRuntimeMonitorITCase.java     | 13 ++--
 .../runtime/webmonitor/WebMonitorUtils.java     | 63 ++++++++++++++++++++
 .../apache/flink/test/util/TestBaseUtils.java   |  5 +-
 .../flink/test/web/WebFrontendITCase.java       | 10 ++--
 .../flink/yarn/ApplicationMasterBase.scala      |  9 +--
 .../org/apache/flink/yarn/YarnJobManager.scala  |  4 +-
 9 files changed, 96 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e7119697/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index be730a0..fc2087a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -297,7 +297,10 @@ public final class ConfigConstants {
         */
        public static final String JOB_MANAGER_WEB_ARCHIVE_COUNT = 
"jobmanager.web.history";
 
-       public static final String JOB_MANAGER_WEB_LOG_PATH_KEY = 
"jobmanager.web.logpath";
+       /**
+        * The log file location (may be in /log for standalone but under log 
directory when using YARN)
+        */
+       public static final String JOB_MANAGER_WEB_LOG_PATH_KEY = 
"jobmanager.web.log.path";
 
 
        // ------------------------------ Web Client 
------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/e7119697/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 40d9f2d..e69165d 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.webmonitor;
 
 import akka.actor.ActorSystem;
-import com.google.common.io.PatternFilenameFilter;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelInitializer;
@@ -32,9 +31,7 @@ import io.netty.handler.codec.http.router.Handler;
 import io.netty.handler.codec.http.router.Router;
 import io.netty.handler.stream.ChunkedWriteHandler;
 import org.apache.commons.io.FileUtils;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
@@ -65,10 +62,8 @@ import scala.concurrent.Promise;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
-import java.io.FilenameFilter;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -91,12 +86,6 @@ public class WebRuntimeMonitor implements WebMonitor {
        /** Logger for web frontend startup / shutdown messages */
        private static final Logger LOG = 
LoggerFactory.getLogger(WebRuntimeMonitor.class);
 
-       /** Job manager's log file pattern */
-       public static final FilenameFilter LOG_FILE_PATTERN = new 
PatternFilenameFilter(".*jobmanager[^\\.]*\\.log");
-
-       /** Job manager's stdout file pattern */
-       public static final FilenameFilter STDOUT_FILE_PATTERN = new 
PatternFilenameFilter(".*jobmanager[^\\.]*\\.out");
-
        // 
------------------------------------------------------------------------
 
        /** Guarding concurrent modifications to the server channel pipeline 
during startup and shutdown */
@@ -136,39 +125,11 @@ public class WebRuntimeMonitor implements WebMonitor {
                String fileName = String.format("flink-web-%s", 
UUID.randomUUID().toString());
                webRootDir = new File(System.getProperty("java.io.tmpdir"), 
fileName);
                LOG.info("Using directory {} for the web interface files", 
webRootDir);
-               
-               // figure out where our logs are
-               final String flinkRoot = 
config.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, null);
-               final String defaultLogDirectory = flinkRoot + "/log";
-               final String logDirectories = 
config.getString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, 
defaultLogDirectory);
-
-               // find out which directory holds the path for log and stdout
-               final ArrayList<String> logPaths = new ArrayList<>();
-               final ArrayList<String> outPaths = new ArrayList<>();
-
-               // yarn allows for multiple log directories. Search in all.
-               for(String paths: logDirectories.split(",")) {
-                       File dir = new File(paths);
-                       if (dir.exists() && dir.isDirectory() && dir.canRead()) 
{
-                               if (dir.listFiles(LOG_FILE_PATTERN).length == 
1) {
-                                       logPaths.add(paths);
-                               }
-                               if (dir.listFiles(STDOUT_FILE_PATTERN).length 
== 1) {
-                                       outPaths.add(paths);
-                               }
-                       }
-               }
 
-               // we don't want any ambiguities. There must be only one log 
and out file.
-               if(logPaths.size() != 1 || outPaths.size() != 1) {
-                       throw new IllegalConfigurationException("The path to 
the log and out files (" +
-                                       logDirectories  + ") is not valid.");
-               }
+               final WebMonitorUtils.LogFiles logFiles = 
WebMonitorUtils.LogFiles.find(config);
 
-               final File logDir = new File(logPaths.get(0));
-               final File outDir = new File(outPaths.get(0));
-               LOG.info("Serving job manager logs from {}", 
logDir.getAbsolutePath());
-               LOG.info("Serving job manager stdout from {}", 
outDir.getAbsolutePath());
+               LOG.info("Serving job manager log from {}", 
logFiles.logFile.getAbsolutePath());
+               LOG.info("Serving job manager stdout from {}", 
logFiles.stdOutFile.getAbsolutePath());
 
                // port configuration
                this.configuredPort = cfg.getWebFrontendPort();
@@ -190,7 +151,7 @@ public class WebRuntimeMonitor implements WebMonitor {
                        // the overview - how many task managers, slots, free 
slots, ...
                        .GET("/overview", handler(new 
ClusterOverviewHandler(DEFAULT_REQUEST_TIMEOUT)))
 
-                       // job manager configuration, log and stdout
+                       // job manager configuration
                        .GET("/jobmanager/config", handler(new 
JobManagerConfigHandler(config)))
 
                        // overview over jobs
@@ -220,8 +181,10 @@ public class WebRuntimeMonitor implements WebMonitor {
                        .GET("/taskmanagers", handler(new 
TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT)))
                        .GET("/taskmanagers/:" + 
TaskManagersHandler.TASK_MANAGER_ID_KEY, handler(new 
TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT)))
 
-                       .GET("/jobmanager/log", new 
StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, 
logDir))
-                       .GET("/jobmanager/stdout", new 
StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, 
outDir))
+                       // log and stdout
+                       .GET("/jobmanager/log", new 
StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, 
logFiles.logFile))
+                       .GET("/jobmanager/stdout", new 
StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, 
logFiles.stdOutFile))
+
                        // this handler serves all the static contents
                        .GET("/:*", new StaticFileServerHandler(retriever, 
jobManagerAddressPromise.future(), timeout, webRootDir));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e7119697/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
index 02dd81e..df330fd 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
@@ -45,7 +45,6 @@ import io.netty.handler.codec.http.router.KeepAliveWrite;
 import io.netty.handler.codec.http.router.Routed;
 import io.netty.util.CharsetUtil;
 import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.webmonitor.WebRuntimeMonitor;
 import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
 import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
 import org.slf4j.Logger;
@@ -60,7 +59,6 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.FilenameFilter;
 import java.io.RandomAccessFile;
 import java.nio.file.Files;
 import java.text.ParseException;
@@ -166,11 +164,9 @@ public class StaticFileServerHandler extends 
SimpleChannelInboundHandler<Routed>
                                requestPath = requestPath + "index.html";
                        }
 
-               // in case the files being accessed are logs or stdout files, 
find appropriate paths.
-               if (requestPath.equals("/jobmanager/log")) {
-                       requestPath = "/" + getFileName(rootPath, 
WebRuntimeMonitor.LOG_FILE_PATTERN);
-               } else if (requestPath.equals("/jobmanager/stdout")) {
-                       requestPath = "/" + getFileName(rootPath, 
WebRuntimeMonitor.STDOUT_FILE_PATTERN);
+                       // in case the files being accessed are logs or stdout 
files, find appropriate paths.
+                       if (requestPath.equals("/jobmanager/log") || 
requestPath.equals("/jobmanager/stdout")) {
+                               requestPath = "";
                        }
 
                        Option<Tuple2<ActorGateway, Integer>> jobManager = 
retriever.getJobManagerGatewayAndWebPort();
@@ -371,9 +367,4 @@ public class StaticFileServerHandler extends 
SimpleChannelInboundHandler<Routed>
                String mimeFinal = mimeType != null ? mimeType : 
MimeTypes.getDefaultMimeType();
                response.headers().set(CONTENT_TYPE, mimeFinal);
        }
-
-       private static String getFileName(File directory, FilenameFilter 
pattern) {
-               File[] files = directory.listFiles(pattern);
-               return files.length == 0 ? null : files[0].getName();
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e7119697/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index 68b00dc..a3f152d 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -47,6 +47,7 @@ import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Scanner;
@@ -84,12 +85,12 @@ public class WebRuntimeMonitorITCase extends TestLogger {
                        ActorRef jmActor = 
flink.jobManagerActors().get().head();
 
                        File logDir = temporaryFolder.newFolder("log");
-                       Files.createFile(new File(logDir, 
"jobmanager.log").toPath());
+                       Path logFile = Files.createFile(new File(logDir, 
"jobmanager.log").toPath());
                        Files.createFile(new File(logDir, 
"jobmanager.out").toPath());
 
                        Configuration monitorConfig = new Configuration();
                        
monitorConfig.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
-                       
monitorConfig.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, 
logDir.getAbsolutePath());
+                       
monitorConfig.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, 
logFile.toString());
 
                        // Needs to match the leader address from the leader 
retrieval service
                        String jobManagerAddress = 
AkkaUtils.getAkkaURL(jmActorSystem, jmActor);
@@ -149,11 +150,11 @@ public class WebRuntimeMonitorITCase extends TestLogger {
                                temporaryFolder.getRoot().getPath());
 
                        File logDir = temporaryFolder.newFolder();
-                       Files.createFile(new File(logDir, 
"jobmanager.log").toPath());
+                       Path logFile = Files.createFile(new File(logDir, 
"jobmanager.log").toPath());
                        Files.createFile(new File(logDir, 
"jobmanager.out").toPath());
 
                        
config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
-                       
config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, 
logDir.getAbsolutePath());
+                       
config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, 
logFile.toString());
 
                        for (int i = 0; i < jobManagerSystem.length; i++) {
                                jobManagerSystem[i] = 
AkkaUtils.createActorSystem(new Configuration(),
@@ -289,12 +290,12 @@ public class WebRuntimeMonitorITCase extends TestLogger {
                try (TestingServer zooKeeper = new TestingServer()) {
 
                        File logDir = temporaryFolder.newFolder();
-                       Files.createFile(new File(logDir, 
"jobmanager.log").toPath());
+                       Path logFile = Files.createFile(new File(logDir, 
"jobmanager.log").toPath());
                        Files.createFile(new File(logDir, 
"jobmanager.out").toPath());
 
                        final Configuration config = new Configuration();
                        
config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
-                       
config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, 
logDir.getAbsolutePath());
+                       
config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, 
logFile.toString());
                        config.setString(ConfigConstants.RECOVERY_MODE, 
"ZOOKEEPER");
                        config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, 
zooKeeper.getConnectString());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e7119697/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
index 4fca270..181d6d6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
@@ -19,7 +19,9 @@
 package org.apache.flink.runtime.webmonitor;
 
 import akka.actor.ActorSystem;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -33,6 +35,7 @@ import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.util.HashMap;
@@ -48,6 +51,66 @@ public final class WebMonitorUtils {
        private static final Logger LOG = 
LoggerFactory.getLogger(WebMonitorUtils.class);
 
        /**
+        * Singleton to hold the log and stdout file
+        */
+       public static class LogFiles {
+
+               private static LogFiles INSTANCE;
+
+               public final File logFile;
+               public final File stdOutFile;
+
+               private LogFiles(String logFile) {
+                       this.logFile = checkFileLocation(logFile);
+                       String stdOutFile = logFile.replaceFirst("\\.log$", 
".out");
+                       this.stdOutFile = checkFileLocation(stdOutFile);;
+               }
+
+               /**
+                * Verify log file location
+                * @param logFilePath Path to log file
+                * @return File or null if not a valid log file
+                */
+               private static File checkFileLocation (String logFilePath) {
+                       File logFile = new File(logFilePath);
+                       if (logFile.exists() && logFile.canRead()) {
+                               return logFile;
+                       } else {
+                               throw new IllegalConfigurationException("Job 
manager log file was supposed to be at " +
+                                               logFile.getAbsolutePath() + " 
but it does not exist or is not readable.");
+                       }
+               }
+
+               /**
+                * Finds the Flink log directory using log.file Java property 
that is set during startup.
+                */
+               public static LogFiles find(Configuration config) {
+                       if (INSTANCE == null) {
+
+                               /** Figure out log file location based on 
'log.file' VM argument **/
+                               final String logEnv = "log.file";
+                               String logFilePath = System.getProperty(logEnv);
+
+                               if (logFilePath == null) {
+                                       LOG.warn("Log file environment variable 
'{}' is not set.", logEnv);
+                                       logFilePath = 
config.getString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, null);
+                               }
+
+                               if (logFilePath == null) {
+                                       throw new 
IllegalConfigurationException("JobManager log file not found. " +
+                                                       "Can't serve log files. 
Log file location couldn't be determined via the " +
+                                                       logEnv + " environment 
variable or the config constant " +
+                                                       
ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY);
+                               }
+
+                               INSTANCE = new LogFiles(logFilePath);
+                       }
+
+                       return INSTANCE;
+               }
+       }
+
+       /**
         * Starts the web runtime monitor. Because the actual implementation of 
the runtime monitor is
         * in another project, we load the runtime monitor dynamically.
         * <p/>

http://git-wip-us.apache.org/repos/asf/flink/blob/e7119697/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java 
b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index ce3adce..b2bfd6b 100644
--- 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ 
b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -59,6 +59,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -131,7 +132,7 @@ public class TestBaseUtils extends TestLogger {
                logDir = File.createTempFile("TestBaseUtils-logdir", null);
                Assert.assertTrue("Unable to delete temp file", 
logDir.delete());
                Assert.assertTrue("Unable to create temp directory", 
logDir.mkdir());
-               Files.createFile(new File(logDir, "jobmanager.log").toPath());
+               Path logFile = Files.createFile(new File(logDir, 
"jobmanager.log").toPath());
                Files.createFile(new File(logDir, "jobmanager.out").toPath());
 
                config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 
TASK_MANAGER_MEMORY_SIZE);
@@ -141,7 +142,7 @@ public class TestBaseUtils extends TestLogger {
                config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, 
DEFAULT_AKKA_STARTUP_TIMEOUT);
 
                config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 
8081);
-               config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, 
logDir.toString());
+               config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, 
logFile.toString());
 
                ForkableFlinkMiniCluster cluster =  new 
ForkableFlinkMiniCluster(config, singleActorSystem, mode);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e7119697/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
index bea96d3..9c37a95 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
@@ -115,14 +115,13 @@ public class WebFrontendITCase extends 
MultipleProgramsTestBase {
        @Test
        public void getLogAndStdoutFiles() {
                try {
-                       String logPath = 
cluster.configuration().getString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, 
null);
-                       Assert.assertNotNull(logPath);
+                       WebMonitorUtils.LogFiles logFiles = 
WebMonitorUtils.LogFiles.find(cluster.configuration());
 
-                       FileUtils.writeStringToFile(new File(logPath, 
"jobmanager.log"), "job manager log");
+                       FileUtils.writeStringToFile(logFiles.logFile, "job 
manager log");
                        String logs = getFromHTTP("http://localhost:"; + port + 
"/jobmanager/log");
                        Assert.assertTrue(logs.contains("job manager log"));
 
-                       FileUtils.writeStringToFile(new File(logPath, 
"jobmanager.out"), "job manager out");
+                       FileUtils.writeStringToFile(logFiles.stdOutFile, "job 
manager out");
                        logs = getFromHTTP("http://localhost:"; + port + 
"/jobmanager/stdout");
                        Assert.assertTrue(logs.contains("job manager out"));
                }catch(Throwable e) {
@@ -138,8 +137,7 @@ public class WebFrontendITCase extends 
MultipleProgramsTestBase {
                        JSONArray array = new JSONArray(config);
 
                        Map<String, String> conf = 
WebMonitorUtils.fromKeyValueJsonArray(array);
-                       Assert.assertEquals(logDir.toString(),
-                                       
conf.get(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY));
+                       
Assert.assertTrue(conf.get(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY).startsWith(logDir.toString()));
                        Assert.assertEquals(
                                        
cluster.configuration().getString("taskmanager.numberOfTaskSlots", null),
                                        
conf.get(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS));

http://git-wip-us.apache.org/repos/asf/flink/blob/e7119697/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala
index f3892dd..73fc951 100644
--- 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala
+++ 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala
@@ -93,8 +93,6 @@ abstract class ApplicationMasterBase {
       val currDir = env.get(Environment.PWD.key())
       require(currDir != null, "Current directory unknown.")
 
-      val logDirs = env.get(Environment.LOG_DIRS.key())
-
       val streamingMode = if(ApplicationMasterBase.hasStreamingMode(env)) {
         log.info("Starting ApplicationMaster/JobManager in streaming mode")
         StreamingMode.STREAMING
@@ -119,8 +117,7 @@ abstract class ApplicationMasterBase {
 
       // if a web monitor shall be started, set the port to random binding
       if (config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) 
{
-        config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDirs)
-        config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0); // set 
port to 0.
+        config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
       }
 
       val (actorSystem, jmActor, archiveActor, webMonitor) =
@@ -147,7 +144,7 @@ abstract class ApplicationMasterBase {
 
       // generate configuration file for TaskManagers
       generateConfigurationFile(s"$currDir/$MODIFIED_CONF_FILE", currDir, 
akkaHostname,
-        jobManagerPort, webServerPort, logDirs, slots, taskManagerCount,
+        jobManagerPort, webServerPort, slots, taskManagerCount,
         dynamicPropertiesEncodedString)
 
       val hadoopConfig = new YarnConfiguration();
@@ -184,7 +181,6 @@ abstract class ApplicationMasterBase {
     ownHostname: String,
     jobManagerPort: Int,
     jobManagerWebPort: Int,
-    logDirs: String,
     slots: Int,
     taskManagerCount: Int,
     dynamicPropertiesEncodedString: String)
@@ -202,7 +198,6 @@ abstract class ApplicationMasterBase {
     output.println(s"${ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY}: 
$ownHostname")
     output.println(s"${ConfigConstants.JOB_MANAGER_IPC_PORT_KEY}: 
$jobManagerPort")
 
-    output.println(s"${ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY}: 
$logDirs")
     output.println(s"${ConfigConstants.JOB_MANAGER_WEB_PORT_KEY}: 
$jobManagerWebPort")
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e7119697/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index 4ada21e..c8a9480 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -691,8 +691,8 @@ class YarnJobManager(
     }
 
     tmCommand ++= s" ${taskManagerRunnerClass.getName} --configDir . 1> " +
-      s"${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/taskmanager-stdout.log 
2> " +
-      s"${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/taskmanager-stderr.log"
+      s"${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/taskmanager.out 2> " +
+      s"${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/taskmanager.err"
 
     tmCommand ++= " --streamingMode"
     if(streamingMode) {

Reply via email to