[AIRFLOW-1089] Add Spark application arguments Allows arguments to be passed to the Spark application being submitted. For example:
- spark-submit --class foo.Bar foobar.jar arg1 arg2 - spark-submit app.py arg1 arg2 Closes #2229 from camshrun/sparkSubmitAppArgs Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/34f072a1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/34f072a1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/34f072a1 Branch: refs/heads/v1-8-test Commit: 34f072a1716350ec39464d73282127d08b83582c Parents: 156e90b Author: Stephan Werges <[email protected]> Authored: Tue Apr 25 11:28:31 2017 +0200 Committer: Maxime Beauchemin <[email protected]> Committed: Thu Jun 8 08:36:20 2017 -0700 ---------------------------------------------------------------------- airflow/contrib/hooks/spark_submit_hook.py | 9 +++++++++ airflow/contrib/operators/spark_submit_operator.py | 5 +++++ tests/contrib/hooks/spark_submit_hook.py | 16 +++++++++++++++- tests/contrib/operators/spark_submit_operator.py | 7 ++++++- 4 files changed, 35 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/34f072a1/airflow/contrib/hooks/spark_submit_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/spark_submit_hook.py b/airflow/contrib/hooks/spark_submit_hook.py index 59d28b5..e4ce797 100644 --- a/airflow/contrib/hooks/spark_submit_hook.py +++ b/airflow/contrib/hooks/spark_submit_hook.py @@ -56,6 +56,8 @@ class SparkSubmitHook(BaseHook): :type name: str :param num_executors: Number of executors to launch :type num_executors: int + :param application_args: Arguments for the application being submitted + :type application_args: list :param verbose: Whether to pass the verbose flag to spark-submit process for debugging :type verbose: bool """ @@ -74,6 +76,7 @@ class SparkSubmitHook(BaseHook): principal=None, name='default-name', num_executors=None, + application_args=None, verbose=False): self._conf = conf self._conn_id = conn_id @@ -88,6 +91,7 @@ class SparkSubmitHook(BaseHook): self._principal = principal self._name = name self._num_executors = num_executors + self._application_args = application_args self._verbose = verbose self._sp = None self._yarn_application_id = None @@ -183,6 +187,11 @@ class SparkSubmitHook(BaseHook): # The actual script to execute connection_cmd += [application] + # Append any application arguments + if self._application_args: + for arg in self._application_args: + connection_cmd += [arg] + logging.debug("Spark-Submit cmd: {}".format(connection_cmd)) return connection_cmd http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/34f072a1/airflow/contrib/operators/spark_submit_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/spark_submit_operator.py b/airflow/contrib/operators/spark_submit_operator.py index f62c395..2a7e3cf 100644 --- a/airflow/contrib/operators/spark_submit_operator.py +++ b/airflow/contrib/operators/spark_submit_operator.py @@ -56,6 +56,8 @@ class SparkSubmitOperator(BaseOperator): :type name: str :param num_executors: Number of executors to launch :type num_executors: int + :param application_args: Arguments for the application being submitted + :type application_args: list :param verbose: Whether to pass the verbose flag to spark-submit process for debugging :type verbose: bool """ @@ -76,6 +78,7 @@ class SparkSubmitOperator(BaseOperator): principal=None, name='airflow-spark', num_executors=None, + application_args=None, verbose=False, *args, **kwargs): @@ -93,6 +96,7 @@ class SparkSubmitOperator(BaseOperator): self._principal = principal self._name = name self._num_executors = num_executors + self._application_args = application_args self._verbose = verbose self._hook = None self._conn_id = conn_id @@ -115,6 +119,7 @@ class SparkSubmitOperator(BaseOperator): principal=self._principal, name=self._name, num_executors=self._num_executors, + application_args=self._application_args, verbose=self._verbose ) self._hook.submit(self._application) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/34f072a1/tests/contrib/hooks/spark_submit_hook.py ---------------------------------------------------------------------- diff --git a/tests/contrib/hooks/spark_submit_hook.py b/tests/contrib/hooks/spark_submit_hook.py index 8f514c2..c17ad28 100644 --- a/tests/contrib/hooks/spark_submit_hook.py +++ b/tests/contrib/hooks/spark_submit_hook.py @@ -39,7 +39,12 @@ class TestSparkSubmitHook(unittest.TestCase): 'num_executors': 10, 'verbose': True, 'driver_memory': '3g', - 'java_class': 'com.foo.bar.AppMain' + 'java_class': 'com.foo.bar.AppMain', + 'application_args': [ + '-f foo', + '--bar bar', + 'baz' + ] } def setUp(self): @@ -94,6 +99,15 @@ class TestSparkSubmitHook(unittest.TestCase): for k in self._config['conf']: assert "--conf {0}={1}".format(k, self._config['conf'][k]) in cmd + # Check the application arguments are there + for a in self._config['application_args']: + assert a in cmd + + # Check if application arguments are after the application + application_idx = cmd.find(self._spark_job_file) + for a in self._config['application_args']: + assert cmd.find(a) > application_idx + if self._config['verbose']: assert "--verbose" in cmd http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/34f072a1/tests/contrib/operators/spark_submit_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/spark_submit_operator.py b/tests/contrib/operators/spark_submit_operator.py index 4e2afb2..31bfcfa 100644 --- a/tests/contrib/operators/spark_submit_operator.py +++ b/tests/contrib/operators/spark_submit_operator.py @@ -39,7 +39,11 @@ class TestSparkSubmitOperator(unittest.TestCase): 'verbose': True, 'application': 'test_application.py', 'driver_memory': '3g', - 'java_class': 'com.foo.bar.AppMain' + 'java_class': 'com.foo.bar.AppMain', + 'application_args': [ + '-f foo', + '--bar bar' + ] } def setUp(self): @@ -73,6 +77,7 @@ class TestSparkSubmitOperator(unittest.TestCase): self.assertEqual(self._config['verbose'], operator._verbose) self.assertEqual(self._config['java_class'], operator._java_class) self.assertEqual(self._config['driver_memory'], operator._driver_memory) + self.assertEqual(self._config['application_args'], operator._application_args)
