Repository: incubator-airflow Updated Branches: refs/heads/master 9df0ac64c -> 32750601a
[AIRFLOW-1562] Spark-sql logging contains deadlock Logging in SparkSqlOperator does not work as intended. Spark-sql internally redirects all logs to stdout (including stderr), which causes the current two iterator logging to get stuck with the stderr pipe. This situation can lead to a deadlock because the std-err can grow too big and it will start to block until it will be consumed, which will only happen when the process ends, so the process stalls. Closes #2563 from Fokko/AIRFLOW-1562-Spark-sql- loggin-contains-deadlock Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/32750601 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/32750601 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/32750601 Branch: refs/heads/master Commit: 32750601ad0a422283613bf7fccff8eb5407bc9c Parents: 9df0ac6 Author: Fokko Driesprong <[email protected]> Authored: Wed Sep 6 13:37:31 2017 +0200 Committer: Bolke de Bruin <[email protected]> Committed: Wed Sep 6 13:37:31 2017 +0200 ---------------------------------------------------------------------- airflow/contrib/hooks/spark_sql_hook.py | 33 ++++----- airflow/contrib/operators/spark_sql_operator.py | 3 + .../operators/test_spark_sql_operator.py | 73 ++++++++++++++++++++ 3 files changed, 93 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32750601/airflow/contrib/hooks/spark_sql_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/spark_sql_hook.py b/airflow/contrib/hooks/spark_sql_hook.py index 8d73f60..d7bef7b 100644 --- a/airflow/contrib/hooks/spark_sql_hook.py +++ b/airflow/contrib/hooks/spark_sql_hook.py @@ -58,6 +58,7 @@ class SparkSqlHook(BaseHook): executor_cores=None, executor_memory=None, keytab=None, + principal=None, master='yarn', name='default-name', num_executors=None, @@ -71,6 +72,7 @@ class SparkSqlHook(BaseHook): self._executor_cores = executor_cores self._executor_memory = executor_memory self._keytab = keytab + self._principal = principal self._master = master self._name = name self._num_executors = num_executors @@ -101,6 +103,8 @@ class SparkSqlHook(BaseHook): connection_cmd += ["--executor-memory", self._executor_memory] if self._keytab: connection_cmd += ["--keytab", self._keytab] + if self._principal: + connection_cmd += ["--principal", self._principal] if self._num_executors: connection_cmd += ["--num-executors", str(self._num_executors)] if self._sql: @@ -130,25 +134,22 @@ class SparkSqlHook(BaseHook): :param cmd: command to remotely execute :param kwargs: extra arguments to Popen (see subprocess.Popen) """ - prefixed_cmd = self._prepare_command(cmd) - self._sp = subprocess.Popen(prefixed_cmd, + spark_sql_cmd = self._prepare_command(cmd) + self._sp = subprocess.Popen(spark_sql_cmd, stdout=subprocess.PIPE, - stderr=subprocess.PIPE, + stderr=subprocess.STDOUT, **kwargs) - # using two iterators here to support 'real-time' logging - for line in iter(self._sp.stdout.readline, b''): - line = line.decode('utf-8').strip() - logging.info(line) - for line in iter(self._sp.stderr.readline, b''): - line = line.decode('utf-8').strip() - logging.info(line) - output, stderr = self._sp.communicate() - if self._sp.returncode: - raise AirflowException("Cannot execute {} on {}. Error code is: " - "{}. Output: {}, Stderr: {}" - .format(cmd, self._conn.host, - self._sp.returncode, output, stderr)) + self._process_log(iter(self._sp.stdout.readline, b'')) + + returncode = self._sp.wait() + + if returncode: + raise AirflowException( + "Cannot execute {} on {}. Process exit code: {}.".format( + cmd, self._conn.host, returncode + ) + ) def kill(self): if self._sp and self._sp.poll() is None: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32750601/airflow/contrib/operators/spark_sql_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/spark_sql_operator.py b/airflow/contrib/operators/spark_sql_operator.py index 246e808..f6cba59 100644 --- a/airflow/contrib/operators/spark_sql_operator.py +++ b/airflow/contrib/operators/spark_sql_operator.py @@ -58,6 +58,7 @@ class SparkSqlOperator(BaseOperator): executor_cores=None, executor_memory=None, keytab=None, + principal=None, master='yarn', name='default-name', num_executors=None, @@ -72,6 +73,7 @@ class SparkSqlOperator(BaseOperator): self._executor_cores = executor_cores self._executor_memory = executor_memory self._keytab = keytab + self._principal = principal self._master = master self._name = name self._num_executors = num_executors @@ -89,6 +91,7 @@ class SparkSqlOperator(BaseOperator): executor_cores=self._executor_cores, executor_memory=self._executor_memory, keytab=self._keytab, + principal=self._principal, name=self._name, num_executors=self._num_executors, master=self._master, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32750601/tests/contrib/operators/test_spark_sql_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_spark_sql_operator.py b/tests/contrib/operators/test_spark_sql_operator.py new file mode 100644 index 0000000..7e71ce6 --- /dev/null +++ b/tests/contrib/operators/test_spark_sql_operator.py @@ -0,0 +1,73 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import datetime +import unittest + +from airflow import DAG, configuration +from airflow.contrib.operators.spark_sql_operator import SparkSqlOperator + +DEFAULT_DATE = datetime.datetime(2017, 1, 1) + + +class TestSparkSqlOperator(unittest.TestCase): + + _config = { + 'sql': 'SELECT 22', + 'conn_id': 'spark_special_conn_id', + 'total_executor_cores': 4, + 'executor_cores': 4, + 'executor_memory': '22g', + 'keytab': 'privileged_user.keytab', + 'principal': 'user/[email protected]', + 'master': 'yarn-client', + 'name': 'special-application-name', + 'num_executors': 8, + 'yarn_queue': 'special-queue' + } + + def setUp(self): + configuration.load_test_config() + args = { + 'owner': 'airflow', + 'start_date': DEFAULT_DATE + } + self.dag = DAG('test_dag_id', default_args=args) + + def test_execute(self): + # Given / When + operator = SparkSqlOperator( + task_id='spark_sql_job', + dag=self.dag, + **self._config + ) + + self.assertEqual(self._config['sql'], operator._sql) + self.assertEqual(self._config['conn_id'], operator._conn_id) + self.assertEqual(self._config['total_executor_cores'], operator._total_executor_cores) + self.assertEqual(self._config['executor_cores'], operator._executor_cores) + self.assertEqual(self._config['executor_memory'], operator._executor_memory) + self.assertEqual(self._config['keytab'], operator._keytab) + self.assertEqual(self._config['principal'], operator._principal) + self.assertEqual(self._config['executor_memory'], operator._executor_memory) + self.assertEqual(self._config['keytab'], operator._keytab) + self.assertEqual(self._config['principal'], operator._principal) + self.assertEqual(self._config['master'], operator._master) + self.assertEqual(self._config['name'], operator._name) + self.assertEqual(self._config['num_executors'], operator._num_executors) + self.assertEqual(self._config['yarn_queue'], operator._yarn_queue) + +if __name__ == '__main__': + unittest.main()
