Repository: spark
Updated Branches:
  refs/heads/master c854b9fcb -> bb96012b7


[SPARK-3679] [PySpark] pickle the exact globals of functions

function.func_code.co_names has all the names used in the function, including 
name of attributes. It will pickle some unnecessary globals if there is a 
global having the same name with attribute (in co_names).

There is a regression introduced by #2144, revert part of changes in that PR.

cc JoshRosen

Author: Davies Liu <[email protected]>

Closes #2522 from davies/globals and squashes the following commits:

dfbccf5 [Davies Liu] fix bug while pickle globals of function


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bb96012b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bb96012b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bb96012b

Branch: refs/heads/master
Commit: bb96012b7360b099a19fecc80f0209b30f118ada
Parents: c854b9f
Author: Davies Liu <[email protected]>
Authored: Wed Sep 24 13:00:05 2014 -0700
Committer: Josh Rosen <[email protected]>
Committed: Wed Sep 24 13:00:05 2014 -0700

----------------------------------------------------------------------
 python/pyspark/cloudpickle.py | 42 ++++++++++++++++++++++++++++++++------
 python/pyspark/tests.py       | 18 ++++++++++++++++
 2 files changed, 54 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bb96012b/python/pyspark/cloudpickle.py
----------------------------------------------------------------------
diff --git a/python/pyspark/cloudpickle.py b/python/pyspark/cloudpickle.py
index 32dda38..bb07835 100644
--- a/python/pyspark/cloudpickle.py
+++ b/python/pyspark/cloudpickle.py
@@ -52,6 +52,7 @@ from functools import partial
 import itertools
 from copy_reg import _extension_registry, _inverted_registry, _extension_cache
 import new
+import dis
 import traceback
 import platform
 
@@ -61,6 +62,14 @@ PyImp = platform.python_implementation()
 import logging
 cloudLog = logging.getLogger("Cloud.Transport")
 
+#relevant opcodes
+STORE_GLOBAL = chr(dis.opname.index('STORE_GLOBAL'))
+DELETE_GLOBAL = chr(dis.opname.index('DELETE_GLOBAL'))
+LOAD_GLOBAL = chr(dis.opname.index('LOAD_GLOBAL'))
+GLOBAL_OPS = [STORE_GLOBAL, DELETE_GLOBAL, LOAD_GLOBAL]
+
+HAVE_ARGUMENT = chr(dis.HAVE_ARGUMENT)
+EXTENDED_ARG = chr(dis.EXTENDED_ARG)
 
 if PyImp == "PyPy":
     # register builtin type in `new`
@@ -304,16 +313,37 @@ class CloudPickler(pickle.Pickler):
         write(pickle.REDUCE)  # applies _fill_function on the tuple
 
     @staticmethod
-    def extract_code_globals(code):
+    def extract_code_globals(co):
         """
         Find all globals names read or written to by codeblock co
         """
-        names = set(code.co_names)
-        if code.co_consts:   # see if nested function have any global refs
-            for const in code.co_consts:
+        code = co.co_code
+        names = co.co_names
+        out_names = set()
+
+        n = len(code)
+        i = 0
+        extended_arg = 0
+        while i < n:
+            op = code[i]
+
+            i = i+1
+            if op >= HAVE_ARGUMENT:
+                oparg = ord(code[i]) + ord(code[i+1])*256 + extended_arg
+                extended_arg = 0
+                i = i+2
+                if op == EXTENDED_ARG:
+                    extended_arg = oparg*65536L
+                if op in GLOBAL_OPS:
+                    out_names.add(names[oparg])
+        #print 'extracted', out_names, ' from ', names
+
+        if co.co_consts:   # see if nested function have any global refs
+            for const in co.co_consts:
                 if type(const) is types.CodeType:
-                    names |= CloudPickler.extract_code_globals(const)
-        return names
+                    out_names |= CloudPickler.extract_code_globals(const)
+
+        return out_names
 
     def extract_func_data(self, func):
         """

http://git-wip-us.apache.org/repos/asf/spark/blob/bb96012b/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 4483bf8..d1bb203 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -213,6 +213,24 @@ class SerializationTestCase(unittest.TestCase):
         out2 = ser.loads(ser.dumps(out1))
         self.assertEquals(out1, out2)
 
+    def test_func_globals(self):
+
+        class Unpicklable(object):
+            def __reduce__(self):
+                raise Exception("not picklable")
+
+        global exit
+        exit = Unpicklable()
+
+        ser = CloudPickleSerializer()
+        self.assertRaises(Exception, lambda: ser.dumps(exit))
+
+        def foo():
+            sys.exit(0)
+
+        self.assertTrue("exit" in foo.func_code.co_names)
+        ser.dumps(foo)
+
 
 class PySparkTestCase(unittest.TestCase):
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to