This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new bf14d14e27 Configurable umask to all deamonized processes. (#25664)
bf14d14e27 is described below
commit bf14d14e272d24ee7a3f798c242593078359383e
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Thu Aug 11 17:03:55 2022 +0100
Configurable umask to all deamonized processes. (#25664)
We previously had this for just the `celery worker` subcommand, this PR
extends it to anything that can run in daemon mode
---
airflow/cli/cli_parser.py | 1 -
airflow/cli/commands/celery_command.py | 3 +++
airflow/cli/commands/dag_processor_command.py | 2 ++
airflow/cli/commands/kerberos_command.py | 1 +
airflow/cli/commands/scheduler_command.py | 1 +
airflow/cli/commands/triggerer_command.py | 1 +
airflow/cli/commands/webserver_command.py | 1 +
airflow/config_templates/config.yml | 22 +++++++++++++---------
airflow/config_templates/default_airflow.cfg | 13 ++++++++-----
airflow/configuration.py | 10 ++++++++++
airflow/settings.py | 2 ++
tests/cli/commands/test_celery_command.py | 1 +
tests/cli/commands/test_kerberos_command.py | 1 +
13 files changed, 44 insertions(+), 15 deletions(-)
diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py
index e4be3e978d..4828a37bfc 100644
--- a/airflow/cli/cli_parser.py
+++ b/airflow/cli/cli_parser.py
@@ -671,7 +671,6 @@ ARG_CELERY_HOSTNAME = Arg(
ARG_UMASK = Arg(
("-u", "--umask"),
help="Set the umask of celery worker in daemon mode",
- default=conf.get('celery', 'worker_umask'),
)
ARG_WITHOUT_MINGLE = Arg(
("--without-mingle",),
diff --git a/airflow/cli/commands/celery_command.py
b/airflow/cli/commands/celery_command.py
index d19490caa7..c2deb16ff4 100644
--- a/airflow/cli/commands/celery_command.py
+++ b/airflow/cli/commands/celery_command.py
@@ -75,6 +75,7 @@ def flower(args):
pidfile=TimeoutPIDLockFile(pidfile, -1),
stdout=stdout,
stderr=stderr,
+ umask=int(settings.DAEMON_UMASK, 8),
)
with ctx:
celery_app.start(options)
@@ -183,6 +184,8 @@ def worker(args):
with open(stdout, 'a') as stdout_handle, open(stderr, 'a') as
stderr_handle:
if args.umask:
umask = args.umask
+ else:
+ umask = conf.get('celery', 'worker_umask',
fallback=settings.DAEMON_UMASK)
stdout_handle.truncate(0)
stderr_handle.truncate(0)
diff --git a/airflow/cli/commands/dag_processor_command.py
b/airflow/cli/commands/dag_processor_command.py
index 4c4e5b46eb..92f7f37ee1 100644
--- a/airflow/cli/commands/dag_processor_command.py
+++ b/airflow/cli/commands/dag_processor_command.py
@@ -22,6 +22,7 @@ from datetime import timedelta
import daemon
from daemon.pidfile import TimeoutPIDLockFile
+from airflow import settings
from airflow.configuration import conf
from airflow.dag_processing.manager import DagFileProcessorManager
from airflow.utils import cli as cli_utils
@@ -69,6 +70,7 @@ def dag_processor(args):
files_preserve=[handle],
stdout=stdout_handle,
stderr=stderr_handle,
+ umask=int(settings.DAEMON_UMASK, 8),
)
with ctx:
try:
diff --git a/airflow/cli/commands/kerberos_command.py
b/airflow/cli/commands/kerberos_command.py
index 3a33363df9..4ccd07092f 100644
--- a/airflow/cli/commands/kerberos_command.py
+++ b/airflow/cli/commands/kerberos_command.py
@@ -42,6 +42,7 @@ def kerberos(args):
pidfile=TimeoutPIDLockFile(pid, -1),
stdout=stdout_handle,
stderr=stderr_handle,
+ umask=int(settings.DAEMON_UMASK, 8),
)
with ctx:
diff --git a/airflow/cli/commands/scheduler_command.py
b/airflow/cli/commands/scheduler_command.py
index b6f35cff9e..22a185794f 100644
--- a/airflow/cli/commands/scheduler_command.py
+++ b/airflow/cli/commands/scheduler_command.py
@@ -74,6 +74,7 @@ def scheduler(args):
files_preserve=[handle],
stdout=stdout_handle,
stderr=stderr_handle,
+ umask=int(settings.DAEMON_UMASK, 8),
)
with ctx:
_run_scheduler_job(args=args)
diff --git a/airflow/cli/commands/triggerer_command.py
b/airflow/cli/commands/triggerer_command.py
index defe476a8f..70fbbda2fc 100644
--- a/airflow/cli/commands/triggerer_command.py
+++ b/airflow/cli/commands/triggerer_command.py
@@ -48,6 +48,7 @@ def triggerer(args):
files_preserve=[handle],
stdout=stdout_handle,
stderr=stderr_handle,
+ umask=int(settings.DAEMON_UMASK, 8),
)
with ctx:
job.run()
diff --git a/airflow/cli/commands/webserver_command.py
b/airflow/cli/commands/webserver_command.py
index da2ed32fc2..37145c0c81 100644
--- a/airflow/cli/commands/webserver_command.py
+++ b/airflow/cli/commands/webserver_command.py
@@ -458,6 +458,7 @@ def webserver(args):
files_preserve=[handle],
stdout=stdout,
stderr=stderr,
+ umask=int(settings.DAEMON_UMASK, 8),
)
with ctx:
subprocess.Popen(run_args, close_fds=True)
diff --git a/airflow/config_templates/config.yml
b/airflow/config_templates/config.yml
index cbb8081371..c3970dc018 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -374,6 +374,19 @@
example: ~
default: "1024"
+ - name: daemon_umask
+ description: |
+ The default umask to use for process when run in daemon mode
(scheduler, worker, etc.)
+
+ This controls the file-creation mode mask which determines the initial
value of file permission bits
+ for newly created files.
+
+ This value is treated as an octal-integer.
+ version_added: 2.3.4
+ type: string
+ default: "0o077"
+ example: ~
+
- name: database
description: ~
options:
@@ -1652,15 +1665,6 @@
type: boolean
example: ~
default: "true"
- - name: worker_umask
- description: |
- Umask that will be used when starting workers with the ``airflow
celery worker``
- in daemon mode. This control the file-creation mode mask which
determines the initial
- value of file permission bits for newly created files.
- version_added: 2.0.0
- type: string
- example: ~
- default: "0o077"
- name: broker_url
description: |
The Celery broker URL. Celery supports RabbitMQ, Redis and
experimentally
diff --git a/airflow/config_templates/default_airflow.cfg
b/airflow/config_templates/default_airflow.cfg
index 34046f4cb4..90fa6e0df3 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -212,6 +212,14 @@ default_pool_task_slot_count = 128
# mapped tasks from clogging the scheduler.
max_map_length = 1024
+# The default umask to use for process when run in daemon mode (scheduler,
worker, etc.)
+#
+# This controls the file-creation mode mask which determines the initial value
of file permission bits
+# for newly created files.
+#
+# This value is treated as an octal-integer.
+daemon_umask = 0o077
+
[database]
# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engines.
@@ -834,11 +842,6 @@ worker_prefetch_multiplier = 1
# prevent this by setting this to false. However, with this disabled Flower
won't work.
worker_enable_remote_control = true
-# Umask that will be used when starting workers with the ``airflow celery
worker``
-# in daemon mode. This control the file-creation mode mask which determines
the initial
-# value of file permission bits for newly created files.
-worker_umask = 0o077
-
# The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
# a sqlalchemy database. Refer to the Celery documentation for more
information.
broker_url = redis://redis:6379/0
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 5e84de6ea4..61d2260e5c 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -522,7 +522,17 @@ class AirflowConfigParser(ConfigParser):
raise ValueError(f"The value {section}/{key} should be set!")
return value
+ @overload # type: ignore[override]
+ def get(self, section: str, key: str, fallback: str = ..., **kwargs) ->
str: # type: ignore[override]
+
+ ...
+
+ @overload # type: ignore[override]
def get(self, section: str, key: str, **kwargs) -> Optional[str]: # type:
ignore[override]
+
+ ...
+
+ def get(self, section: str, key: str, **kwargs) -> Optional[str]: # type:
ignore[override, misc]
section = str(section).lower()
key = str(key).lower()
diff --git a/airflow/settings.py b/airflow/settings.py
index bea68ec8cf..beaed3b459 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -640,3 +640,5 @@ DASHBOARD_UIALERTS: List["UIAlert"] = []
# Prefix used to identify tables holding data moved during migration.
AIRFLOW_MOVED_TABLE_PREFIX = "_airflow_moved"
+
+DAEMON_UMASK: str = conf.get('core', 'daemon_umask', fallback='0o077')
diff --git a/tests/cli/commands/test_celery_command.py
b/tests/cli/commands/test_celery_command.py
index 304d7a7c70..cf430903e3 100644
--- a/tests/cli/commands/test_celery_command.py
+++ b/tests/cli/commands/test_celery_command.py
@@ -354,6 +354,7 @@ class TestFlowerCommand(unittest.TestCase):
pidfile=mock_pid_file.return_value,
stderr=mock_open.return_value,
stdout=mock_open.return_value,
+ umask=0o077,
),
mock.call.DaemonContext().__enter__(),
mock.call.DaemonContext().__exit__(None, None, None),
diff --git a/tests/cli/commands/test_kerberos_command.py
b/tests/cli/commands/test_kerberos_command.py
index 63b14cd9a8..b03cdfd20c 100644
--- a/tests/cli/commands/test_kerberos_command.py
+++ b/tests/cli/commands/test_kerberos_command.py
@@ -75,6 +75,7 @@ class TestKerberosCommand(unittest.TestCase):
pidfile=mock_pid_file.return_value,
stderr=mock_open.return_value,
stdout=mock_open.return_value,
+ umask=0o077,
),
mock.call.DaemonContext().__enter__(),
mock.call.DaemonContext().__exit__(None, None, None),