Repository: incubator-airflow
Updated Branches:
  refs/heads/master 1d531555e -> b3c247d3b


[AIRFLOW-1757] Add missing options to SparkSubmitOperator

add 'exclude-packages' and 'repositories' as
options
to SparkSubmitOperator as they were missing

Closes #2725 from kretes/AIRFLOW-1757-spark-new-
options


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b3c247d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b3c247d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b3c247d3

Branch: refs/heads/master
Commit: b3c247d3bfd43a292f728608bc5aaba772f40f33
Parents: 1d53155
Author: Tomasz Bartczak <[email protected]>
Authored: Sat Oct 28 15:13:00 2017 +0200
Committer: Bolke de Bruin <[email protected]>
Committed: Sat Oct 28 15:13:07 2017 +0200

----------------------------------------------------------------------
 airflow/contrib/hooks/spark_submit_hook.py            | 14 ++++++++++++++
 airflow/contrib/operators/spark_submit_operator.py    | 12 +++++++++++-
 tests/contrib/hooks/test_spark_submit_hook.py         |  4 ++++
 tests/contrib/operators/test_spark_submit_operator.py |  6 ++++++
 4 files changed, 35 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b3c247d3/airflow/contrib/hooks/spark_submit_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/spark_submit_hook.py 
b/airflow/contrib/hooks/spark_submit_hook.py
index 7f8e35e..4dc865b 100644
--- a/airflow/contrib/hooks/spark_submit_hook.py
+++ b/airflow/contrib/hooks/spark_submit_hook.py
@@ -40,6 +40,12 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
     :type jars: str
     :param java_class: the main class of the Java application
     :type java_class: str
+    :param packages: Comma-separated list of maven coordinates of jars to 
include on the driver and executor classpaths
+    :type packages: str
+    :param exclude_packages: Comma-separated list of maven coordinates of jars 
to exclude while resolving the dependencies provided in 'packages'
+    :type exclude_packages: str
+    :param repositories: Comma-separated list of additional remote 
repositories to search for the maven coordinates given with 'packages'
+    :type repositories: str
     :param total_executor_cores: (Standalone & Mesos only) Total cores for all 
executors (Default: all the available cores on the worker)
     :type total_executor_cores: int
     :param executor_cores: (Standalone & YARN only) Number of cores per 
executor (Default: 2)
@@ -69,6 +75,8 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
                  jars=None,
                  java_class=None,
                  packages=None,
+                 exclude_packages=None,
+                 repositories=None,
                  total_executor_cores=None,
                  executor_cores=None,
                  executor_memory=None,
@@ -86,6 +94,8 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
         self._jars = jars
         self._java_class = java_class
         self._packages = packages
+        self._exclude_packages = exclude_packages
+        self._repositories = repositories
         self._total_executor_cores = total_executor_cores
         self._executor_cores = executor_cores
         self._executor_memory = executor_memory
@@ -164,6 +174,10 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
             connection_cmd += ["--jars", self._jars]
         if self._packages:
             connection_cmd += ["--packages", self._packages]
+        if self._exclude_packages:
+            connection_cmd += ["--exclude-packages", self._exclude_packages]
+        if self._repositories:
+            connection_cmd += ["--repositories", self._repositories]
         if self._num_executors:
             connection_cmd += ["--num-executors", str(self._num_executors)]
         if self._total_executor_cores:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b3c247d3/airflow/contrib/operators/spark_submit_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/spark_submit_operator.py 
b/airflow/contrib/operators/spark_submit_operator.py
index 277c55f..a7c9990 100644
--- a/airflow/contrib/operators/spark_submit_operator.py
+++ b/airflow/contrib/operators/spark_submit_operator.py
@@ -41,6 +41,10 @@ class SparkSubmitOperator(BaseOperator):
     :type java_class: str
     :param packages: Comma-separated list of maven coordinates of jars to 
include on the driver and executor classpaths
     :type packages: str
+    :param exclude_packages: Comma-separated list of maven coordinates of jars 
to exclude while resolving the dependencies provided in 'packages'
+    :type exclude_packages: str
+    :param repositories: Comma-separated list of additional remote 
repositories to search for the maven coordinates given with 'packages'
+    :type repositories: str
     :param total_executor_cores: (Standalone & Mesos only) Total cores for all 
executors (Default: all the available cores on the worker)
     :type total_executor_cores: int
     :param executor_cores: (Standalone & YARN only) Number of cores per 
executor (Default: 2)
@@ -62,7 +66,7 @@ class SparkSubmitOperator(BaseOperator):
     :param verbose: Whether to pass the verbose flag to spark-submit process 
