This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-3-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit b6a2cd1aa34f69a36ea127e4f7f5ba87f4aca420 Author: Ash Berlin-Taylor <[email protected]> AuthorDate: Thu Aug 11 17:03:55 2022 +0100 Configurable umask to all deamonized processes. (#25664) We previously had this for just the `celery worker` subcommand, this PR extends it to anything that can run in daemon mode (cherry picked from commit bf14d14e272d24ee7a3f798c242593078359383e) --- airflow/cli/cli_parser.py | 1 - airflow/cli/commands/celery_command.py | 3 +++ airflow/cli/commands/dag_processor_command.py | 2 ++ airflow/cli/commands/kerberos_command.py | 1 + airflow/cli/commands/scheduler_command.py | 1 + airflow/cli/commands/triggerer_command.py | 1 + airflow/cli/commands/webserver_command.py | 1 + airflow/config_templates/config.yml | 22 +++++++++++++--------- airflow/config_templates/default_airflow.cfg | 13 ++++++++----- airflow/configuration.py | 10 ++++++++++ airflow/settings.py | 2 ++ tests/cli/commands/test_celery_command.py | 1 + tests/cli/commands/test_kerberos_command.py | 1 + 13 files changed, 44 insertions(+), 15 deletions(-) diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py index d494aa0f06..ae71d2007a 100644 --- a/airflow/cli/cli_parser.py +++ b/airflow/cli/cli_parser.py @@ -667,7 +667,6 @@ ARG_CELERY_HOSTNAME = Arg( ARG_UMASK = Arg( ("-u", "--umask"), help="Set the umask of celery worker in daemon mode", - default=conf.get('celery', 'worker_umask'), ) ARG_WITHOUT_MINGLE = Arg( ("--without-mingle",), diff --git a/airflow/cli/commands/celery_command.py b/airflow/cli/commands/celery_command.py index affe032411..edb61c1b9d 100644 --- a/airflow/cli/commands/celery_command.py +++ b/airflow/cli/commands/celery_command.py @@ -72,6 +72,7 @@ def flower(args): pidfile=TimeoutPIDLockFile(pidfile, -1), stdout=stdout, stderr=stderr, + umask=int(settings.DAEMON_UMASK, 8), ) with ctx: celery_app.start(options) @@ -180,6 +181,8 @@ def worker(args): with open(stdout, 'w+') as stdout_handle, open(stderr, 'w+') as stderr_handle: if args.umask: umask = args.umask + else: + umask = conf.get('celery', 'worker_umask', fallback=settings.DAEMON_UMASK) ctx = daemon.DaemonContext( files_preserve=[handle], diff --git a/airflow/cli/commands/dag_processor_command.py b/airflow/cli/commands/dag_processor_command.py index d57e26510c..e6a27072cd 100644 --- a/airflow/cli/commands/dag_processor_command.py +++ b/airflow/cli/commands/dag_processor_command.py @@ -22,6 +22,7 @@ from datetime import timedelta import daemon from daemon.pidfile import TimeoutPIDLockFile +from airflow import settings from airflow.configuration import conf from airflow.dag_processing.manager import DagFileProcessorManager from airflow.utils import cli as cli_utils @@ -66,6 +67,7 @@ def dag_processor(args): files_preserve=[handle], stdout=stdout_handle, stderr=stderr_handle, + umask=int(settings.DAEMON_UMASK, 8), ) with ctx: try: diff --git a/airflow/cli/commands/kerberos_command.py b/airflow/cli/commands/kerberos_command.py index fea8743499..51acdf62df 100644 --- a/airflow/cli/commands/kerberos_command.py +++ b/airflow/cli/commands/kerberos_command.py @@ -39,6 +39,7 @@ def kerberos(args): pidfile=TimeoutPIDLockFile(pid, -1), stdout=stdout_handle, stderr=stderr_handle, + umask=int(settings.DAEMON_UMASK, 8), ) with ctx: diff --git a/airflow/cli/commands/scheduler_command.py b/airflow/cli/commands/scheduler_command.py index bc6e983ee5..ab24dd21c7 100644 --- a/airflow/cli/commands/scheduler_command.py +++ b/airflow/cli/commands/scheduler_command.py @@ -65,6 +65,7 @@ def scheduler(args): files_preserve=[handle], stdout=stdout_handle, stderr=stderr_handle, + umask=int(settings.DAEMON_UMASK, 8), ) with ctx: _run_scheduler_job(args=args) diff --git a/airflow/cli/commands/triggerer_command.py b/airflow/cli/commands/triggerer_command.py index 82e7fde129..70f860034d 100644 --- a/airflow/cli/commands/triggerer_command.py +++ b/airflow/cli/commands/triggerer_command.py @@ -45,6 +45,7 @@ def triggerer(args): files_preserve=[handle], stdout=stdout_handle, stderr=stderr_handle, + umask=int(settings.DAEMON_UMASK, 8), ) with ctx: job.run() diff --git a/airflow/cli/commands/webserver_command.py b/airflow/cli/commands/webserver_command.py index c74513d23e..f1eeb8000f 100644 --- a/airflow/cli/commands/webserver_command.py +++ b/airflow/cli/commands/webserver_command.py @@ -455,6 +455,7 @@ def webserver(args): files_preserve=[handle], stdout=stdout, stderr=stderr, + umask=int(settings.DAEMON_UMASK, 8), ) with ctx: subprocess.Popen(run_args, close_fds=True) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 4ea090bcd9..9a97790ab7 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -366,6 +366,19 @@ example: ~ default: "1024" + - name: daemon_umask + description: | + The default umask to use for process when run in daemon mode (scheduler, worker, etc.) + + This controls the file-creation mode mask which determines the initial value of file permission bits + for newly created files. + + This value is treated as an octal-integer. + version_added: 2.3.4 + type: string + default: "0o077" + example: ~ + - name: database description: ~ options: @@ -1643,15 +1656,6 @@ type: boolean example: ~ default: "true" - - name: worker_umask - description: | - Umask that will be used when starting workers with the ``airflow celery worker`` - in daemon mode. This control the file-creation mode mask which determines the initial - value of file permission bits for newly created files. - version_added: 2.0.0 - type: string - example: ~ - default: "0o077" - name: broker_url description: | The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 86f9cf93fc..16f8d8a0d9 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -208,6 +208,14 @@ default_pool_task_slot_count = 128 # mapped tasks from clogging the scheduler. max_map_length = 1024 +# The default umask to use for process when run in daemon mode (scheduler, worker, etc.) +# +# This controls the file-creation mode mask which determines the initial value of file permission bits +# for newly created files. +# +# This value is treated as an octal-integer. +daemon_umask = 0o077 + [database] # The SqlAlchemy connection string to the metadata database. # SqlAlchemy supports many different database engines. @@ -829,11 +837,6 @@ worker_prefetch_multiplier = 1 # prevent this by setting this to false. However, with this disabled Flower won't work. worker_enable_remote_control = true -# Umask that will be used when starting workers with the ``airflow celery worker`` -# in daemon mode. This control the file-creation mode mask which determines the initial -# value of file permission bits for newly created files. -worker_umask = 0o077 - # The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally # a sqlalchemy database. Refer to the Celery documentation for more information. broker_url = redis://redis:6379/0 diff --git a/airflow/configuration.py b/airflow/configuration.py index 4f2caa186e..20dd3c13d9 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -517,7 +517,17 @@ class AirflowConfigParser(ConfigParser): raise ValueError(f"The value {section}/{key} should be set!") return value + @overload # type: ignore[override] + def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str: # type: ignore[override] + + ... + + @overload # type: ignore[override] def get(self, section: str, key: str, **kwargs) -> Optional[str]: # type: ignore[override] + + ... + + def get(self, section: str, key: str, **kwargs) -> Optional[str]: # type: ignore[override, misc] section = str(section).lower() key = str(key).lower() diff --git a/airflow/settings.py b/airflow/settings.py index 374960ab3e..be05589f94 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -640,3 +640,5 @@ DASHBOARD_UIALERTS: List["UIAlert"] = [] # Prefix used to identify tables holding data moved during migration. AIRFLOW_MOVED_TABLE_PREFIX = "_airflow_moved" + +DAEMON_UMASK: str = conf.get('core', 'daemon_umask', fallback='0o077') diff --git a/tests/cli/commands/test_celery_command.py b/tests/cli/commands/test_celery_command.py index 14a6f52e9d..2bc9796cdd 100644 --- a/tests/cli/commands/test_celery_command.py +++ b/tests/cli/commands/test_celery_command.py @@ -354,6 +354,7 @@ class TestFlowerCommand(unittest.TestCase): pidfile=mock_pid_file.return_value, stderr=mock_open.return_value, stdout=mock_open.return_value, + umask=0o077, ), mock.call.DaemonContext().__enter__(), mock.call.DaemonContext().__exit__(None, None, None), diff --git a/tests/cli/commands/test_kerberos_command.py b/tests/cli/commands/test_kerberos_command.py index 007bd2f41b..855b0515df 100644 --- a/tests/cli/commands/test_kerberos_command.py +++ b/tests/cli/commands/test_kerberos_command.py @@ -75,6 +75,7 @@ class TestKerberosCommand(unittest.TestCase): pidfile=mock_pid_file.return_value, stderr=mock_open.return_value, stdout=mock_open.return_value, + umask=0o077, ), mock.call.DaemonContext().__enter__(), mock.call.DaemonContext().__exit__(None, None, None),
