Repository: spark
Updated Branches:
  refs/heads/branch-1.1 09b8a3ce0 -> 6ec137450


[SPARK-2952] Enable logging actor messages at DEBUG level

Example messages:
```
14/08/09 21:37:01 DEBUG BlockManagerMasterActor: [actor] received message 
RegisterBlockManager(BlockManagerId(0, rxin-mbp, 58092, 
0),278302556,Actor[akka.tcp://spark@rxin-mbp:58088/user/BlockManagerActor1#-63596539])
 from Actor[akka.tcp://spark@rxin-mbp:58088/temp/$c]

14/08/09 21:37:01 DEBUG BlockManagerMasterActor: [actor] handled message (0.279 
ms) RegisterBlockManager(BlockManagerId(0, rxin-mbp, 58092, 
0),278302556,Actor[akka.tcp://spark@rxin-mbp:58088/user/BlockManagerActor1#-63596539])
 from Actor[akka.tcp://spark@rxin-mbp:58088/temp/$c]
```

cc @mengxr @tdas @pwendell

Author: Reynold Xin <[email protected]>

Closes #1870 from rxin/actorLogging and squashes the following commits:

c531ee5 [Reynold Xin] Added license header for ActorLogReceive.
f6b1ebe [Reynold Xin] [SPARK-2952] Enable logging actor messages at DEBUG level

(cherry picked from commit 37338666655909502e424b4639d680271d6d4c12)
Signed-off-by: Reynold Xin <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6ec13745
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6ec13745
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6ec13745

Branch: refs/heads/branch-1.1
Commit: 6ec13745093e983836098c5828a4d4f4e8cc2f54
Parents: 09b8a3c
Author: Reynold Xin <[email protected]>
Authored: Mon Aug 11 15:25:21 2014 -0700
Committer: Reynold Xin <[email protected]>
Committed: Mon Aug 11 15:25:33 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/HeartbeatReceiver.scala    |  7 ++-
 .../org/apache/spark/MapOutputTracker.scala     |  4 +-
 .../scala/org/apache/spark/deploy/Client.scala  |  8 ++-
 .../apache/spark/deploy/client/AppClient.scala  |  6 +-
 .../org/apache/spark/deploy/master/Master.scala |  6 +-
 .../org/apache/spark/deploy/worker/Worker.scala |  6 +-
 .../spark/deploy/worker/WorkerWatcher.scala     |  8 ++-
 .../executor/CoarseGrainedExecutorBackend.scala |  7 ++-
 .../cluster/CoarseGrainedSchedulerBackend.scala |  9 ++-
 .../spark/scheduler/local/LocalBackend.scala    |  8 +--
 .../spark/storage/BlockManagerMasterActor.scala | 11 ++--
 .../spark/storage/BlockManagerSlaveActor.scala  |  5 +-
 .../org/apache/spark/util/ActorLogReceive.scala | 64 ++++++++++++++++++++
 13 files changed, 111 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6ec13745/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala 
b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index 24ccce2..83ae57b 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -21,6 +21,7 @@ import akka.actor.Actor
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.storage.BlockManagerId
 import org.apache.spark.scheduler.TaskScheduler
+import org.apache.spark.util.ActorLogReceive
 
 /**
  * A heartbeat from executors to the driver. This is a shared message used by 
several internal
@@ -36,8 +37,10 @@ private[spark] case class 
HeartbeatResponse(reregisterBlockManager: Boolean)
 /**
  * Lives in the driver to receive heartbeats from executors..
  */
