vemikhaylov commented on code in PR #29608:
URL: https://github.com/apache/airflow/pull/29608#discussion_r1141132735
##########
airflow/models/taskinstance.py:
##########
@@ -2741,6 +2749,14 @@ def tg2(inp):
map_index_start = ancestor_map_index * further_count
return range(map_index_start, map_index_start + further_count)
+ def inject_xcoms(self, xcoms: list[dict[str, Any]]):
+ """
+ To inject upstream dependencies' output instead of trying
+ to read from DB when testing a task individually.
+ """
Review Comment:
> Write the custom data to the real XCom table, and delete them afterwards.
Emit an error (and refuse to execute) if there is already existing data
matching the custom input in the table. This feels reasonable since it would
only be possible if you test a task in an existing DAG run but don’t want to
use the actual upstream XComs, which feels very niche and not a necessary
feature to me.
For this one I have a concern that in a general case script may exit in a
tricky way, even skipping `finally`
(https://stackoverflow.com/questions/49262379/does-finally-always-execute-in-python).
When we're dealing with a temporary DAG run, then that's relatively fine and
is visible from the run id:
https://github.com/apache/airflow/blob/main/airflow/cli/commands/task_command.py#L74-L80
But for a normal DAG run it may leave the data in a strange state. Also
probably if this is an ongoing DAG run, then, if I'm not mistaken, there might
be race conditions like "write XCom in test command" -> "write the same XCom in
a real task executed in the DAG run" -> "delete XCom in test command (clean
up)".
What do you think?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]