This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new dd2252dcc05 Revert "Call `get_current_version` less often in bundle 
refresh loop (#45999)" (#46037)
dd2252dcc05 is described below

commit dd2252dcc059cfe963bd6b2e87e55de390e18dcd
Author: Jarek Potiuk <[email protected]>
AuthorDate: Sat Jan 25 10:46:33 2025 +0100

    Revert "Call `get_current_version` less often in bundle refresh loop 
(#45999)" (#46037)
    
    This reverts commit c600a95aaf2df80ca59889b22f741bc8289138e1.
---
 airflow/dag_processing/manager.py    | 28 ++++-----------
 tests/dag_processing/test_manager.py | 67 ++----------------------------------
 2 files changed, 8 insertions(+), 87 deletions(-)

diff --git a/airflow/dag_processing/manager.py 
b/airflow/dag_processing/manager.py
index 536113b3dd4..fd63f51a0d2 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -440,21 +440,11 @@ class DagFileProcessorManager:
                 elapsed_time_since_refresh = (
                     now - (bundle_model.last_refreshed or timezone.utc_epoch())
                 ).total_seconds()
-                if bundle.supports_versioning:
-                    # we will also check the version of the bundle to see if 
another DAG processor has seen
-                    # a new version
-                    pre_refresh_version = (
-                        self._bundle_versions.get(bundle.name) or 
bundle.get_current_version()
-                    )
-                    current_version_matches_db = pre_refresh_version == 
bundle_model.version
-                else:
-                    # With no versioning, it always "matches"
-                    current_version_matches_db = True
-
+                pre_refresh_version = bundle.get_current_version()
                 previously_seen = bundle.name in self._bundle_versions
                 if (
                     elapsed_time_since_refresh < bundle.refresh_interval
-                    and current_version_matches_db
+                    and bundle_model.version == pre_refresh_version
                     and previously_seen
                 ):
                     self.log.info("Not time to refresh %s", bundle.name)
@@ -468,17 +458,13 @@ class DagFileProcessorManager:
 
                 bundle_model.last_refreshed = now
 
+                version_after_refresh = bundle.get_current_version()
                 if bundle.supports_versioning:
                     # We can short-circuit the rest of this if (1) bundle was 
seen before by
                     # this dag processor and (2) the version of the bundle did 
not change
                     # after refreshing it
-                    version_after_refresh = bundle.get_current_version()
                     if previously_seen and pre_refresh_version == 
version_after_refresh:
-                        self.log.debug(
-                            "Bundle %s version not changed after refresh: %s",
-                            bundle.name,
-                            version_after_refresh,
-                        )
+                        self.log.debug("Bundle %s version not changed after 
refresh", bundle.name)
                         continue
 
                     bundle_model.version = version_after_refresh
@@ -486,10 +472,6 @@ class DagFileProcessorManager:
                     self.log.info(
                         "Version changed for %s, new version: %s", 
bundle.name, version_after_refresh
                     )
-                else:
-                    version_after_refresh = None
-
-            self._bundle_versions[bundle.name] = version_after_refresh
 
             bundle_file_paths = self._find_files_in_bundle(bundle)
 
@@ -502,6 +484,8 @@ class DagFileProcessorManager:
             self.deactivate_deleted_dags(bundle_file_paths)
             self.clear_nonexistent_import_errors()
 
+            self._bundle_versions[bundle.name] = bundle.get_current_version()
+
     def _find_files_in_bundle(self, bundle: BaseDagBundle) -> list[str]:
         """Refresh file paths from bundle dir."""
         # Build up a list of Python files that could contain DAGs
diff --git a/tests/dag_processing/test_manager.py 
b/tests/dag_processing/test_manager.py
index 9dfb2e8045f..055312b5856 100644
--- a/tests/dag_processing/test_manager.py
+++ b/tests/dag_processing/test_manager.py
@@ -62,7 +62,6 @@ from tests_common.test_utils.config import conf_vars
 from tests_common.test_utils.db import (
     clear_db_assets,
     clear_db_callbacks,
-    clear_db_dag_bundles,
     clear_db_dags,
     clear_db_import_errors,
     clear_db_runs,
@@ -94,7 +93,6 @@ class TestDagFileProcessorManager:
         clear_db_dags()
         clear_db_callbacks()
         clear_db_import_errors()
-        clear_db_dag_bundles()
 
     def teardown_class(self):
         clear_db_assets()
@@ -103,7 +101,6 @@ class TestDagFileProcessorManager:
         clear_db_dags()
         clear_db_callbacks()
         clear_db_import_errors()
-        clear_db_dag_bundles()
 
     def mock_processor(self) -> DagFileProcessorProcess:
         proc = MagicMock()
@@ -530,9 +527,7 @@ class TestDagFileProcessorManager:
             any_order=True,
         )
 
-    def test_refresh_dags_dir_doesnt_delete_zipped_dags(
-        self, tmp_path, testing_dag_bundle, configure_testing_dag_bundle
-    ):
+    def test_refresh_dags_dir_doesnt_delete_zipped_dags(self, tmp_path, 
configure_testing_dag_bundle):
         """Test DagFileProcessorManager._refresh_dag_dir method"""
         dagbag = DagBag(dag_folder=tmp_path, include_examples=False)
         zipped_dag_path = os.path.join(TEST_DAGS_FOLDER, "test_zip.zip")
@@ -795,7 +790,7 @@ class TestDagFileProcessorManager:
     def test_bundles_versions_are_stored(self):
         config = [
             {
-                "name": "bundleone",
+                "name": "mybundle",
                 "classpath": 
"airflow.dag_processing.bundles.local.LocalDagBundle",
                 "kwargs": {"path": "/dev/null", "refresh_interval": 0},
             },
@@ -820,61 +815,3 @@ class TestDagFileProcessorManager:
         with create_session() as session:
             model = session.get(DagBundleModel, "bundleone")
             assert model.version == "123"
-
-    def test_non_versioned_bundle_get_version_not_called(self):
-        config = [
-            {
-                "name": "bundleone",
-                "classpath": 
"airflow.dag_processing.bundles.local.LocalDagBundle",
-                "kwargs": {"path": "/dev/null", "refresh_interval": 0},
-            },
-        ]
-
-        mybundle = MagicMock()
-        mybundle.name = "bundleone"
-        mybundle.refresh_interval = 0
-        mybundle.supports_versioning = False
-
-        with conf_vars({("dag_bundles", "config_list"): json.dumps(config)}):
-            DagBundlesManager().sync_bundles_to_db()
-            with mock.patch(
-                "airflow.dag_processing.bundles.manager.DagBundlesManager"
-            ) as mock_bundle_manager:
-                mock_bundle_manager.return_value._bundle_config = 
{"bundleone": None}
-                
mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [mybundle]
-                manager = DagFileProcessorManager(max_runs=1)
-                manager.run()
-
-        mybundle.get_current_version.assert_not_called()
-
-    def test_versioned_bundle_get_version_called_once(self):
-        """Make sure in a normal "warm" loop, get_current_version is called 
just once after refresha"""
-
-        config = [
-            {
-                "name": "bundleone",
-                "classpath": 
"airflow.dag_processing.bundles.local.LocalDagBundle",
-                "kwargs": {"path": "/dev/null", "refresh_interval": 0},
-            },
-        ]
-
-        mybundle = MagicMock()
-        mybundle.name = "bundleone"
-        mybundle.refresh_interval = 0
-        mybundle.supports_versioning = True
-        mybundle.get_current_version.return_value = "123"
-
-        with conf_vars({("dag_bundles", "config_list"): json.dumps(config)}):
-            DagBundlesManager().sync_bundles_to_db()
-            with mock.patch(
-                "airflow.dag_processing.bundles.manager.DagBundlesManager"
-            ) as mock_bundle_manager:
-                mock_bundle_manager.return_value._bundle_config = 
{"bundleone": None}
-                
mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [mybundle]
-                manager = DagFileProcessorManager(max_runs=1)
-                manager.run()  # run it once to warm up
-
-                # now run it again so we can check we only call 
get_current_version once
-                mybundle.get_current_version.reset_mock()
-                manager.run()
-                mybundle.get_current_version.assert_called_once()

Reply via email to