This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f9eb55f9e0 Reload configuration for standalone dag file processor 
(#35725)
f9eb55f9e0 is described below

commit f9eb55f9e0c6b8441233b2672e6c2669597dd02c
Author: Jarek Potiuk <[email protected]>
AuthorDate: Sat Nov 18 16:23:24 2023 +0100

    Reload configuration for standalone dag file processor (#35725)
    
    DAG file processor has special configuration that is triggered
    by CONFIG_PROCESSOR_MANAGER_LOGGER variable.
    
    The variable is set temporarily when DAG file processor is started
    as scheduler's subprocess and settings are reloaded because the
    settings are generally loaded during importing of airflow package,
    so we need to reload the settings with CONFIG_PROCESSOR_MANAGER_LOGGER
    set. This did not happen when standalone dag file processor started
    which led to standalone DAG file processor not producing
    logs in "dag_processor_manager" folder..
---
 airflow/cli/commands/dag_processor_command.py |  4 +--
 airflow/dag_processing/manager.py             | 46 ++++++++++++++-------------
 2 files changed, 26 insertions(+), 24 deletions(-)

diff --git a/airflow/cli/commands/dag_processor_command.py 
b/airflow/cli/commands/dag_processor_command.py
index 85bef2727d..592be93f3f 100644
--- a/airflow/cli/commands/dag_processor_command.py
+++ b/airflow/cli/commands/dag_processor_command.py
@@ -23,7 +23,7 @@ from typing import Any
 
 from airflow.cli.commands.daemon_utils import run_command_with_daemon_option
 from airflow.configuration import conf
-from airflow.dag_processing.manager import DagFileProcessorManager
+from airflow.dag_processing.manager import DagFileProcessorManager, 
reload_configuration_for_dag_processing
 from airflow.jobs.dag_processor_job_runner import DagProcessorJobRunner
 from airflow.jobs.job import Job, run_job
 from airflow.utils import cli as cli_utils
@@ -36,7 +36,6 @@ def _create_dag_processor_job_runner(args: Any) -> 
DagProcessorJobRunner:
     """Create DagFileProcessorProcess instance."""
     processor_timeout_seconds: int = conf.getint("core", 
"dag_file_processor_timeout")
     processor_timeout = timedelta(seconds=processor_timeout_seconds)
-
     return DagProcessorJobRunner(
         job=Job(),
         processor=DagFileProcessorManager(
@@ -62,6 +61,7 @@ def dag_processor(args):
 
     job_runner = _create_dag_processor_job_runner(args)
 
+    reload_configuration_for_dag_processing()
     run_command_with_daemon_option(
         args=args,
         process_name="dag-processor",
diff --git a/airflow/dag_processing/manager.py 
b/airflow/dag_processing/manager.py
index ddf5e93f89..909425e569 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -227,28 +227,7 @@ class DagFileProcessorAgent(LoggingMixin, 
MultiprocessingStartMethodMixin):
         set_new_process_group()
 
         setproctitle("airflow scheduler -- DagFileProcessorManager")
-        # Reload configurations and settings to avoid collision with parent 
process.
-        # Because this process may need custom configurations that cannot be 
shared,
-        # e.g. RotatingFileHandler. And it can cause connection corruption if 
we
-        # do not recreate the SQLA connection pool.
-        os.environ["CONFIG_PROCESSOR_MANAGER_LOGGER"] = "True"
-        os.environ["AIRFLOW__LOGGING__COLORED_CONSOLE_LOG"] = "False"
-        # Replicating the behavior of how logging module was loaded
-        # in logging_config.py
-
-        # TODO: This reloading should be removed when we fix our logging 
behaviour
-        # In case of "spawn" method of starting processes for multiprocessing, 
reinitializing of the
-        # SQLAlchemy engine causes extremely unexpected behaviour of messing 
with objects already loaded
-        # in a parent process (likely via resources shared in memory by the 
ORM libraries).
-        # This caused flaky tests in our CI for many months and has been 
discovered while
-        # iterating on https://github.com/apache/airflow/pull/19860
-        # The issue that describes the problem and possible remediation is
-        # at https://github.com/apache/airflow/issues/19934
-
-        
importlib.reload(import_module(airflow.settings.LOGGING_CLASS_PATH.rsplit(".", 
1)[0]))  # type: ignore
-        importlib.reload(airflow.settings)
-        airflow.settings.initialize()
-        del os.environ["CONFIG_PROCESSOR_MANAGER_LOGGER"]
+        reload_configuration_for_dag_processing()
         processor_manager = DagFileProcessorManager(
             dag_directory=dag_directory,
             max_runs=max_runs,
@@ -1292,3 +1271,26 @@ class DagFileProcessorManager(LoggingMixin):
     @property
     def file_paths(self):
         return self._file_paths
+
+
+def reload_configuration_for_dag_processing():
+    # Reload configurations and settings to avoid collision with parent 
process.
+    # Because this process may need custom configurations that cannot be 
shared,
+    # e.g. RotatingFileHandler. And it can cause connection corruption if we
+    # do not recreate the SQLA connection pool.
+    os.environ["CONFIG_PROCESSOR_MANAGER_LOGGER"] = "True"
+    os.environ["AIRFLOW__LOGGING__COLORED_CONSOLE_LOG"] = "False"
+    # Replicating the behavior of how logging module was loaded
+    # in logging_config.py
+    # TODO: This reloading should be removed when we fix our logging behaviour
+    # In case of "spawn" method of starting processes for multiprocessing, 
reinitializing of the
+    # SQLAlchemy engine causes extremely unexpected behaviour of messing with 
objects already loaded
+    # in a parent process (likely via resources shared in memory by the ORM 
libraries).
+    # This caused flaky tests in our CI for many months and has been 
discovered while
+    # iterating on https://github.com/apache/airflow/pull/19860
+    # The issue that describes the problem and possible remediation is
+    # at https://github.com/apache/airflow/issues/19934
+    
importlib.reload(import_module(airflow.settings.LOGGING_CLASS_PATH.rsplit(".", 
1)[0]))  # type: ignore
+    importlib.reload(airflow.settings)
+    airflow.settings.initialize()
+    del os.environ["CONFIG_PROCESSOR_MANAGER_LOGGER"]

Reply via email to