This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 13a827d  Ensure Kerberos token is valid in SparkSubmitOperator before 
running `yarn kill` (#9044)
13a827d is described below

commit 13a827d80fef738e25f30ea20c095ad4dbd401f6
Author: Aneesh Joseph <[email protected]>
AuthorDate: Thu Jul 9 15:09:16 2020 +0530

    Ensure Kerberos token is valid in SparkSubmitOperator before running `yarn 
kill` (#9044)
    
    do a kinit before yarn kill if keytab and principal is provided
---
 .../providers/apache/spark/hooks/spark_submit.py   | 16 ++++++++++++---
 airflow/security/kerberos.py                       | 16 +++++++++++----
 .../apache/spark/hooks/test_spark_submit.py        | 23 ++++++++++++++++++++--
 3 files changed, 46 insertions(+), 9 deletions(-)

diff --git a/airflow/providers/apache/spark/hooks/spark_submit.py 
b/airflow/providers/apache/spark/hooks/spark_submit.py
index d94be2c..f7cf839 100644
--- a/airflow/providers/apache/spark/hooks/spark_submit.py
+++ b/airflow/providers/apache/spark/hooks/spark_submit.py
@@ -21,8 +21,10 @@ import re
 import subprocess
 import time
 
+from airflow.configuration import conf as airflow_conf
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
+from airflow.security.kerberos import renew_from_kt
 from airflow.utils.log.logging_mixin import LoggingMixin
 
 try:
@@ -617,15 +619,23 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
             self._submit_sp.kill()
 
             if self._yarn_application_id:
-                self.log.info('Killing application %s on YARN', 
self._yarn_application_id)
-
                 kill_cmd = "yarn application -kill {}" \
                     .format(self._yarn_application_id).split()
+                env = None
+                if self._keytab is not None and self._principal is not None:
+                    # we are ignoring renewal failures from renew_from_kt
+                    # here as the failure could just be due to a non-renewable 
ticket,
+                    # we still attempt to kill the yarn application
+                    renew_from_kt(self._principal, self._keytab, 
exit_on_fail=False)
+                    env = os.environ.copy()
+                    env["KRB5CCNAME"] = airflow_conf.get('kerberos', 'ccache')
+
                 yarn_kill = subprocess.Popen(kill_cmd,
+                                             env=env,
                                              stdout=subprocess.PIPE,
                                              stderr=subprocess.PIPE)
 
-                self.log.info("YARN killed with return code: %s", 
yarn_kill.wait())
+                self.log.info("YARN app killed with return code: %s", 
yarn_kill.wait())
 
             if self._kubernetes_driver_pod:
                 self.log.info('Killing pod %s on Kubernetes', 
self._kubernetes_driver_pod)
diff --git a/airflow/security/kerberos.py b/airflow/security/kerberos.py
index 2886b61..72aff08 100644
--- a/airflow/security/kerberos.py
+++ b/airflow/security/kerberos.py
@@ -46,7 +46,7 @@ NEED_KRB181_WORKAROUND = None  # type: Optional[bool]
 log = logging.getLogger(__name__)
 
 
-def renew_from_kt(principal: str, keytab: str):
+def renew_from_kt(principal: str, keytab: str, exit_on_fail: bool = True):
     """
     Renew kerberos token from keytab
 
@@ -86,7 +86,10 @@ def renew_from_kt(principal: str, keytab: str):
             "\n".join(subp.stdout.readlines() if subp.stdout else []),
             "\n".join(subp.stderr.readlines() if subp.stderr else [])
         )
-        sys.exit(subp.returncode)
+        if exit_on_fail:
+            sys.exit(subp.returncode)
+        else:
+            return subp.returncode
 
     global NEED_KRB181_WORKAROUND  # pylint: disable=global-statement
     if NEED_KRB181_WORKAROUND is None:
@@ -95,7 +98,12 @@ def renew_from_kt(principal: str, keytab: str):
         # (From: HUE-640). Kerberos clock have seconds level granularity. Make 
sure we
         # renew the ticket after the initial valid time.
         time.sleep(1.5)
-        perform_krb181_workaround(principal)
+        ret = perform_krb181_workaround(principal)
+        if exit_on_fail and ret != 0:
+            sys.exit(ret)
+        else:
+            return ret
+    return 0
 
 
 def perform_krb181_workaround(principal: str):
@@ -127,7 +135,7 @@ def perform_krb181_workaround(principal: str):
             "configuration, and the ticket renewal policy (maxrenewlife) for 
the '%s' and `krbtgt' "
             "principals.", princ, ccache, princ
         )
-        sys.exit(ret)
+    return ret
 
 
 def detect_conf_var() -> bool:
diff --git a/tests/providers/apache/spark/hooks/test_spark_submit.py 
b/tests/providers/apache/spark/hooks/test_spark_submit.py
index df22a61..570e54c 100644
--- a/tests/providers/apache/spark/hooks/test_spark_submit.py
+++ b/tests/providers/apache/spark/hooks/test_spark_submit.py
@@ -17,6 +17,7 @@
 # under the License.
 
 import io
+import os
 import unittest
 from unittest.mock import call, patch
 
@@ -651,8 +652,9 @@ class TestSparkSubmitHook(unittest.TestCase):
 
         self.assertEqual(hook._driver_status, 'RUNNING')
 
+    @patch('airflow.providers.apache.spark.hooks.spark_submit.renew_from_kt')
     
@patch('airflow.providers.apache.spark.hooks.spark_submit.subprocess.Popen')
-    def test_yarn_process_on_kill(self, mock_popen):
+    def test_yarn_process_on_kill(self, mock_popen, mock_renew_from_kt):
         # Given
         mock_popen.return_value.stdout = io.StringIO('stdout')
         mock_popen.return_value.stderr = io.StringIO('stderr')
@@ -679,7 +681,24 @@ class TestSparkSubmitHook(unittest.TestCase):
         # Then
         self.assertIn(call(['yarn', 'application', '-kill',
                             'application_1486558679801_1820'],
-                           stderr=-1, stdout=-1),
+                      env=None, stderr=-1, stdout=-1),
+                      mock_popen.mock_calls)
+        # resetting the mock to test  kill with keytab & principal
+        mock_popen.reset_mock()
+        # Given
+        hook = SparkSubmitHook(conn_id='spark_yarn_cluster', 
keytab='privileged_user.keytab',
+                               principal='user/[email protected]')
+        hook._process_spark_submit_log(log_lines)
+        hook.submit()
+
+        # When
+        hook.on_kill()
+        # Then
+        expected_env = os.environ.copy()
+        expected_env["KRB5CCNAME"] = '/tmp/airflow_krb5_ccache'
+        self.assertIn(call(['yarn', 'application', '-kill',
+                            'application_1486558679801_1820'],
+                      env=expected_env, stderr=-1, stdout=-1),
                       mock_popen.mock_calls)
 
     def test_standalone_cluster_process_on_kill(self):

Reply via email to