Repository: incubator-airflow
Updated Branches:
  refs/heads/master d3484a9b1 -> 71d8f132c


[AIRFLOW-1197] : SparkSubmitHook on_kill error

The on_kill method was buggy. Some corrections as been
made with the killcmd and globally the method.

Test coverage update to reflect changes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/1f1b46dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/1f1b46dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/1f1b46dc

Branch: refs/heads/master
Commit: 1f1b46dc011d2988edb66cf9aeaec9dd30c1b0e8
Parents: d06ab68
Author: vfoucault <[email protected]>
Authored: Sun May 14 23:34:00 2017 +0200
Committer: vfoucault <[email protected]>
Committed: Mon May 22 23:43:46 2017 +0200

----------------------------------------------------------------------
 airflow/contrib/hooks/spark_submit_hook.py    |  9 ++--
 tests/contrib/hooks/test_spark_submit_hook.py | 57 ++++++++++++++++------
 2 files changed, 45 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1f1b46dc/airflow/contrib/hooks/spark_submit_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/spark_submit_hook.py 
b/airflow/contrib/hooks/spark_submit_hook.py
index ae51959..f3d6e34 100644
--- a/airflow/contrib/hooks/spark_submit_hook.py
+++ b/airflow/contrib/hooks/spark_submit_hook.py
@@ -254,12 +254,11 @@ class SparkSubmitHook(BaseHook):
 
     def on_kill(self):
         if self._sp and self._sp.poll() is None:
