Repository: spark
Updated Branches:
  refs/heads/master bf49c2213 -> 8948ad3fb


[SPARK-7339] [PYSPARK] PySpark shuffle spill memory sometimes are not correct

In PySpark we get memory used before and after spill, then use the difference 
of these two value as memorySpilled, but if the before value is small than 
after value, then we will get a negative value, but this scenario 0 value may 
be more reasonable.

Below is the result in HistoryServer we have tested:
Index   ID      Attempt Status  Locality Level  Executor ID / Host      Launch 
Time     Duration        GC Time Input Size / Records    Write Time      
Shuffle Write Size / Records    Shuffle Spill (Memory)  Shuffle Spill (Disk)    
Errors
0       0       0       SUCCESS NODE_LOCAL      3 / vm119       2015/05/04 
17:31:06     21 s    0.1 s   128.1 MB (hadoop) / 3237        70 ms   10.1 MB / 
2529  0.0 B   5.7 MB
2       2       0       SUCCESS NODE_LOCAL      1 / vm118       2015/05/04 
17:31:06     22 s    89 ms   128.1 MB (hadoop) / 3205        0.1 s   10.1 MB / 
2529  -1048576.0 B    5.9 MB
1       1       0       SUCCESS NODE_LOCAL      2 / vm117       2015/05/04 
17:31:06     22 s    0.1 s   128.1 MB (hadoop) / 3271        68 ms   10.1 MB / 
2529  -1048576.0 B    5.6 MB
4       4       0       SUCCESS NODE_LOCAL      2 / vm117       2015/05/04 
17:31:06     22 s    0.1 s   128.1 MB (hadoop) / 3192        51 ms   10.1 MB / 
2529  -1048576.0 B    5.9 MB
3       3       0       SUCCESS NODE_LOCAL      3 / vm119       2015/05/04 
17:31:06     22 s    0.1 s   128.1 MB (hadoop) / 3262        51 ms   10.1 MB / 
2529  1024.0 KB       5.8 MB
5       5       0       SUCCESS NODE_LOCAL      1 / vm118       2015/05/04 
17:31:06     22 s    89 ms   128.1 MB (hadoop) / 3256        93 ms   10.1 MB / 
2529  -1048576.0 B    5.7 MB

/cc davies

Author: linweizhong <linweizh...@huawei.com>

Closes #5887 from Sephiroth-Lin/spark-7339 and squashes the following commits:

9186c81 [linweizhong] Use max function to get a nonnegative value
d41672b [linweizhong] Update MemoryBytesSpilled when memorySpilled > 0


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8948ad3f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8948ad3f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8948ad3f

Branch: refs/heads/master
Commit: 8948ad3fb5d5d095d3942855960d735f27d97dd5
Parents: bf49c22
Author: linweizhong <linweizh...@huawei.com>
Authored: Tue May 26 08:35:39 2015 -0700
Committer: Davies Liu <dav...@databricks.com>
Committed: Tue May 26 08:35:39 2015 -0700

----------------------------------------------------------------------
 python/pyspark/shuffle.py | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8948ad3f/python/pyspark/shuffle.py
----------------------------------------------------------------------
diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py
index 1d0b16c..81c420c 100644
--- a/python/pyspark/shuffle.py
+++ b/python/pyspark/shuffle.py
@@ -362,7 +362,7 @@ class ExternalMerger(Merger):
 
         self.spills += 1
         gc.collect()  # release the memory as much as possible
-        MemoryBytesSpilled += (used_memory - get_used_memory()) << 20
+        MemoryBytesSpilled += max(used_memory - get_used_memory(), 0) << 20
 
     def items(self):
         """ Return all merged items as iterator """
@@ -515,7 +515,7 @@ class ExternalSorter(object):
                 gc.collect()
                 batch //= 2
                 limit = self._next_limit()
-                MemoryBytesSpilled += (used_memory - get_used_memory()) << 20
+                MemoryBytesSpilled += max(used_memory - get_used_memory(), 0) 
<< 20
                 DiskBytesSpilled += os.path.getsize(path)
                 os.unlink(path)  # data will be deleted after close
 
@@ -630,7 +630,7 @@ class ExternalList(object):
         self.values = []
         gc.collect()
         DiskBytesSpilled += self._file.tell() - pos
-        MemoryBytesSpilled += (used_memory - get_used_memory()) << 20
+        MemoryBytesSpilled += max(used_memory - get_used_memory(), 0) << 20
 
 
 class ExternalListOfList(ExternalList):
@@ -794,7 +794,7 @@ class ExternalGroupBy(ExternalMerger):
 
         self.spills += 1
         gc.collect()  # release the memory as much as possible
-        MemoryBytesSpilled += (used_memory - get_used_memory()) << 20
+        MemoryBytesSpilled += max(used_memory - get_used_memory(), 0) << 20
 
     def _merged_items(self, index):
         size = sum(os.path.getsize(os.path.join(self._get_spill_dir(j), 
str(index)))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to