This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-6-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 805d22b373ab26e220d093d5a5aaf3c4fe200611 Author: Jed Cunningham <[email protected]> AuthorDate: Tue May 9 17:59:54 2023 -0500 Ensure the KPO runs pod mutation hooks correctly (#31173) If you run a task via the cli with `cfg_path`, we also run `configure_vars` again so that the globals built from the config file can be updated. We were also initializing pluggy in that function, however, which was effectively removing any user provided hooks. We will move the initializing of pluggy out of that function, making it safe to run again by the task command. This primarily impacted KPOs, as they wouldn't have been passed through any `pod_mutation_hooks`. (cherry picked from commit 71c26276bcd3ddd5377d620e6b8baef30b72eaa0) --- airflow/settings.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/airflow/settings.py b/airflow/settings.py index 19e8972e94..67d614f2bb 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -173,6 +173,14 @@ def get_dagbag_import_timeout(dag_file_path: str): return POLICY_PLUGIN_MANAGER.hook.get_dagbag_import_timeout(dag_file_path=dag_file_path) +def configure_policy_plugin_manager(): + global POLICY_PLUGIN_MANAGER + + POLICY_PLUGIN_MANAGER = pluggy.PluginManager(policies.local_settings_hookspec.project_name) + POLICY_PLUGIN_MANAGER.add_hookspecs(policies) + POLICY_PLUGIN_MANAGER.register(policies.DefaultPolicy) + + def load_policy_plugins(pm: pluggy.PluginManager): # We can't log duration etc here, as logging hasn't yet been configured! pm.load_setuptools_entrypoints("airflow.policy") @@ -184,7 +192,6 @@ def configure_vars(): global DAGS_FOLDER global PLUGINS_FOLDER global DONOT_MODIFY_HANDLERS - global POLICY_PLUGIN_MANAGER SQL_ALCHEMY_CONN = conf.get("database", "SQL_ALCHEMY_CONN") DAGS_FOLDER = os.path.expanduser(conf.get("core", "DAGS_FOLDER")) @@ -197,10 +204,6 @@ def configure_vars(): # The handlers are restored after the task completes execution. DONOT_MODIFY_HANDLERS = conf.getboolean("logging", "donot_modify_handlers", fallback=False) - POLICY_PLUGIN_MANAGER = pluggy.PluginManager(policies.local_settings_hookspec.project_name) - POLICY_PLUGIN_MANAGER.add_hookspecs(policies) - POLICY_PLUGIN_MANAGER.register(policies.DefaultPolicy) - def configure_orm(disable_connection_pool=False, pool_class=None): """Configure ORM using SQLAlchemy.""" @@ -512,6 +515,7 @@ def initialize(): """Initialize Airflow with all the settings from this file.""" configure_vars() prepare_syspath() + configure_policy_plugin_manager() # Load policy plugins _before_ importing airflow_local_settings, as Pluggy uses LIFO and we want anything # in airflow_local_settings to take precendec load_policy_plugins(POLICY_PLUGIN_MANAGER)
