[FLINK-1372] [runtime] Fixes logging settings. The logging is now exclusively 
controlled by the logging properties provided to the system. Removes 
akka.loglevel config parameter.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4e197d5f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4e197d5f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4e197d5f

Branch: refs/heads/master
Commit: 4e197d5fac7bfc774a8e4e39ed40c831e5848428
Parents: 58e1e44
Author: Till Rohrmann <[email protected]>
Authored: Mon Jan 19 16:13:52 2015 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Tue Jan 20 20:29:03 2015 +0100

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  7 --
 .../apache/flink/runtime/akka/AkkaUtils.scala   | 20 ++----
 .../flink/runtime/jobmanager/JobManager.scala   | 67 +++++++++++---------
 .../flink/runtime/taskmanager/TaskManager.scala | 67 ++++++++++++--------
 .../runtime/testingUtils/TestingUtils.scala     |  3 +-
 5 files changed, 83 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4e197d5f/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index df96339..d482e3c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -345,11 +345,6 @@ public final class ConfigConstants {
        public static final String AKKA_LOG_LIFECYCLE_EVENTS = 
"akka.log.lifecycle.events";
 
        /**
-        * Log level for akka
-        */
-       public static final String AKKA_LOG_LEVEL = "akka.loglevel";
-
-       /**
         * Timeout for all blocking calls
         */
        public static final String AKKA_ASK_TIMEOUT = "akka.ask.timeout";
@@ -597,8 +592,6 @@ public final class ConfigConstants {
 
        public static String DEFAULT_AKKA_FRAMESIZE = "10485760b";
 
-       public static String DEFAULT_AKKA_LOG_LEVEL = "ERROR";
-
        public static int DEFAULT_AKKA_ASK_TIMEOUT = 100;
        
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4e197d5f/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 07f5395..dd44587 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -80,15 +80,9 @@ object AkkaUtils {
 
     val logLifecycleEvents = if (lifecycleEvents) "on" else "off"
 
-    val logLevel = configuration.getString(ConfigConstants.AKKA_LOG_LEVEL,
-      ConfigConstants.DEFAULT_AKKA_LOG_LEVEL)
-
     val configString =
       s"""
          |akka {
-         |  loglevel = $logLevel
-         |  stdout-loglevel = $logLevel
-         |
          |  log-dead-letters = $logLifecycleEvents
          |  log-dead-letters-during-shutdown = $logLifecycleEvents
          |
@@ -144,15 +138,9 @@ object AkkaUtils {
 
     val logLifecycleEvents = if (lifecycleEvents) "on" else "off"
 
-    val logLevel = configuration.getString(ConfigConstants.AKKA_LOG_LEVEL,
-      ConfigConstants.DEFAULT_AKKA_LOG_LEVEL)
-
     val configString =
       s"""
          |akka {
-         |  loglevel = $logLevel
-         |  stdout-loglevel = $logLevel
-         |
          |  log-dead-letters = $logLifecycleEvents
          |  log-dead-letters-during-shutdown = $logLifecycleEvents
          |
@@ -204,12 +192,16 @@ object AkkaUtils {
       |
       |  loggers = ["akka.event.slf4j.Slf4jLogger"]
       |  logger-startup-timeout = 30s
-      |  loglevel = "WARNING"
-      |  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
+      |  loglevel = "DEBUG"
       |  stdout-loglevel = "WARNING"
       |  jvm-exit-on-fatal-error = off
       |  log-config-on-start = off
+      |
       |  serialize-messages = on
+      |
+      |  debug {
+      |   lifecycle = on
+      |  }
       |}
     """.stripMargin
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/4e197d5f/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 9dbd581..bed8a11 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -105,7 +105,16 @@ class JobManager(val configuration: Configuration)
 
     instanceManager.shutdown()
     scheduler.shutdown()
-    libraryCacheManager.shutdown()
+
+    try {
+      libraryCacheManager.shutdown()
+    } catch {
+      case e: IOException => log.error(e, "Could not properly shutdown the 
library cache manager.")
+    }
+
+    if(log.isDebugEnabled) {
+      log.debug("Job manager {} is completely stopped.", self.path)
+    }
   }
 
   override def receiveWithLogMessages: Receive = {
@@ -134,7 +143,7 @@ class JobManager(val configuration: Configuration)
           sender ! akka.actor.Status.Failure(new 
IllegalArgumentException("JobGraph must not be" +
             " null."))
         } else {
-          log.info(s"Received job ${jobGraph.getJobID} (${jobGraph.getName}).")
+          log.info("Received job {} ({}).", jobGraph.getJobID, 
jobGraph.getName)
 
           if (jobGraph.getNumberOfVertices == 0) {
             sender ! SubmissionFailure(jobGraph.getJobID, new 
IllegalArgumentException("Job is " +
@@ -164,10 +173,8 @@ class JobManager(val configuration: Configuration)
             }
 
             if (log.isDebugEnabled) {
-              log.debug(s"Running master initialization of job 
${jobGraph.getJobID} (${
-                jobGraph
-                  .getName
-              }}).")
+              log.debug("Running master initialization of job {} ({}).",
+                jobGraph.getJobID, jobGraph.getName)
             }
 
             for (vertex <- jobGraph.getVertices.asScala) {
@@ -184,17 +191,15 @@ class JobManager(val configuration: Configuration)
             val sortedTopology = 
jobGraph.getVerticesSortedTopologicallyFromSources
 
             if (log.isDebugEnabled) {
-              log.debug(s"Adding ${sortedTopology.size()} vertices from job 
graph ${
-                jobGraph
-                  .getJobID
-              } (${jobGraph.getName}).")
+              log.debug("Adding {} vertices from job graph {} ({}).",
+                sortedTopology.size(), jobGraph.getJobID, jobGraph.getName)
             }
 
             executionGraph.attachJobGraph(sortedTopology)
 
             if (log.isDebugEnabled) {
-              log.debug(s"Successfully created execution graph from job graph 
" +
-                s"${jobGraph.getJobID} (${jobGraph.getName}).")
+              log.debug("Successfully created execution graph from job graph 
{} ({}).",
+                jobGraph.getJobID, jobGraph.getName)
             }
 
             
executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling)
@@ -210,7 +215,7 @@ class JobManager(val configuration: Configuration)
 
             jobInfo.detach = detach
 
-            log.info(s"Scheduling job ${jobGraph.getName}.")
+            log.info("Scheduling job {}.", jobGraph.getName)
 
             executionGraph.scheduleForExecution(scheduler)
 
@@ -245,7 +250,7 @@ class JobManager(val configuration: Configuration)
     }
 
     case CancelJob(jobID) => {
-      log.info(s"Trying to cancel job with ID ${jobID}.")
+      log.info("Trying to cancel job with ID {}.", jobID)
 
       currentJobs.get(jobID) match {
         case Some((executionGraph, _)) =>
@@ -254,8 +259,8 @@ class JobManager(val configuration: Configuration)
           }
           sender ! CancellationSuccess(jobID)
         case None =>
-          log.info(s"No job found with ID ${jobID}.")
-          sender ! CancellationFailure(jobID, new 
IllegalArgumentException(s"No job found with " +
+          log.info("No job found with ID {}.", jobID)
+          sender ! CancellationFailure(jobID, new IllegalArgumentException("No 
job found with " +
             s"ID ${jobID}."))
       }
     }
@@ -270,8 +275,8 @@ class JobManager(val configuration: Configuration)
             Future {
               originalSender ! executionGraph.updateState(taskExecutionState)
             }
-          case None => log.error(s"Cannot find execution graph for ID 
${taskExecutionState
-            .getJobID} to change state to 
${taskExecutionState.getExecutionState}.")
+          case None => log.error("Cannot find execution graph for ID {} to 
change state to {}.",
+            taskExecutionState.getJobID, taskExecutionState.getExecutionState)
             sender ! false
         }
       }
@@ -283,7 +288,7 @@ class JobManager(val configuration: Configuration)
           val execution = 
executionGraph.getRegisteredExecutions().get(executionAttempt)
 
           if(execution == null){
-            log.error(s"Can not find Execution for attempt 
${executionAttempt}.")
+            log.error("Can not find Execution for attempt {}.", 
executionAttempt)
             null
           }else{
             val slot = execution.getAssignedResource
@@ -298,21 +303,21 @@ class JobManager(val configuration: Configuration)
               case vertex: ExecutionJobVertex => vertex.getSplitAssigner match 
{
                 case splitAssigner: InputSplitAssigner => 
splitAssigner.getNextInputSplit(host)
                 case _ =>
-                  log.error(s"No InputSplitAssigner for vertex ID 
${vertexID}.")
+                  log.error("No InputSplitAssigner for vertex ID {}.", 
vertexID)
                   null
               }
               case _ =>
-                log.error(s"Cannot find execution vertex for vertex ID 
${vertexID}.")
+                log.error("Cannot find execution vertex for vertex ID {}.", 
vertexID)
                 null
           }
         }
         case None =>
-          log.error(s"Cannot find execution graph for job ID ${jobID}.")
+          log.error("Cannot find execution graph for job ID {}.", jobID)
           null
       }
 
       if(log.isDebugEnabled) {
-        log.debug(s"Send next input split ${nextInputSplit}.")
+        log.debug("Send next input split {}.", nextInputSplit)
       }
       sender ! NextInputSplit(nextInputSplit)
     }
@@ -320,8 +325,9 @@ class JobManager(val configuration: Configuration)
     case JobStatusChanged(jobID, newJobStatus, timeStamp, optionalMessage) => {
       currentJobs.get(jobID) match {
         case Some((executionGraph, jobInfo)) => executionGraph.getJobName
-          log.info(s"Status of job ${jobID} (${executionGraph.getJobName}) 
changed to " +
-            s"${newJobStatus}${if(optionalMessage == null) "" else 
optionalMessage}.")
+          log.info("Status of job {} ({}) changed to {}{}.",
+            jobID, executionGraph.getJobName, newJobStatus,
+            if(optionalMessage == null) "" else optionalMessage)
 
           if(newJobStatus.isTerminalState) {
             jobInfo.end = timeStamp
@@ -368,7 +374,7 @@ class JobManager(val configuration: Configuration)
           sender ! ConsumerNotificationResult(executionGraph
             .scheduleOrUpdateConsumers(executionId, partitionIndex))
         case None =>
-          log.error(s"Cannot find execution graph for job ID ${jobId}.")
+          log.error("Cannot find execution graph for job ID {}.", jobId)
           sender ! ConsumerNotificationResult(false, Some(
             new IllegalStateException("Cannot find execution graph for job ID 
" + jobId)))
       }
@@ -422,7 +428,7 @@ class JobManager(val configuration: Configuration)
     }
 
     case Terminated(taskManager) => {
-      log.info(s"Task manager ${taskManager.path} terminated.")
+      log.info("Task manager {} terminated.", taskManager.path)
       instanceManager.unregisterTaskManager(taskManager)
       context.unwatch(taskManager)
     }
@@ -442,14 +448,13 @@ class JobManager(val configuration: Configuration)
       libraryCacheManager.unregisterJob(jobID)
     } catch {
       case t: Throwable =>
-        log.error(t, s"Could not properly unregister job ${jobID} form the 
library cache.")
+        log.error(t, "Could not properly unregister job {} form the library 
cache.", jobID)
     }
   }
 
   private def checkJavaVersion {
-    var javaVersion = System.getProperty("java.version")
-    if (javaVersion.substring(0, 3).toDouble < 1.7) {
-      JobManager.LOG.warn("Warning: Flink is running with Java 6. " +
+    if (System.getProperty("java.version").substring(0, 3).toDouble < 1.7) {
+      log.warning("Warning: Flink is running with Java 6. " +
         "Java 6 is not maintained any more by Oracle or the OpenJDK community. 
" +
         "Flink currently supports Java 6, but may not in future releases," +
         " due to the unavailability of bug fixes security patched.")

http://git-wip-us.apache.org/repos/asf/flink/blob/4e197d5f/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 2872678..62081a3 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -86,7 +86,9 @@ import scala.collection.JavaConverters._
 
   implicit val timeout = tmTimeout
 
-  log.info(s"Starting task manager at ${self.path}.")
+  log.info("Starting task manager at {}.", self.path)
+  log.info("Creating {} task slot(s).", numberOfSlots)
+  log.info("TaskManager connection information {}.", connectionInfo)
 
   val REGISTRATION_DELAY = 0 seconds
   val REGISTRATION_INTERVAL = 10 seconds
@@ -105,9 +107,12 @@ import scala.collection.JavaConverters._
   val waitForRegistration = scala.collection.mutable.Set[ActorRef]()
 
   val profiler = profilingInterval match {
-    case Some(interval) => 
Some(TaskManager.startProfiler(self.path.toSerializationFormat,
-      interval))
-    case None => None
+    case Some(interval) =>
+      log.info("Profiling of jobs is enabled.")
+      Some(TaskManager.startProfiler(self.path.toSerializationFormat, 
interval))
+    case None =>
+      log.info("Profiling of jobs is disabled.")
+      None
   }
 
   var libraryCacheManager: LibraryCacheManager = null
@@ -132,7 +137,7 @@ import scala.collection.JavaConverters._
   }
 
   override def postStop(): Unit = {
-    log.info(s"Stopping task manager ${self.path}.")
+    log.info("Stopping task manager {}.", self.path)
 
     cancelAndClearEverything(new Exception("Task Manager is shutting down."))
 
@@ -161,6 +166,10 @@ import scala.collection.JavaConverters._
         case t: Throwable => log.error(t, "LibraryCacheManager did not 
shutdown properly.")
       }
     }
+
+    if(log.isDebugEnabled){
+      log.debug("Task manager {} is completely stopped.", self.path)
+    }
   }
 
   private def tryJobManagerRegistration(): Unit = {
@@ -180,14 +189,14 @@ import scala.collection.JavaConverters._
       }
       else if (registrationAttempts <= TaskManager.MAX_REGISTRATION_ATTEMPTS) {
 
-        log.info(s"Try to register at master ${jobManagerAkkaURL}. 
${registrationAttempts}. " +
-          s"Attempt")
+        log.info("Try to register at master {}. Attempt #{}", 
jobManagerAkkaURL,
+          registrationAttempts)
         val jobManager = context.actorSelection(jobManagerAkkaURL)
 
         jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, 
numberOfSlots)
       }
       else {
-        log.error("TaskManager could not register at JobManager.")
+        log.error("TaskManager could not register at JobManager.");
         self ! PoisonPill
       }
     }
@@ -200,9 +209,8 @@ import scala.collection.JavaConverters._
 
         context.watch(currentJobManager)
 
-        log.info(s"TaskManager successfully registered at JobManager ${
-          currentJobManager.path.toString
-        }.")
+        log.info("TaskManager successfully registered at JobManager {}.",
+          currentJobManager.path.toString)
 
         setupNetworkEnvironment()
         setupLibraryCacheManager(blobPort)
@@ -274,8 +282,8 @@ import scala.collection.JavaConverters._
     }
 
     case Terminated(jobManager) => {
-      log.info(s"Job manager ${jobManager.path} is no longer reachable. "
-        + "Cancelling all tasks and trying to reregister.")
+      log.info("Job manager {} is no longer reachable. Cancelling all tasks 
and trying to " +
+        "reregister.", jobManager.path)
 
       cancelAndClearEverything(new Throwable("Lost connection to JobManager"))
       tryJobManagerRegistration()
@@ -285,7 +293,7 @@ import scala.collection.JavaConverters._
   def notifyExecutionStateChange(jobID: JobID, executionID: ExecutionAttemptID,
                                  executionState: ExecutionState,
                                  optionalError: Throwable): Unit = {
-    log.info(s"Update execution state to ${executionState}.")
+    log.info("Update execution state to {}.", executionState)
     val futureResponse = (currentJobManager ? UpdateTaskExecutionState(new 
TaskExecutionState
     (jobID, executionID, executionState, optionalError)))(timeout)
 
@@ -301,8 +309,8 @@ import scala.collection.JavaConverters._
           self ! UnregisterTask(executionID)
         }
       case Failure(t) => {
-        log.warning(s"Execution state change notification failed for task 
${executionID} " +
-          s"of job ${jobID}. Cause ${t.getMessage}.")
+        log.warning("Execution state change notification failed for task {} of 
job {}. Cause {}.",
+          executionID, jobID, t.getMessage)
         self ! UnregisterTask(executionID)
       }
     }
@@ -321,12 +329,11 @@ import scala.collection.JavaConverters._
       if (log.isDebugEnabled) {
         startRegisteringTask = System.currentTimeMillis()
       }
-      libraryCacheManager.registerTask(jobID, executionID, 
tdd.getRequiredJarFiles())
+      libraryCacheManager.registerTask(jobID, executionID, 
tdd.getRequiredJarFiles());
 
       if (log.isDebugEnabled) {
-        log.debug(s"Register task ${executionID} took ${
-          (System.currentTimeMillis() - startRegisteringTask) / 1000.0
-        }s")
+        log.debug("Register task {} took {}s", executionID,
+          (System.currentTimeMillis() - startRegisteringTask) / 1000.0)
       }
 
       val userCodeClassLoader = libraryCacheManager.getClassLoader(jobID)
@@ -386,7 +393,7 @@ import scala.collection.JavaConverters._
         val message = if (t.isInstanceOf[CancelTaskException]) {
           "Task was canceled"
         } else {
-          log.error(t, s"Could not instantiate task with execution ID 
${executionID}.")
+          log.error(t, "Could not instantiate task with execution ID {}.", 
executionID)
           ExceptionUtils.stringifyException(t)
         }
 
@@ -398,7 +405,7 @@ import scala.collection.JavaConverters._
 
           libraryCacheManager.unregisterTask(jobID, executionID)
         } catch {
-          case t: Throwable => log.error("Error during cleanup of task 
deployment.", t)
+          case t: Throwable => log.error(t, "Error during cleanup of task 
deployment.")
         }
 
         sender ! new TaskOperationResult(executionID, false, message)
@@ -480,6 +487,9 @@ import scala.collection.JavaConverters._
     if (blobPort > 0) {
       val address = new 
InetSocketAddress(currentJobManager.path.address.host.getOrElse
         ("localhost"), blobPort)
+
+      log.info("Determined BLOB server address to be {}.", address)
+
       libraryCacheManager = new BlobLibraryCacheManager(new 
BlobCache(address), cleanupInterval)
     } else {
       libraryCacheManager = new FallbackLibraryCacheManager
@@ -500,14 +510,14 @@ import scala.collection.JavaConverters._
   }
 
   private def unregisterTask(executionID: ExecutionAttemptID): Unit = {
-    log.info(s"Unregister task with execution ID ${executionID}.")
+    log.info("Unregister task with execution ID {}.", executionID)
     runningTasks.remove(executionID) match {
       case Some(task) =>
         removeAllTaskResources(task)
         libraryCacheManager.unregisterTask(task.getJobID, executionID)
       case None =>
         if (log.isDebugEnabled) {
-          log.debug(s"Cannot find task with ID ${executionID} to unregister.")
+          log.debug("Cannot find task with ID {} to unregister.", executionID)
         }
     }
   }
@@ -537,12 +547,12 @@ import scala.collection.JavaConverters._
   }
 
   private def logMemoryStats(): Unit = {
-    if (log.isDebugEnabled) {
+    if (log.isInfoEnabled) {
       val memoryMXBean = ManagementFactory.getMemoryMXBean()
       val gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans().asScala
 
-      log.debug(TaskManager.getMemoryUsageStatsAsString(memoryMXBean))
-      log.debug(TaskManager.getGarbageCollectorStatsAsString(gcMXBeans))
+      log.info(TaskManager.getMemoryUsageStatsAsString(memoryMXBean))
+      log.info(TaskManager.getGarbageCollectorStatsAsString(gcMXBeans))
     }
   }
 }
@@ -685,6 +695,9 @@ object TaskManager {
     } else {
       val fraction = 
configuration.getFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
         ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION)
+
+      LOG.info("Using {} of the free heap space for managed memory.", fraction)
+
       ((EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag - 
networkBufferMem) * fraction)
         .toLong
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/4e197d5f/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 37b99b1..014a9ed 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -46,8 +46,7 @@ object TestingUtils {
     s"""akka.daemonic = on
       |akka.test.timefactor = 10
       |akka.loggers = ["akka.event.slf4j.Slf4jLogger"]
-      |akka.loglevel = "OFF"
-      |akka.logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
+      |akka.loglevel = "DEBUG"
       |akka.stdout-loglevel = "OFF"
       |akka.jvm-exit-on-fata-error = off
       |akka.log-config-on-start = off

Reply via email to