This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 930f9d247f5 Call `get_current_version` less often in bundle refresh 
loop (#46261)
930f9d247f5 is described below

commit 930f9d247f55406304f317f92e2d37ecc630e359
Author: Jed Cunningham <[email protected]>
AuthorDate: Mon Feb 3 15:13:44 2025 -0700

    Call `get_current_version` less often in bundle refresh loop (#46261)
    
    In the bundle refresh loop, we can call `get_current_version` a lot less
    often, as 1) we can skip it for bundles that do not support versioning
    and 2) for those that do, we already know the version from the last time
    we refreshed!
    
    Since this is a local call, this isn't a huge gain. But every little bit
    helps.
---
 airflow/dag_processing/manager.py    | 28 +++++++++++---
 tests/dag_processing/test_manager.py | 74 +++++++++++++++++++++++++++++++++---
 2 files changed, 91 insertions(+), 11 deletions(-)

diff --git a/airflow/dag_processing/manager.py 
b/airflow/dag_processing/manager.py
index b395b3d33ff..987e09bfc66 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -453,11 +453,21 @@ class DagFileProcessorManager:
                 elapsed_time_since_refresh = (
                     now - (bundle_model.last_refreshed or timezone.utc_epoch())
                 ).total_seconds()
-                pre_refresh_version = bundle.get_current_version()
+                if bundle.supports_versioning:
+                    # we will also check the version of the bundle to see if 
another DAG processor has seen
+                    # a new version
+                    pre_refresh_version = (
+                        self._bundle_versions.get(bundle.name) or 
bundle.get_current_version()
+                    )
+                    current_version_matches_db = pre_refresh_version == 
bundle_model.version
+                else:
+                    # With no versioning, it always "matches"
+                    current_version_matches_db = True
+
                 previously_seen = bundle.name in self._bundle_versions
                 if (
                     elapsed_time_since_refresh < bundle.refresh_interval
-                    and bundle_model.version == pre_refresh_version
+                    and current_version_matches_db
                     and previously_seen
                 ):
                     self.log.info("Not time to refresh %s", bundle.name)
@@ -471,13 +481,17 @@ class DagFileProcessorManager:
 
                 bundle_model.last_refreshed = now
 
-                version_after_refresh = bundle.get_current_version()
                 if bundle.supports_versioning:
                     # We can short-circuit the rest of this if (1) bundle was 
seen before by
                     # this dag processor and (2) the version of the bundle did 
not change
                     # after refreshing it
+                    version_after_refresh = bundle.get_current_version()
                     if previously_seen and pre_refresh_version == 
