kaxil commented on code in PR #66608:
URL: https://github.com/apache/airflow/pull/66608#discussion_r3321418146


##########
airflow-core/src/airflow/utils/log/callback_log_reader.py:
##########
@@ -0,0 +1,170 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Reader for callback execution logs stored in remote or local storage."""
+
+from __future__ import annotations
+
+import logging
+import os
+from collections.abc import Generator
+from contextlib import suppress
+from pathlib import Path
+from typing import TYPE_CHECKING
+
+from airflow.configuration import conf
+from airflow.utils.log.file_task_handler import (
+    StructuredLogMessage,
+    _interleave_logs,
+    _stream_lines_by_chunk,
+)
+
+if TYPE_CHECKING:
+    from airflow._shared.logging.remote import LogSourceInfo, RawLogStream
+
+logger = logging.getLogger(__name__)
+
+
+def _get_callback_log_relative_path(dag_id: str, run_id: str, callback_id: 
str) -> str:
+    """
+    Construct the relative log path for a callback execution.
+
+    This must match the path format used in ExecuteCallback.make():
+        executor_callbacks/{dag_id}/{run_id}/{callback_id}
+    """
+    return f"executor_callbacks/{dag_id}/{run_id}/{callback_id}"
+
+
+def read_callback_log(

Review Comment:
   This whole module (170 lines) doesn't appear to be called anywhere on the 
branch. `read_callback_log` has no caller in the diff or the rest of the tree, 
and there's no test. Is it meant to wire into a core-API log endpoint in a 
follow-up? If so I'd pull it out of this PR until the consumer lands, since 
dead code tends to drift before it's ever used.



##########
task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py:
##########
@@ -305,6 +327,50 @@ def _handle_request(self, msg: CallbackToSupervisor, log: 
FilteringBoundLogger,
         self.send_msg(resp, request_id=req_id, error=None, **dump_opts)
 
 
+def _register_unusual_prefix_module(callback_path: str, bundle_path, _log) -> 
None:

Review Comment:
   This is nearly identical to `_ensure_bundle_module_registered` in 
`airflow-core/src/airflow/triggers/callback.py` (same mangled-name 
reconstruction, same `split("_", 3)`, same `SourceFileLoader` dance). That's 
the same duplication-drift problem we extracted the context builder to avoid. 
Worth a single shared helper both sides import. Neither copy has a test, and 
the `unusual_prefix_` triggerer-import mechanism is a sizable addition that 
isn't in the PR description. Can you describe what it fixes? It reads as a 
separate concern from the context-fetching change.



##########
airflow-core/src/airflow/models/deadline.py:
##########
@@ -215,46 +216,31 @@ def prune_deadlines(cls, *, session: Session, conditions: 
dict[Mapped, Any]) ->
 
     def handle_miss(self, session: Session):
         """Handle a missed deadline by queueing the callback."""
-
-        def get_simple_context():
-            from airflow.api_fastapi.core_api.datamodels.dag_run import 
DAGRunResponse
-            from airflow.models import DagRun
-
-            # TODO: Use the TaskAPI from within Triggerer to fetch full 
context instead of sending this context
-            #  from the scheduler
-
-            # Fetch the DagRun from the database again to avoid errors when 
self.dagrun's relationship fields
-            # are not in the current session.
-            dagrun = session.get(DagRun, self.dagrun_id)
-
-            return {
-                "dag_run": 
DAGRunResponse.model_validate(dagrun).model_dump(mode="json"),
-                "deadline": {"id": self.id, "deadline_time": 
self.deadline_time},
-            }
+        # Routing identifiers: stored at the top level of callback.data so the 
triggerer/executor
+        # can locate the DagRun and build execution context *before* invoking 
the callback.
+        # These are NOT user payload — they are consumed by infrastructure 
(fetch_dag_run_for_callback,

Review Comment:
   `fetch_dag_run_for_callback` doesn't exist. The actual consumers are 
`_fetch_callback_dag_run_data` (triggerer, `triggerer_job_runner.py`) and 
`_fetch_and_build_context` (executor subprocess, this file's sibling 
supervisor). Worth fixing the name so the comment doesn't send the next reader 
looking for a function that isn't there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to