[FLINK-2873] detect & serve the job manager log files correctly
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e7119697 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e7119697 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e7119697 Branch: refs/heads/master Commit: e71196972c6acd124bd5ff36ad57dc493cf35e93 Parents: 6c44d93 Author: Maximilian Michels <[email protected]> Authored: Tue Oct 20 17:07:40 2015 +0200 Committer: Maximilian Michels <[email protected]> Committed: Tue Oct 20 17:07:40 2015 +0200 ---------------------------------------------------------------------- .../flink/configuration/ConfigConstants.java | 5 +- .../runtime/webmonitor/WebRuntimeMonitor.java | 53 +++------------- .../files/StaticFileServerHandler.java | 15 +---- .../webmonitor/WebRuntimeMonitorITCase.java | 13 ++-- .../runtime/webmonitor/WebMonitorUtils.java | 63 ++++++++++++++++++++ .../apache/flink/test/util/TestBaseUtils.java | 5 +- .../flink/test/web/WebFrontendITCase.java | 10 ++-- .../flink/yarn/ApplicationMasterBase.scala | 9 +-- .../org/apache/flink/yarn/YarnJobManager.scala | 4 +- 9 files changed, 96 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e7119697/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index be730a0..fc2087a 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -297,7 +297,10 @@ public final class ConfigConstants { */ public static final String JOB_MANAGER_WEB_ARCHIVE_COUNT = "jobmanager.web.history"; - public static final String JOB_MANAGER_WEB_LOG_PATH_KEY = "jobmanager.web.logpath"; + /** + * The log file location (may be in /log for standalone but under log directory when using YARN) + */ + public static final String JOB_MANAGER_WEB_LOG_PATH_KEY = "jobmanager.web.log.path"; // ------------------------------ Web Client ------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/e7119697/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java index 40d9f2d..e69165d 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.webmonitor; import akka.actor.ActorSystem; -import com.google.common.io.PatternFilenameFilter; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; @@ -32,9 +31,7 @@ import io.netty.handler.codec.http.router.Handler; import io.netty.handler.codec.http.router.Router; import io.netty.handler.stream.ChunkedWriteHandler; import org.apache.commons.io.FileUtils; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler; @@ -65,10 +62,8 @@ import scala.concurrent.Promise; import scala.concurrent.duration.FiniteDuration; import java.io.File; -import java.io.FilenameFilter; import java.io.IOException; import java.net.InetSocketAddress; -import java.util.ArrayList; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -91,12 +86,6 @@ public class WebRuntimeMonitor implements WebMonitor { /** Logger for web frontend startup / shutdown messages */ private static final Logger LOG = LoggerFactory.getLogger(WebRuntimeMonitor.class); - /** Job manager's log file pattern */ - public static final FilenameFilter LOG_FILE_PATTERN = new PatternFilenameFilter(".*jobmanager[^\\.]*\\.log"); - - /** Job manager's stdout file pattern */ - public static final FilenameFilter STDOUT_FILE_PATTERN = new PatternFilenameFilter(".*jobmanager[^\\.]*\\.out"); - // ------------------------------------------------------------------------ /** Guarding concurrent modifications to the server channel pipeline during startup and shutdown */ @@ -136,39 +125,11 @@ public class WebRuntimeMonitor implements WebMonitor { String fileName = String.format("flink-web-%s", UUID.randomUUID().toString()); webRootDir = new File(System.getProperty("java.io.tmpdir"), fileName); LOG.info("Using directory {} for the web interface files", webRootDir); - - // figure out where our logs are - final String flinkRoot = config.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, null); - final String defaultLogDirectory = flinkRoot + "/log"; - final String logDirectories = config.getString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, defaultLogDirectory); - - // find out which directory holds the path for log and stdout - final ArrayList<String> logPaths = new ArrayList<>(); - final ArrayList<String> outPaths = new ArrayList<>(); - - // yarn allows for multiple log directories. Search in all. - for(String paths: logDirectories.split(",")) { - File dir = new File(paths); - if (dir.exists() && dir.isDirectory() && dir.canRead()) { - if (dir.listFiles(LOG_FILE_PATTERN).length == 1) { - logPaths.add(paths); - } - if (dir.listFiles(STDOUT_FILE_PATTERN).length == 1) { - outPaths.add(paths); - } - } - } - // we don't want any ambiguities. There must be only one log and out file. - if(logPaths.size() != 1 || outPaths.size() != 1) { - throw new IllegalConfigurationException("The path to the log and out files (" + - logDirectories + ") is not valid."); - } + final WebMonitorUtils.LogFiles logFiles = WebMonitorUtils.LogFiles.find(config); - final File logDir = new File(logPaths.get(0)); - final File outDir = new File(outPaths.get(0)); - LOG.info("Serving job manager logs from {}", logDir.getAbsolutePath()); - LOG.info("Serving job manager stdout from {}", outDir.getAbsolutePath()); + LOG.info("Serving job manager log from {}", logFiles.logFile.getAbsolutePath()); + LOG.info("Serving job manager stdout from {}", logFiles.stdOutFile.getAbsolutePath()); // port configuration this.configuredPort = cfg.getWebFrontendPort(); @@ -190,7 +151,7 @@ public class WebRuntimeMonitor implements WebMonitor { // the overview - how many task managers, slots, free slots, ... .GET("/overview", handler(new ClusterOverviewHandler(DEFAULT_REQUEST_TIMEOUT))) - // job manager configuration, log and stdout + // job manager configuration .GET("/jobmanager/config", handler(new JobManagerConfigHandler(config))) // overview over jobs @@ -220,8 +181,10 @@ public class WebRuntimeMonitor implements WebMonitor { .GET("/taskmanagers", handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT))) .GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY, handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT))) - .GET("/jobmanager/log", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logDir)) - .GET("/jobmanager/stdout", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, outDir)) + // log and stdout + .GET("/jobmanager/log", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logFiles.logFile)) + .GET("/jobmanager/stdout", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logFiles.stdOutFile)) + // this handler serves all the static contents .GET("/:*", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, webRootDir)); http://git-wip-us.apache.org/repos/asf/flink/blob/e7119697/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java index 02dd81e..df330fd 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java @@ -45,7 +45,6 @@ import io.netty.handler.codec.http.router.KeepAliveWrite; import io.netty.handler.codec.http.router.Routed; import io.netty.util.CharsetUtil; import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.webmonitor.WebRuntimeMonitor; import org.apache.flink.runtime.webmonitor.JobManagerRetriever; import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils; import org.slf4j.Logger; @@ -60,7 +59,6 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; -import java.io.FilenameFilter; import java.io.RandomAccessFile; import java.nio.file.Files; import java.text.ParseException; @@ -166,11 +164,9 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed> requestPath = requestPath + "index.html"; } - // in case the files being accessed are logs or stdout files, find appropriate paths. - if (requestPath.equals("/jobmanager/log")) { - requestPath = "/" + getFileName(rootPath, WebRuntimeMonitor.LOG_FILE_PATTERN); - } else if (requestPath.equals("/jobmanager/stdout")) { - requestPath = "/" + getFileName(rootPath, WebRuntimeMonitor.STDOUT_FILE_PATTERN); + // in case the files being accessed are logs or stdout files, find appropriate paths. + if (requestPath.equals("/jobmanager/log") || requestPath.equals("/jobmanager/stdout")) { + requestPath = ""; } Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort(); @@ -371,9 +367,4 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed> String mimeFinal = mimeType != null ? mimeType : MimeTypes.getDefaultMimeType(); response.headers().set(CONTENT_TYPE, mimeFinal); } - - private static String getFileName(File directory, FilenameFilter pattern) { - File[] files = directory.listFiles(pattern); - return files.length == 0 ? null : files[0].getName(); - } } http://git-wip-us.apache.org/repos/asf/flink/blob/e7119697/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java index 68b00dc..a3f152d 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java @@ -47,6 +47,7 @@ import scala.concurrent.duration.FiniteDuration; import java.io.File; import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayList; import java.util.List; import java.util.Scanner; @@ -84,12 +85,12 @@ public class WebRuntimeMonitorITCase extends TestLogger { ActorRef jmActor = flink.jobManagerActors().get().head(); File logDir = temporaryFolder.newFolder("log"); - Files.createFile(new File(logDir, "jobmanager.log").toPath()); + Path logFile = Files.createFile(new File(logDir, "jobmanager.log").toPath()); Files.createFile(new File(logDir, "jobmanager.out").toPath()); Configuration monitorConfig = new Configuration(); monitorConfig.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0); - monitorConfig.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDir.getAbsolutePath()); + monitorConfig.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString()); // Needs to match the leader address from the leader retrieval service String jobManagerAddress = AkkaUtils.getAkkaURL(jmActorSystem, jmActor); @@ -149,11 +150,11 @@ public class WebRuntimeMonitorITCase extends TestLogger { temporaryFolder.getRoot().getPath()); File logDir = temporaryFolder.newFolder(); - Files.createFile(new File(logDir, "jobmanager.log").toPath()); + Path logFile = Files.createFile(new File(logDir, "jobmanager.log").toPath()); Files.createFile(new File(logDir, "jobmanager.out").toPath()); config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0); - config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDir.getAbsolutePath()); + config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString()); for (int i = 0; i < jobManagerSystem.length; i++) { jobManagerSystem[i] = AkkaUtils.createActorSystem(new Configuration(), @@ -289,12 +290,12 @@ public class WebRuntimeMonitorITCase extends TestLogger { try (TestingServer zooKeeper = new TestingServer()) { File logDir = temporaryFolder.newFolder(); - Files.createFile(new File(logDir, "jobmanager.log").toPath()); + Path logFile = Files.createFile(new File(logDir, "jobmanager.log").toPath()); Files.createFile(new File(logDir, "jobmanager.out").toPath()); final Configuration config = new Configuration(); config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0); - config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDir.getAbsolutePath()); + config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString()); config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER"); config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, zooKeeper.getConnectString()); http://git-wip-us.apache.org/repos/asf/flink/blob/e7119697/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java index 4fca270..181d6d6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java @@ -19,7 +19,9 @@ package org.apache.flink.runtime.webmonitor; import akka.actor.ActorSystem; +import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; @@ -33,6 +35,7 @@ import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.util.HashMap; @@ -48,6 +51,66 @@ public final class WebMonitorUtils { private static final Logger LOG = LoggerFactory.getLogger(WebMonitorUtils.class); /** + * Singleton to hold the log and stdout file + */ + public static class LogFiles { + + private static LogFiles INSTANCE; + + public final File logFile; + public final File stdOutFile; + + private LogFiles(String logFile) { + this.logFile = checkFileLocation(logFile); + String stdOutFile = logFile.replaceFirst("\\.log$", ".out"); + this.stdOutFile = checkFileLocation(stdOutFile);; + } + + /** + * Verify log file location + * @param logFilePath Path to log file + * @return File or null if not a valid log file + */ + private static File checkFileLocation (String logFilePath) { + File logFile = new File(logFilePath); + if (logFile.exists() && logFile.canRead()) { + return logFile; + } else { + throw new IllegalConfigurationException("Job manager log file was supposed to be at " + + logFile.getAbsolutePath() + " but it does not exist or is not readable."); + } + } + + /** + * Finds the Flink log directory using log.file Java property that is set during startup. + */ + public static LogFiles find(Configuration config) { + if (INSTANCE == null) { + + /** Figure out log file location based on 'log.file' VM argument **/ + final String logEnv = "log.file"; + String logFilePath = System.getProperty(logEnv); + + if (logFilePath == null) { + LOG.warn("Log file environment variable '{}' is not set.", logEnv); + logFilePath = config.getString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, null); + } + + if (logFilePath == null) { + throw new IllegalConfigurationException("JobManager log file not found. " + + "Can't serve log files. Log file location couldn't be determined via the " + + logEnv + " environment variable or the config constant " + + ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY); + } + + INSTANCE = new LogFiles(logFilePath); + } + + return INSTANCE; + } + } + + /** * Starts the web runtime monitor. Because the actual implementation of the runtime monitor is * in another project, we load the runtime monitor dynamically. * <p/> http://git-wip-us.apache.org/repos/asf/flink/blob/e7119697/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java ---------------------------------------------------------------------- diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java index ce3adce..b2bfd6b 100644 --- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java +++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java @@ -59,6 +59,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.URL; import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -131,7 +132,7 @@ public class TestBaseUtils extends TestLogger { logDir = File.createTempFile("TestBaseUtils-logdir", null); Assert.assertTrue("Unable to delete temp file", logDir.delete()); Assert.assertTrue("Unable to create temp directory", logDir.mkdir()); - Files.createFile(new File(logDir, "jobmanager.log").toPath()); + Path logFile = Files.createFile(new File(logDir, "jobmanager.log").toPath()); Files.createFile(new File(logDir, "jobmanager.out").toPath()); config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, TASK_MANAGER_MEMORY_SIZE); @@ -141,7 +142,7 @@ public class TestBaseUtils extends TestLogger { config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, DEFAULT_AKKA_STARTUP_TIMEOUT); config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 8081); - config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDir.toString()); + config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString()); ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(config, singleActorSystem, mode); http://git-wip-us.apache.org/repos/asf/flink/blob/e7119697/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java index bea96d3..9c37a95 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java @@ -115,14 +115,13 @@ public class WebFrontendITCase extends MultipleProgramsTestBase { @Test public void getLogAndStdoutFiles() { try { - String logPath = cluster.configuration().getString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, null); - Assert.assertNotNull(logPath); + WebMonitorUtils.LogFiles logFiles = WebMonitorUtils.LogFiles.find(cluster.configuration()); - FileUtils.writeStringToFile(new File(logPath, "jobmanager.log"), "job manager log"); + FileUtils.writeStringToFile(logFiles.logFile, "job manager log"); String logs = getFromHTTP("http://localhost:" + port + "/jobmanager/log"); Assert.assertTrue(logs.contains("job manager log")); - FileUtils.writeStringToFile(new File(logPath, "jobmanager.out"), "job manager out"); + FileUtils.writeStringToFile(logFiles.stdOutFile, "job manager out"); logs = getFromHTTP("http://localhost:" + port + "/jobmanager/stdout"); Assert.assertTrue(logs.contains("job manager out")); }catch(Throwable e) { @@ -138,8 +137,7 @@ public class WebFrontendITCase extends MultipleProgramsTestBase { JSONArray array = new JSONArray(config); Map<String, String> conf = WebMonitorUtils.fromKeyValueJsonArray(array); - Assert.assertEquals(logDir.toString(), - conf.get(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY)); + Assert.assertTrue(conf.get(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY).startsWith(logDir.toString())); Assert.assertEquals( cluster.configuration().getString("taskmanager.numberOfTaskSlots", null), conf.get(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS)); http://git-wip-us.apache.org/repos/asf/flink/blob/e7119697/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala index f3892dd..73fc951 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala @@ -93,8 +93,6 @@ abstract class ApplicationMasterBase { val currDir = env.get(Environment.PWD.key()) require(currDir != null, "Current directory unknown.") - val logDirs = env.get(Environment.LOG_DIRS.key()) - val streamingMode = if(ApplicationMasterBase.hasStreamingMode(env)) { log.info("Starting ApplicationMaster/JobManager in streaming mode") StreamingMode.STREAMING @@ -119,8 +117,7 @@ abstract class ApplicationMasterBase { // if a web monitor shall be started, set the port to random binding if (config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) { - config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDirs) - config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0); // set port to 0. + config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0); } val (actorSystem, jmActor, archiveActor, webMonitor) = @@ -147,7 +144,7 @@ abstract class ApplicationMasterBase { // generate configuration file for TaskManagers generateConfigurationFile(s"$currDir/$MODIFIED_CONF_FILE", currDir, akkaHostname, - jobManagerPort, webServerPort, logDirs, slots, taskManagerCount, + jobManagerPort, webServerPort, slots, taskManagerCount, dynamicPropertiesEncodedString) val hadoopConfig = new YarnConfiguration(); @@ -184,7 +181,6 @@ abstract class ApplicationMasterBase { ownHostname: String, jobManagerPort: Int, jobManagerWebPort: Int, - logDirs: String, slots: Int, taskManagerCount: Int, dynamicPropertiesEncodedString: String) @@ -202,7 +198,6 @@ abstract class ApplicationMasterBase { output.println(s"${ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY}: $ownHostname") output.println(s"${ConfigConstants.JOB_MANAGER_IPC_PORT_KEY}: $jobManagerPort") - output.println(s"${ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY}: $logDirs") output.println(s"${ConfigConstants.JOB_MANAGER_WEB_PORT_KEY}: $jobManagerWebPort") http://git-wip-us.apache.org/repos/asf/flink/blob/e7119697/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala index 4ada21e..c8a9480 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala @@ -691,8 +691,8 @@ class YarnJobManager( } tmCommand ++= s" ${taskManagerRunnerClass.getName} --configDir . 1> " + - s"${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/taskmanager-stdout.log 2> " + - s"${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/taskmanager-stderr.log" + s"${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/taskmanager.out 2> " + + s"${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/taskmanager.err" tmCommand ++= " --streamingMode" if(streamingMode) {
