This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 36c5c111ec Permitting airflow kerberos to run in different modes
(#35146)
36c5c111ec is described below
commit 36c5c111ec00075db30fab7c67ac1b6900e144dc
Author: Amogh Desai <[email protected]>
AuthorDate: Wed Oct 25 19:14:18 2023 +0530
Permitting airflow kerberos to run in different modes (#35146)
---
airflow/cli/cli_config.py | 4 ++++
airflow/cli/commands/kerberos_command.py | 7 ++++++-
airflow/security/kerberos.py | 25 ++++++++++++++++++++---
docs/apache-airflow/security/kerberos.rst | 23 +++++++++++++++++++++
tests/cli/commands/test_kerberos_command.py | 31 +++++++++++++++++++++++++++--
5 files changed, 84 insertions(+), 6 deletions(-)
diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py
index 11d83debb0..f828a63ee1 100644
--- a/airflow/cli/cli_config.py
+++ b/airflow/cli/cli_config.py
@@ -561,6 +561,9 @@ ARG_VAR_ACTION_ON_EXISTING_KEY = Arg(
# kerberos
ARG_PRINCIPAL = Arg(("principal",), help="kerberos principal", nargs="?")
ARG_KEYTAB = Arg(("-k", "--keytab"), help="keytab", nargs="?",
default=conf.get("kerberos", "keytab"))
+ARG_KERBEROS_ONE_TIME_MODE = Arg(
+ ("-o", "--one-time"), help="Run airflow kerberos one time instead of
forever", action="store_true"
+)
# run
ARG_INTERACTIVE = Arg(
("-N", "--interactive"),
@@ -1889,6 +1892,7 @@ core_commands: list[CLICommand] = [
ARG_KEYTAB,
ARG_PID,
ARG_DAEMON,
+ ARG_KERBEROS_ONE_TIME_MODE,
ARG_STDOUT,
ARG_STDERR,
ARG_LOG_FILE,
diff --git a/airflow/cli/commands/kerberos_command.py
b/airflow/cli/commands/kerberos_command.py
index 8d33e7f8ef..dd7bb1aee3 100644
--- a/airflow/cli/commands/kerberos_command.py
+++ b/airflow/cli/commands/kerberos_command.py
@@ -20,6 +20,7 @@ from __future__ import annotations
from airflow import settings
from airflow.cli.commands.daemon_utils import run_command_with_daemon_option
from airflow.security import kerberos as krb
+from airflow.security.kerberos import KerberosMode
from airflow.utils import cli as cli_utils
from airflow.utils.providers_configuration_loader import
providers_configuration_loaded
@@ -30,8 +31,12 @@ def kerberos(args):
"""Start a kerberos ticket renewer."""
print(settings.HEADER)
+ mode = KerberosMode.STANDARD
+ if args.one_time:
+ mode = KerberosMode.ONE_TIME
+
run_command_with_daemon_option(
args=args,
process_name="kerberos",
- callback=lambda: krb.run(principal=args.principal, keytab=args.keytab),
+ callback=lambda: krb.run(principal=args.principal, keytab=args.keytab,
mode=mode),
)
diff --git a/airflow/security/kerberos.py b/airflow/security/kerberos.py
index 69afc5d793..2729a07f1a 100644
--- a/airflow/security/kerberos.py
+++ b/airflow/security/kerberos.py
@@ -17,6 +17,8 @@
# under the License.
from __future__ import annotations
+from enum import Enum
+
# Licensed to Cloudera, Inc. under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -47,6 +49,17 @@ NEED_KRB181_WORKAROUND: bool | None = None
log = logging.getLogger(__name__)
+class KerberosMode(Enum):
+ """
+ Defines modes for running airflow kerberos.
+
+ :return: None.
+ """
+
+ STANDARD = "standard"
+ ONE_TIME = "one-time"
+
+
def get_kerberos_principle(principal: str | None) -> str:
"""Retrieve Kerberos principal. Fallback to principal from Airflow
configuration if not provided."""
return principal or conf.get_mandatory_value("kerberos",
"principal").replace("_HOST", get_hostname())
@@ -176,18 +189,24 @@ def detect_conf_var() -> bool:
return b"X-CACHECONF:" in file.read()
-def run(principal: str | None, keytab: str):
+def run(principal: str | None, keytab: str, mode: KerberosMode =
KerberosMode.STANDARD):
"""
Run the kerberos renewer.
:param principal: principal name
:param keytab: keytab file
+ :param mode: mode to run the airflow kerberos in
:return: None
"""
if not keytab:
log.warning("Keytab renewer not starting, no keytab configured")
sys.exit(0)
- while True:
+ log.info("Using airflow kerberos with mode: %s", mode.value)
+
+ if mode == KerberosMode.STANDARD:
+ while True:
+ renew_from_kt(principal, keytab)
+ time.sleep(conf.getint("kerberos", "reinit_frequency"))
+ elif mode == KerberosMode.ONE_TIME:
renew_from_kt(principal, keytab)
- time.sleep(conf.getint("kerberos", "reinit_frequency"))
diff --git a/docs/apache-airflow/security/kerberos.rst
b/docs/apache-airflow/security/kerberos.rst
index 345a8c8484..c51997d752 100644
--- a/docs/apache-airflow/security/kerberos.rst
+++ b/docs/apache-airflow/security/kerberos.rst
@@ -100,6 +100,29 @@ Launch the ticket renewer by
# run ticket renewer
airflow kerberos
+To support more advanced deployment models for using kerberos in standard or
one-time fashion,
+you can specify the mode while running the ``airflow kerberos`` by using the
``--one-time`` flag.
+
+a) standard: The airflow kerberos command will run endlessly. The ticket
renewer process runs continuously every few seconds
+and refreshes the ticket if it has expired.
+b) one-time: The airflow kerberos will run once and exit. In case of failure
the main task won't spin up.
+
+The default mode is standard.
+
+Example usages:
+
+For standard mode:
+
+.. code-block:: bash
+
+ airflow kerberos
+
+For one time mode:
+
+.. code-block:: bash
+
+ airflow kerberos --one-time
+
Hadoop
^^^^^^
diff --git a/tests/cli/commands/test_kerberos_command.py
b/tests/cli/commands/test_kerberos_command.py
index 14eb1676bd..8835d9d287 100644
--- a/tests/cli/commands/test_kerberos_command.py
+++ b/tests/cli/commands/test_kerberos_command.py
@@ -20,6 +20,7 @@ from unittest import mock
from airflow.cli import cli_parser
from airflow.cli.commands import kerberos_command
+from airflow.security.kerberos import KerberosMode
from tests.test_utils.config import conf_vars
@@ -34,7 +35,9 @@ class TestKerberosCommand:
args = self.parser.parse_args(["kerberos", "PRINCIPAL", "--keytab",
"/tmp/airflow.keytab"])
kerberos_command.kerberos(args)
- mock_krb.run.assert_called_once_with(keytab="/tmp/airflow.keytab",
principal="PRINCIPAL")
+ mock_krb.run.assert_called_once_with(
+ keytab="/tmp/airflow.keytab", principal="PRINCIPAL",
mode=KerberosMode.STANDARD
+ )
@mock.patch("airflow.cli.commands.daemon_utils.TimeoutPIDLockFile")
@mock.patch("airflow.cli.commands.daemon_utils.setup_locations")
@@ -69,7 +72,9 @@ class TestKerberosCommand:
with mock.patch("airflow.cli.commands.daemon_utils.open", mock_open):
kerberos_command.kerberos(args)
- mock_krb.run.assert_called_once_with(keytab="/tmp/airflow.keytab",
principal="PRINCIPAL")
+ mock_krb.run.assert_called_once_with(
+ keytab="/tmp/airflow.keytab", principal="PRINCIPAL",
mode=KerberosMode.STANDARD
+ )
assert mock_daemon.mock_calls[:3] == [
mock.call.DaemonContext(
pidfile=mock_pid_file.return_value,
@@ -100,3 +105,25 @@ class TestKerberosCommand:
mock.call().__exit__(None, None, None),
mock.call().__exit__(None, None, None),
]
+
+ @mock.patch("airflow.cli.commands.kerberos_command.krb")
+ @conf_vars({("core", "executor"): "CeleryExecutor"})
+ def test_run_command_with_mode_standard(self, mock_krb):
+ args = self.parser.parse_args(["kerberos", "PRINCIPAL", "--keytab",
"/tmp/airflow.keytab"])
+
+ kerberos_command.kerberos(args)
+ mock_krb.run.assert_called_once_with(
+ keytab="/tmp/airflow.keytab", principal="PRINCIPAL",
mode=KerberosMode.STANDARD
+ )
+
+ @mock.patch("airflow.cli.commands.kerberos_command.krb")
+ @conf_vars({("core", "executor"): "CeleryExecutor"})
+ def test_run_command_with_mode_one_time(self, mock_krb):
+ args = self.parser.parse_args(
+ ["kerberos", "PRINCIPAL", "--keytab", "/tmp/airflow.keytab",
"--one-time"]
+ )
+
+ kerberos_command.kerberos(args)
+ mock_krb.run.assert_called_once_with(
+ keytab="/tmp/airflow.keytab", principal="PRINCIPAL",
mode=KerberosMode.ONE_TIME
+ )