This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 71c26276bc  Ensure the KPO runs pod mutation hooks correctly (#31173)
71c26276bc is described below

commit 71c26276bcd3ddd5377d620e6b8baef30b72eaa0
Author: Jed Cunningham <[email protected]>
AuthorDate: Tue May 9 17:59:54 2023 -0500

     Ensure the KPO runs pod mutation hooks correctly (#31173)
    
    If you run a task via the cli with `cfg_path`, we also run
    `configure_vars` again so that the globals built from the config file
    can be updated. We were also initializing pluggy in that function,
    however, which was effectively removing any user provided hooks.
    
    We will move the initializing of pluggy out of that function, making it
    safe to run again by the task command.
    
    This primarily impacted KPOs, as they wouldn't have been passed through any 
`pod_mutation_hooks`.
---
 airflow/settings.py | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)

diff --git a/airflow/settings.py b/airflow/settings.py
index d9a47c4a2d..8f335e024c 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -173,6 +173,14 @@ def get_dagbag_import_timeout(dag_file_path: str):
     return 
POLICY_PLUGIN_MANAGER.hook.get_dagbag_import_timeout(dag_file_path=dag_file_path)
 
 
+def configure_policy_plugin_manager():
+    global POLICY_PLUGIN_MANAGER
+
+    POLICY_PLUGIN_MANAGER = 
pluggy.PluginManager(policies.local_settings_hookspec.project_name)
+    POLICY_PLUGIN_MANAGER.add_hookspecs(policies)
+    POLICY_PLUGIN_MANAGER.register(policies.DefaultPolicy)
+
+
 def load_policy_plugins(pm: pluggy.PluginManager):
     # We can't log duration etc  here, as logging hasn't yet been configured!
     pm.load_setuptools_entrypoints("airflow.policy")
@@ -184,7 +192,6 @@ def configure_vars():
     global DAGS_FOLDER
     global PLUGINS_FOLDER
     global DONOT_MODIFY_HANDLERS
-    global POLICY_PLUGIN_MANAGER
     SQL_ALCHEMY_CONN = conf.get("database", "SQL_ALCHEMY_CONN")
     DAGS_FOLDER = os.path.expanduser(conf.get("core", "DAGS_FOLDER"))
 
@@ -197,10 +204,6 @@ def configure_vars():
     # The handlers are restored after the task completes execution.
     DONOT_MODIFY_HANDLERS = conf.getboolean("logging", 
"donot_modify_handlers", fallback=False)
 
-    POLICY_PLUGIN_MANAGER = 
pluggy.PluginManager(policies.local_settings_hookspec.project_name)
-    POLICY_PLUGIN_MANAGER.add_hookspecs(policies)
-    POLICY_PLUGIN_MANAGER.register(policies.DefaultPolicy)
-
 
 def configure_orm(disable_connection_pool=False, pool_class=None):
     """Configure ORM using SQLAlchemy."""
@@ -512,6 +515,7 @@ def initialize():
     """Initialize Airflow with all the settings from this file."""
     configure_vars()
     prepare_syspath()
+    configure_policy_plugin_manager()
     # Load policy plugins _before_ importing airflow_local_settings, as Pluggy 
uses LIFO and we want anything
     # in airflow_local_settings to take precendec
     load_policy_plugins(POLICY_PLUGIN_MANAGER)

Reply via email to