jedcunningham commented on code in PR #45860:
URL: https://github.com/apache/airflow/pull/45860#discussion_r1924490231
##########
airflow/dag_processing/manager.py:
##########
@@ -388,18 +391,18 @@ def _fetch_callbacks(
def _add_callback_to_queue(self, request: CallbackRequest):
self.log.debug("Queuing %s CallbackRequest: %s",
type(request).__name__, request)
- self.log.warning("Callbacks are not implemented yet!")
- # TODO: AIP-66 make callbacks bundle aware
- return
- self._callback_to_execute[request.full_filepath].append(request)
- if request.full_filepath in self._file_path_queue:
- # Remove file paths matching request.full_filepath from
self._file_path_queue
+ bundle = DagBundlesManager().get_bundle(name=request.bundle_name,
version=request.bundle_version)
+ dag_absolute_path = os.fspath(Path(bundle.path, request.filepath))
+ file_info = DagFileInfo(path=dag_absolute_path,
bundle_name=request.bundle_name)
+ self._callback_to_execute[file_info].append(request)
+ if file_info in self._file_path_queue:
+ # Remove file paths matching request.filepath from
self._file_path_queue
# Since we are already going to use that filepath to run callback,
# there is no need to have same file path again in the queue
Review Comment:
There is a reason now - we are running the callback on an old bundle version
now, so we should leave the existing entry in the queue so the latest file is
still parsed when it is its turn.
##########
airflow/models/dag.py:
##########
@@ -2212,15 +2213,14 @@ def safe_dag_id(self):
return self.dag_id.replace(".", "__dot__")
@property
- def relative_fileloc(self) -> pathlib.Path | None:
+ def relative_fileloc(self) -> pathlib.Path:
"""File location of the importable dag 'file' relative to the
configured DAGs folder."""
Review Comment:
docstring needs updating. Longer term `fileloc` should just be relative from
the start: #45623
##########
airflow/dag_processing/processor.py:
##########
@@ -66,17 +66,22 @@ def _parse_file_entrypoint():
log = structlog.get_logger(logger_name="task")
result = _parse_file(msg, log)
- comms_decoder.send_request(log, result)
+ if result:
+ comms_decoder.send_request(log, result)
-def _parse_file(msg: DagFileParseRequest, log: FilteringBoundLogger) ->
DagFileParsingResult:
+def _parse_file(msg: DagFileParseRequest, log: FilteringBoundLogger) ->
DagFileParsingResult | None:
# TODO: Set known_pool names on DagBag!
bag = DagBag(
dag_folder=msg.file,
include_examples=False,
safe_mode=True,
load_op_links=False,
)
+ if msg.callback_requests:
Review Comment:
Probably worth leaving a comment why we are short circuiting out.
##########
airflow/dag_processing/manager.py:
##########
@@ -647,10 +650,9 @@ def set_file_paths(self, new_file_paths:
list[DagFileInfo]):
self._file_path_queue = deque(x for x in self._file_path_queue if x in
new_file_paths)
Stats.gauge("dag_processing.file_path_queue_size",
len(self._file_path_queue))
- # TODO: AIP-66 make callbacks bundle aware
- # callback_paths_to_del = [x for x in self._callback_to_execute if x
not in new_file_paths]
- # for path_to_del in callback_paths_to_del:
- # del self._callback_to_execute[path_to_del]
+ callback_paths_to_del = [x for x in self._callback_to_execute if x not
in new_file_paths]
Review Comment:
We will need to be a little smarter about this - the file could still exist
in the old bundle version, but not the latest.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]