Repository: spark
Updated Branches:
  refs/heads/branch-2.3 1708de27e -> 28973e152


[SPARK-21945][YARN][PYTHON] Make --py-files work with PySpark shell in Yarn 
client mode

When we run _PySpark shell with Yarn client mode_, specified `--py-files` are 
not recognised in _driver side_.

Here are the steps I took to check:

```bash
$ cat /home/spark/tmp.py
def testtest():
    return 1
```

```bash
$ ./bin/pyspark --master yarn --deploy-mode client --py-files /home/spark/tmp.py
```

```python
>>> def test():
...     import tmp
...     return tmp.testtest()
...
>>> spark.range(1).rdd.map(lambda _: test()).collect()  # executor side
[1]
>>> test()  # driver side
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 2, in test
ImportError: No module named tmp
```

Unlike Yarn cluster and client mode with Spark submit, when Yarn client mode 
with PySpark shell specifically,

1. It first runs Python shell via:

https://github.com/apache/spark/blob/3cb82047f2f51af553df09b9323796af507d36f8/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java#L158
 as pointed out by tgravescs in the JIRA.

2. this triggers shell.py and submit another application to launch a py4j 
gateway:

https://github.com/apache/spark/blob/209b9361ac8a4410ff797cff1115e1888e2f7e66/python/pyspark/java_gateway.py#L45-L60

3. it runs a Py4J gateway:

https://github.com/apache/spark/blob/3cb82047f2f51af553df09b9323796af507d36f8/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L425

4. it copies (or downloads) --py-files  into local temp directory:

https://github.com/apache/spark/blob/3cb82047f2f51af553df09b9323796af507d36f8/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L365-L376

and then these files are set up to `spark.submit.pyFiles`

5. Py4J JVM is launched and then the Python paths are set via:

https://github.com/apache/spark/blob/7013eea11cb32b1e0038dc751c485da5c94a484b/python/pyspark/context.py#L209-L216

However, these are not actually set because those files were copied into a tmp 
directory in 4. whereas this code path looks for `SparkFiles.getRootDirectory` 
where the files are stored only when `SparkContext.addFile()` is called.

In other cluster mode, `spark.files` are set via:

https://github.com/apache/spark/blob/3cb82047f2f51af553df09b9323796af507d36f8/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L554-L555

and those files are explicitly added via:

https://github.com/apache/spark/blob/ecb8b383af1cf1b67f3111c148229e00c9c17c40/core/src/main/scala/org/apache/spark/SparkContext.scala#L395

So we are fine in other modes.

In case of Yarn client and cluster with _submit_, these are manually being 
handled. In particular https://github.com/apache/spark/pull/6360 added most of 
the logics. In this case, the Python path looks manually set via, for example, 
`deploy.PythonRunner`. We don't use `spark.files` here.

I tried to make an isolated approach as possible as I can: simply copy py file 
or zip files into `SparkFiles.getRootDirectory()` in driver side if not 
existing. Another possible way is to set `spark.files` but it does unnecessary 
stuff together and sounds a bit invasive.

**Before**

```python
>>> def test():
...     import tmp
...     return tmp.testtest()
...
>>> spark.range(1).rdd.map(lambda _: test()).collect()
[1]
>>> test()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 2, in test
ImportError: No module named tmp
```

**After**

```python
>>> def test():
...     import tmp
...     return tmp.testtest()
...
>>> spark.range(1).rdd.map(lambda _: test()).collect()
[1]
>>> test()
1
```

I manually tested in standalone and yarn cluster with PySpark shell. .zip and 
.py files were also tested with the similar steps above. It's difficult to add 
a test.

Author: hyukjinkwon <gurwls...@apache.org>

Closes #21267 from HyukjinKwon/SPARK-21945.

(cherry picked from commit 9a641e7f721d01d283afb09dccefaf32972d3c04)
Signed-off-by: Marcelo Vanzin <van...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/28973e15
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/28973e15
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/28973e15

Branch: refs/heads/branch-2.3
Commit: 28973e152f7de73f82583ab373ffdecc55dcabc2
Parents: 1708de2
Author: hyukjinkwon <gurwls...@apache.org>
Authored: Thu May 17 12:07:58 2018 +0800
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Thu May 17 12:46:39 2018 -0700

----------------------------------------------------------------------
 python/pyspark/context.py | 18 +++++++++++++++---
 1 file changed, 15 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/28973e15/python/pyspark/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index d1f89e8..8805597 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -211,9 +211,21 @@ class SparkContext(object):
         for path in self._conf.get("spark.submit.pyFiles", "").split(","):
             if path != "":
                 (dirname, filename) = os.path.split(path)
-                if filename[-4:].lower() in self.PACKAGE_EXTENSIONS:
-                    self._python_includes.append(filename)
-                    sys.path.insert(1, 
os.path.join(SparkFiles.getRootDirectory(), filename))
+                try:
+                    filepath = os.path.join(SparkFiles.getRootDirectory(), 
filename)
+                    if not os.path.exists(filepath):
+                        # In case of YARN with shell mode, 
'spark.submit.pyFiles' files are
+                        # not added via SparkContext.addFile. Here we check if 
the file exists,
+                        # try to copy and then add it to the path. See 
SPARK-21945.
+                        shutil.copyfile(path, filepath)
+                    if filename[-4:].lower() in self.PACKAGE_EXTENSIONS:
+                        self._python_includes.append(filename)
+                        sys.path.insert(1, filepath)
+                except Exception:
+                    warnings.warn(
+                        "Failed to add file [%s] speficied in 
'spark.submit.pyFiles' to "
+                        "Python path:\n  %s" % (path, "\n  ".join(sys.path)),
+                        RuntimeWarning)
 
         # Create a temporary directory inside spark.local.dir:
         local_dir = 
self._jvm.org.apache.spark.util.Utils.getLocalDir(self._jsc.sc().conf())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to