Repository: spark
Updated Branches:
  refs/heads/master ad290402a -> 05a451491


[SPARK-20386][SPARK CORE] modify the log info if the block exists on the slave 
already

## What changes were proposed in this pull request?
Modify the added memory size to memSize-originalMemSize if the  block exists on 
the slave already
since if the  block exists, the added memory size should be 
memSize-originalMemSize; if originalMemSize is bigger than memSize ,then the 
log info should be Removed memory, removed size should be 
originalMemSize-memSize

## How was this patch tested?
Multiple runs on existing unit tests

(Please explain how this patch was tested. E.g. unit tests, integration tests, 
manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: eatoncys <[email protected]>

Closes #17683 from eatoncys/SPARK-20386.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/05a45149
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/05a45149
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/05a45149

Branch: refs/heads/master
Commit: 05a451491d535c0828413ce2eb06fe94571069ac
Parents: ad29040
Author: eatoncys <[email protected]>
Authored: Sat Apr 22 12:29:35 2017 +0100
Committer: Sean Owen <[email protected]>
Committed: Sat Apr 22 12:29:35 2017 +0100

----------------------------------------------------------------------
 .../storage/BlockManagerMasterEndpoint.scala    | 52 +++++++++++++-------
 1 file changed, 35 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/05a45149/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 467c3e0..6f85b9e 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -497,11 +497,17 @@ private[spark] class BlockManagerInfo(
 
     updateLastSeenMs()
 
-    if (_blocks.containsKey(blockId)) {
+    val blockExists = _blocks.containsKey(blockId)
+    var originalMemSize: Long = 0
+    var originalDiskSize: Long = 0
+    var originalLevel: StorageLevel = StorageLevel.NONE
+
+    if (blockExists) {
       // The block exists on the slave already.
       val blockStatus: BlockStatus = _blocks.get(blockId)
-      val originalLevel: StorageLevel = blockStatus.storageLevel
-      val originalMemSize: Long = blockStatus.memSize
+      originalLevel = blockStatus.storageLevel
+      originalMemSize = blockStatus.memSize
+      originalDiskSize = blockStatus.diskSize
 
       if (originalLevel.useMemory) {
         _remainingMem += originalMemSize
@@ -520,32 +526,44 @@ private[spark] class BlockManagerInfo(
         blockStatus = BlockStatus(storageLevel, memSize = memSize, diskSize = 
0)
         _blocks.put(blockId, blockStatus)
         _remainingMem -= memSize
-        logInfo("Added %s in memory on %s (size: %s, free: %s)".format(
-          blockId, blockManagerId.hostPort, Utils.bytesToString(memSize),
-          Utils.bytesToString(_remainingMem)))
+        if (blockExists) {
+          logInfo(s"Updated $blockId in memory on ${blockManagerId.hostPort}" +
+            s" (current size: ${Utils.bytesToString(memSize)}," +
+            s" original size: ${Utils.bytesToString(originalMemSize)}," +
+            s" free: ${Utils.bytesToString(_remainingMem)})")
+        } else {
+          logInfo(s"Added $blockId in memory on ${blockManagerId.hostPort}" +
+            s" (size: ${Utils.bytesToString(memSize)}," +
+            s" free: ${Utils.bytesToString(_remainingMem)})")
+        }
       }
       if (storageLevel.useDisk) {
         blockStatus = BlockStatus(storageLevel, memSize = 0, diskSize = 
diskSize)
         _blocks.put(blockId, blockStatus)
-        logInfo("Added %s on disk on %s (size: %s)".format(
-          blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
+        if (blockExists) {
+          logInfo(s"Updated $blockId on disk on ${blockManagerId.hostPort}" +
+            s" (current size: ${Utils.bytesToString(diskSize)}," +
+            s" original size: ${Utils.bytesToString(originalDiskSize)})")
+        } else {
+          logInfo(s"Added $blockId on disk on ${blockManagerId.hostPort}" +
+            s" (size: ${Utils.bytesToString(diskSize)})")
+        }
       }
       if (!blockId.isBroadcast && blockStatus.isCached) {
         _cachedBlocks += blockId
       }
-    } else if (_blocks.containsKey(blockId)) {
+    } else if (blockExists) {
       // If isValid is not true, drop the block.
-      val blockStatus: BlockStatus = _blocks.get(blockId)
       _blocks.remove(blockId)
       _cachedBlocks -= blockId
-      if (blockStatus.storageLevel.useMemory) {
-        logInfo("Removed %s on %s in memory (size: %s, free: %s)".format(
-          blockId, blockManagerId.hostPort, 
Utils.bytesToString(blockStatus.memSize),
-          Utils.bytesToString(_remainingMem)))
+      if (originalLevel.useMemory) {
+        logInfo(s"Removed $blockId on ${blockManagerId.hostPort} in memory" +
+          s" (size: ${Utils.bytesToString(originalMemSize)}," +
+          s" free: ${Utils.bytesToString(_remainingMem)})")
       }
-      if (blockStatus.storageLevel.useDisk) {
-        logInfo("Removed %s on %s on disk (size: %s)".format(
-          blockId, blockManagerId.hostPort, 
Utils.bytesToString(blockStatus.diskSize)))
+      if (originalLevel.useDisk) {
+        logInfo(s"Removed $blockId on ${blockManagerId.hostPort} on disk" +
+          s" (size: ${Utils.bytesToString(originalDiskSize)})")
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to