Repository: spark
Updated Branches:
  refs/heads/branch-2.2 6c2489c66 -> d68e0a3a5


[SPARK-20386][SPARK CORE] modify the log info if the block exists on the slave 
already

## What changes were proposed in this pull request?
Modify the added memory size to memSize-originalMemSize if the  block exists on 
the slave already
since if the  block exists, the added memory size should be 
memSize-originalMemSize; if originalMemSize is bigger than memSize ,then the 
log info should be Removed memory, removed size should be 
originalMemSize-memSize

## How was this patch tested?
Multiple runs on existing unit tests

(Please explain how this patch was tested. E.g. unit tests, integration tests, 
manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: eatoncys <[email protected]>

Closes #17683 from eatoncys/SPARK-20386.

(cherry picked from commit 05a451491d535c0828413ce2eb06fe94571069ac)
Signed-off-by: Sean Owen <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d68e0a3a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d68e0a3a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d68e0a3a

Branch: refs/heads/branch-2.2
Commit: d68e0a3a5ec39a3cb4358aacfc2bd1c5d783e51e
Parents: 6c2489c
Author: eatoncys <[email protected]>
Authored: Sat Apr 22 12:29:35 2017 +0100
Committer: Sean Owen <[email protected]>
Committed: Sat Apr 22 12:53:06 2017 +0100

----------------------------------------------------------------------
 .../storage/BlockManagerMasterEndpoint.scala    | 52 +++++++++++++-------
 1 file changed, 35 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d68e0a3a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 467c3e0..6f85b9e 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -497,11 +497,17 @@ private[spark] class BlockManagerInfo(
 
     updateLastSeenMs()
 
-    if (_blocks.containsKey(blockId)) {
+    val blockExists = _blocks.containsKey(blockId)
+    var originalMemSize: Long = 0
+    var originalDiskSize: Long = 0
+    var originalLevel: StorageLevel = StorageLevel.NONE
+
+    if (blockExists) {
       // The block exists on the slave already.
       val blockStatus: BlockStatus = _blocks.get(blockId)
-      val originalLevel: StorageLevel = blockStatus.storageLevel
-      val originalMemSize: Long = blockStatus.memSize
+      originalLevel = blockStatus.storageLevel
+      originalMemSize = blockStatus.memSize
+      originalDiskSize = blockStatus.diskSize
 
       if (originalLevel.useMemory) {
         _remainingMem += originalMemSize
@@ -520,32 +526,44 @@ private[spark] class BlockManagerInfo(
         blockStatus = BlockStatus(storageLevel, memSize = memSize, diskSize = 
0)
         _blocks.put(blockId, blockStatus)
         _remainingMem -= memSize
-        logInfo("Added %s in memory on %s (size: %s, free: %s)".format(
-          blockId, blockManagerId.hostPort, Utils.bytesToString(memSize),
-          Utils.bytesToString(_remainingMem)))
+        if (blockExists) {
+          logInfo(s"Updated $blockId in memory on ${blockManagerId.hostPort}" +
+            s" (current size: ${Utils.bytesToString(memSize)}," +
+            s" original size: ${Utils.bytesToString(originalMemSize)}," +
+            s" free: ${Utils.bytesToString(_remainingMem)})")
+        } else {
+          logInfo(s"Added $blockId in memory on ${blockManagerId.hostPort}" +
+            s" (size: ${Utils.bytesToString(memSize)}," +
+            s" free: ${Utils.bytesToString(_remainingMem)})")
+        }
       }
       if (storageLevel.useDisk) {
         blockStatus = BlockStatus(storageLevel, memSize = 0, diskSize = 
diskSize)
         _blocks.put(blockId, blockStatus)
-        logInfo("Added %s on disk on %s (size: %s)".format(
-          blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
+        if (blockExists) {
+          logInfo(s"Updated $blockId on disk on ${blockManagerId.hostPort}" +
+            s" (current size: ${Utils.bytesToString(diskSize)}," +
+            s" original size: ${Utils.bytesToString(originalDiskSize)})")
+        } else {
+          logInfo(s"Added $blockId on disk on ${blockManagerId.hostPort}" +
+            s" (size: ${Utils.bytesToString(diskSize)})")
+        }
       }
       if (!blockId.isBroadcast && blockStatus.isCached) {
         _cachedBlocks += blockId
       }
-    } else if (_blocks.containsKey(blockId)) {
+    } else if (blockExists) {
       // If isValid is not true, drop the block.
-      val blockStatus: BlockStatus = _blocks.get(blockId)
       _blocks.remove(blockId)
       _cachedBlocks -= blockId
-      if (blockStatus.storageLevel.useMemory) {
-        logInfo("Removed %s on %s in memory (size: %s, free: %s)".format(
-          blockId, blockManagerId.hostPort, 
Utils.bytesToString(blockStatus.memSize),
-          Utils.bytesToString(_remainingMem)))
+      if (originalLevel.useMemory) {
+        logInfo(s"Removed $blockId on ${blockManagerId.hostPort} in memory" +
+          s" (size: ${Utils.bytesToString(originalMemSize)}," +
+          s" free: ${Utils.bytesToString(_remainingMem)})")
       }
-      if (blockStatus.storageLevel.useDisk) {
-        logInfo("Removed %s on %s on disk (size: %s)".format(
-          blockId, blockManagerId.hostPort, 
Utils.bytesToString(blockStatus.diskSize)))
+      if (originalLevel.useDisk) {
+        logInfo(s"Removed $blockId on ${blockManagerId.hostPort} on disk" +
+          s" (size: ${Utils.bytesToString(originalDiskSize)})")
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to