for debugging
     :type verbose: bool
     """
-    template_fields = ('_name', '_application_args',)
+    template_fields = ('_name', '_application_args','_packages')
     ui_color = WEB_COLORS['LIGHTORANGE']
 
     @apply_defaults
@@ -75,6 +79,8 @@ class SparkSubmitOperator(BaseOperator):
                  jars=None,
                  java_class=None,
                  packages=None,
+                 exclude_packages=None,
+                 repositories=None,
                  total_executor_cores=None,
                  executor_cores=None,
                  executor_memory=None,
@@ -95,6 +101,8 @@ class SparkSubmitOperator(BaseOperator):
         self._jars = jars
         self._java_class = java_class
         self._packages = packages
+        self._exclude_packages = exclude_packages
+        self._repositories = repositories
         self._total_executor_cores = total_executor_cores
         self._executor_cores = executor_cores
         self._executor_memory = executor_memory
@@ -120,6 +128,8 @@ class SparkSubmitOperator(BaseOperator):
             jars=self._jars,
             java_class=self._java_class,
             packages=self._packages,
+            exclude_packages=self._exclude_packages,
+            repositories=self._repositories,
             total_executor_cores=self._total_executor_cores,
             executor_cores=self._executor_cores,
             executor_memory=self._executor_memory,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b3c247d3/tests/contrib/hooks/test_spark_submit_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_spark_submit_hook.py 
b/tests/contrib/hooks/test_spark_submit_hook.py
index be88897..5cb7132 100644
--- a/tests/contrib/hooks/test_spark_submit_hook.py
+++ b/tests/contrib/hooks/test_spark_submit_hook.py
@@ -35,6 +35,8 @@ class TestSparkSubmitHook(unittest.TestCase):
         'py_files': 'sample_library.py',
         'jars': 'parquet.jar',
         'packages': 'com.databricks:spark-avro_2.11:3.2.0',
+        'exclude_packages': 'org.bad.dependency:1.0.0',
+        'repositories': 'http://myrepo.org',
         'total_executor_cores': 4,
         'executor_cores': 4,
         'executor_memory': '22g',
@@ -115,6 +117,8 @@ class TestSparkSubmitHook(unittest.TestCase):
             '--py-files', 'sample_library.py',
             '--jars', 'parquet.jar',
             '--packages', 'com.databricks:spark-avro_2.11:3.2.0',
+            '--exclude-packages', 'org.bad.dependency:1.0.0',
+            '--repositories', 'http://myrepo.org',
             '--num-executors', '10',
             '--total-executor-cores', '4',
             '--executor-cores', '4',

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b3c247d3/tests/contrib/operators/test_spark_submit_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_spark_submit_operator.py 
b/tests/contrib/operators/test_spark_submit_operator.py
index 33705e9..ce85a61 100644
--- a/tests/contrib/operators/test_spark_submit_operator.py
+++ b/tests/contrib/operators/test_spark_submit_operator.py
@@ -35,6 +35,8 @@ class TestSparkSubmitOperator(unittest.TestCase):
         'py_files': 'sample_library.py',
         'jars': 'parquet.jar',
         'packages': 'com.databricks:spark-avro_2.11:3.2.0',
+        'exclude_packages': 'org.bad.dependency:1.0.0',
+        'repositories': 'http://myrepo.org',
         'total_executor_cores':4,
         'executor_cores': 4,
         'executor_memory': '22g',
@@ -86,6 +88,8 @@ class TestSparkSubmitOperator(unittest.TestCase):
             'py_files': 'sample_library.py',
             'jars': 'parquet.jar',
             'packages': 'com.databricks:spark-avro_2.11:3.2.0',
+            'exclude_packages': 'org.bad.dependency:1.0.0',
+            'repositories': 'http://myrepo.org',
             'total_executor_cores': 4,
             'executor_cores': 4,
             'executor_memory': '22g',
@@ -114,6 +118,8 @@ class TestSparkSubmitOperator(unittest.TestCase):
         self.assertEqual(expected_dict['py_files'], operator._py_files)
         self.assertEqual(expected_dict['jars'], operator._jars)
         self.assertEqual(expected_dict['packages'], operator._packages)
+        self.assertEqual(expected_dict['exclude_packages'], 
operator._exclude_packages)
+        self.assertEqual(expected_dict['repositories'], operator._repositories)
         self.assertEqual(expected_dict['total_executor_cores'], 
operator._total_executor_cores)
         self.assertEqual(expected_dict['executor_cores'], 
operator._executor_cores)
         self.assertEqual(expected_dict['executor_memory'], 
operator._executor_memory)

Reply via email to