Repository: spark
Updated Branches:
refs/heads/master bfd75cdfb -> 9a641e7f7
[SPARK-21945][YARN][PYTHON] Make --py-files work with PySpark shell in Yarn
client mode
## What changes were proposed in this pull request?
### Problem
When we run _PySpark shell with Yarn client mode_, specified `--py-files` are
not recognised in _driver side_.
Here are the steps I took to check:
```bash
$ cat /home/spark/tmp.py
def testtest():
return 1
```
```bash
$ ./bin/pyspark --master yarn --deploy-mode client --py-files /home/spark/tmp.py
```
```python
>>> def test():
... import tmp
... return tmp.testtest()
...
>>> spark.range(1).rdd.map(lambda _: test()).collect() # executor side
[1]
>>> test() # driver side
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 2, in test
ImportError: No module named tmp
```
### How did it happen?
Unlike Yarn cluster and client mode with Spark submit, when Yarn client mode
with PySpark shell specifically,
1. It first runs Python shell via:
https://github.com/apache/spark/blob/3cb82047f2f51af553df09b9323796af507d36f8/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java#L158
as pointed out by tgravescs in the JIRA.
2. this triggers shell.py and submit another application to launch a py4j
gateway:
https://github.com/apache/spark/blob/209b9361ac8a4410ff797cff1115e1888e2f7e66/python/pyspark/java_gateway.py#L45-L60
3. it runs a Py4J gateway:
https://github.com/apache/spark/blob/3cb82047f2f51af553df09b9323796af507d36f8/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L425
4. it copies (or downloads) --py-files into local temp directory:
https://github.com/apache/spark/blob/3cb82047f2f51af553df09b9323796af507d36f8/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L365-L376
and then these files are set up to `spark.submit.pyFiles`
5. Py4J JVM is launched and then the Python paths are set via:
https://github.com/apache/spark/blob/7013eea11cb32b1e0038dc751c485da5c94a484b/python/pyspark/context.py#L209-L216
However, these are not actually set because those files were copied into a tmp
directory in 4. whereas this code path looks for `SparkFiles.getRootDirectory`
where the files are stored only when `SparkContext.addFile()` is called.
In other cluster mode, `spark.files` are set via:
https://github.com/apache/spark/blob/3cb82047f2f51af553df09b9323796af507d36f8/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L554-L555
and those files are explicitly added via:
https://github.com/apache/spark/blob/ecb8b383af1cf1b67f3111c148229e00c9c17c40/core/src/main/scala/org/apache/spark/SparkContext.scala#L395
So we are fine in other modes.
In case of Yarn client and cluster with _submit_, these are manually being
handled. In particular https://github.com/apache/spark/pull/6360 added most of
the logics. In this case, the Python path looks manually set via, for example,
`deploy.PythonRunner`. We don't use `spark.files` here.
### How does the PR fix the problem?
I tried to make an isolated approach as possible as I can: simply copy py file
or zip files into `SparkFiles.getRootDirectory()` in driver side if not
existing. Another possible way is to set `spark.files` but it does unnecessary
stuff together and sounds a bit invasive.
**Before**
```python
>>> def test():
... import tmp
... return tmp.testtest()
...
>>> spark.range(1).rdd.map(lambda _: test()).collect()
[1]
>>> test()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 2, in test
ImportError: No module named tmp
```
**After**
```python
>>> def test():
... import tmp
... return tmp.testtest()
...
>>> spark.range(1).rdd.map(lambda _: test()).collect()
[1]
>>> test()
1
```
## How was this patch tested?
I manually tested in standalone and yarn cluster with PySpark shell. .zip and
.py files were also tested with the similar steps above. It's difficult to add
a test.
Author: hyukjinkwon <[email protected]>
Closes #21267 from HyukjinKwon/SPARK-21945.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9a641e7f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9a641e7f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9a641e7f
Branch: refs/heads/master
Commit: 9a641e7f721d01d283afb09dccefaf32972d3c04
Parents: bfd75cd
Author: hyukjinkwon <[email protected]>
Authored: Thu May 17 12:07:58 2018 +0800
Committer: hyukjinkwon <[email protected]>
Committed: Thu May 17 12:07:58 2018 +0800
----------------------------------------------------------------------
python/pyspark/context.py | 18 +++++++++++++++---
1 file changed, 15 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/9a641e7f/python/pyspark/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index dbb463f..ede3b6a 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -211,9 +211,21 @@ class SparkContext(object):
for path in self._conf.get("spark.submit.pyFiles", "").split(","):
if path != "":
(dirname, filename) = os.path.split(path)
- if filename[-4:].lower() in self.PACKAGE_EXTENSIONS:
- self._python_includes.append(filename)
- sys.path.insert(1,
os.path.join(SparkFiles.getRootDirectory(), filename))
+ try:
+ filepath = os.path.join(SparkFiles.getRootDirectory(),
filename)
+ if not os.path.exists(filepath):
+ # In case of YARN with shell mode,
'spark.submit.pyFiles' files are
+ # not added via SparkContext.addFile. Here we check if
the file exists,
+ # try to copy and then add it to the path. See
SPARK-21945.
+ shutil.copyfile(path, filepath)
+ if filename[-4:].lower() in self.PACKAGE_EXTENSIONS:
+ self._python_includes.append(filename)
+ sys.path.insert(1, filepath)
+ except Exception:
+ warnings.warn(
+ "Failed to add file [%s] speficied in
'spark.submit.pyFiles' to "
+ "Python path:\n %s" % (path, "\n ".join(sys.path)),
+ RuntimeWarning)
# Create a temporary directory inside spark.local.dir:
local_dir =
self._jvm.org.apache.spark.util.Utils.getLocalDir(self._jsc.sc().conf())
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]