Repository: incubator-airflow Updated Branches: refs/heads/master 831f8d504 -> e5b914789
[AIRFLOW-1089] Add Spark application arguments Allows arguments to be passed to the Spark application being submitted. For example: - spark-submit --class foo.Bar foobar.jar arg1 arg2 - spark-submit app.py arg1 arg2 Closes #2229 from camshrun/sparkSubmitAppArgs Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e5b91478 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e5b91478 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e5b91478 Branch: refs/heads/master Commit: e5b9147894b0d47bf36f1c2570d765b16c1c2506 Parents: 831f8d5 Author: Stephan Werges <[email protected]> Authored: Tue Apr 25 11:28:31 2017 +0200 Committer: Bolke de Bruin <[email protected]> Committed: Tue Apr 25 11:28:31 2017 +0200 ---------------------------------------------------------------------- airflow/contrib/hooks/spark_submit_hook.py | 9 +++++++++ airflow/contrib/operators/spark_submit_operator.py | 5 +++++ tests/contrib/hooks/test_spark_submit_hook.py | 16 +++++++++++++++- .../contrib/operators/test_spark_submit_operator.py | 7 ++++++- 4 files changed, 35 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e5b91478/airflow/contrib/hooks/spark_submit_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/spark_submit_hook.py b/airflow/contrib/hooks/spark_submit_hook.py index 59d28b5..e4ce797 100644 --- a/airflow/contrib/hooks/spark_submit_hook.py +++ b/airflow/contrib/hooks/spark_submit_hook.py @@ -56,6 +56,8 @@ class SparkSubmitHook(BaseHook): :type name: str :param num_executors: Number of executors to launch :type num_executors: int + :param application_args: Arguments for the application being submitted + :type application_args: list :param verbose: Whether to pass the verbose flag to spark-submit process for debugging :type verbose: bool """ @@ -74,6 +76,7 @@ class SparkSubmitHook(BaseHook): principal=None, name='default-name', num_executors=None, + application_args=None, verbose=False): self._conf = conf self._conn_id = conn_id @@ -88,6 +91,7 @@ class SparkSubmitHook(BaseHook): self._principal = principal self._name = name self._num_executors = num_executors + self._application_args = application_args self._verbose = verbose self._sp = None self._yarn_application_id = None @@ -183,6 +187,11 @@ class SparkSubmitHook(BaseHook): # The actual script to execute connection_cmd += [application] + # Append any application arguments + if self._application_args: + for arg in self._application_args: + connection_cmd += [arg] + logging.debug("Spark-Submit cmd: {}".format(connection_cmd)) return connection_cmd http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e5b91478/airflow/contrib/operators/spark_submit_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/spark_submit_operator.py b/airflow/contrib/operators/spark_submit_operator.py index f62c395..2a7e3cf 100644 --- a/airflow/contrib/operators/spark_submit_operator.py +++ b/airflow/contrib/operators/spark_submit_operator.py @@ -56,6 +56,8 @@ class SparkSubmitOperator(BaseOperator): :type name: str :param num_executors: Number of executors to launch :type num_executors: int + :param application_args: Arguments for the application being submitted + :type application_args: list :param verbose: Whether to pass the verbose flag to spark-submit process for debugging :type verbose: bool """ @@ -76,6 +78,7 @@ class SparkSubmitOperator(BaseOperator): principal=None, name='airflow-spark', num_executors=None, + application_args=None, verbose=False, *args, **kwargs): @@ -93,6 +96,7 @@ class SparkSubmitOperator(BaseOperator): self._principal = principal self._name = name self._num_executors = num_executors + self._application_args = application_args self._verbose = verbose self._hook = None self._conn_id = conn_id @@ -115,6 +119,7 @@ class SparkSubmitOperator(BaseOperator): principal=self._principal, name=self._name, num_executors=self._num_executors, + application_args=self._application_args, verbose=self._verbose ) self._hook.submit(self._application) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e5b91478/tests/contrib/hooks/test_spark_submit_hook.py ---------------------------------------------------------------------- diff --git a/tests/contrib/hooks/test_spark_submit_hook.py b/tests/contrib/hooks/test_spark_submit_hook.py index 24315fa..81916ad 100644 --- a/tests/contrib/hooks/test_spark_submit_hook.py +++ b/tests/contrib/hooks/test_spark_submit_hook.py @@ -42,7 +42,12 @@ class TestSparkSubmitHook(unittest.TestCase): 'num_executors': 10, 'verbose': True, 'driver_memory': '3g', - 'java_class': 'com.foo.bar.AppMain' + 'java_class': 'com.foo.bar.AppMain', + 'application_args': [ + '-f foo', + '--bar bar', + 'baz' + ] } def setUp(self): @@ -102,6 +107,15 @@ class TestSparkSubmitHook(unittest.TestCase): for k in self._config['conf']: assert "--conf {0}={1}".format(k, self._config['conf'][k]) in cmd + # Check the application arguments are there + for a in self._config['application_args']: + assert a in cmd + + # Check if application arguments are after the application + application_idx = cmd.find(self._spark_job_file) + for a in self._config['application_args']: + assert cmd.find(a) > application_idx + if self._config['verbose']: assert "--verbose" in cmd http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e5b91478/tests/contrib/operators/test_spark_submit_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_spark_submit_operator.py b/tests/contrib/operators/test_spark_submit_operator.py index 3c11dbb..dd3d84b 100644 --- a/tests/contrib/operators/test_spark_submit_operator.py +++ b/tests/contrib/operators/test_spark_submit_operator.py @@ -41,7 +41,11 @@ class TestSparkSubmitOperator(unittest.TestCase): 'verbose': True, 'application': 'test_application.py', 'driver_memory': '3g', - 'java_class': 'com.foo.bar.AppMain' + 'java_class': 'com.foo.bar.AppMain', + 'application_args': [ + '-f foo', + '--bar bar' + ] } def setUp(self): @@ -80,6 +84,7 @@ class TestSparkSubmitOperator(unittest.TestCase): self.assertEqual(self._config['verbose'], operator._verbose) self.assertEqual(self._config['java_class'], operator._java_class) self.assertEqual(self._config['driver_memory'], operator._driver_memory) + self.assertEqual(self._config['application_args'], operator._application_args)
