This is an automated email from the ASF dual-hosted git repository. yaniv pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-amaterasu.git
commit 886541ccf144c7dfab8c4058b6d85a83197500c1 Author: Nadav Har Tzvi <[email protected]> AuthorDate: Sat Apr 27 12:26:14 2019 +0300 Installing action requirements inside the driver and shipping them to the executors --- .../runners/providers/PySparkRunnerProvider.scala | 2 +- sdk_python/amaterasu/pyspark/runtime.py | 14 +++++++++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala index 919d8e3..0e9fea4 100644 --- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala +++ b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala @@ -13,7 +13,7 @@ class PySparkRunnerProvider(val env: String, val conf: ClusterConfig) extends Py //s" $$SPARK_HOME/conf/spark-env.sh" + // s" && env PYSPARK_PYTHON=$getVirtualPythonPath" + //s" env PYSPARK_DRIVER_PYTHON=$getVirtualPythonPath" + d - s" && $$SPARK_HOME/bin/spark-submit --master yarn-cluster --conf spark.pyspark.virtualenv.enabled=true --conf spark.pyspark.virtualenv.type=native --conf spark.pyspark.virtualenv.bin.path=$getVirtualPythonBin --conf spark.pyspark.python=$getVirtualPythonPath --files $$SPARK_HOME/conf/hive-site.xml ${actionData.getSrc}" + s" && $$SPARK_HOME/bin/spark-submit --master yarn-cluster --conf spark.pyspark.python=$getVirtualPythonPath --files $$SPARK_HOME/conf/hive-site.xml ${actionData.getSrc}" } override def getRunnerResources: Array[String] = { diff --git a/sdk_python/amaterasu/pyspark/runtime.py b/sdk_python/amaterasu/pyspark/runtime.py index da69205..c51382d 100644 --- a/sdk_python/amaterasu/pyspark/runtime.py +++ b/sdk_python/amaterasu/pyspark/runtime.py @@ -18,9 +18,12 @@ from pyspark import SparkContext, SparkConf from pyspark.sql import SparkSession, DataFrame from amaterasu.base import BaseAmaContextBuilder, LoaderAmaContext from .datasets import DatasetManager +from pip._internal import main as pip_main +import zipfile import os import sys + # For some reason, the leader passes "_" as the PYSPARK_PYTHON env variable os.environ['PYSPARK_PYTHON'] = os.environ.get('_') or os.environ.get('PYSPARK_PYTHON') or sys.executable @@ -34,6 +37,7 @@ class AmaContextBuilder(BaseAmaContextBuilder): except: self.spark_conf = SparkConf() + def setMaster(self, master_uri) -> "AmaContextBuilder": self.spark_conf.setMaster(master_uri) return self @@ -42,9 +46,17 @@ class AmaContextBuilder(BaseAmaContextBuilder): self.spark_conf.set(key, value) return self + def prepare_user_dependencies(self): + pip_main(["download", "-r", "requirements.txt", '-d', 'job_deps']) + return [os.path.join('job_deps', fname) for fname in os.listdir('job_deps')] + + def build(self) -> "AmaContext": + deps_paths = self.prepare_user_dependencies() spark = SparkSession.builder.config(conf=self.spark_conf).getOrCreate() - sc = spark.sparkContext + sc: SparkContext = spark.sparkContext + for path in deps_paths: + sc.addPyFile(path) return AmaContext(self.ama_conf, sc, spark)
