This is an automated email from the ASF dual-hosted git repository.

yaniv pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-amaterasu.git

commit 886541ccf144c7dfab8c4058b6d85a83197500c1
Author: Nadav Har Tzvi <[email protected]>
AuthorDate: Sat Apr 27 12:26:14 2019 +0300

    Installing action requirements inside the driver and shipping them to the 
executors
---
 .../runners/providers/PySparkRunnerProvider.scala          |  2 +-
 sdk_python/amaterasu/pyspark/runtime.py                    | 14 +++++++++++++-
 2 files changed, 14 insertions(+), 2 deletions(-)

diff --git 
a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
 
b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
index 919d8e3..0e9fea4 100644
--- 
a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
+++ 
b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
@@ -13,7 +13,7 @@ class PySparkRunnerProvider(val env: String, val conf: 
ClusterConfig) extends Py
       //s" $$SPARK_HOME/conf/spark-env.sh" +
      // s" && env PYSPARK_PYTHON=$getVirtualPythonPath" +
       //s" env PYSPARK_DRIVER_PYTHON=$getVirtualPythonPath" + d
-        s" && $$SPARK_HOME/bin/spark-submit --master yarn-cluster --conf 
spark.pyspark.virtualenv.enabled=true  --conf 
spark.pyspark.virtualenv.type=native --conf 
spark.pyspark.virtualenv.bin.path=$getVirtualPythonBin --conf 
spark.pyspark.python=$getVirtualPythonPath --files 
$$SPARK_HOME/conf/hive-site.xml ${actionData.getSrc}"
+        s" && $$SPARK_HOME/bin/spark-submit --master yarn-cluster --conf 
spark.pyspark.python=$getVirtualPythonPath --files 
$$SPARK_HOME/conf/hive-site.xml ${actionData.getSrc}"
   }
 
   override def getRunnerResources: Array[String] = {
diff --git a/sdk_python/amaterasu/pyspark/runtime.py 
b/sdk_python/amaterasu/pyspark/runtime.py
index da69205..c51382d 100644
--- a/sdk_python/amaterasu/pyspark/runtime.py
+++ b/sdk_python/amaterasu/pyspark/runtime.py
@@ -18,9 +18,12 @@ from pyspark import SparkContext, SparkConf
 from pyspark.sql import SparkSession, DataFrame
 from amaterasu.base import BaseAmaContextBuilder, LoaderAmaContext
 from .datasets import DatasetManager
+from pip._internal import main as pip_main
+import zipfile
 import os
 import sys
 
+
 # For some reason, the leader passes "_" as the PYSPARK_PYTHON env variable
 os.environ['PYSPARK_PYTHON'] = os.environ.get('_') or 
os.environ.get('PYSPARK_PYTHON') or sys.executable
 
@@ -34,6 +37,7 @@ class AmaContextBuilder(BaseAmaContextBuilder):
         except:
             self.spark_conf = SparkConf()
 
+
     def setMaster(self, master_uri) -> "AmaContextBuilder":
         self.spark_conf.setMaster(master_uri)
         return self
@@ -42,9 +46,17 @@ class AmaContextBuilder(BaseAmaContextBuilder):
         self.spark_conf.set(key, value)
         return self
 
+    def prepare_user_dependencies(self):
+        pip_main(["download", "-r", "requirements.txt", '-d', 'job_deps'])
+        return [os.path.join('job_deps', fname) for fname in 
os.listdir('job_deps')]
+
+
     def build(self) -> "AmaContext":
+        deps_paths = self.prepare_user_dependencies()
         spark = SparkSession.builder.config(conf=self.spark_conf).getOrCreate()
-        sc = spark.sparkContext
+        sc: SparkContext = spark.sparkContext
+        for path in deps_paths:
+            sc.addPyFile(path)
         return AmaContext(self.ama_conf, sc, spark)
 
 

Reply via email to