dstandish commented on code in PR #46503:
URL: https://github.com/apache/airflow/pull/46503#discussion_r1968493013
##########
airflow/dag_processing/bundles/base.py:
##########
@@ -18,12 +18,224 @@
from __future__ import annotations
import fcntl
+import logging
+import os
+import shutil
import tempfile
from abc import ABC, abstractmethod
from contextlib import contextmanager
+from dataclasses import dataclass, field
+from datetime import timedelta
+from fcntl import LOCK_SH, flock
+from operator import attrgetter
from pathlib import Path
+from typing import TYPE_CHECKING
+
+from pendulum.parsing import ParserError
+from sqlalchemy_utils.types.enriched_datetime.pendulum_datetime import pendulum
from airflow.configuration import conf
+from airflow.dag_processing.bundles.manager import DagBundlesManager
+
+if TYPE_CHECKING:
+ from pendulum import DateTime
+
+log = logging.getLogger(__name__)
+
+STALE_BUNDLE_TRACKING_FOLDER = Path(
+ tempfile.gettempdir(),
+ "airflow",
+ "dag_bundles",
+ "_tracking",
+)
+
+STALE_BUNDLE_CHECK_INTERVAL: int = conf.getint(
+ "dag_processor", "stale_bundle_cleanup_interval", fallback=10 * 60
+)
+"""How frequently (in seconds) a worker should check for stale bundles."""
+
+
+def get_bundle_storage_root_path():
+ if configured_location := conf.get("dag_processor",
"dag_bundle_storage_path", fallback=None):
+ return Path(configured_location)
+ else:
+ return Path(tempfile.gettempdir(), "airflow", "dag_bundles")
+
+
+def get_bundle_tracking_dir(bundle_name: str) -> Path:
+ return STALE_BUNDLE_TRACKING_FOLDER / bundle_name
+
+
+def get_bundle_tracking_file(bundle_name: str, version: str) -> Path:
+ tracking_dir = get_bundle_tracking_dir(bundle_name=bundle_name)
+ return Path(tracking_dir, version)
+
+
+def get_bundle_base_folder(bundle_type: str, bundle_name: str) -> Path:
+ return get_bundle_storage_root_path() / bundle_type / bundle_name
+
+
+def get_bundle_versions_base_folder(bundle_type: str, bundle_name: str) ->
Path:
+ return get_bundle_base_folder(bundle_type=bundle_type,
bundle_name=bundle_name) / "versions"
+
+
+def get_bundle_version_path(bundle_type: str, bundle_name: str, version: str)
-> Path:
+ base_folder = get_bundle_versions_base_folder(bundle_type=bundle_type,
bundle_name=bundle_name)
+ return base_folder / version
+
+
+@dataclass(frozen=True)
+class TrackedBundleVersionInfo:
+ """
+ Internal info class for stale bundle cleanup.
+
+ :meta private:
+ """
+
+ lock_file_path: Path
+ version: str = field(compare=False)
+ dt: DateTime = field(compare=False)
+
+
+class BundleUsageTrackingManager:
+ """
+ Utility helper for removing stale bundles.
+
+ :meta private:
+ """
+
+ def _parse_dt(self, val) -> DateTime | None:
+ try:
+ return pendulum.parse(val)
+ except ParserError:
+ return None
+
+ @staticmethod
+ def _remove_last_n(val: list[TrackedBundleVersionInfo]) ->
list[TrackedBundleVersionInfo]:
+ min_versions_to_keep = conf.getint(
+ section="dag_processor",
+ key="stale_bundle_cleanup_min_versions",
+ fallback=10,
+ )
+ return sorted(val, key=attrgetter("dt"),
reverse=True)[min_versions_to_keep:]
+
+ @staticmethod
+ def _remove_recent(val: list[TrackedBundleVersionInfo]) ->
list[TrackedBundleVersionInfo]:
+ age_threshold = conf.getint(
+ section="dag_processor",
+ key="stale_bundle_cleanup_age_threshold",
+ fallback=60 * 60 * 24,
+ )
+ ret = []
+ now = pendulum.now(tz=pendulum.UTC)
+ cutoff = now - timedelta(seconds=age_threshold)
+ for item in val:
+ if item.dt < cutoff:
+ ret.append(item)
+ return ret
+
+ def _find_all_tracking_files(self, bundle_name) ->
list[TrackedBundleVersionInfo] | None:
+ tracking_dir = get_bundle_tracking_dir(bundle_name=bundle_name)
+ found: list[TrackedBundleVersionInfo] = []
+ if not tracking_dir.exists():
+ log.debug("bundle usage tracking directory does not exist.
tracking_dir=%s", tracking_dir)
+ return None
+ for file in tracking_dir.iterdir():
+ log.debug("found bundle tracking file, path=%s", file)
+ version = file.name
+ dt_str = file.read_text()
+ dt = self._parse_dt(val=dt_str)
+ if not dt:
+ log.error(
+ "could not parse val as datetime bundle_name=%s val=%s
version=%s",
+ bundle_name,
+ dt_str,
+ version,
+ )
+ continue
+ found.append(TrackedBundleVersionInfo(lock_file_path=file,
version=version, dt=dt))
+ return found
+
+ @staticmethod
+ def _remove_stale_bundle(bundle_type: str, bundle_name: str, info:
TrackedBundleVersionInfo) -> None:
+ bundle_version_path = get_bundle_version_path(
+ bundle_type=bundle_type,
+ bundle_name=bundle_name,
+ version=info.version,
+ )
+
+ def log_info(msg):
+ log.info(
+ "%s bundle_name=%s bundle_version=%s bundle_path=%s
lock_file=%s",
+ msg,
+ bundle_name,
+ info.version,
+ bundle_version_path,
+ info.lock_file_path,
+ )
+
+ try:
+ log_info("removing stale bundle.")
+ with open(info.lock_file_path, "a") as f:
+ flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB) # exclusive lock, do
not wait
+ # remove the actual bundle copy
+ shutil.rmtree(bundle_version_path)
+ # remove the lock file
+ os.remove(info.lock_file_path)
+ except BlockingIOError:
+ log_info("could not obtain lock. stale bundle will not be
removed.")
+ return
+
+ def _find_candidates(self, found):
+ """Remove the recently used bundles."""
+ candidates = self._remove_last_n(found)
+ candidates = self._remove_recent(candidates)
Review Comment:
it's just remove from the candidates list :) yeah i can make that clearer
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]