ephraimbuddy commented on code in PR #63835:
URL: https://github.com/apache/airflow/pull/63835#discussion_r2987364451


##########
airflow-core/src/airflow/dag_processing/manager.py:
##########
@@ -620,69 +666,72 @@ def _refresh_dag_bundles(self, known_files: dict[str, 
set[DagFileInfo]]):
                     self.log.exception("Error initializing bundle %s: %s", 
bundle.name, e)
                     continue
             # TODO: AIP-66 test to make sure we get a fresh record from the db 
and it's not cached
-            with create_session() as session:
-                bundle_model = session.get(DagBundleModel, bundle.name)
-                if bundle_model is None:
-                    self.log.warning("Bundle model not found for %s", 
bundle.name)
-                    continue
-                elapsed_time_since_refresh = (
-                    now - (bundle_model.last_refreshed or utc_epoch())
-                ).total_seconds()
-                if bundle.supports_versioning:
-                    # we will also check the version of the bundle to see if 
another DAG processor has seen
-                    # a new version
-                    pre_refresh_version = (
-                        self._bundle_versions.get(bundle.name) or 
bundle.get_current_version()
-                    )
-                    current_version_matches_db = pre_refresh_version == 
bundle_model.version
-                else:
-                    # With no versioning, it always "matches"
-                    current_version_matches_db = True
-
-                previously_seen = bundle.name in self._bundle_versions
-                if self.should_skip_refresh(
-                    bundle=bundle,
-                    elapsed_time_since_refresh=elapsed_time_since_refresh,
-                    current_version_matches_db=current_version_matches_db,
-                    previously_seen=previously_seen,
-                ):
-                    self.log.info("Not time to refresh bundle %s", bundle.name)
-                    continue
-
-                self.log.info("Refreshing bundle %s", bundle.name)
-
-                try:
-                    bundle.refresh()
-                    any_refreshed = True
-                except Exception:
-                    self.log.exception("Error refreshing bundle %s", 
bundle.name)
-                    continue
-
-                bundle_model.last_refreshed = now
-                self._force_refresh_bundles.discard(bundle.name)
+            try:
+                bundle_state = self.get_bundle_state(bundle.name)
+            except Exception:
+                self.log.exception("Error fetching state for bundle %s", 
bundle.name)
+                continue
+            if bundle_state is None:
+                self.log.warning("Bundle model not found for %s", bundle.name)
+                continue
+            elapsed_time_since_refresh = (now - (bundle_state.last_refreshed 
or utc_epoch())).total_seconds()
+            if bundle.supports_versioning:
+                # we will also check the version of the bundle to see if 
another DAG processor has seen
+                # a new version
+                pre_refresh_version = self._bundle_versions.get(bundle.name) 
or bundle.get_current_version()
+                current_version_matches_db = pre_refresh_version == 
bundle_state.version
+            else:
+                # With no versioning, it always "matches"
+                current_version_matches_db = True
+
+            previously_seen = bundle.name in self._bundle_versions
+            if self.should_skip_refresh(
+                bundle=bundle,
+                elapsed_time_since_refresh=elapsed_time_since_refresh,
+                current_version_matches_db=current_version_matches_db,
+                previously_seen=previously_seen,
+            ):
+                self.log.info("Not time to refresh bundle %s", bundle.name)
+                continue
 
-                if bundle.supports_versioning:
-                    # We can short-circuit the rest of this if (1) bundle was 
seen before by
-                    # this dag processor and (2) the version of the bundle did 
not change
-                    # after refreshing it
-                    version_after_refresh = bundle.get_current_version()
-                    if previously_seen and pre_refresh_version == 
version_after_refresh:
-                        self.log.debug(
-                            "Bundle %s version not changed after refresh: %s",
-                            bundle.name,
-                            version_after_refresh,
-                        )
-                        continue
+            self.log.info("Refreshing bundle %s", bundle.name)
 
-                    bundle_model.version = version_after_refresh
+            try:
+                bundle.refresh()
+                any_refreshed = True
+            except Exception:
+                self.log.exception("Error refreshing bundle %s", bundle.name)
+                continue
 
-                    self.log.info(
-                        "Version changed for %s, new version: %s", 
bundle.name, version_after_refresh
+            self._force_refresh_bundles.discard(bundle.name)
+
+            if bundle.supports_versioning:
+                # We can short-circuit the rest of this if (1) bundle was seen 
before by
+                # this dag processor and (2) the version of the bundle did not 
change
+                # after refreshing it
+                version_after_refresh = bundle.get_current_version()
+                if previously_seen and pre_refresh_version == 
version_after_refresh:
+                    self.log.debug(
+                        "Bundle %s version not changed after refresh: %s",
+                        bundle.name,
+                        version_after_refresh,
                     )
-                else:
-                    version_after_refresh = None
+                    try:
+                        self.update_bundle_state(bundle.name, 
last_refreshed=now, version=None)
+                    except Exception:
+                        self.log.exception("Error persisting state for bundle 
%s", bundle.name)
+                    continue

Review Comment:
   I have added an inline comment on the main path below to indicate that the 
asymmetry is intentional. The short-circuit path doesn't need the same 
treatment since version didn't change. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to