Repository: incubator-airflow Updated Branches: refs/heads/master 194d1d6e5 -> 28aeed4aa
[AIRFLOW-1255] Fix SparkSubmitHook output deadlock Refactor the SparkSubmitHook output processing to avoid a deadlock. The prior implementation deadlocks if the stderr pipe buffer fills: 1. Airflow tries to drain stdout before starting on stderr; 2. Spark gets suspended if the stderr pipe fills; 3. Both processes are now waiting for something that will never happen. Closes #2438 from asnare/fix/spark-submit Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/28aeed4a Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/28aeed4a Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/28aeed4a Branch: refs/heads/master Commit: 28aeed4aa62acb295eda1cf94a40f7f643b650fb Parents: 194d1d6 Author: Andrew Snare <[email protected]> Authored: Fri Jul 14 12:21:12 2017 +0200 Committer: Bolke de Bruin <[email protected]> Committed: Fri Jul 14 12:21:12 2017 +0200 ---------------------------------------------------------------------- airflow/contrib/hooks/spark_submit_hook.py | 19 +++++++------------ tests/contrib/hooks/test_spark_submit_hook.py | 8 +++----- 2 files changed, 10 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/28aeed4a/airflow/contrib/hooks/spark_submit_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/spark_submit_hook.py b/airflow/contrib/hooks/spark_submit_hook.py index 88d547b..14e297b 100644 --- a/airflow/contrib/hooks/spark_submit_hook.py +++ b/airflow/contrib/hooks/spark_submit_hook.py @@ -26,7 +26,7 @@ log = logging.getLogger(__name__) class SparkSubmitHook(BaseHook): """ This hook is a wrapper around the spark-submit binary to kick off a spark-submit job. - It requires that the "spark-submit" binary is in the PATH or the spark_home to be + It requires that the "spark-submit" binary is in the PATH or the spark_home to be supplied. :param conf: Arbitrary Spark configuration properties :type conf: dict @@ -211,21 +211,16 @@ class SparkSubmitHook(BaseHook): spark_submit_cmd = self._build_command(application) self._sp = subprocess.Popen(spark_submit_cmd, stdout=subprocess.PIPE, - stderr=subprocess.PIPE, + stderr=subprocess.STDOUT, **kwargs) - # Using two iterators here to support 'real-time' logging - sources = [self._sp.stdout, self._sp.stderr] + self._process_log(iter(self._sp.stdout.readline, b'')) + returncode = self._sp.wait() - for source in sources: - self._process_log(iter(source.readline, b'')) - - output, stderr = self._sp.communicate() - - if self._sp.returncode: + if returncode: raise AirflowException( - "Cannot execute: {}. Error code is: {}. Output: {}, Stderr: {}".format( - spark_submit_cmd, self._sp.returncode, output, stderr + "Cannot execute: {}. Error code is: {}.".format( + spark_submit_cmd, returncode ) ) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/28aeed4a/tests/contrib/hooks/test_spark_submit_hook.py ---------------------------------------------------------------------- diff --git a/tests/contrib/hooks/test_spark_submit_hook.py b/tests/contrib/hooks/test_spark_submit_hook.py index 98e959b..6b7da75 100644 --- a/tests/contrib/hooks/test_spark_submit_hook.py +++ b/tests/contrib/hooks/test_spark_submit_hook.py @@ -140,15 +140,14 @@ class TestSparkSubmitHook(unittest.TestCase): # Given mock_popen.return_value.stdout = StringIO(u'stdout') mock_popen.return_value.stderr = StringIO(u'stderr') - mock_popen.return_value.returncode = 0 - mock_popen.return_value.communicate.return_value = [StringIO(u'stdout\nstdout'), StringIO(u'stderr\nstderr')] + mock_popen.return_value.wait.return_value = 0 # When hook = SparkSubmitHook(conn_id='') hook.submit() # Then - self.assertEqual(mock_popen.mock_calls[0], call(['spark-submit', '--master', 'yarn', '--name', 'default-name', ''], stderr=-1, stdout=-1)) + self.assertEqual(mock_popen.mock_calls[0], call(['spark-submit', '--master', 'yarn', '--name', 'default-name', ''], stdout=-1, stderr=-2)) def test_resolve_connection_yarn_default(self): # Given @@ -315,9 +314,8 @@ class TestSparkSubmitHook(unittest.TestCase): # Given mock_popen.return_value.stdout = StringIO(u'stdout') mock_popen.return_value.stderr = StringIO(u'stderr') - mock_popen.return_value.returncode = 0 mock_popen.return_value.poll.return_value = None - mock_popen.return_value.communicate.return_value = [StringIO(u'stderr\nstderr'), StringIO(u'stderr\nstderr')] + mock_popen.return_value.wait.return_value = 0 log_lines = [ 'SPARK_MAJOR_VERSION is set to 2, using Spark2', 'WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable',
