[FLINK-1372] [runtime] Fixes logging settings. The logging is now exclusively controlled by the logging properties provided to the system. Removes akka.loglevel config parameter.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4e197d5f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4e197d5f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4e197d5f Branch: refs/heads/master Commit: 4e197d5fac7bfc774a8e4e39ed40c831e5848428 Parents: 58e1e44 Author: Till Rohrmann <[email protected]> Authored: Mon Jan 19 16:13:52 2015 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Tue Jan 20 20:29:03 2015 +0100 ---------------------------------------------------------------------- .../flink/configuration/ConfigConstants.java | 7 -- .../apache/flink/runtime/akka/AkkaUtils.scala | 20 ++---- .../flink/runtime/jobmanager/JobManager.scala | 67 +++++++++++--------- .../flink/runtime/taskmanager/TaskManager.scala | 67 ++++++++++++-------- .../runtime/testingUtils/TestingUtils.scala | 3 +- 5 files changed, 83 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4e197d5f/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index df96339..d482e3c 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -345,11 +345,6 @@ public final class ConfigConstants { public static final String AKKA_LOG_LIFECYCLE_EVENTS = "akka.log.lifecycle.events"; /** - * Log level for akka - */ - public static final String AKKA_LOG_LEVEL = "akka.loglevel"; - - /** * Timeout for all blocking calls */ public static final String AKKA_ASK_TIMEOUT = "akka.ask.timeout"; @@ -597,8 +592,6 @@ public final class ConfigConstants { public static String DEFAULT_AKKA_FRAMESIZE = "10485760b"; - public static String DEFAULT_AKKA_LOG_LEVEL = "ERROR"; - public static int DEFAULT_AKKA_ASK_TIMEOUT = 100; http://git-wip-us.apache.org/repos/asf/flink/blob/4e197d5f/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index 07f5395..dd44587 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -80,15 +80,9 @@ object AkkaUtils { val logLifecycleEvents = if (lifecycleEvents) "on" else "off" - val logLevel = configuration.getString(ConfigConstants.AKKA_LOG_LEVEL, - ConfigConstants.DEFAULT_AKKA_LOG_LEVEL) - val configString = s""" |akka { - | loglevel = $logLevel - | stdout-loglevel = $logLevel - | | log-dead-letters = $logLifecycleEvents | log-dead-letters-during-shutdown = $logLifecycleEvents | @@ -144,15 +138,9 @@ object AkkaUtils { val logLifecycleEvents = if (lifecycleEvents) "on" else "off" - val logLevel = configuration.getString(ConfigConstants.AKKA_LOG_LEVEL, - ConfigConstants.DEFAULT_AKKA_LOG_LEVEL) - val configString = s""" |akka { - | loglevel = $logLevel - | stdout-loglevel = $logLevel - | | log-dead-letters = $logLifecycleEvents | log-dead-letters-during-shutdown = $logLifecycleEvents | @@ -204,12 +192,16 @@ object AkkaUtils { | | loggers = ["akka.event.slf4j.Slf4jLogger"] | logger-startup-timeout = 30s - | loglevel = "WARNING" - | logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" + | loglevel = "DEBUG" | stdout-loglevel = "WARNING" | jvm-exit-on-fatal-error = off | log-config-on-start = off + | | serialize-messages = on + | + | debug { + | lifecycle = on + | } |} """.stripMargin } http://git-wip-us.apache.org/repos/asf/flink/blob/4e197d5f/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 9dbd581..bed8a11 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -105,7 +105,16 @@ class JobManager(val configuration: Configuration) instanceManager.shutdown() scheduler.shutdown() - libraryCacheManager.shutdown() + + try { + libraryCacheManager.shutdown() + } catch { + case e: IOException => log.error(e, "Could not properly shutdown the library cache manager.") + } + + if(log.isDebugEnabled) { + log.debug("Job manager {} is completely stopped.", self.path) + } } override def receiveWithLogMessages: Receive = { @@ -134,7 +143,7 @@ class JobManager(val configuration: Configuration) sender ! akka.actor.Status.Failure(new IllegalArgumentException("JobGraph must not be" + " null.")) } else { - log.info(s"Received job ${jobGraph.getJobID} (${jobGraph.getName}).") + log.info("Received job {} ({}).", jobGraph.getJobID, jobGraph.getName) if (jobGraph.getNumberOfVertices == 0) { sender ! SubmissionFailure(jobGraph.getJobID, new IllegalArgumentException("Job is " + @@ -164,10 +173,8 @@ class JobManager(val configuration: Configuration) } if (log.isDebugEnabled) { - log.debug(s"Running master initialization of job ${jobGraph.getJobID} (${ - jobGraph - .getName - }}).") + log.debug("Running master initialization of job {} ({}).", + jobGraph.getJobID, jobGraph.getName) } for (vertex <- jobGraph.getVertices.asScala) { @@ -184,17 +191,15 @@ class JobManager(val configuration: Configuration) val sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources if (log.isDebugEnabled) { - log.debug(s"Adding ${sortedTopology.size()} vertices from job graph ${ - jobGraph - .getJobID - } (${jobGraph.getName}).") + log.debug("Adding {} vertices from job graph {} ({}).", + sortedTopology.size(), jobGraph.getJobID, jobGraph.getName) } executionGraph.attachJobGraph(sortedTopology) if (log.isDebugEnabled) { - log.debug(s"Successfully created execution graph from job graph " + - s"${jobGraph.getJobID} (${jobGraph.getName}).") + log.debug("Successfully created execution graph from job graph {} ({}).", + jobGraph.getJobID, jobGraph.getName) } executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling) @@ -210,7 +215,7 @@ class JobManager(val configuration: Configuration) jobInfo.detach = detach - log.info(s"Scheduling job ${jobGraph.getName}.") + log.info("Scheduling job {}.", jobGraph.getName) executionGraph.scheduleForExecution(scheduler) @@ -245,7 +250,7 @@ class JobManager(val configuration: Configuration) } case CancelJob(jobID) => { - log.info(s"Trying to cancel job with ID ${jobID}.") + log.info("Trying to cancel job with ID {}.", jobID) currentJobs.get(jobID) match { case Some((executionGraph, _)) => @@ -254,8 +259,8 @@ class JobManager(val configuration: Configuration) } sender ! CancellationSuccess(jobID) case None => - log.info(s"No job found with ID ${jobID}.") - sender ! CancellationFailure(jobID, new IllegalArgumentException(s"No job found with " + + log.info("No job found with ID {}.", jobID) + sender ! CancellationFailure(jobID, new IllegalArgumentException("No job found with " + s"ID ${jobID}.")) } } @@ -270,8 +275,8 @@ class JobManager(val configuration: Configuration) Future { originalSender ! executionGraph.updateState(taskExecutionState) } - case None => log.error(s"Cannot find execution graph for ID ${taskExecutionState - .getJobID} to change state to ${taskExecutionState.getExecutionState}.") + case None => log.error("Cannot find execution graph for ID {} to change state to {}.", + taskExecutionState.getJobID, taskExecutionState.getExecutionState) sender ! false } } @@ -283,7 +288,7 @@ class JobManager(val configuration: Configuration) val execution = executionGraph.getRegisteredExecutions().get(executionAttempt) if(execution == null){ - log.error(s"Can not find Execution for attempt ${executionAttempt}.") + log.error("Can not find Execution for attempt {}.", executionAttempt) null }else{ val slot = execution.getAssignedResource @@ -298,21 +303,21 @@ class JobManager(val configuration: Configuration) case vertex: ExecutionJobVertex => vertex.getSplitAssigner match { case splitAssigner: InputSplitAssigner => splitAssigner.getNextInputSplit(host) case _ => - log.error(s"No InputSplitAssigner for vertex ID ${vertexID}.") + log.error("No InputSplitAssigner for vertex ID {}.", vertexID) null } case _ => - log.error(s"Cannot find execution vertex for vertex ID ${vertexID}.") + log.error("Cannot find execution vertex for vertex ID {}.", vertexID) null } } case None => - log.error(s"Cannot find execution graph for job ID ${jobID}.") + log.error("Cannot find execution graph for job ID {}.", jobID) null } if(log.isDebugEnabled) { - log.debug(s"Send next input split ${nextInputSplit}.") + log.debug("Send next input split {}.", nextInputSplit) } sender ! NextInputSplit(nextInputSplit) } @@ -320,8 +325,9 @@ class JobManager(val configuration: Configuration) case JobStatusChanged(jobID, newJobStatus, timeStamp, optionalMessage) => { currentJobs.get(jobID) match { case Some((executionGraph, jobInfo)) => executionGraph.getJobName - log.info(s"Status of job ${jobID} (${executionGraph.getJobName}) changed to " + - s"${newJobStatus}${if(optionalMessage == null) "" else optionalMessage}.") + log.info("Status of job {} ({}) changed to {}{}.", + jobID, executionGraph.getJobName, newJobStatus, + if(optionalMessage == null) "" else optionalMessage) if(newJobStatus.isTerminalState) { jobInfo.end = timeStamp @@ -368,7 +374,7 @@ class JobManager(val configuration: Configuration) sender ! ConsumerNotificationResult(executionGraph .scheduleOrUpdateConsumers(executionId, partitionIndex)) case None => - log.error(s"Cannot find execution graph for job ID ${jobId}.") + log.error("Cannot find execution graph for job ID {}.", jobId) sender ! ConsumerNotificationResult(false, Some( new IllegalStateException("Cannot find execution graph for job ID " + jobId))) } @@ -422,7 +428,7 @@ class JobManager(val configuration: Configuration) } case Terminated(taskManager) => { - log.info(s"Task manager ${taskManager.path} terminated.") + log.info("Task manager {} terminated.", taskManager.path) instanceManager.unregisterTaskManager(taskManager) context.unwatch(taskManager) } @@ -442,14 +448,13 @@ class JobManager(val configuration: Configuration) libraryCacheManager.unregisterJob(jobID) } catch { case t: Throwable => - log.error(t, s"Could not properly unregister job ${jobID} form the library cache.") + log.error(t, "Could not properly unregister job {} form the library cache.", jobID) } } private def checkJavaVersion { - var javaVersion = System.getProperty("java.version") - if (javaVersion.substring(0, 3).toDouble < 1.7) { - JobManager.LOG.warn("Warning: Flink is running with Java 6. " + + if (System.getProperty("java.version").substring(0, 3).toDouble < 1.7) { + log.warning("Warning: Flink is running with Java 6. " + "Java 6 is not maintained any more by Oracle or the OpenJDK community. " + "Flink currently supports Java 6, but may not in future releases," + " due to the unavailability of bug fixes security patched.") http://git-wip-us.apache.org/repos/asf/flink/blob/4e197d5f/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 2872678..62081a3 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -86,7 +86,9 @@ import scala.collection.JavaConverters._ implicit val timeout = tmTimeout - log.info(s"Starting task manager at ${self.path}.") + log.info("Starting task manager at {}.", self.path) + log.info("Creating {} task slot(s).", numberOfSlots) + log.info("TaskManager connection information {}.", connectionInfo) val REGISTRATION_DELAY = 0 seconds val REGISTRATION_INTERVAL = 10 seconds @@ -105,9 +107,12 @@ import scala.collection.JavaConverters._ val waitForRegistration = scala.collection.mutable.Set[ActorRef]() val profiler = profilingInterval match { - case Some(interval) => Some(TaskManager.startProfiler(self.path.toSerializationFormat, - interval)) - case None => None + case Some(interval) => + log.info("Profiling of jobs is enabled.") + Some(TaskManager.startProfiler(self.path.toSerializationFormat, interval)) + case None => + log.info("Profiling of jobs is disabled.") + None } var libraryCacheManager: LibraryCacheManager = null @@ -132,7 +137,7 @@ import scala.collection.JavaConverters._ } override def postStop(): Unit = { - log.info(s"Stopping task manager ${self.path}.") + log.info("Stopping task manager {}.", self.path) cancelAndClearEverything(new Exception("Task Manager is shutting down.")) @@ -161,6 +166,10 @@ import scala.collection.JavaConverters._ case t: Throwable => log.error(t, "LibraryCacheManager did not shutdown properly.") } } + + if(log.isDebugEnabled){ + log.debug("Task manager {} is completely stopped.", self.path) + } } private def tryJobManagerRegistration(): Unit = { @@ -180,14 +189,14 @@ import scala.collection.JavaConverters._ } else if (registrationAttempts <= TaskManager.MAX_REGISTRATION_ATTEMPTS) { - log.info(s"Try to register at master ${jobManagerAkkaURL}. ${registrationAttempts}. " + - s"Attempt") + log.info("Try to register at master {}. Attempt #{}", jobManagerAkkaURL, + registrationAttempts) val jobManager = context.actorSelection(jobManagerAkkaURL) jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots) } else { - log.error("TaskManager could not register at JobManager.") + log.error("TaskManager could not register at JobManager."); self ! PoisonPill } } @@ -200,9 +209,8 @@ import scala.collection.JavaConverters._ context.watch(currentJobManager) - log.info(s"TaskManager successfully registered at JobManager ${ - currentJobManager.path.toString - }.") + log.info("TaskManager successfully registered at JobManager {}.", + currentJobManager.path.toString) setupNetworkEnvironment() setupLibraryCacheManager(blobPort) @@ -274,8 +282,8 @@ import scala.collection.JavaConverters._ } case Terminated(jobManager) => { - log.info(s"Job manager ${jobManager.path} is no longer reachable. " - + "Cancelling all tasks and trying to reregister.") + log.info("Job manager {} is no longer reachable. Cancelling all tasks and trying to " + + "reregister.", jobManager.path) cancelAndClearEverything(new Throwable("Lost connection to JobManager")) tryJobManagerRegistration() @@ -285,7 +293,7 @@ import scala.collection.JavaConverters._ def notifyExecutionStateChange(jobID: JobID, executionID: ExecutionAttemptID, executionState: ExecutionState, optionalError: Throwable): Unit = { - log.info(s"Update execution state to ${executionState}.") + log.info("Update execution state to {}.", executionState) val futureResponse = (currentJobManager ? UpdateTaskExecutionState(new TaskExecutionState (jobID, executionID, executionState, optionalError)))(timeout) @@ -301,8 +309,8 @@ import scala.collection.JavaConverters._ self ! UnregisterTask(executionID) } case Failure(t) => { - log.warning(s"Execution state change notification failed for task ${executionID} " + - s"of job ${jobID}. Cause ${t.getMessage}.") + log.warning("Execution state change notification failed for task {} of job {}. Cause {}.", + executionID, jobID, t.getMessage) self ! UnregisterTask(executionID) } } @@ -321,12 +329,11 @@ import scala.collection.JavaConverters._ if (log.isDebugEnabled) { startRegisteringTask = System.currentTimeMillis() } - libraryCacheManager.registerTask(jobID, executionID, tdd.getRequiredJarFiles()) + libraryCacheManager.registerTask(jobID, executionID, tdd.getRequiredJarFiles()); if (log.isDebugEnabled) { - log.debug(s"Register task ${executionID} took ${ - (System.currentTimeMillis() - startRegisteringTask) / 1000.0 - }s") + log.debug("Register task {} took {}s", executionID, + (System.currentTimeMillis() - startRegisteringTask) / 1000.0) } val userCodeClassLoader = libraryCacheManager.getClassLoader(jobID) @@ -386,7 +393,7 @@ import scala.collection.JavaConverters._ val message = if (t.isInstanceOf[CancelTaskException]) { "Task was canceled" } else { - log.error(t, s"Could not instantiate task with execution ID ${executionID}.") + log.error(t, "Could not instantiate task with execution ID {}.", executionID) ExceptionUtils.stringifyException(t) } @@ -398,7 +405,7 @@ import scala.collection.JavaConverters._ libraryCacheManager.unregisterTask(jobID, executionID) } catch { - case t: Throwable => log.error("Error during cleanup of task deployment.", t) + case t: Throwable => log.error(t, "Error during cleanup of task deployment.") } sender ! new TaskOperationResult(executionID, false, message) @@ -480,6 +487,9 @@ import scala.collection.JavaConverters._ if (blobPort > 0) { val address = new InetSocketAddress(currentJobManager.path.address.host.getOrElse ("localhost"), blobPort) + + log.info("Determined BLOB server address to be {}.", address) + libraryCacheManager = new BlobLibraryCacheManager(new BlobCache(address), cleanupInterval) } else { libraryCacheManager = new FallbackLibraryCacheManager @@ -500,14 +510,14 @@ import scala.collection.JavaConverters._ } private def unregisterTask(executionID: ExecutionAttemptID): Unit = { - log.info(s"Unregister task with execution ID ${executionID}.") + log.info("Unregister task with execution ID {}.", executionID) runningTasks.remove(executionID) match { case Some(task) => removeAllTaskResources(task) libraryCacheManager.unregisterTask(task.getJobID, executionID) case None => if (log.isDebugEnabled) { - log.debug(s"Cannot find task with ID ${executionID} to unregister.") + log.debug("Cannot find task with ID {} to unregister.", executionID) } } } @@ -537,12 +547,12 @@ import scala.collection.JavaConverters._ } private def logMemoryStats(): Unit = { - if (log.isDebugEnabled) { + if (log.isInfoEnabled) { val memoryMXBean = ManagementFactory.getMemoryMXBean() val gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans().asScala - log.debug(TaskManager.getMemoryUsageStatsAsString(memoryMXBean)) - log.debug(TaskManager.getGarbageCollectorStatsAsString(gcMXBeans)) + log.info(TaskManager.getMemoryUsageStatsAsString(memoryMXBean)) + log.info(TaskManager.getGarbageCollectorStatsAsString(gcMXBeans)) } } } @@ -685,6 +695,9 @@ object TaskManager { } else { val fraction = configuration.getFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION) + + LOG.info("Using {} of the free heap space for managed memory.", fraction) + ((EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag - networkBufferMem) * fraction) .toLong } http://git-wip-us.apache.org/repos/asf/flink/blob/4e197d5f/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala index 37b99b1..014a9ed 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala @@ -46,8 +46,7 @@ object TestingUtils { s"""akka.daemonic = on |akka.test.timefactor = 10 |akka.loggers = ["akka.event.slf4j.Slf4jLogger"] - |akka.loglevel = "OFF" - |akka.logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" + |akka.loglevel = "DEBUG" |akka.stdout-loglevel = "OFF" |akka.jvm-exit-on-fata-error = off |akka.log-config-on-start = off