-private[spark] class HeartbeatReceiver(scheduler: TaskScheduler) extends Actor 
{
-  override def receive = {
+private[spark] class HeartbeatReceiver(scheduler: TaskScheduler)
+  extends Actor with ActorLogReceive with Logging {
+
+  override def receiveWithLogging = {
     case Heartbeat(executorId, taskMetrics, blockManagerId) =>
       val response = HeartbeatResponse(
         !scheduler.executorHeartbeatReceived(executorId, taskMetrics, 
blockManagerId))

http://git-wip-us.apache.org/repos/asf/spark/blob/6ec13745/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala 
b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 8940917..51705c8 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -38,10 +38,10 @@ private[spark] case object StopMapOutputTracker extends 
MapOutputTrackerMessage
 
 /** Actor class for MapOutputTrackerMaster */
 private[spark] class MapOutputTrackerMasterActor(tracker: 
MapOutputTrackerMaster, conf: SparkConf)
-  extends Actor with Logging {
+  extends Actor with ActorLogReceive with Logging {
   val maxAkkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
 
-  def receive = {
+  override def receiveWithLogging = {
     case GetMapOutputStatuses(shuffleId: Int) =>
       val hostPort = sender.path.address.hostPort
       logInfo("Asked to send map output locations for shuffle " + shuffleId + 
" to " + hostPort)

http://git-wip-us.apache.org/repos/asf/spark/blob/6ec13745/core/src/main/scala/org/apache/spark/deploy/Client.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala 
b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index c070037..065ddda 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -27,12 +27,14 @@ import org.apache.log4j.{Level, Logger}
 import org.apache.spark.{Logging, SecurityManager, SparkConf}
 import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.master.{DriverState, Master}
-import org.apache.spark.util.{AkkaUtils, Utils}
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils, Utils}
 
 /**
  * Proxy that relays messages to the driver.
  */
-private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) 
extends Actor with Logging {
+private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
+  extends Actor with ActorLogReceive with Logging {
+
   var masterActor: ActorSelection = _
   val timeout = AkkaUtils.askTimeout(conf)
 
@@ -114,7 +116,7 @@ private class ClientActor(driverArgs: ClientArguments, 
conf: SparkConf) extends
     }
   }
 
-  override def receive = {
+  override def receiveWithLogging = {
 
     case SubmitDriverResponse(success, driverId, message) =>
       println(message)

http://git-wip-us.apache.org/repos/asf/spark/blob/6ec13745/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala 
b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index d38e9e7..3279005 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -30,7 +30,7 @@ import org.apache.spark.{Logging, SparkConf, SparkException}
 import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
 import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.master.Master
-import org.apache.spark.util.{Utils, AkkaUtils}
+import org.apache.spark.util.{ActorLogReceive, Utils, AkkaUtils}
 
 /**
  * Interface allowing applications to speak with a Spark deploy cluster. Takes 
a master URL,
@@ -56,7 +56,7 @@ private[spark] class AppClient(
   var registered = false
   var activeMasterUrl: String = null
 
-  class ClientActor extends Actor with Logging {
+  class ClientActor extends Actor with ActorLogReceive with Logging {
     var master: ActorSelection = null
     var alreadyDisconnected = false  // To avoid calling 
listener.disconnected() multiple times
     var alreadyDead = false  // To avoid calling listener.dead() multiple times
@@ -119,7 +119,7 @@ private[spark] class AppClient(
         .contains(remoteUrl.hostPort)
     }
 
-    override def receive = {
+    override def receiveWithLogging = {
       case RegisteredApplication(appId_, masterUrl) =>
         appId = appId_
         registered = true

http://git-wip-us.apache.org/repos/asf/spark/blob/6ec13745/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index a70ecdb..cfa2c02 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -42,14 +42,14 @@ import org.apache.spark.deploy.master.ui.MasterWebUI
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.scheduler.{EventLoggingListener, ReplayListenerBus}
 import org.apache.spark.ui.SparkUI
-import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger, Utils}
 
 private[spark] class Master(
     host: String,
     port: Int,
     webUiPort: Int,
     val securityMgr: SecurityManager)
-  extends Actor with Logging {
+  extends Actor with ActorLogReceive with Logging {
 
   import context.dispatcher   // to use Akka's scheduler.schedule()
 
@@ -167,7 +167,7 @@ private[spark] class Master(
     context.stop(leaderElectionAgent)
   }
 
-  override def receive = {
+  override def receiveWithLogging = {
     case ElectedLeader => {
       val (storedApps, storedDrivers, storedWorkers) = 
persistenceEngine.readPersistedData()
       state = if (storedApps.isEmpty && storedDrivers.isEmpty && 
storedWorkers.isEmpty) {

http://git-wip-us.apache.org/repos/asf/spark/blob/6ec13745/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 458d994..da4fa2f 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -34,7 +34,7 @@ import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.master.{DriverState, Master}
 import org.apache.spark.deploy.worker.ui.WorkerWebUI
 import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger, Utils}
 
 /**
   * @param masterUrls Each url should look like spark://host:port.
@@ -51,7 +51,7 @@ private[spark] class Worker(
     workDirPath: String = null,
     val conf: SparkConf,
     val securityMgr: SecurityManager)
-  extends Actor with Logging {
+  extends Actor with ActorLogReceive with Logging {
   import context.dispatcher
 
   Utils.checkHost(host, "Expected hostname")
@@ -187,7 +187,7 @@ private[spark] class Worker(
     }
   }
 
-  override def receive = {
+  override def receiveWithLogging = {
     case RegisteredWorker(masterUrl, masterWebUiUrl) =>
       logInfo("Successfully registered with master " + masterUrl)
       registered = true

http://git-wip-us.apache.org/repos/asf/spark/blob/6ec13745/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
index 530c147..6d0d0bb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
@@ -22,13 +22,15 @@ import akka.remote.{AssociatedEvent, AssociationErrorEvent, 
AssociationEvent, Di
 
 import org.apache.spark.Logging
 import org.apache.spark.deploy.DeployMessages.SendHeartbeat
+import org.apache.spark.util.ActorLogReceive
 
 /**
  * Actor which connects to a worker process and terminates the JVM if the 
connection is severed.
  * Provides fate sharing between a worker and its associated child processes.
  */
-private[spark] class WorkerWatcher(workerUrl: String) extends Actor
-    with Logging {
+private[spark] class WorkerWatcher(workerUrl: String)
+  extends Actor with ActorLogReceive with Logging {
+
   override def preStart() {
     context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
 
@@ -48,7 +50,7 @@ private[spark] class WorkerWatcher(workerUrl: String) extends 
Actor
 
   def exitNonZero() = if (isTesting) isShutDown = true else System.exit(-1)
 
-  override def receive = {
+  override def receiveWithLogging = {
     case AssociatedEvent(localAddress, remoteAddress, inbound) if 
isWorker(remoteAddress) =>
       logInfo(s"Successfully connected to $workerUrl")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6ec13745/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 1f46a0f..13af5b6 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -31,14 +31,15 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.worker.WorkerWatcher
 import org.apache.spark.scheduler.TaskDescription
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
-import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger, Utils}
 
 private[spark] class CoarseGrainedExecutorBackend(
     driverUrl: String,
     executorId: String,
     hostPort: String,
     cores: Int,
-    sparkProperties: Seq[(String, String)]) extends Actor with ExecutorBackend 
with Logging {
+    sparkProperties: Seq[(String, String)])
+  extends Actor with ActorLogReceive with ExecutorBackend with Logging {
 
   Utils.checkHostPort(hostPort, "Expected hostport")
 
@@ -52,7 +53,7 @@ private[spark] class CoarseGrainedExecutorBackend(
     context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
   }
 
-  override def receive = {
+  override def receiveWithLogging = {
     case RegisteredExecutor =>
       logInfo("Successfully registered with driver")
       // Make this host instead of hostPort ?

http://git-wip-us.apache.org/repos/asf/spark/blob/6ec13745/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 33500d9..2a3711a 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -30,7 +30,7 @@ import akka.remote.{DisassociatedEvent, 
RemotingLifecycleEvent}
 import org.apache.spark.{SparkEnv, Logging, SparkException, TaskState}
 import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, 
TaskDescription, TaskSchedulerImpl, WorkerOffer}
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
-import org.apache.spark.util.{SerializableBuffer, AkkaUtils, Utils}
+import org.apache.spark.util.{ActorLogReceive, SerializableBuffer, AkkaUtils, 
Utils}
 import org.apache.spark.ui.JettyUtils
 
 /**
@@ -61,7 +61,10 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
     conf.getInt("spark.scheduler.maxRegisteredResourcesWaitingTime", 30000)
   val createTime = System.currentTimeMillis()
 
-  class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
+  class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor with 
ActorLogReceive {
+
+    override protected def log = CoarseGrainedSchedulerBackend.this.log
+
     private val executorActor = new HashMap[String, ActorRef]
     private val executorAddress = new HashMap[String, Address]
     private val executorHost = new HashMap[String, String]
@@ -79,7 +82,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
       context.system.scheduler.schedule(0.millis, reviveInterval.millis, self, 
ReviveOffers)
     }
 
-    def receive = {
+    def receiveWithLogging = {
       case RegisterExecutor(executorId, hostPort, cores) =>
         Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
         if (executorActor.contains(executorId)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/6ec13745/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala 
b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 3d1cf31..bec9502 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -23,9 +23,9 @@ import akka.actor.{Actor, ActorRef, Props}
 
 import org.apache.spark.{Logging, SparkEnv, TaskState}
 import org.apache.spark.TaskState.TaskState
-import org.apache.spark.executor.{TaskMetrics, Executor, ExecutorBackend}
+import org.apache.spark.executor.{Executor, ExecutorBackend}
 import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, 
WorkerOffer}
-import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.util.ActorLogReceive
 
 private case class ReviveOffers()
 
@@ -43,7 +43,7 @@ private case class StopExecutor()
 private[spark] class LocalActor(
   scheduler: TaskSchedulerImpl,
   executorBackend: LocalBackend,
-  private val totalCores: Int) extends Actor with Logging {
+  private val totalCores: Int) extends Actor with ActorLogReceive with Logging 
{
 
   private var freeCores = totalCores
 
@@ -53,7 +53,7 @@ private[spark] class LocalActor(
   val executor = new Executor(
     localExecutorId, localExecutorHostname, scheduler.conf.getAll, isLocal = 
true)
 
-  def receive = {
+  override def receiveWithLogging = {
     case ReviveOffers =>
       reviveOffers()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6ec13745/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index bd31e3c..3ab0770 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -31,7 +31,7 @@ import org.apache.spark.{Logging, SparkConf, SparkException}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.scheduler._
 import org.apache.spark.storage.BlockManagerMessages._
-import org.apache.spark.util.{AkkaUtils, Utils}
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils, Utils}
 
 /**
  * BlockManagerMasterActor is an actor on the master node to track statuses of
@@ -39,7 +39,7 @@ import org.apache.spark.util.{AkkaUtils, Utils}
  */
 private[spark]
 class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, 
listenerBus: LiveListenerBus)
-  extends Actor with Logging {
+  extends Actor with ActorLogReceive with Logging {
 
   // Mapping from block manager id to the block manager's information.
   private val blockManagerInfo = new mutable.HashMap[BlockManagerId, 
BlockManagerInfo]
@@ -55,8 +55,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: 
SparkConf, listenerBus
   val slaveTimeout = conf.getLong("spark.storage.blockManagerSlaveTimeoutMs",
     math.max(conf.getInt("spark.executor.heartbeatInterval", 10000) * 3, 
45000))
 
-  val checkTimeoutInterval = 
conf.getLong("spark.storage.blockManagerTimeoutIntervalMs",
-    60000)
+  val checkTimeoutInterval = 
conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60000)
 
   var timeoutCheckingTask: Cancellable = null
 
@@ -67,9 +66,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: 
SparkConf, listenerBus
     super.preStart()
   }
 
-  def receive = {
+  override def receiveWithLogging = {
     case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) =>
-      logInfo("received a register")
       register(blockManagerId, maxMemSize, slaveActor)
       sender ! true
 
@@ -118,7 +116,6 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: 
SparkConf, listenerBus
       sender ! true
 
     case StopBlockManagerMaster =>
-      logInfo("Stopping BlockManagerMaster")
       sender ! true
       if (timeoutCheckingTask != null) {
         timeoutCheckingTask.cancel()

http://git-wip-us.apache.org/repos/asf/spark/blob/6ec13745/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala
index 6d4db06..c194e0f 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala
@@ -23,6 +23,7 @@ import akka.actor.{ActorRef, Actor}
 
 import org.apache.spark.{Logging, MapOutputTracker}
 import org.apache.spark.storage.BlockManagerMessages._
+import org.apache.spark.util.ActorLogReceive
 
 /**
  * An actor to take commands from the master to execute options. For example,
@@ -32,12 +33,12 @@ private[storage]
 class BlockManagerSlaveActor(
     blockManager: BlockManager,
     mapOutputTracker: MapOutputTracker)
-  extends Actor with Logging {
+  extends Actor with ActorLogReceive with Logging {
 
   import context.dispatcher
 
   // Operations that involve removing blocks may be slow and should be done 
asynchronously
-  override def receive = {
+  override def receiveWithLogging = {
     case RemoveBlock(blockId) =>
       doAsync[Boolean]("removing block " + blockId, sender) {
         blockManager.removeBlock(blockId)

http://git-wip-us.apache.org/repos/asf/spark/blob/6ec13745/core/src/main/scala/org/apache/spark/util/ActorLogReceive.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ActorLogReceive.scala 
b/core/src/main/scala/org/apache/spark/util/ActorLogReceive.scala
new file mode 100644
index 0000000..332d0cb
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/ActorLogReceive.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import akka.actor.Actor
+import org.slf4j.Logger
+
+/**
+ * A trait to enable logging all Akka actor messages. Here's an example of 
using this:
+ *
+ * {{{
+ *   class BlockManagerMasterActor extends Actor with ActorLogReceive with 
Logging {
+ *     ...
+ *     override def receiveWithLogging = {
+ *       case GetLocations(blockId) =>
+ *         sender ! getLocations(blockId)
+ *       ...
+ *     }
+ *     ...
+ *   }
+ * }}}
+ *
+ */
+private[spark] trait ActorLogReceive {
+  self: Actor =>
+
+  override def receive: Actor.Receive = new Actor.Receive {
+
+    private val _receiveWithLogging = receiveWithLogging
+
+    override def isDefinedAt(o: Any): Boolean = 
_receiveWithLogging.isDefinedAt(o)
+
+    override def apply(o: Any): Unit = {
+      if (log.isDebugEnabled) {
+        log.debug(s"[actor] received message $o from ${self.sender}")
+      }
+      val start = System.nanoTime
+      _receiveWithLogging.apply(o)
+      val timeTaken = (System.nanoTime - start).toDouble / 1000000
+      if (log.isDebugEnabled) {
+        log.debug(s"[actor] handled message ($timeTaken ms) $o from 
${self.sender}")
+      }
+    }
+  }
+
+  def receiveWithLogging: Actor.Receive
+
+  protected def log: Logger
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to