Repository: spark
Updated Branches:
  refs/heads/branch-1.4 9dabc1293 -> ca23c3b01


[SPARK-8202] [PYSPARK] fix infinite loop during external sort in PySpark

The batch size during external sort will grow up to max 10000, then shrink down 
to zero, causing infinite loop.
Given the assumption that the items usually have similar size, so we don't need 
to adjust the batch size after first spill.

cc JoshRosen rxin angelini

Author: Davies Liu <[email protected]>

Closes #6714 from davies/batch_size and squashes the following commits:

b170dfb [Davies Liu] update test
b9be832 [Davies Liu] Merge branch 'batch_size' of github.com:davies/spark into 
batch_size
6ade745 [Davies Liu] update test
5c21777 [Davies Liu] Update shuffle.py
e746aec [Davies Liu] fix batch size during sort


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ca23c3b0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ca23c3b0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ca23c3b0

Branch: refs/heads/branch-1.4
Commit: ca23c3b0147de9bcc22e3b9c7b74d20df6402137
Parents: 9dabc12
Author: Davies Liu <[email protected]>
Authored: Thu Jun 18 13:45:58 2015 -0700
Committer: Josh Rosen <[email protected]>
Committed: Thu Jun 18 13:49:32 2015 -0700

----------------------------------------------------------------------
 python/pyspark/shuffle.py | 5 +----
 python/pyspark/tests.py   | 5 ++++-
 2 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ca23c3b0/python/pyspark/shuffle.py
----------------------------------------------------------------------
diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py
index 81c420c..67752c0 100644
--- a/python/pyspark/shuffle.py
+++ b/python/pyspark/shuffle.py
@@ -486,7 +486,7 @@ class ExternalSorter(object):
         goes above the limit.
         """
         global MemoryBytesSpilled, DiskBytesSpilled
-        batch, limit = 100, self.memory_limit
+        batch, limit = 100, self._next_limit()
         chunks, current_chunk = [], []
         iterator = iter(iterator)
         while True:
@@ -512,9 +512,6 @@ class ExternalSorter(object):
                     f.close()
                 chunks.append(load(open(path, 'rb')))
                 current_chunk = []
-                gc.collect()
-                batch //= 2
-                limit = self._next_limit()
                 MemoryBytesSpilled += max(used_memory - get_used_memory(), 0) 
<< 20
                 DiskBytesSpilled += os.path.getsize(path)
                 os.unlink(path)  # data will be deleted after close

http://git-wip-us.apache.org/repos/asf/spark/blob/ca23c3b0/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 11b402e..7826542 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -179,9 +179,12 @@ class SorterTests(unittest.TestCase):
                          list(sorter.sorted(l, key=lambda x: -x, 
reverse=True)))
 
     def test_external_sort(self):
+        class CustomizedSorter(ExternalSorter):
+            def _next_limit(self):
+                return self.memory_limit
         l = list(range(1024))
         random.shuffle(l)
-        sorter = ExternalSorter(1)
+        sorter = CustomizedSorter(1)
         self.assertEqual(sorted(l), list(sorter.sorted(l)))
         self.assertGreater(shuffle.DiskBytesSpilled, 0)
         last = shuffle.DiskBytesSpilled


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to