This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f9eb55f9e0 Reload configuration for standalone dag file processor
(#35725)
f9eb55f9e0 is described below
commit f9eb55f9e0c6b8441233b2672e6c2669597dd02c
Author: Jarek Potiuk <[email protected]>
AuthorDate: Sat Nov 18 16:23:24 2023 +0100
Reload configuration for standalone dag file processor (#35725)
DAG file processor has special configuration that is triggered
by CONFIG_PROCESSOR_MANAGER_LOGGER variable.
The variable is set temporarily when DAG file processor is started
as scheduler's subprocess and settings are reloaded because the
settings are generally loaded during importing of airflow package,
so we need to reload the settings with CONFIG_PROCESSOR_MANAGER_LOGGER
set. This did not happen when standalone dag file processor started
which led to standalone DAG file processor not producing
logs in "dag_processor_manager" folder..
---
airflow/cli/commands/dag_processor_command.py | 4 +--
airflow/dag_processing/manager.py | 46 ++++++++++++++-------------
2 files changed, 26 insertions(+), 24 deletions(-)
diff --git a/airflow/cli/commands/dag_processor_command.py
b/airflow/cli/commands/dag_processor_command.py
index 85bef2727d..592be93f3f 100644
--- a/airflow/cli/commands/dag_processor_command.py
+++ b/airflow/cli/commands/dag_processor_command.py
@@ -23,7 +23,7 @@ from typing import Any
from airflow.cli.commands.daemon_utils import run_command_with_daemon_option
from airflow.configuration import conf
-from airflow.dag_processing.manager import DagFileProcessorManager
+from airflow.dag_processing.manager import DagFileProcessorManager,
reload_configuration_for_dag_processing
from airflow.jobs.dag_processor_job_runner import DagProcessorJobRunner
from airflow.jobs.job import Job, run_job
from airflow.utils import cli as cli_utils
@@ -36,7 +36,6 @@ def _create_dag_processor_job_runner(args: Any) ->
DagProcessorJobRunner:
"""Create DagFileProcessorProcess instance."""
processor_timeout_seconds: int = conf.getint("core",
"dag_file_processor_timeout")
processor_timeout = timedelta(seconds=processor_timeout_seconds)
-
return DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
@@ -62,6 +61,7 @@ def dag_processor(args):
job_runner = _create_dag_processor_job_runner(args)
+ reload_configuration_for_dag_processing()
run_command_with_daemon_option(
args=args,
process_name="dag-processor",
diff --git a/airflow/dag_processing/manager.py
b/airflow/dag_processing/manager.py
index ddf5e93f89..909425e569 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -227,28 +227,7 @@ class DagFileProcessorAgent(LoggingMixin,
MultiprocessingStartMethodMixin):
set_new_process_group()
setproctitle("airflow scheduler -- DagFileProcessorManager")
- # Reload configurations and settings to avoid collision with parent
process.
- # Because this process may need custom configurations that cannot be
shared,
- # e.g. RotatingFileHandler. And it can cause connection corruption if
we
- # do not recreate the SQLA connection pool.
- os.environ["CONFIG_PROCESSOR_MANAGER_LOGGER"] = "True"
- os.environ["AIRFLOW__LOGGING__COLORED_CONSOLE_LOG"] = "False"
- # Replicating the behavior of how logging module was loaded
- # in logging_config.py
-
- # TODO: This reloading should be removed when we fix our logging
behaviour
- # In case of "spawn" method of starting processes for multiprocessing,
reinitializing of the
- # SQLAlchemy engine causes extremely unexpected behaviour of messing
with objects already loaded
- # in a parent process (likely via resources shared in memory by the
ORM libraries).
- # This caused flaky tests in our CI for many months and has been
discovered while
- # iterating on https://github.com/apache/airflow/pull/19860
- # The issue that describes the problem and possible remediation is
- # at https://github.com/apache/airflow/issues/19934
-
-
importlib.reload(import_module(airflow.settings.LOGGING_CLASS_PATH.rsplit(".",
1)[0])) # type: ignore
- importlib.reload(airflow.settings)
- airflow.settings.initialize()
- del os.environ["CONFIG_PROCESSOR_MANAGER_LOGGER"]
+ reload_configuration_for_dag_processing()
processor_manager = DagFileProcessorManager(
dag_directory=dag_directory,
max_runs=max_runs,
@@ -1292,3 +1271,26 @@ class DagFileProcessorManager(LoggingMixin):
@property
def file_paths(self):
return self._file_paths
+
+
+def reload_configuration_for_dag_processing():
+ # Reload configurations and settings to avoid collision with parent
process.
+ # Because this process may need custom configurations that cannot be
shared,
+ # e.g. RotatingFileHandler. And it can cause connection corruption if we
+ # do not recreate the SQLA connection pool.
+ os.environ["CONFIG_PROCESSOR_MANAGER_LOGGER"] = "True"
+ os.environ["AIRFLOW__LOGGING__COLORED_CONSOLE_LOG"] = "False"
+ # Replicating the behavior of how logging module was loaded
+ # in logging_config.py
+ # TODO: This reloading should be removed when we fix our logging behaviour
+ # In case of "spawn" method of starting processes for multiprocessing,
reinitializing of the
+ # SQLAlchemy engine causes extremely unexpected behaviour of messing with
objects already loaded
+ # in a parent process (likely via resources shared in memory by the ORM
libraries).
+ # This caused flaky tests in our CI for many months and has been
discovered while
+ # iterating on https://github.com/apache/airflow/pull/19860
+ # The issue that describes the problem and possible remediation is
+ # at https://github.com/apache/airflow/issues/19934
+
importlib.reload(import_module(airflow.settings.LOGGING_CLASS_PATH.rsplit(".",
1)[0])) # type: ignore
+ importlib.reload(airflow.settings)
+ airflow.settings.initialize()
+ del os.environ["CONFIG_PROCESSOR_MANAGER_LOGGER"]