Repository: incubator-airflow Updated Branches: refs/heads/v1-9-test f6810c9b4 -> 4e06ee554
[AIRFLOW-1757] Add missing options to SparkSubmitOperator add 'exclude-packages' and 'repositories' as options to SparkSubmitOperator as they were missing Closes #2725 from kretes/AIRFLOW-1757-spark-new- options (cherry picked from commit b3c247d3bfd43a292f728608bc5aaba772f40f33) Signed-off-by: Bolke de Bruin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4e06ee55 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4e06ee55 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4e06ee55 Branch: refs/heads/v1-9-test Commit: 4e06ee554291798fabfd3c8c6ff3943219162c7b Parents: f6810c9 Author: Tomasz Bartczak <[email protected]> Authored: Sat Oct 28 15:13:00 2017 +0200 Committer: Bolke de Bruin <[email protected]> Committed: Sat Oct 28 15:13:49 2017 +0200 ---------------------------------------------------------------------- airflow/contrib/hooks/spark_submit_hook.py | 14 ++++++++++++++ airflow/contrib/operators/spark_submit_operator.py | 12 +++++++++++- tests/contrib/hooks/test_spark_submit_hook.py | 4 ++++ tests/contrib/operators/test_spark_submit_operator.py | 6 ++++++ 4 files changed, 35 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4e06ee55/airflow/contrib/hooks/spark_submit_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/spark_submit_hook.py b/airflow/contrib/hooks/spark_submit_hook.py index 7f8e35e..4dc865b 100644 --- a/airflow/contrib/hooks/spark_submit_hook.py +++ b/airflow/contrib/hooks/spark_submit_hook.py @@ -40,6 +40,12 @@ class SparkSubmitHook(BaseHook, LoggingMixin): :type jars: str :param java_class: the main class of the Java application :type java_class: str + :param packages: Comma-separated list of maven coordinates of jars to include on the driver and executor classpaths + :type packages: str + :param exclude_packages: Comma-separated list of maven coordinates of jars to exclude while resolving the dependencies provided in 'packages' + :type exclude_packages: str + :param repositories: Comma-separated list of additional remote repositories to search for the maven coordinates given with 'packages' + :type repositories: str :param total_executor_cores: (Standalone & Mesos only) Total cores for all executors (Default: all the available cores on the worker) :type total_executor_cores: int :param executor_cores: (Standalone & YARN only) Number of cores per executor (Default: 2) @@ -69,6 +75,8 @@ class SparkSubmitHook(BaseHook, LoggingMixin): jars=None, java_class=None, packages=None, + exclude_packages=None, + repositories=None, total_executor_cores=None, executor_cores=None, executor_memory=None, @@ -86,6 +94,8 @@ class SparkSubmitHook(BaseHook, LoggingMixin): self._jars = jars self._java_class = java_class self._packages = packages + self._exclude_packages = exclude_packages + self._repositories = repositories self._total_executor_cores = total_executor_cores self._executor_cores = executor_cores self._executor_memory = executor_memory @@ -164,6 +174,10 @@ class SparkSubmitHook(BaseHook, LoggingMixin): connection_cmd += ["--jars", self._jars] if self._packages: connection_cmd += ["--packages", self._packages] + if self._exclude_packages: + connection_cmd += ["--exclude-packages", self._exclude_packages] + if self._repositories: + connection_cmd += ["--repositories", self._repositories] if self._num_executors: connection_cmd += ["--num-executors", str(self._num_executors)] if self._total_executor_cores: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4e06ee55/airflow/contrib/operators/spark_submit_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/spark_submit_operator.py b/airflow/contrib/operators/spark_submit_operator.py index 277c55f..a7c9990 100644 --- a/airflow/contrib/operators/spark_submit_operator.py +++ b/airflow/contrib/operators/spark_submit_operator.py @@ -41,6 +41,10 @@ class SparkSubmitOperator(BaseOperator): :type java_class: str :param packages: Comma-separated list of maven coordinates of jars to include on the driver and executor classpaths :type packages: str + :param exclude_packages: Comma-separated list of maven coordinates of jars to exclude while resolving the dependencies provided in 'packages' + :type exclude_packages: str + :param repositories: Comma-separated list of additional remote repositories to search for the maven coordinates given with 'packages' + :type repositories: str :param total_executor_cores: (Standalone & Mesos only) Total cores for all executors (Default: all the available cores on the worker) :type total_executor_cores: int :param executor_cores: (Standalone & YARN only) Number of cores per executor (Default: 2) @@ -62,7 +66,7 @@ class SparkSubmitOperator(BaseOperator): :param verbose: Whether to pass the verbose flag to spark-submit process for debugging :type verbose: bool """ - template_fields = ('_name', '_application_args',) + template_fields = ('_name', '_application_args','_packages') ui_color = WEB_COLORS['LIGHTORANGE'] @apply_defaults @@ -75,6 +79,8 @@ class SparkSubmitOperator(BaseOperator): jars=None, java_class=None, packages=None, + exclude_packages=None, + repositories=None, total_executor_cores=None, executor_cores=None, executor_memory=None, @@ -95,6 +101,8 @@ class SparkSubmitOperator(BaseOperator): self._jars = jars self._java_class = java_class self._packages = packages + self._exclude_packages = exclude_packages + self._repositories = repositories self._total_executor_cores = total_executor_cores self._executor_cores = executor_cores self._executor_memory = executor_memory @@ -120,6 +128,8 @@ class SparkSubmitOperator(BaseOperator): jars=self._jars, java_class=self._java_class, packages=self._packages, + exclude_packages=self._exclude_packages, + repositories=self._repositories, total_executor_cores=self._total_executor_cores, executor_cores=self._executor_cores, executor_memory=self._executor_memory, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4e06ee55/tests/contrib/hooks/test_spark_submit_hook.py ---------------------------------------------------------------------- diff --git a/tests/contrib/hooks/test_spark_submit_hook.py b/tests/contrib/hooks/test_spark_submit_hook.py index be88897..5cb7132 100644 --- a/tests/contrib/hooks/test_spark_submit_hook.py +++ b/tests/contrib/hooks/test_spark_submit_hook.py @@ -35,6 +35,8 @@ class TestSparkSubmitHook(unittest.TestCase): 'py_files': 'sample_library.py', 'jars': 'parquet.jar', 'packages': 'com.databricks:spark-avro_2.11:3.2.0', + 'exclude_packages': 'org.bad.dependency:1.0.0', + 'repositories': 'http://myrepo.org', 'total_executor_cores': 4, 'executor_cores': 4, 'executor_memory': '22g', @@ -115,6 +117,8 @@ class TestSparkSubmitHook(unittest.TestCase): '--py-files', 'sample_library.py', '--jars', 'parquet.jar', '--packages', 'com.databricks:spark-avro_2.11:3.2.0', + '--exclude-packages', 'org.bad.dependency:1.0.0', + '--repositories', 'http://myrepo.org', '--num-executors', '10', '--total-executor-cores', '4', '--executor-cores', '4', http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4e06ee55/tests/contrib/operators/test_spark_submit_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_spark_submit_operator.py b/tests/contrib/operators/test_spark_submit_operator.py index 33705e9..ce85a61 100644 --- a/tests/contrib/operators/test_spark_submit_operator.py +++ b/tests/contrib/operators/test_spark_submit_operator.py @@ -35,6 +35,8 @@ class TestSparkSubmitOperator(unittest.TestCase): 'py_files': 'sample_library.py', 'jars': 'parquet.jar', 'packages': 'com.databricks:spark-avro_2.11:3.2.0', + 'exclude_packages': 'org.bad.dependency:1.0.0', + 'repositories': 'http://myrepo.org', 'total_executor_cores':4, 'executor_cores': 4, 'executor_memory': '22g', @@ -86,6 +88,8 @@ class TestSparkSubmitOperator(unittest.TestCase): 'py_files': 'sample_library.py', 'jars': 'parquet.jar', 'packages': 'com.databricks:spark-avro_2.11:3.2.0', + 'exclude_packages': 'org.bad.dependency:1.0.0', + 'repositories': 'http://myrepo.org', 'total_executor_cores': 4, 'executor_cores': 4, 'executor_memory': '22g', @@ -114,6 +118,8 @@ class TestSparkSubmitOperator(unittest.TestCase): self.assertEqual(expected_dict['py_files'], operator._py_files) self.assertEqual(expected_dict['jars'], operator._jars) self.assertEqual(expected_dict['packages'], operator._packages) + self.assertEqual(expected_dict['exclude_packages'], operator._exclude_packages) + self.assertEqual(expected_dict['repositories'], operator._repositories) self.assertEqual(expected_dict['total_executor_cores'], operator._total_executor_cores) self.assertEqual(expected_dict['executor_cores'], operator._executor_cores) self.assertEqual(expected_dict['executor_memory'], operator._executor_memory)