-            logging.info('Sending kill signal to spark-submit')
-            self.sp.kill()
+            logging.info('Sending kill signal to 
{}'.format(self._connection['spark_binary']))
+            self._sp.kill()
 
             if self._yarn_application_id:
                 logging.info('Killing application on YARN')
-                yarn_kill = subprocess.Popen("yarn application -kill 
{0}".format(self._yarn_application_id),
-                                             stdout=subprocess.PIPE,
-                                             stderr=subprocess.PIPE)
+                kill_cmd = "yarn application -kill 
{0}".format(self._yarn_application_id).split()
+                yarn_kill = subprocess.Popen(kill_cmd, stdout=subprocess.PIPE, 
stderr=subprocess.PIPE)
                 logging.info("YARN killed with return code: 
{0}".format(yarn_kill.wait()))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1f1b46dc/tests/contrib/hooks/test_spark_submit_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_spark_submit_hook.py 
b/tests/contrib/hooks/test_spark_submit_hook.py
index 80b5ce0..ef0857b 100644
--- a/tests/contrib/hooks/test_spark_submit_hook.py
+++ b/tests/contrib/hooks/test_spark_submit_hook.py
@@ -134,7 +134,7 @@ class TestSparkSubmitHook(unittest.TestCase):
         self.assertEquals(expected_build_cmd, cmd)
 
     @patch('subprocess.Popen')
-    def test_SparkProcess_runcmd(self, mock_popen):
+    def test_spark_process_runcmd(self, mock_popen):
         # Given
         mock_popen.return_value.stdout = StringIO(u'stdout')
         mock_popen.return_value.stderr = StringIO(u'stderr')
@@ -158,7 +158,7 @@ class TestSparkSubmitHook(unittest.TestCase):
 
         # Then
         dict_cmd = self.cmd_args_to_dict(cmd)
-        expected_spark_connection = {"master": u"yarn",
+        expected_spark_connection = {"master": "yarn",
                                      "spark_binary": "spark-submit",
                                      "deploy_mode": None,
                                      "queue": None,
@@ -176,10 +176,10 @@ class TestSparkSubmitHook(unittest.TestCase):
 
         # Then
         dict_cmd = self.cmd_args_to_dict(cmd)
-        expected_spark_connection = {"master": u"yarn",
+        expected_spark_connection = {"master": "yarn",
                                      "spark_binary": "spark-submit",
                                      "deploy_mode": None,
-                                     "queue": u"root.default",
+                                     "queue": "root.default",
                                      "spark_home": None}
         self.assertEqual(connection, expected_spark_connection)
         self.assertEqual(dict_cmd["--master"], "yarn")
@@ -195,7 +195,7 @@ class TestSparkSubmitHook(unittest.TestCase):
 
         # Then
         dict_cmd = self.cmd_args_to_dict(cmd)
-        expected_spark_connection = {"master": u"mesos://host:5050",
+        expected_spark_connection = {"master": "mesos://host:5050",
                                      "spark_binary": "spark-submit",
                                      "deploy_mode": None,
                                      "queue": None,
@@ -213,10 +213,10 @@ class TestSparkSubmitHook(unittest.TestCase):
 
         # Then
         dict_cmd = self.cmd_args_to_dict(cmd)
-        expected_spark_connection = {"master": u"yarn://yarn-master",
+        expected_spark_connection = {"master": "yarn://yarn-master",
                                      "spark_binary": "spark-submit",
-                                     "deploy_mode": u"cluster",
-                                     "queue": u"root.etl",
+                                     "deploy_mode": "cluster",
+                                     "queue": "root.etl",
                                      "spark_home": None}
         self.assertEqual(connection, expected_spark_connection)
         self.assertEqual(dict_cmd["--master"], "yarn://yarn-master")
@@ -232,11 +232,11 @@ class TestSparkSubmitHook(unittest.TestCase):
         cmd = hook._build_command(self._spark_job_file)
 
         # Then
-        expected_spark_connection = {"master": u"yarn://yarn-master",
+        expected_spark_connection = {"master": "yarn://yarn-master",
                                      "spark_binary": "spark-submit",
                                      "deploy_mode": None,
                                      "queue": None,
-                                     "spark_home": u"/opt/myspark"}
+                                     "spark_home": "/opt/myspark"}
         self.assertEqual(connection, expected_spark_connection)
         self.assertEqual(cmd[0], '/opt/myspark/bin/spark-submit')
 
@@ -249,7 +249,7 @@ class TestSparkSubmitHook(unittest.TestCase):
         cmd = hook._build_command(self._spark_job_file)
 
         # Then
-        expected_spark_connection = {"master": u"yarn://yarn-master",
+        expected_spark_connection = {"master": "yarn://yarn-master",
                                      "spark_binary": "spark-submit",
                                      "deploy_mode": None,
                                      "queue": None,
@@ -266,8 +266,8 @@ class TestSparkSubmitHook(unittest.TestCase):
         cmd = hook._build_command(self._spark_job_file)
 
         # Then
-        expected_spark_connection = {"master": u"yarn",
-                                     "spark_binary": u"custom-spark-submit",
+        expected_spark_connection = {"master": "yarn",
+                                     "spark_binary": "custom-spark-submit",
                                      "deploy_mode": None,
                                      "queue": None,
                                      "spark_home": None}
@@ -283,11 +283,11 @@ class TestSparkSubmitHook(unittest.TestCase):
         cmd = hook._build_command(self._spark_job_file)
 
         # Then
-        expected_spark_connection = {"master": u"yarn",
-                                     "spark_binary": u"custom-spark-submit",
+        expected_spark_connection = {"master": "yarn",
+                                     "spark_binary": "custom-spark-submit",
                                      "deploy_mode": None,
                                      "queue": None,
-                                     "spark_home": u"/path/to/spark_home"}
+                                     "spark_home": "/path/to/spark_home"}
         self.assertEqual(connection, expected_spark_connection)
         self.assertEqual(cmd[0], '/path/to/spark_home/bin/custom-spark-submit')
 
@@ -308,6 +308,31 @@ class TestSparkSubmitHook(unittest.TestCase):
 
         self.assertEqual(hook._yarn_application_id, 
'application_1486558679801_1820')
 
+    @patch('subprocess.Popen')
+    def test_spark_process_on_kill(self, mock_popen):
+        # Given
+        mock_popen.return_value.stdout = StringIO(u'stdout')
+        mock_popen.return_value.stderr = StringIO(u'stderr')
+        mock_popen.return_value.returncode = 0
+        mock_popen.return_value.poll.return_value = None
+        mock_popen.return_value.communicate.return_value = 
[StringIO(u'stderr\nstderr'), StringIO(u'stderr\nstderr')]
+        log_lines = [
+            'SPARK_MAJOR_VERSION is set to 2, using Spark2',
+            'WARN NativeCodeLoader: Unable to load native-hadoop library for 
your platform... using builtin-java classes where applicable',
+            'WARN DomainSocketFactory: The short-circuit local reads feature 
cannot be used because libhadoop cannot be loaded.',
+            'INFO Client: Requesting a new application from cluster with 10 
NodeManagerapplication_1486558679801_1820s',
+            'INFO Client: Submitting application 
application_1486558679801_1820 to ResourceManager'
+        ]
+        hook = SparkSubmitHook(conn_id='spark_yarn_cluster')
+        hook._process_log(log_lines)
+        hook.submit()
+
+        # When
+        hook.on_kill()
+
+        # Then
+        self.assertIn(call(['yarn', 'application', '-kill', 
'application_1486558679801_1820'], stderr=-1, stdout=-1), mock_popen.mock_calls)
+
 
 if __name__ == '__main__':
     unittest.main()

Reply via email to