vemikhaylov commented on code in PR #29608:
URL: https://github.com/apache/airflow/pull/29608#discussion_r1141132735


##########
airflow/models/taskinstance.py:
##########
@@ -2741,6 +2749,14 @@ def tg2(inp):
         map_index_start = ancestor_map_index * further_count
         return range(map_index_start, map_index_start + further_count)
 
+    def inject_xcoms(self, xcoms: list[dict[str, Any]]):
+        """
+        To inject upstream dependencies' output instead of trying
+        to read from DB when testing a task individually.
+        """

Review Comment:
   > Write the custom data to the real XCom table, and delete them afterwards. 
Emit an error (and refuse to execute) if there is already existing data 
matching the custom input in the table. This feels reasonable since it would 
only be possible if you test a task in an existing DAG run but don’t want to 
use the actual upstream XComs, which feels very niche and not a necessary 
feature to me.
   
   For this one I have a concern that in general script may exit in a tricky 
way, even skipping `finally` 
(https://stackoverflow.com/questions/49262379/does-finally-always-execute-in-python).
 When we're dealing with a temporary DAG run, then that's relatively fine and 
is visible from the run id:
   
   
https://github.com/apache/airflow/blob/main/airflow/cli/commands/task_command.py#L74-L80
   
   But for a normal DAG run it may leave the data in a strange state. Also 
probably if this is an ongoing DAG run, then, if I'm not mistaken, there might 
be race conditions like "write XCom in test command" -> "write the same XCom in 
a real task executed in the DAG run" -> "delete XCom in test command (clean 
up)".
   
   What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to