This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new dd2252dcc05 Revert "Call `get_current_version` less often in bundle
refresh loop (#45999)" (#46037)
dd2252dcc05 is described below
commit dd2252dcc059cfe963bd6b2e87e55de390e18dcd
Author: Jarek Potiuk <[email protected]>
AuthorDate: Sat Jan 25 10:46:33 2025 +0100
Revert "Call `get_current_version` less often in bundle refresh loop
(#45999)" (#46037)
This reverts commit c600a95aaf2df80ca59889b22f741bc8289138e1.
---
airflow/dag_processing/manager.py | 28 ++++-----------
tests/dag_processing/test_manager.py | 67 ++----------------------------------
2 files changed, 8 insertions(+), 87 deletions(-)
diff --git a/airflow/dag_processing/manager.py
b/airflow/dag_processing/manager.py
index 536113b3dd4..fd63f51a0d2 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -440,21 +440,11 @@ class DagFileProcessorManager:
elapsed_time_since_refresh = (
now - (bundle_model.last_refreshed or timezone.utc_epoch())
).total_seconds()
- if bundle.supports_versioning:
- # we will also check the version of the bundle to see if
another DAG processor has seen
- # a new version
- pre_refresh_version = (
- self._bundle_versions.get(bundle.name) or
bundle.get_current_version()
- )
- current_version_matches_db = pre_refresh_version ==
bundle_model.version
- else:
- # With no versioning, it always "matches"
- current_version_matches_db = True
-
+ pre_refresh_version = bundle.get_current_version()
previously_seen = bundle.name in self._bundle_versions
if (
elapsed_time_since_refresh < bundle.refresh_interval
- and current_version_matches_db
+ and bundle_model.version == pre_refresh_version
and previously_seen
):
self.log.info("Not time to refresh %s", bundle.name)
@@ -468,17 +458,13 @@ class DagFileProcessorManager:
bundle_model.last_refreshed = now
+ version_after_refresh = bundle.get_current_version()
if bundle.supports_versioning:
# We can short-circuit the rest of this if (1) bundle was
seen before by
# this dag processor and (2) the version of the bundle did
not change
# after refreshing it
- version_after_refresh = bundle.get_current_version()
if previously_seen and pre_refresh_version ==
version_after_refresh:
- self.log.debug(
- "Bundle %s version not changed after refresh: %s",
- bundle.name,
- version_after_refresh,
- )
+ self.log.debug("Bundle %s version not changed after
refresh", bundle.name)
continue
bundle_model.version = version_after_refresh
@@ -486,10 +472,6 @@ class DagFileProcessorManager:
self.log.info(
"Version changed for %s, new version: %s",
bundle.name, version_after_refresh
)
- else:
- version_after_refresh = None
-
- self._bundle_versions[bundle.name] = version_after_refresh
bundle_file_paths = self._find_files_in_bundle(bundle)
@@ -502,6 +484,8 @@ class DagFileProcessorManager:
self.deactivate_deleted_dags(bundle_file_paths)
self.clear_nonexistent_import_errors()
+ self._bundle_versions[bundle.name] = bundle.get_current_version()
+
def _find_files_in_bundle(self, bundle: BaseDagBundle) -> list[str]:
"""Refresh file paths from bundle dir."""
# Build up a list of Python files that could contain DAGs
diff --git a/tests/dag_processing/test_manager.py
b/tests/dag_processing/test_manager.py
index 9dfb2e8045f..055312b5856 100644
--- a/tests/dag_processing/test_manager.py
+++ b/tests/dag_processing/test_manager.py
@@ -62,7 +62,6 @@ from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.db import (
clear_db_assets,
clear_db_callbacks,
- clear_db_dag_bundles,
clear_db_dags,
clear_db_import_errors,
clear_db_runs,
@@ -94,7 +93,6 @@ class TestDagFileProcessorManager:
clear_db_dags()
clear_db_callbacks()
clear_db_import_errors()
- clear_db_dag_bundles()
def teardown_class(self):
clear_db_assets()
@@ -103,7 +101,6 @@ class TestDagFileProcessorManager:
clear_db_dags()
clear_db_callbacks()
clear_db_import_errors()
- clear_db_dag_bundles()
def mock_processor(self) -> DagFileProcessorProcess:
proc = MagicMock()
@@ -530,9 +527,7 @@ class TestDagFileProcessorManager:
any_order=True,
)
- def test_refresh_dags_dir_doesnt_delete_zipped_dags(
- self, tmp_path, testing_dag_bundle, configure_testing_dag_bundle
- ):
+ def test_refresh_dags_dir_doesnt_delete_zipped_dags(self, tmp_path,
configure_testing_dag_bundle):
"""Test DagFileProcessorManager._refresh_dag_dir method"""
dagbag = DagBag(dag_folder=tmp_path, include_examples=False)
zipped_dag_path = os.path.join(TEST_DAGS_FOLDER, "test_zip.zip")
@@ -795,7 +790,7 @@ class TestDagFileProcessorManager:
def test_bundles_versions_are_stored(self):
config = [
{
- "name": "bundleone",
+ "name": "mybundle",
"classpath":
"airflow.dag_processing.bundles.local.LocalDagBundle",
"kwargs": {"path": "/dev/null", "refresh_interval": 0},
},
@@ -820,61 +815,3 @@ class TestDagFileProcessorManager:
with create_session() as session:
model = session.get(DagBundleModel, "bundleone")
assert model.version == "123"
-
- def test_non_versioned_bundle_get_version_not_called(self):
- config = [
- {
- "name": "bundleone",
- "classpath":
"airflow.dag_processing.bundles.local.LocalDagBundle",
- "kwargs": {"path": "/dev/null", "refresh_interval": 0},
- },
- ]
-
- mybundle = MagicMock()
- mybundle.name = "bundleone"
- mybundle.refresh_interval = 0
- mybundle.supports_versioning = False
-
- with conf_vars({("dag_bundles", "config_list"): json.dumps(config)}):
- DagBundlesManager().sync_bundles_to_db()
- with mock.patch(
- "airflow.dag_processing.bundles.manager.DagBundlesManager"
- ) as mock_bundle_manager:
- mock_bundle_manager.return_value._bundle_config =
{"bundleone": None}
-
mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [mybundle]
- manager = DagFileProcessorManager(max_runs=1)
- manager.run()
-
- mybundle.get_current_version.assert_not_called()
-
- def test_versioned_bundle_get_version_called_once(self):
- """Make sure in a normal "warm" loop, get_current_version is called
just once after refresha"""
-
- config = [
- {
- "name": "bundleone",
- "classpath":
"airflow.dag_processing.bundles.local.LocalDagBundle",
- "kwargs": {"path": "/dev/null", "refresh_interval": 0},
- },
- ]
-
- mybundle = MagicMock()
- mybundle.name = "bundleone"
- mybundle.refresh_interval = 0
- mybundle.supports_versioning = True
- mybundle.get_current_version.return_value = "123"
-
- with conf_vars({("dag_bundles", "config_list"): json.dumps(config)}):
- DagBundlesManager().sync_bundles_to_db()
- with mock.patch(
- "airflow.dag_processing.bundles.manager.DagBundlesManager"
- ) as mock_bundle_manager:
- mock_bundle_manager.return_value._bundle_config =
{"bundleone": None}
-
mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [mybundle]
- manager = DagFileProcessorManager(max_runs=1)
- manager.run() # run it once to warm up
-
- # now run it again so we can check we only call
get_current_version once
- mybundle.get_current_version.reset_mock()
- manager.run()
- mybundle.get_current_version.assert_called_once()