Repository: spark
Updated Branches:
  refs/heads/master 446c45bd8 -> 8a333d2da


[SPARK-14243][CORE] update task metrics when removing blocks

## What changes were proposed in this pull request?

This PR try to use `incUpdatedBlockStatuses ` to update the 
`updatedBlockStatuses ` when removing blocks, making sure `BlockManager` 
correctly updates `updatedBlockStatuses`

## How was this patch tested?

test("updated block statuses") in BlockManagerSuite.scala

Author: jeanlyn <[email protected]>

Closes #12091 from jeanlyn/updateBlock.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8a333d2d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8a333d2d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8a333d2d

Branch: refs/heads/master
Commit: 8a333d2da859fd593bda183413630bc3757529c9
Parents: 446c45b
Author: jeanlyn <[email protected]>
Authored: Thu Mar 31 12:04:42 2016 -0700
Committer: Andrew Or <[email protected]>
Committed: Thu Mar 31 12:04:42 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/storage/BlockManager.scala     |  7 +++++--
 .../org/apache/spark/storage/BlockManagerSuite.scala      | 10 ++++++++++
 2 files changed, 15 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8a333d2d/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 0c7763f..3014caf 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1264,9 +1264,12 @@ private[spark] class BlockManager(
             "the disk, memory, or external block store")
         }
         blockInfoManager.removeBlock(blockId)
+        val removeBlockStatus = getCurrentBlockStatus(blockId, info)
         if (tellMaster && info.tellMaster) {
-          val status = getCurrentBlockStatus(blockId, info)
-          reportBlockStatus(blockId, info, status)
+          reportBlockStatus(blockId, info, removeBlockStatus)
+        }
+        Option(TaskContext.get()).foreach { c =>
+          c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, 
removeBlockStatus)))
         }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/8a333d2d/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 6fc32cb..9f3a775 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -928,6 +928,16 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     assert(!store.diskStore.contains("list3"), "list3 was in disk store")
     assert(!store.diskStore.contains("list4"), "list4 was in disk store")
     assert(!store.diskStore.contains("list5"), "list5 was in disk store")
+
+    // remove block - list2 should be removed from disk
+    val updatedBlocks6 = getUpdatedBlocks {
+      store.removeBlock(
+        "list2", tellMaster = true)
+    }
+    assert(updatedBlocks6.size === 1)
+    assert(updatedBlocks6.head._1 === TestBlockId("list2"))
+    assert(updatedBlocks6.head._2.storageLevel == StorageLevel.NONE)
+    assert(!store.diskStore.contains("list2"), "list2 was in disk store")
   }
 
   test("query block statuses") {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to