ephraimbuddy commented on code in PR #63835:
URL: https://github.com/apache/airflow/pull/63835#discussion_r2987368885
##########
airflow-core/tests/unit/dag_processing/test_manager.py:
##########
@@ -1667,3 +1669,217 @@ def test_stats_total_parse_time(self,
statsd_gauge_mock, tmp_path, configure_tes
dag_path.touch() # make the loop run faster
gauge_values.clear()
+
+ # --- get_bundle_state / update_bundle_state ---
+
+ def test_get_bundle_state_returns_none_for_missing_bundle(self):
+ manager = DagFileProcessorManager(max_runs=1)
+ assert manager.get_bundle_state("nonexistent_bundle") is None
+
+ def test_get_bundle_state_returns_correct_state(self, session):
+ bundle_name = "test_state_bundle"
+ refreshed_at = timezone.datetime(2024, 1, 15, 12, 0, 0)
+ model = DagBundleModel(name=bundle_name, version="v1")
+ model.last_refreshed = refreshed_at
+ session.add(model)
+ session.commit()
+
+ manager = DagFileProcessorManager(max_runs=1)
+ state = manager.get_bundle_state(bundle_name)
+
+ assert state == BundleState(last_refreshed=refreshed_at, version="v1")
+
+ def test_get_bundle_state_null_fields(self, session):
+ bundle_name = "test_null_state_bundle"
+ session.add(DagBundleModel(name=bundle_name))
+ session.commit()
+
+ manager = DagFileProcessorManager(max_runs=1)
+ state = manager.get_bundle_state(bundle_name)
+
+ assert state == BundleState(last_refreshed=None, version=None)
+
+ def test_update_bundle_state_sets_last_refreshed(self, session):
+ bundle_name = "test_update_bundle"
+ session.add(DagBundleModel(name=bundle_name))
+ session.commit()
+
+ refreshed_at = timezone.datetime(2024, 6, 1, 8, 0, 0)
+ manager = DagFileProcessorManager(max_runs=1)
+ manager.update_bundle_state(bundle_name, last_refreshed=refreshed_at,
version=None)
+
+ session.expire_all()
+ model = session.get(DagBundleModel, bundle_name)
+ assert model.last_refreshed == refreshed_at
+ assert model.version is None
+
+ def test_update_bundle_state_sets_version(self, session):
+ bundle_name = "test_update_version_bundle"
+ session.add(DagBundleModel(name=bundle_name))
+ session.commit()
+
+ refreshed_at = timezone.datetime(2024, 6, 1, 8, 0, 0)
+ manager = DagFileProcessorManager(max_runs=1)
+ manager.update_bundle_state(bundle_name, last_refreshed=refreshed_at,
version="abc123")
+
+ session.expire_all()
+ model = session.get(DagBundleModel, bundle_name)
+ assert model.last_refreshed == refreshed_at
+ assert model.version == "abc123"
+
+ def test_update_bundle_state_does_not_overwrite_version_when_none(self,
session):
+ bundle_name = "test_preserve_version_bundle"
+ session.add(DagBundleModel(name=bundle_name, version="keep_me"))
+ session.commit()
+
+ refreshed_at = timezone.datetime(2024, 6, 1, 8, 0, 0)
+ manager = DagFileProcessorManager(max_runs=1)
+ manager.update_bundle_state(bundle_name, last_refreshed=refreshed_at,
version=None)
+
+ session.expire_all()
+ model = session.get(DagBundleModel, bundle_name)
+ assert model.last_refreshed == refreshed_at
+ assert model.version == "keep_me"
+
+
+ def _make_refresh_bundle(self, *, supports_versioning=False,
current_version=None):
+ bundle = MagicMock(spec=BaseDagBundle)
+ bundle.name = "mock_bundle"
+ bundle.refresh_interval = 0
+ bundle.supports_versioning = supports_versioning
+ bundle.is_initialized = True
+ bundle.path = Path("/dev/null")
+ bundle.get_current_version.return_value = current_version
+ return bundle
+
+ def _refresh_with_mocked_state(self, manager, bundle, initial_state):
+ """Run _refresh_dag_bundles with get/update_bundle_state mocked out.
+
+ Returns the two MagicMock objects for post-call assertions. MagicMock
retains its
+ call records after the ``with`` block exits (un-patching only restores
the original
+ attribute; it does not clear the mock's recorded calls), so callers
can assert on
+ them normally after this method returns.
+ """
+ manager._dag_bundles = [bundle]
+ mock_get = mock.patch.object(manager, "get_bundle_state",
return_value=initial_state)
+ mock_update = mock.patch.object(manager, "update_bundle_state")
+ with (
+ mock_get as patched_get,
+ mock_update as patched_update,
+ mock.patch.object(manager, "_find_files_in_bundle",
return_value=[]),
+ mock.patch.object(manager, "deactivate_deleted_dags"),
+ mock.patch.object(manager, "clear_orphaned_import_errors"),
+ mock.patch.object(manager, "handle_removed_files"),
+ mock.patch.object(manager, "_resort_file_queue"),
+ mock.patch.object(manager, "_add_new_files_to_queue"),
+ ):
+ manager._refresh_dag_bundles({})
+ return patched_get, patched_update
+
+ def test_refresh_dag_bundles_non_versioned_calls_update_bundle_state(self):
+ """Non-versioned bundle: update_bundle_state called with
version=None."""
+ manager = DagFileProcessorManager(max_runs=1)
+ bundle = self._make_refresh_bundle(supports_versioning=False)
+
+ mock_get, mock_update = self._refresh_with_mocked_state(
+ manager, bundle, BundleState(last_refreshed=None, version=None)
+ )
+
+ mock_get.assert_called_once_with("mock_bundle")
+ mock_update.assert_called_once_with("mock_bundle",
last_refreshed=mock.ANY, version=None)
+ assert manager._bundle_versions["mock_bundle"] is None
+
+ def
test_refresh_dag_bundles_versioned_version_changed_calls_update_bundle_state(self):
+ """Versioned bundle with new version: update_bundle_state called with
the new version."""
+ manager = DagFileProcessorManager(max_runs=1)
+ bundle = self._make_refresh_bundle(supports_versioning=True,
current_version="v2")
+ # Pre-populate _bundle_versions so previously_seen=True and current DB
version differs
+ manager._bundle_versions["mock_bundle"] = "v1"
+
+ mock_get, mock_update = self._refresh_with_mocked_state(
+ manager, bundle, BundleState(last_refreshed=None, version="v1")
+ )
+
+ mock_get.assert_called_once_with("mock_bundle")
+ mock_update.assert_called_once_with("mock_bundle",
last_refreshed=mock.ANY, version="v2")
+ assert manager._bundle_versions["mock_bundle"] == "v2"
+
+ def
test_refresh_dag_bundles_versioned_version_unchanged_calls_update_bundle_state(self):
+ """Versioned bundle with unchanged version: update_bundle_state called
with version=None."""
+ manager = DagFileProcessorManager(max_runs=1)
+ bundle = self._make_refresh_bundle(supports_versioning=True,
current_version="v1")
+ # Pre-populate _bundle_versions so previously_seen=True and version
matches
+ manager._bundle_versions["mock_bundle"] = "v1"
+
+ mock_get, mock_update = self._refresh_with_mocked_state(
+ manager, bundle, BundleState(last_refreshed=None, version="v1")
+ )
+
+ mock_get.assert_called_once_with("mock_bundle")
+ # version=None because version did not change — last_refreshed still
updated
+ mock_update.assert_called_once_with("mock_bundle",
last_refreshed=mock.ANY, version=None)
+ # _bundle_versions NOT updated for unchanged-version early-continue
path
+ assert manager._bundle_versions["mock_bundle"] == "v1"
+
+ def
test_refresh_dag_bundles_versioned_first_seen_skips_short_circuit(self):
+ """Versioned bundle seen for the first time: short-circuit is skipped
even if versions match.
+
+ previously_seen=False means ``previously_seen and ...`` is False, so
the bundle always
+ goes through the full update path on first encounter regardless of
version equality.
+ """
+ manager = DagFileProcessorManager(max_runs=1)
+ # current_version matches what's already in the DB state
+ bundle = self._make_refresh_bundle(supports_versioning=True,
current_version="v1")
+ # _bundle_versions is empty → previously_seen=False
+
+ mock_get, mock_update = self._refresh_with_mocked_state(
+ manager, bundle, BundleState(last_refreshed=None, version="v1")
+ )
+
+ mock_get.assert_called_once_with("mock_bundle")
+ # full update called with the actual version, not short-circuited to
version=None
+ mock_update.assert_called_once_with("mock_bundle",
last_refreshed=mock.ANY, version="v1")
+ assert manager._bundle_versions["mock_bundle"] == "v1"
+
+ def test_refresh_dag_bundles_get_bundle_state_failure_skips_bundle(self):
+ """A failure in get_bundle_state() logs and skips the bundle without
aborting the loop."""
+ manager = DagFileProcessorManager(max_runs=1)
+ bundle = self._make_refresh_bundle()
+ manager._dag_bundles = [bundle]
+
+ with mock.patch.object(manager, "get_bundle_state",
side_effect=Exception("API error")):
+ manager._refresh_dag_bundles({})
+
+ bundle.refresh.assert_not_called()
+
+ def
test_refresh_dag_bundles_update_bundle_state_failure_still_scans_files(self):
Review Comment:
I have added test for the version-unchanged path
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]