Here’s a repro for a very similar issue where Spark hangs on the UDF, which I think is related to the SPARK_HOME issue. I posted the repro on the EMR forum <https://forums.aws.amazon.com/thread.jspa?messageID=791019󁇫>, but in case you can’t access it:
1. I’m running EMR 5.6.0, Spark 2.1.1, and Python 3.5.1. 2. Create a simple Python package by creating a directory called udftest. 3. Inside udftest put an empty __init__.py and a nothing.py. 4. nothing.py should have the following contents: from pyspark.sql.types import IntegerType from pyspark.sql.functions import udf def do_nothing(s: int) -> int: return s do_nothing_udf = udf(do_nothing, IntegerType()) 5. From your home directory (the one that contains your udftest package), create a ZIP that we will ship to YARN. pushd udftest/ zip -rq ../udftest.zip * popd 6. Start a PySpark shell with our test package. export PYSPARK_PYTHON=python3 pyspark \ --conf "spark.yarn.appMasterEnv.PYSPARK_PYTHON=$PYSPARK_PYTHON" \ --archives "udftest.zip#udftest" 7. Now try to use the UDF. It will hang. from udftest.nothing import do_nothing_udf spark.range(10).select(do_nothing_udf('id')).show() # hangs 8. The strange thing is, if you define the exact same UDF directly in the active PySpark shell, it works fine! It’s only when you import it from a user-defined module that you see this issue. On Thu, Jun 22, 2017 at 12:08 PM Nick Chammas <nicholas.cham...@gmail.com> wrote: > I’m seeing a strange issue on EMR which I posted about here > <https://forums.aws.amazon.com/thread.jspa?threadID=248805&tstart=0&messageID=790954#790954> > . > > In brief, when I try to import a UDF I’ve defined, Python somehow fails to > find Spark. This exact code works for me locally and works on our > on-premises CDH cluster under YARN. > > This is the traceback: > > Traceback (most recent call last): > File "<stdin>", line 1, in <module> > File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 318, in show > print(self._jdf.showString(n, 20)) > File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", > line 1133, in __call__ > File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco > return f(*a, **kw) > File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line > 319, in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling o89.showString. > : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 > in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 > (TID 3, ip-10-97-35-12.ec2.internal, executor 1): > org.apache.spark.api.python.PythonException: Traceback (most recent call > last): > File > "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/pyspark.zip/pyspark/worker.py", > line 161, in main > func, profiler, deserializer, serializer = read_udfs(pickleSer, infile) > File > "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/pyspark.zip/pyspark/worker.py", > line 91, in read_udfs > _, udf = read_single_udf(pickleSer, infile) > File > "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/pyspark.zip/pyspark/worker.py", > line 78, in read_single_udf > f, return_type = read_command(pickleSer, infile) > File > "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/pyspark.zip/pyspark/worker.py", > line 54, in read_command > command = serializer._read_with_length(file) > File > "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/pyspark.zip/pyspark/serializers.py", > line 169, in _read_with_length > return self.loads(obj) > File > "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/pyspark.zip/pyspark/serializers.py", > line 451, in loads > return pickle.loads(obj, encoding=encoding) > File > "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/splinkr/person.py", > line 7, in <module> > from splinkr.util import repartition_to_size > File > "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/splinkr/util.py", > line 34, in <module> > containsNull=False, > File > "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/pyspark.zip/pyspark/sql/functions.py", > line 1872, in udf > return UserDefinedFunction(f, returnType) > File > "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/pyspark.zip/pyspark/sql/functions.py", > line 1830, in __init__ > self._judf = self._create_judf(name) > File > "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/pyspark.zip/pyspark/sql/functions.py", > line 1834, in _create_judf > sc = SparkContext.getOrCreate() > File > "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/pyspark.zip/pyspark/context.py", > line 310, in getOrCreate > SparkContext(conf=conf or SparkConf()) > File > "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/pyspark.zip/pyspark/context.py", > line 115, in __init__ > SparkContext._ensure_initialized(self, gateway=gateway, conf=conf) > File > "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/pyspark.zip/pyspark/context.py", > line 259, in _ensure_initialized > SparkContext._gateway = gateway or launch_gateway(conf) > File > "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/pyspark.zip/pyspark/java_gateway.py", > line 77, in launch_gateway > proc = Popen(command, stdin=PIPE, preexec_fn=preexec_func, env=env) > File "/usr/lib64/python3.5/subprocess.py", line 950, in __init__ > restore_signals, start_new_session) > File "/usr/lib64/python3.5/subprocess.py", line 1544, in _execute_child > raise child_exception_type(errno_num, err_msg) > FileNotFoundError: [Errno 2] No such file or directory: './bin/spark-submit' > > Does anyone have clues about what might be going on? > > Nick > > > ------------------------------ > View this message in context: Trouble with PySpark UDFs and SPARK_HOME > only on EMR > <http://apache-spark-user-list.1001560.n3.nabble.com/Trouble-with-PySpark-UDFs-and-SPARK-HOME-only-on-EMR-tp28778.html> > Sent from the Apache Spark User List mailing list archive > <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com. >