This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 71c26276bc Ensure the KPO runs pod mutation hooks correctly (#31173)
71c26276bc is described below
commit 71c26276bcd3ddd5377d620e6b8baef30b72eaa0
Author: Jed Cunningham <[email protected]>
AuthorDate: Tue May 9 17:59:54 2023 -0500
Ensure the KPO runs pod mutation hooks correctly (#31173)
If you run a task via the cli with `cfg_path`, we also run
`configure_vars` again so that the globals built from the config file
can be updated. We were also initializing pluggy in that function,
however, which was effectively removing any user provided hooks.
We will move the initializing of pluggy out of that function, making it
safe to run again by the task command.
This primarily impacted KPOs, as they wouldn't have been passed through any
`pod_mutation_hooks`.
---
airflow/settings.py | 14 +++++++++-----
1 file changed, 9 insertions(+), 5 deletions(-)
diff --git a/airflow/settings.py b/airflow/settings.py
index d9a47c4a2d..8f335e024c 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -173,6 +173,14 @@ def get_dagbag_import_timeout(dag_file_path: str):
return
POLICY_PLUGIN_MANAGER.hook.get_dagbag_import_timeout(dag_file_path=dag_file_path)
+def configure_policy_plugin_manager():
+ global POLICY_PLUGIN_MANAGER
+
+ POLICY_PLUGIN_MANAGER =
pluggy.PluginManager(policies.local_settings_hookspec.project_name)
+ POLICY_PLUGIN_MANAGER.add_hookspecs(policies)
+ POLICY_PLUGIN_MANAGER.register(policies.DefaultPolicy)
+
+
def load_policy_plugins(pm: pluggy.PluginManager):
# We can't log duration etc here, as logging hasn't yet been configured!
pm.load_setuptools_entrypoints("airflow.policy")
@@ -184,7 +192,6 @@ def configure_vars():
global DAGS_FOLDER
global PLUGINS_FOLDER
global DONOT_MODIFY_HANDLERS
- global POLICY_PLUGIN_MANAGER
SQL_ALCHEMY_CONN = conf.get("database", "SQL_ALCHEMY_CONN")
DAGS_FOLDER = os.path.expanduser(conf.get("core", "DAGS_FOLDER"))
@@ -197,10 +204,6 @@ def configure_vars():
# The handlers are restored after the task completes execution.
DONOT_MODIFY_HANDLERS = conf.getboolean("logging",
"donot_modify_handlers", fallback=False)
- POLICY_PLUGIN_MANAGER =
pluggy.PluginManager(policies.local_settings_hookspec.project_name)
- POLICY_PLUGIN_MANAGER.add_hookspecs(policies)
- POLICY_PLUGIN_MANAGER.register(policies.DefaultPolicy)
-
def configure_orm(disable_connection_pool=False, pool_class=None):
"""Configure ORM using SQLAlchemy."""
@@ -512,6 +515,7 @@ def initialize():
"""Initialize Airflow with all the settings from this file."""
configure_vars()
prepare_syspath()
+ configure_policy_plugin_manager()
# Load policy plugins _before_ importing airflow_local_settings, as Pluggy
uses LIFO and we want anything
# in airflow_local_settings to take precendec
load_policy_plugins(POLICY_PLUGIN_MANAGER)