ephraimbuddy commented on code in PR #63835:
URL: https://github.com/apache/airflow/pull/63835#discussion_r2987090004


##########
airflow-core/src/airflow/dag_processing/manager.py:
##########
@@ -620,69 +666,72 @@ def _refresh_dag_bundles(self, known_files: dict[str, 
set[DagFileInfo]]):
                     self.log.exception("Error initializing bundle %s: %s", 
bundle.name, e)
                     continue
             # TODO: AIP-66 test to make sure we get a fresh record from the db 
and it's not cached
-            with create_session() as session:
-                bundle_model = session.get(DagBundleModel, bundle.name)
-                if bundle_model is None:
-                    self.log.warning("Bundle model not found for %s", 
bundle.name)
-                    continue
-                elapsed_time_since_refresh = (
-                    now - (bundle_model.last_refreshed or utc_epoch())
-                ).total_seconds()
-                if bundle.supports_versioning:
-                    # we will also check the version of the bundle to see if 
another DAG processor has seen
-                    # a new version
-                    pre_refresh_version = (
-                        self._bundle_versions.get(bundle.name) or 
bundle.get_current_version()
-                    )
-                    current_version_matches_db = pre_refresh_version == 
bundle_model.version
-                else:
-                    # With no versioning, it always "matches"
-                    current_version_matches_db = True
-
-                previously_seen = bundle.name in self._bundle_versions
-                if self.should_skip_refresh(
-                    bundle=bundle,
-                    elapsed_time_since_refresh=elapsed_time_since_refresh,
-                    current_version_matches_db=current_version_matches_db,
-                    previously_seen=previously_seen,
-                ):
-                    self.log.info("Not time to refresh bundle %s", bundle.name)
-                    continue
-
-                self.log.info("Refreshing bundle %s", bundle.name)
-
-                try:
-                    bundle.refresh()
-                    any_refreshed = True
-                except Exception:
-                    self.log.exception("Error refreshing bundle %s", 
bundle.name)
-                    continue
-
-                bundle_model.last_refreshed = now
-                self._force_refresh_bundles.discard(bundle.name)
+            try:
+                bundle_state = self.get_bundle_state(bundle.name)
+            except Exception:
+                self.log.exception("Error fetching state for bundle %s", 
bundle.name)
+                continue
+            if bundle_state is None:
+                self.log.warning("Bundle model not found for %s", bundle.name)
+                continue
+            elapsed_time_since_refresh = (now - (bundle_state.last_refreshed 
or utc_epoch())).total_seconds()
+            if bundle.supports_versioning:
+                # we will also check the version of the bundle to see if 
another DAG processor has seen
+                # a new version
+                pre_refresh_version = self._bundle_versions.get(bundle.name) 
or bundle.get_current_version()
+                current_version_matches_db = pre_refresh_version == 
bundle_state.version
+            else:
+                # With no versioning, it always "matches"
+                current_version_matches_db = True
+
+            previously_seen = bundle.name in self._bundle_versions
+            if self.should_skip_refresh(
+                bundle=bundle,
+                elapsed_time_since_refresh=elapsed_time_since_refresh,
+                current_version_matches_db=current_version_matches_db,
+                previously_seen=previously_seen,
+            ):
+                self.log.info("Not time to refresh bundle %s", bundle.name)
+                continue
 
-                if bundle.supports_versioning:
-                    # We can short-circuit the rest of this if (1) bundle was 
seen before by
-                    # this dag processor and (2) the version of the bundle did 
not change
-                    # after refreshing it
-                    version_after_refresh = bundle.get_current_version()
-                    if previously_seen and pre_refresh_version == 
version_after_refresh:
-                        self.log.debug(
-                            "Bundle %s version not changed after refresh: %s",
-                            bundle.name,
-                            version_after_refresh,
-                        )
-                        continue
+            self.log.info("Refreshing bundle %s", bundle.name)
 
-                    bundle_model.version = version_after_refresh
+            try:
+                bundle.refresh()
+                any_refreshed = True
+            except Exception:
+                self.log.exception("Error refreshing bundle %s", bundle.name)
+                continue
 
-                    self.log.info(
-                        "Version changed for %s, new version: %s", 
bundle.name, version_after_refresh
+            self._force_refresh_bundles.discard(bundle.name)
+
+            if bundle.supports_versioning:
+                # We can short-circuit the rest of this if (1) bundle was seen 
before by
+                # this dag processor and (2) the version of the bundle did not 
change
+                # after refreshing it
+                version_after_refresh = bundle.get_current_version()
+                if previously_seen and pre_refresh_version == 
version_after_refresh:
+                    self.log.debug(
+                        "Bundle %s version not changed after refresh: %s",
+                        bundle.name,
+                        version_after_refresh,
                     )
-                else:
-                    version_after_refresh = None
+                    try:
+                        self.update_bundle_state(bundle.name, 
last_refreshed=now, version=None)
+                    except Exception:
+                        self.log.exception("Error persisting state for bundle 
%s", bundle.name)
+                    continue
 
-            self._bundle_versions[bundle.name] = version_after_refresh
+                self.log.info("Version changed for %s, new version: %s", 
bundle.name, version_after_refresh)
+            else:
+                version_after_refresh = None
+
+            try:
+                self.update_bundle_state(bundle.name, last_refreshed=now, 
version=version_after_refresh)
+            except Exception:
+                self.log.exception("Error persisting state for bundle %s", 
bundle.name)
+            else:
+                self._bundle_versions[bundle.name] = version_after_refresh

Review Comment:
   This review is incorrect as the `should_skip_refresh` uses AND logic, 
meaning that all the four conditions must be true to skip. 
`update_bundle_state` failure leaves `last_refreshed` stale in the DB, 
`elapsed_time_since_refresh < bundle.refresh_interval` will be False on the 
next pass, which alone is enough to prevent skipping



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to