Repository: incubator-airflow Updated Branches: refs/heads/master d3484a9b1 -> 71d8f132c
[AIRFLOW-1197] : SparkSubmitHook on_kill error The on_kill method was buggy. Some corrections as been made with the killcmd and globally the method. Test coverage update to reflect changes. Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/1f1b46dc Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/1f1b46dc Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/1f1b46dc Branch: refs/heads/master Commit: 1f1b46dc011d2988edb66cf9aeaec9dd30c1b0e8 Parents: d06ab68 Author: vfoucault <[email protected]> Authored: Sun May 14 23:34:00 2017 +0200 Committer: vfoucault <[email protected]> Committed: Mon May 22 23:43:46 2017 +0200 ---------------------------------------------------------------------- airflow/contrib/hooks/spark_submit_hook.py | 9 ++-- tests/contrib/hooks/test_spark_submit_hook.py | 57 ++++++++++++++++------ 2 files changed, 45 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1f1b46dc/airflow/contrib/hooks/spark_submit_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/spark_submit_hook.py b/airflow/contrib/hooks/spark_submit_hook.py index ae51959..f3d6e34 100644 --- a/airflow/contrib/hooks/spark_submit_hook.py +++ b/airflow/contrib/hooks/spark_submit_hook.py @@ -254,12 +254,11 @@ class SparkSubmitHook(BaseHook): def on_kill(self): if self._sp and self._sp.poll() is None: - logging.info('Sending kill signal to spark-submit') - self.sp.kill() + logging.info('Sending kill signal to {}'.format(self._connection['spark_binary'])) + self._sp.kill() if self._yarn_application_id: logging.info('Killing application on YARN') - yarn_kill = subprocess.Popen("yarn application -kill {0}".format(self._yarn_application_id), - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) + kill_cmd = "yarn application -kill {0}".format(self._yarn_application_id).split() + yarn_kill = subprocess.Popen(kill_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) logging.info("YARN killed with return code: {0}".format(yarn_kill.wait())) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1f1b46dc/tests/contrib/hooks/test_spark_submit_hook.py ---------------------------------------------------------------------- diff --git a/tests/contrib/hooks/test_spark_submit_hook.py b/tests/contrib/hooks/test_spark_submit_hook.py index 80b5ce0..ef0857b 100644 --- a/tests/contrib/hooks/test_spark_submit_hook.py +++ b/tests/contrib/hooks/test_spark_submit_hook.py @@ -134,7 +134,7 @@ class TestSparkSubmitHook(unittest.TestCase): self.assertEquals(expected_build_cmd, cmd) @patch('subprocess.Popen') - def test_SparkProcess_runcmd(self, mock_popen): + def test_spark_process_runcmd(self, mock_popen): # Given mock_popen.return_value.stdout = StringIO(u'stdout') mock_popen.return_value.stderr = StringIO(u'stderr') @@ -158,7 +158,7 @@ class TestSparkSubmitHook(unittest.TestCase): # Then dict_cmd = self.cmd_args_to_dict(cmd) - expected_spark_connection = {"master": u"yarn", + expected_spark_connection = {"master": "yarn", "spark_binary": "spark-submit", "deploy_mode": None, "queue": None, @@ -176,10 +176,10 @@ class TestSparkSubmitHook(unittest.TestCase): # Then dict_cmd = self.cmd_args_to_dict(cmd) - expected_spark_connection = {"master": u"yarn", + expected_spark_connection = {"master": "yarn", "spark_binary": "spark-submit", "deploy_mode": None, - "queue": u"root.default", + "queue": "root.default", "spark_home": None} self.assertEqual(connection, expected_spark_connection) self.assertEqual(dict_cmd["--master"], "yarn") @@ -195,7 +195,7 @@ class TestSparkSubmitHook(unittest.TestCase): # Then dict_cmd = self.cmd_args_to_dict(cmd) - expected_spark_connection = {"master": u"mesos://host:5050", + expected_spark_connection = {"master": "mesos://host:5050", "spark_binary": "spark-submit", "deploy_mode": None, "queue": None, @@ -213,10 +213,10 @@ class TestSparkSubmitHook(unittest.TestCase): # Then dict_cmd = self.cmd_args_to_dict(cmd) - expected_spark_connection = {"master": u"yarn://yarn-master", + expected_spark_connection = {"master": "yarn://yarn-master", "spark_binary": "spark-submit", - "deploy_mode": u"cluster", - "queue": u"root.etl", + "deploy_mode": "cluster", + "queue": "root.etl", "spark_home": None} self.assertEqual(connection, expected_spark_connection) self.assertEqual(dict_cmd["--master"], "yarn://yarn-master") @@ -232,11 +232,11 @@ class TestSparkSubmitHook(unittest.TestCase): cmd = hook._build_command(self._spark_job_file) # Then - expected_spark_connection = {"master": u"yarn://yarn-master", + expected_spark_connection = {"master": "yarn://yarn-master", "spark_binary": "spark-submit", "deploy_mode": None, "queue": None, - "spark_home": u"/opt/myspark"} + "spark_home": "/opt/myspark"} self.assertEqual(connection, expected_spark_connection) self.assertEqual(cmd[0], '/opt/myspark/bin/spark-submit') @@ -249,7 +249,7 @@ class TestSparkSubmitHook(unittest.TestCase): cmd = hook._build_command(self._spark_job_file) # Then - expected_spark_connection = {"master": u"yarn://yarn-master", + expected_spark_connection = {"master": "yarn://yarn-master", "spark_binary": "spark-submit", "deploy_mode": None, "queue": None, @@ -266,8 +266,8 @@ class TestSparkSubmitHook(unittest.TestCase): cmd = hook._build_command(self._spark_job_file) # Then - expected_spark_connection = {"master": u"yarn", - "spark_binary": u"custom-spark-submit", + expected_spark_connection = {"master": "yarn", + "spark_binary": "custom-spark-submit", "deploy_mode": None, "queue": None, "spark_home": None} @@ -283,11 +283,11 @@ class TestSparkSubmitHook(unittest.TestCase): cmd = hook._build_command(self._spark_job_file) # Then - expected_spark_connection = {"master": u"yarn", - "spark_binary": u"custom-spark-submit", + expected_spark_connection = {"master": "yarn", + "spark_binary": "custom-spark-submit", "deploy_mode": None, "queue": None, - "spark_home": u"/path/to/spark_home"} + "spark_home": "/path/to/spark_home"} self.assertEqual(connection, expected_spark_connection) self.assertEqual(cmd[0], '/path/to/spark_home/bin/custom-spark-submit') @@ -308,6 +308,31 @@ class TestSparkSubmitHook(unittest.TestCase): self.assertEqual(hook._yarn_application_id, 'application_1486558679801_1820') + @patch('subprocess.Popen') + def test_spark_process_on_kill(self, mock_popen): + # Given + mock_popen.return_value.stdout = StringIO(u'stdout') + mock_popen.return_value.stderr = StringIO(u'stderr') + mock_popen.return_value.returncode = 0 + mock_popen.return_value.poll.return_value = None + mock_popen.return_value.communicate.return_value = [StringIO(u'stderr\nstderr'), StringIO(u'stderr\nstderr')] + log_lines = [ + 'SPARK_MAJOR_VERSION is set to 2, using Spark2', + 'WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable', + 'WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.', + 'INFO Client: Requesting a new application from cluster with 10 NodeManagerapplication_1486558679801_1820s', + 'INFO Client: Submitting application application_1486558679801_1820 to ResourceManager' + ] + hook = SparkSubmitHook(conn_id='spark_yarn_cluster') + hook._process_log(log_lines) + hook.submit() + + # When + hook.on_kill() + + # Then + self.assertIn(call(['yarn', 'application', '-kill', 'application_1486558679801_1820'], stderr=-1, stdout=-1), mock_popen.mock_calls) + if __name__ == '__main__': unittest.main()
