Repository: spark
Updated Branches:
  refs/heads/master c05d11bb3 -> 3776f2f28


Add Python includes to path before depickling broadcast values

This fixes https://issues.apache.org/jira/browse/SPARK-1731 by adding the 
Python includes to the PYTHONPATH before depickling the broadcast values

@airhorns

Author: Bouke van der Bijl <boukevanderb...@gmail.com>

Closes #656 from bouk/python-includes-before-broadcast and squashes the 
following commits:

7b0dfe4 [Bouke van der Bijl] Add Python includes to path before depickling 
broadcast values


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3776f2f2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3776f2f2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3776f2f2

Branch: refs/heads/master
Commit: 3776f2f283842543ff766398292532c6e94221cc
Parents: c05d11b
Author: Bouke van der Bijl <boukevanderb...@gmail.com>
Authored: Sat May 10 13:02:13 2014 -0700
Committer: Patrick Wendell <pwend...@gmail.com>
Committed: Sat May 10 13:02:13 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/api/python/PythonRDD.scala | 10 +++++-----
 python/pyspark/worker.py                              | 14 +++++++-------
 2 files changed, 12 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3776f2f2/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index fecd976..388b838 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -179,6 +179,11 @@ private[spark] class PythonRDD[T: ClassTag](
         dataOut.writeInt(split.index)
         // sparkFilesDir
         PythonRDD.writeUTF(SparkFiles.getRootDirectory, dataOut)
+        // Python includes (*.zip and *.egg files)
+        dataOut.writeInt(pythonIncludes.length)
+        for (include <- pythonIncludes) {
+          PythonRDD.writeUTF(include, dataOut)
+        }
         // Broadcast variables
         dataOut.writeInt(broadcastVars.length)
         for (broadcast <- broadcastVars) {
@@ -186,11 +191,6 @@ private[spark] class PythonRDD[T: ClassTag](
           dataOut.writeInt(broadcast.value.length)
           dataOut.write(broadcast.value)
         }
-        // Python includes (*.zip and *.egg files)
-        dataOut.writeInt(pythonIncludes.length)
-        for (include <- pythonIncludes) {
-          PythonRDD.writeUTF(include, dataOut)
-        }
         dataOut.flush()
         // Serialized command:
         dataOut.writeInt(command.length)

http://git-wip-us.apache.org/repos/asf/spark/blob/3776f2f2/python/pyspark/worker.py
----------------------------------------------------------------------
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 4c214ef..f43210c 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -56,13 +56,6 @@ def main(infile, outfile):
         SparkFiles._root_directory = spark_files_dir
         SparkFiles._is_running_on_worker = True
 
-        # fetch names and values of broadcast variables
-        num_broadcast_variables = read_int(infile)
-        for _ in range(num_broadcast_variables):
-            bid = read_long(infile)
-            value = pickleSer._read_with_length(infile)
-            _broadcastRegistry[bid] = Broadcast(bid, value)
-
         # fetch names of includes (*.zip and *.egg files) and construct 
PYTHONPATH
         sys.path.append(spark_files_dir) # *.py files that were added will be 
copied here
         num_python_includes =  read_int(infile)
@@ -70,6 +63,13 @@ def main(infile, outfile):
             filename = utf8_deserializer.loads(infile)
             sys.path.append(os.path.join(spark_files_dir, filename))
 
+        # fetch names and values of broadcast variables
+        num_broadcast_variables = read_int(infile)
+        for _ in range(num_broadcast_variables):
+            bid = read_long(infile)
+            value = pickleSer._read_with_length(infile)
+            _broadcastRegistry[bid] = Broadcast(bid, value)
+
         command = pickleSer._read_with_length(infile)
         (func, deserializer, serializer) = command
         init_time = time.time()

Reply via email to