Repository: spark
Updated Branches:
  refs/heads/branch-1.2 633d67cb7 -> 8ecabf4b7


[SPARK-4384] [PySpark] improve sort spilling

If there some big broadcasts (or other object) in Python worker, the free 
memory could be used for sorting will be too small, then it will keep spilling 
small files into disks, finally failed with too many open files.

This PR try to delay the spilling until the used memory goes over limit and 
start to increase since last spilling, it will increase the size of spilling 
files, improve the stability and performance in this cases. (We also do this in 
ExternalAggregator).

Author: Davies Liu <[email protected]>

Closes #3252 from davies/sort and squashes the following commits:

711fb6c [Davies Liu] improve sort spilling

(cherry picked from commit 73c8ea84a668f443eb18ce15ba97023da041d808)
Signed-off-by: Josh Rosen <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8ecabf4b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8ecabf4b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8ecabf4b

Branch: refs/heads/branch-1.2
Commit: 8ecabf4b7678d788faba6a202e883855be0c9f99
Parents: 633d67c
Author: Davies Liu <[email protected]>
Authored: Wed Nov 19 15:45:37 2014 -0800
Committer: Josh Rosen <[email protected]>
Committed: Wed Nov 19 15:46:50 2014 -0800

----------------------------------------------------------------------
 python/pyspark/shuffle.py | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8ecabf4b/python/pyspark/shuffle.py
----------------------------------------------------------------------
diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py
index 5931e92..10a7ccd 100644
--- a/python/pyspark/shuffle.py
+++ b/python/pyspark/shuffle.py
@@ -478,13 +478,21 @@ class ExternalSorter(object):
             os.makedirs(d)
         return os.path.join(d, str(n))
 
+    def _next_limit(self):
+        """
+        Return the next memory limit. If the memory is not released
+        after spilling, it will dump the data only when the used memory
+        starts to increase.
+        """
+        return max(self.memory_limit, get_used_memory() * 1.05)
+
     def sorted(self, iterator, key=None, reverse=False):
         """
         Sort the elements in iterator, do external sort when the memory
         goes above the limit.
         """
         global MemoryBytesSpilled, DiskBytesSpilled
-        batch = 100
+        batch, limit = 100, self._next_limit()
         chunks, current_chunk = [], []
         iterator = iter(iterator)
         while True:
@@ -504,6 +512,7 @@ class ExternalSorter(object):
                 chunks.append(self.serializer.load_stream(open(path)))
                 current_chunk = []
                 gc.collect()
+                limit = self._next_limit()
                 MemoryBytesSpilled += (used_memory - get_used_memory()) << 20
                 DiskBytesSpilled += os.path.getsize(path)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to