version_after_refresh:
-                        self.log.debug("Bundle %s version not changed after 
refresh", bundle.name)
+                        self.log.debug(
+                            "Bundle %s version not changed after refresh: %s",
+                            bundle.name,
+                            version_after_refresh,
+                        )
                         continue
 
                     bundle_model.version = version_after_refresh
@@ -485,6 +499,10 @@ class DagFileProcessorManager:
                     self.log.info(
                         "Version changed for %s, new version: %s", 
bundle.name, version_after_refresh
                     )
+                else:
+                    version_after_refresh = None
+
+            self._bundle_versions[bundle.name] = version_after_refresh
 
             found_files = [
                 DagFileInfo(rel_path=p, bundle_name=bundle.name, 
bundle_path=bundle.path)
@@ -503,8 +521,6 @@ class DagFileProcessorManager:
             self.deactivate_deleted_dags(active_files=found_files)
             self.clear_nonexistent_import_errors()
 
-            self._bundle_versions[bundle.name] = bundle.get_current_version()
-
     def _find_files_in_bundle(self, bundle: BaseDagBundle) -> list[Path]:
         """Get relative paths for dag files from bundle dir."""
         # Build up a list of Python files that could contain DAGs
diff --git a/tests/dag_processing/test_manager.py 
b/tests/dag_processing/test_manager.py
index d2dacd35870..ebc3536c83f 100644
--- a/tests/dag_processing/test_manager.py
+++ b/tests/dag_processing/test_manager.py
@@ -62,6 +62,7 @@ from tests_common.test_utils.config import conf_vars
 from tests_common.test_utils.db import (
     clear_db_assets,
     clear_db_callbacks,
+    clear_db_dag_bundles,
     clear_db_dags,
     clear_db_import_errors,
     clear_db_runs,
@@ -93,6 +94,7 @@ class TestDagFileProcessorManager:
         clear_db_dags()
         clear_db_callbacks()
         clear_db_import_errors()
+        clear_db_dag_bundles()
 
     def teardown_class(self):
         clear_db_assets()
@@ -101,6 +103,7 @@ class TestDagFileProcessorManager:
         clear_db_dags()
         clear_db_callbacks()
         clear_db_import_errors()
+        clear_db_dag_bundles()
 
     def mock_processor(self) -> DagFileProcessorProcess:
         proc = MagicMock()
@@ -575,7 +578,9 @@ class TestDagFileProcessorManager:
             any_order=True,
         )
 
-    def test_refresh_dags_dir_doesnt_delete_zipped_dags(self, tmp_path, 
configure_testing_dag_bundle):
+    def test_refresh_dags_dir_doesnt_delete_zipped_dags(
+        self, tmp_path, testing_dag_bundle, configure_testing_dag_bundle
+    ):
         """Test DagFileProcessorManager._refresh_dag_dir method"""
         dagbag = DagBag(dag_folder=tmp_path, include_examples=False)
         zipped_dag_path = os.path.join(TEST_DAGS_FOLDER, "test_zip.zip")
@@ -869,14 +874,14 @@ class TestDagFileProcessorManager:
     def test_bundles_versions_are_stored(self, session):
         config = [
             {
-                "name": "mybundle",
+                "name": "bundleone",
                 "classpath": 
"airflow.dag_processing.bundles.local.LocalDagBundle",
                 "kwargs": {"path": "/dev/null", "refresh_interval": 0},
             },
         ]
 
         mybundle = MagicMock()
-        mybundle.name = "mybundle"
+        mybundle.name = "bundleone"
         mybundle.path = "/dev/null"
         mybundle.refresh_interval = 0
         mybundle.supports_versioning = True
@@ -885,11 +890,70 @@ class TestDagFileProcessorManager:
         with conf_vars({("dag_processor", "dag_bundle_config_list"): 
json.dumps(config)}):
             DagBundlesManager().sync_bundles_to_db()
             with 
mock.patch("airflow.dag_processing.manager.DagBundlesManager") as 
mock_bundle_manager:
-                mock_bundle_manager.return_value._bundle_config = {"mybundle": 
None}
+                mock_bundle_manager.return_value._bundle_config = 
{"bundleone": None}
                 
mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [mybundle]
                 manager = DagFileProcessorManager(max_runs=1)
                 manager.run()
 
         with create_session() as session:
-            model = session.get(DagBundleModel, "mybundle")
+            model = session.get(DagBundleModel, "bundleone")
             assert model.version == "123"
+
+    def test_non_versioned_bundle_get_version_not_called(self):
+        config = [
+            {
+                "name": "bundleone",
+                "classpath": 
"airflow.dag_processing.bundles.local.LocalDagBundle",
+                "kwargs": {"path": "/dev/null", "refresh_interval": 0},
+            },
+        ]
+
+        bundleone = MagicMock()
+        bundleone.name = "bundleone"
+        bundleone.refresh_interval = 0
+        bundleone.supports_versioning = False
+        bundleone.path = Path("/dev/null")
+
+        with conf_vars({("dag_processor", "dag_bundle_config_list"): 
json.dumps(config)}):
+            DagBundlesManager().sync_bundles_to_db()
+            with 
mock.patch("airflow.dag_processing.manager.DagBundlesManager") as 
mock_bundle_manager:
+                mock_bundle_manager.return_value._bundle_config = 
{"bundleone": None}
+                
mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [bundleone]
+                manager = DagFileProcessorManager(max_runs=1)
+                manager.run()
+
+        bundleone.refresh.assert_called_once()
+        bundleone.get_current_version.assert_not_called()
+
+    def test_versioned_bundle_get_version_called_once(self):
+        """Make sure in a normal "warm" loop, get_current_version is called 
just once after refresha"""
+
+        config = [
+            {
+                "name": "bundleone",
+                "classpath": 
"airflow.dag_processing.bundles.local.LocalDagBundle",
+                "kwargs": {"path": "/dev/null", "refresh_interval": 0},
+            },
+        ]
+
+        bundleone = MagicMock()
+        bundleone.name = "bundleone"
+        bundleone.refresh_interval = 0
+        bundleone.supports_versioning = True
+        bundleone.get_current_version.return_value = "123"
+        bundleone.path = Path("/dev/null")
+
+        with conf_vars({("dag_processor", "dag_bundle_config_list"): 
json.dumps(config)}):
+            DagBundlesManager().sync_bundles_to_db()
+            with 
mock.patch("airflow.dag_processing.manager.DagBundlesManager") as 
mock_bundle_manager:
+                mock_bundle_manager.return_value._bundle_config = 
{"bundleone": None}
+                
mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [bundleone]
+                manager = DagFileProcessorManager(max_runs=1)
+                manager.run()  # run it once to warm up
+
+                # now run it again so we can check we only call 
get_current_version once
+                bundleone.refresh.reset_mock()
+                bundleone.get_current_version.reset_mock()
+                manager.run()
+                bundleone.refresh.assert_called_once()
+                bundleone.get_current_version.assert_called_once()

Reply via email to