amoghrajesh commented on code in PR #52876:
URL: https://github.com/apache/airflow/pull/52876#discussion_r2191599681


##########
airflow-core/src/airflow/models/dagbundle.py:
##########
@@ -40,6 +44,59 @@ class DagBundleModel(Base):
     active = Column(Boolean, default=True)
     version = Column(String(200), nullable=True)
     last_refreshed = Column(UtcDateTime, nullable=True)
+    url_template = Column(String(200), nullable=True)

Review Comment:
   nit: `signed_url_template` to be more explicit about what's stored?



##########
providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py:
##########
@@ -137,10 +137,20 @@ def refresh(self) -> None:
             )
 
     def view_url(self, version: str | None = None) -> str | None:
+        """
+        Return a URL for viewing the DAGs in S3. Currently, versioning is not 
supported.
+
+        This method is deprecated and will be removed in a future release. Use 
`view_url_template` instead.
+        """
+        return self.view_url_template()
+
+    def view_url_template(self) -> str | None:
         """Return a URL for viewing the DAGs in S3. Currently, versioning is 
not supported."""
         if self.version:
             raise AirflowException("S3 url with version is not supported")
-
+        if hasattr(self, "_view_url_template") and self._view_url_template:
+            # Backward compatibility for released Airflow versions
+            return self._view_url_template

Review Comment:
   I do not understand this compat. The `_view_url_template` is a new field 
right? How is it present on older Airflow?



##########
airflow-core/src/airflow/models/dagbundle.py:
##########
@@ -40,6 +44,59 @@ class DagBundleModel(Base):
     active = Column(Boolean, default=True)
     version = Column(String(200), nullable=True)
     last_refreshed = Column(UtcDateTime, nullable=True)
+    url_template = Column(String(200), nullable=True)
+    template_params = Column(JSONType, nullable=True)
 
-    def __init__(self, *, name: str):
+    def __init__(self, *, name: str, version: str | None = None):
+        super().__init__()
         self.name = name
+        self.version = version
+
+    def _unsign_url(self) -> str | None:
+        """
+        Unsign a URL token to get the original URL template.
+
+        :param signed_url: The signed URL token
+        :return: The original URL template or None if unsigning fails
+        """
+        try:
+            from itsdangerous import BadSignature, URLSafeSerializer
+
+            from airflow.configuration import conf
+
+            serializer = URLSafeSerializer(conf.get_mandatory_value("core", 
"fernet_key"))
+            payload = serializer.loads(self.url_template)
+            if isinstance(payload, dict) and "url" in payload and 
"bundle_name" in payload:
+                if payload["bundle_name"] == self.name:
+                    return payload["url"]
+
+            return None
+        except (BadSignature, Exception):
+            return None
+
+    def render_url(self, version: str | None = None) -> str | None:
+        """
+        Render the URL template with the given version and stored template 
parameters.
+
+        First unsigns the URL to get the original template, then formats it 
with
+        the provided version and any additional parameters.
+
+        :param version: The version to substitute in the template
+        :return: The rendered URL or None if no template is available
+        """
+        if not self.url_template:
+            return None
+
+        url_template = self._unsign_url()
+
+        if url_template is None:
+            url_template = self.url_template

Review Comment:
   If we fail to unsign, we get the signed one? Shouldn't we fallback to some 
default?



##########
airflow-core/src/airflow/dag_processing/bundles/manager.py:
##########
@@ -125,11 +175,56 @@ def parse_config(self) -> None:
     def sync_bundles_to_db(self, *, session: Session = NEW_SESSION) -> None:
         self.log.debug("Syncing DAG bundles to the database")
         stored = {b.name: b for b in session.query(DagBundleModel).all()}
+
         for name in self._bundle_config.keys():
             if bundle := stored.pop(name, None):
                 bundle.active = True
+                # Update URL template and parameters if they've changed
+                bundle_instance = self.get_bundle(name)
+                new_template = bundle_instance.view_url_template()
+                new_params = self._extract_template_params(bundle_instance)
+
+                # Validate and sign the URL before saving
+                if new_template:
+                    if not _is_safe_bundle_url(new_template):
+                        self.log.warning(
+                            "Bundle %s has unsafe URL template '%s', skipping 
URL update", name, new_template
+                        )
+                        new_template = None
+                    else:
+                        # Sign the URL for integrity verification
+                        new_template = _sign_bundle_url(new_template, name)
+                        self.log.debug("Signed URL template for bundle %s", 
name)

Review Comment:
   Can we extract this part into a function, seems its repeated below too?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to