Repository: spark Updated Branches: refs/heads/master c86eec584 -> 12738c1ae
SPARK-1115: Catch depickling errors This surroungs the complete worker code in a try/except block so we catch any error that arrives. An example would be the depickling failing for some reason @JoshRosen Author: Bouke van der Bijl <[email protected]> Closes #644 from bouk/catch-depickling-errors and squashes the following commits: f0f67cc [Bouke van der Bijl] Lol indentation 0e4d504 [Bouke van der Bijl] Surround the complete python worker with the try block Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/12738c1a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/12738c1a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/12738c1a Branch: refs/heads/master Commit: 12738c1aec136acd7f2e3e2f8f2b541db0890630 Parents: c86eec5 Author: Bouke van der Bijl <[email protected]> Authored: Wed Feb 26 14:50:37 2014 -0800 Committer: Josh Rosen <[email protected]> Committed: Wed Feb 26 14:51:21 2014 -0800 ---------------------------------------------------------------------- python/pyspark/worker.py | 48 +++++++++++++++++++++---------------------- 1 file changed, 24 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/12738c1a/python/pyspark/worker.py ---------------------------------------------------------------------- diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 1586463..4c214ef 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -45,34 +45,34 @@ def report_times(outfile, boot, init, finish): def main(infile, outfile): - boot_time = time.time() - split_index = read_int(infile) - if split_index == -1: # for unit tests - return + try: + boot_time = time.time() + split_index = read_int(infile) + if split_index == -1: # for unit tests + return - # fetch name of workdir - spark_files_dir = utf8_deserializer.loads(infile) - SparkFiles._root_directory = spark_files_dir - SparkFiles._is_running_on_worker = True + # fetch name of workdir + spark_files_dir = utf8_deserializer.loads(infile) + SparkFiles._root_directory = spark_files_dir + SparkFiles._is_running_on_worker = True - # fetch names and values of broadcast variables - num_broadcast_variables = read_int(infile) - for _ in range(num_broadcast_variables): - bid = read_long(infile) - value = pickleSer._read_with_length(infile) - _broadcastRegistry[bid] = Broadcast(bid, value) + # fetch names and values of broadcast variables + num_broadcast_variables = read_int(infile) + for _ in range(num_broadcast_variables): + bid = read_long(infile) + value = pickleSer._read_with_length(infile) + _broadcastRegistry[bid] = Broadcast(bid, value) - # fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH - sys.path.append(spark_files_dir) # *.py files that were added will be copied here - num_python_includes = read_int(infile) - for _ in range(num_python_includes): - filename = utf8_deserializer.loads(infile) - sys.path.append(os.path.join(spark_files_dir, filename)) + # fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH + sys.path.append(spark_files_dir) # *.py files that were added will be copied here + num_python_includes = read_int(infile) + for _ in range(num_python_includes): + filename = utf8_deserializer.loads(infile) + sys.path.append(os.path.join(spark_files_dir, filename)) - command = pickleSer._read_with_length(infile) - (func, deserializer, serializer) = command - init_time = time.time() - try: + command = pickleSer._read_with_length(infile) + (func, deserializer, serializer) = command + init_time = time.time() iterator = deserializer.load_stream(infile) serializer.dump_stream(func(split_index, iterator), outfile) except Exception as e:
