Repository: incubator-airflow Updated Branches: refs/heads/master 48135ad25 -> 2e3f07ff9
[AIRFLOW-1160] Update Spark parameters for Mesos Closes #2265 from cameres/master Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2e3f07ff Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2e3f07ff Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2e3f07ff Branch: refs/heads/master Commit: 2e3f07ff9fb3a969285f4b3f412cd63d95c05457 Parents: 48135ad Author: Connor Ameres <[email protected]> Authored: Mon May 1 23:22:04 2017 +0200 Committer: Bolke de Bruin <[email protected]> Committed: Mon May 1 23:22:04 2017 +0200 ---------------------------------------------------------------------- airflow/contrib/hooks/spark_sql_hook.py | 8 +++++++- airflow/contrib/hooks/spark_submit_hook.py | 8 +++++++- airflow/contrib/operators/spark_sql_operator.py | 7 ++++++- airflow/contrib/operators/spark_submit_operator.py | 7 ++++++- tests/contrib/hooks/test_spark_submit_hook.py | 2 ++ tests/contrib/operators/test_spark_submit_operator.py | 2 ++ 6 files changed, 30 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2e3f07ff/airflow/contrib/hooks/spark_sql_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/spark_sql_hook.py b/airflow/contrib/hooks/spark_sql_hook.py index 724409c..725db21 100644 --- a/airflow/contrib/hooks/spark_sql_hook.py +++ b/airflow/contrib/hooks/spark_sql_hook.py @@ -31,7 +31,9 @@ class SparkSqlHook(BaseHook): :type conf: str (format: PROP=VALUE) :param conn_id: connection_id string :type conn_id: str - :param executor_cores: Number of cores per executor + :param total_executor_cores: (Standalone & Mesos only) Total cores for all executors (Default: all the available cores on the worker) + :type total_executor_cores: int + :param executor_cores: (Standalone & YARN only) Number of cores per executor (Default: 2) :type executor_cores: int :param executor_memory: Memory per executor (e.g. 1000M, 2G) (Default: 1G) :type executor_memory: str @@ -52,6 +54,7 @@ class SparkSqlHook(BaseHook): sql, conf=None, conn_id='spark_sql_default', + total_executor_cores=None, executor_cores=None, executor_memory=None, keytab=None, @@ -64,6 +67,7 @@ class SparkSqlHook(BaseHook): self._sql = sql self._conf = conf self._conn = self.get_connection(conn_id) + self._total_executor_cores = total_executor_cores self._executor_cores = executor_cores self._executor_memory = executor_memory self._keytab = keytab @@ -89,6 +93,8 @@ class SparkSqlHook(BaseHook): if self._conf: for conf_el in self._conf.split(","): connection_cmd += ["--conf", conf_el] + if self._total_executor_cores: + connection_cmd += ["--total-executor-cores", str(self._total_executor_cores)] if self._executor_cores: connection_cmd += ["--executor-cores", str(self._executor_cores)] if self._executor_memory: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2e3f07ff/airflow/contrib/hooks/spark_submit_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/spark_submit_hook.py b/airflow/contrib/hooks/spark_submit_hook.py index e4ce797..c34538e 100644 --- a/airflow/contrib/hooks/spark_submit_hook.py +++ b/airflow/contrib/hooks/spark_submit_hook.py @@ -42,7 +42,9 @@ class SparkSubmitHook(BaseHook): :type jars: str :param java_class: the main class of the Java application :type java_class: str - :param executor_cores: Number of cores per executor (Default: 2) + :param total_executor_cores: (Standalone & Mesos only) Total cores for all executors (Default: all the available cores on the worker) + :type total_executor_cores: int + :param executor_cores: (Standalone & YARN only) Number of cores per executor (Default: 2) :type executor_cores: int :param executor_memory: Memory per executor (e.g. 1000M, 2G) (Default: 1G) :type executor_memory: str @@ -69,6 +71,7 @@ class SparkSubmitHook(BaseHook): py_files=None, jars=None, java_class=None, + total_executor_cores=None, executor_cores=None, executor_memory=None, driver_memory=None, @@ -84,6 +87,7 @@ class SparkSubmitHook(BaseHook): self._py_files = py_files self._jars = jars self._java_class = java_class + self._total_executor_cores = total_executor_cores self._executor_cores = executor_cores self._executor_memory = executor_memory self._driver_memory = driver_memory @@ -163,6 +167,8 @@ class SparkSubmitHook(BaseHook): connection_cmd += ["--jars", self._jars] if self._num_executors: connection_cmd += ["--num-executors", str(self._num_executors)] + if self._total_executor_cores: + connection_cmd += ["--total-executor-cores", str(self._total_executor_cores)] if self._executor_cores: connection_cmd += ["--executor-cores", str(self._executor_cores)] if self._executor_memory: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2e3f07ff/airflow/contrib/operators/spark_sql_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/spark_sql_operator.py b/airflow/contrib/operators/spark_sql_operator.py index 2bed535..246e808 100644 --- a/airflow/contrib/operators/spark_sql_operator.py +++ b/airflow/contrib/operators/spark_sql_operator.py @@ -26,7 +26,9 @@ class SparkSqlOperator(BaseOperator): :type conf: str (format: PROP=VALUE) :param conn_id: connection_id string :type conn_id: str - :param executor_cores: Number of cores per executor + :param total_executor_cores: (Standalone & Mesos only) Total cores for all executors (Default: all the available cores on the worker) + :type total_executor_cores: int + :param executor_cores: (Standalone & YARN only) Number of cores per executor (Default: 2) :type executor_cores: int :param executor_memory: Memory per executor (e.g. 1000M, 2G) (Default: 1G) :type executor_memory: str @@ -52,6 +54,7 @@ class SparkSqlOperator(BaseOperator): sql, conf=None, conn_id='spark_sql_default', + total_executor_cores=None, executor_cores=None, executor_memory=None, keytab=None, @@ -65,6 +68,7 @@ class SparkSqlOperator(BaseOperator): self._sql = sql self._conf = conf self._conn_id = conn_id + self._total_executor_cores = total_executor_cores self._executor_cores = executor_cores self._executor_memory = executor_memory self._keytab = keytab @@ -81,6 +85,7 @@ class SparkSqlOperator(BaseOperator): self._hook = SparkSqlHook(sql=self._sql, conf=self._conf, conn_id=self._conn_id, + total_executor_cores=self._total_executor_cores, executor_cores=self._executor_cores, executor_memory=self._executor_memory, keytab=self._keytab, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2e3f07ff/airflow/contrib/operators/spark_submit_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/spark_submit_operator.py b/airflow/contrib/operators/spark_submit_operator.py index 2a7e3cf..77aacd3 100644 --- a/airflow/contrib/operators/spark_submit_operator.py +++ b/airflow/contrib/operators/spark_submit_operator.py @@ -42,7 +42,9 @@ class SparkSubmitOperator(BaseOperator): :type jars: str :param java_class: the main class of the Java application :type java_class: str - :param executor_cores: Number of cores per executor (Default: 2) + :param total_executor_cores: (Standalone & Mesos only) Total cores for all executors (Default: all the available cores on the worker) + :type total_executor_cores: int + :param executor_cores: (Standalone & YARN only) Number of cores per executor (Default: 2) :type executor_cores: int :param executor_memory: Memory per executor (e.g. 1000M, 2G) (Default: 1G) :type executor_memory: str @@ -71,6 +73,7 @@ class SparkSubmitOperator(BaseOperator): py_files=None, jars=None, java_class=None, + total_executor_cores=None, executor_cores=None, executor_memory=None, driver_memory=None, @@ -89,6 +92,7 @@ class SparkSubmitOperator(BaseOperator): self._py_files = py_files self._jars = jars self._java_class = java_class + self._total_executor_cores = total_executor_cores self._executor_cores = executor_cores self._executor_memory = executor_memory self._driver_memory = driver_memory @@ -112,6 +116,7 @@ class SparkSubmitOperator(BaseOperator): py_files=self._py_files, jars=self._jars, java_class=self._java_class, + total_executor_cores=self._total_executor_cores, executor_cores=self._executor_cores, executor_memory=self._executor_memory, driver_memory=self._driver_memory, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2e3f07ff/tests/contrib/hooks/test_spark_submit_hook.py ---------------------------------------------------------------------- diff --git a/tests/contrib/hooks/test_spark_submit_hook.py b/tests/contrib/hooks/test_spark_submit_hook.py index 81916ad..e06d44c 100644 --- a/tests/contrib/hooks/test_spark_submit_hook.py +++ b/tests/contrib/hooks/test_spark_submit_hook.py @@ -34,6 +34,7 @@ class TestSparkSubmitHook(unittest.TestCase): 'files': 'hive-site.xml', 'py_files': 'sample_library.py', 'jars': 'parquet.jar', + 'total_executor_cores': 4, 'executor_cores': 4, 'executor_memory': '22g', 'keytab': 'privileged_user.keytab', @@ -94,6 +95,7 @@ class TestSparkSubmitHook(unittest.TestCase): assert "--files {}".format(self._config['files']) in cmd assert "--py-files {}".format(self._config['py_files']) in cmd assert "--jars {}".format(self._config['jars']) in cmd + assert "--total-executor-cores {}".format(self._config['total_executor_cores']) in cmd assert "--executor-cores {}".format(self._config['executor_cores']) in cmd assert "--executor-memory {}".format(self._config['executor_memory']) in cmd assert "--keytab {}".format(self._config['keytab']) in cmd http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2e3f07ff/tests/contrib/operators/test_spark_submit_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_spark_submit_operator.py b/tests/contrib/operators/test_spark_submit_operator.py index dd3d84b..6bed6a1 100644 --- a/tests/contrib/operators/test_spark_submit_operator.py +++ b/tests/contrib/operators/test_spark_submit_operator.py @@ -32,6 +32,7 @@ class TestSparkSubmitOperator(unittest.TestCase): 'files': 'hive-site.xml', 'py_files': 'sample_library.py', 'jars': 'parquet.jar', + 'total_executor_cores':4, 'executor_cores': 4, 'executor_memory': '22g', 'keytab': 'privileged_user.keytab', @@ -75,6 +76,7 @@ class TestSparkSubmitOperator(unittest.TestCase): self.assertEqual(self._config['files'], operator._files) self.assertEqual(self._config['py_files'], operator._py_files) self.assertEqual(self._config['jars'], operator._jars) + self.assertEqual(self._config['total_executor_cores'], operator._total_executor_cores) self.assertEqual(self._config['executor_cores'], operator._executor_cores) self.assertEqual(self._config['executor_memory'], operator._executor_memory) self.assertEqual(self._config['keytab'], operator._keytab)
