Repository: spark
Updated Branches:
  refs/heads/master e053c5581 -> 59f84a953


[SPARK-1687] [PySpark] pickable namedtuple

Add an hook to replace original namedtuple with an pickable one, then 
namedtuple could be used in RDDs.

PS: pyspark should be import BEFORE "from collections import namedtuple"

Author: Davies Liu <davies....@gmail.com>

Closes #1623 from davies/namedtuple and squashes the following commits:

045dad8 [Davies Liu] remove unrelated code changes
4132f32 [Davies Liu] address comment
55b1c1a [Davies Liu] fix tests
61f86eb [Davies Liu] replace all the reference of namedtuple to new hacked one
98df6c6 [Davies Liu] Merge branch 'master' of github.com:apache/spark into 
namedtuple
f7b1bde [Davies Liu] add hack for CloudPickleSerializer
0c5c849 [Davies Liu] Merge branch 'master' of github.com:apache/spark into 
namedtuple
21991e6 [Davies Liu] hack namedtuple in __main__ module, make it picklable.
93b03b8 [Davies Liu] pickable namedtuple


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/59f84a95
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/59f84a95
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/59f84a95

Branch: refs/heads/master
Commit: 59f84a9531f7974a053fd4963ce9afd88273ea4c
Parents: e053c55
Author: Davies Liu <davies....@gmail.com>
Authored: Mon Aug 4 12:13:41 2014 -0700
Committer: Josh Rosen <joshro...@apache.org>
Committed: Mon Aug 4 12:13:41 2014 -0700

----------------------------------------------------------------------
 python/pyspark/serializers.py | 60 ++++++++++++++++++++++++++++++++++++++
 python/pyspark/tests.py       | 19 ++++++++++++
 2 files changed, 79 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/59f84a95/python/pyspark/serializers.py
----------------------------------------------------------------------
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 03b31ae..1b52c14 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -65,6 +65,9 @@ from itertools import chain, izip, product
 import marshal
 import struct
 import sys
+import types
+import collections
+
 from pyspark import cloudpickle
 
 
@@ -267,6 +270,63 @@ class NoOpSerializer(FramedSerializer):
         return obj
 
 
+# Hook namedtuple, make it picklable
+
+__cls = {}
+
+
+def _restore(name, fields, value):
+    """ Restore an object of namedtuple"""
+    k = (name, fields)
+    cls = __cls.get(k)
+    if cls is None:
+        cls = collections.namedtuple(name, fields)
+        __cls[k] = cls
+    return cls(*value)
+
+
+def _hack_namedtuple(cls):
+    """ Make class generated by namedtuple picklable """
+    name = cls.__name__
+    fields = cls._fields
+    def __reduce__(self):
+        return (_restore, (name, fields, tuple(self)))
+    cls.__reduce__ = __reduce__
+    return cls
+
+
+def _hijack_namedtuple():
+    """ Hack namedtuple() to make it picklable """
+    global _old_namedtuple # or it will put in closure
+
+    def _copy_func(f):
+        return types.FunctionType(f.func_code, f.func_globals, f.func_name,
+                f.func_defaults, f.func_closure)
+
+    _old_namedtuple = _copy_func(collections.namedtuple)
+
+    def namedtuple(name, fields, verbose=False, rename=False):
+        cls = _old_namedtuple(name, fields, verbose, rename)
+        return _hack_namedtuple(cls)
+
+    # replace namedtuple with new one
+    collections.namedtuple.func_globals["_old_namedtuple"] = _old_namedtuple
+    collections.namedtuple.func_globals["_hack_namedtuple"] = _hack_namedtuple
+    collections.namedtuple.func_code = namedtuple.func_code
+
+    # hack the cls already generated by namedtuple
+    # those created in other module can be pickled as normal,
+    # so only hack those in __main__ module
+    for n, o in sys.modules["__main__"].__dict__.iteritems():
+        if (type(o) is type and o.__base__ is tuple
+            and hasattr(o, "_fields")
+            and "__reduce__" not in o.__dict__):
+            _hack_namedtuple(o) # hack inplace
+
+
+_hijack_namedtuple()
+
+
 class PickleSerializer(FramedSerializer):
     """
     Serializes objects using Python's cPickle serializer:

http://git-wip-us.apache.org/repos/asf/spark/blob/59f84a95/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index acc3c30..4ac94ba 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -112,6 +112,17 @@ class TestMerger(unittest.TestCase):
         m._cleanup()
 
 
+class SerializationTestCase(unittest.TestCase):
+
+    def test_namedtuple(self):
+        from collections import namedtuple
+        from cPickle import dumps, loads
+        P = namedtuple("P", "x y")
+        p1 = P(1, 3)
+        p2 = loads(dumps(p1, 2))
+        self.assertEquals(p1, p2)
+
+
 class PySparkTestCase(unittest.TestCase):
 
     def setUp(self):
@@ -298,6 +309,14 @@ class TestRDDFunctions(PySparkTestCase):
         self.assertEqual([1], rdd.map(itemgetter(1)).collect())
         self.assertEqual([(2, 3)], rdd.map(itemgetter(2, 3)).collect())
 
+    def test_namedtuple_in_rdd(self):
+        from collections import namedtuple
+        Person = namedtuple("Person", "id firstName lastName")
+        jon = Person(1, "Jon", "Doe")
+        jane = Person(2, "Jane", "Doe")
+        theDoes = self.sc.parallelize([jon, jane])
+        self.assertEquals([jon, jane], theDoes.collect())
+
 
 class TestIO(PySparkTestCase):
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to