vemikhaylov commented on code in PR #29608:
URL: https://github.com/apache/airflow/pull/29608#discussion_r1117955516


##########
airflow/models/taskinstance.py:
##########
@@ -2741,6 +2749,14 @@ def tg2(inp):
         map_index_start = ancestor_map_index * further_count
         return range(map_index_start, map_index_start + further_count)
 
+    def inject_xcoms(self, xcoms: list[dict[str, Any]]):
+        """
+        To inject upstream dependencies' output instead of trying
+        to read from DB when testing a task individually.
+        """

Review Comment:
   To be honest, I'm not quite sure about the approach, so would ask for 
feedback. A potential risk and questions around the current one:
   
   1. The users of the framework may start to use it as a public API, but we 
may want to change how it works at some point in the future. Is it a valid 
concern or not?
   
   2. Can the users use `XCom` directly without the API's exposed by 
`TaskInstance` or the models aren't available in the DAGs, so we cover the 
scenarios of the interactions with `XCom` on the `TaskInstance` layer? 
   
   --
   
   Alternative options, which I thought of:
   
   1. Pass `xcoms` down the call stack, like to `ti.run(injected_xcoms=xcoms)`. 
Extending the `ti.run` signature didn't  look quite attractive to me, though 
may be more clear and explicit in comparison to preliminary "injection". But 
injection is similar to what we do with `params`: 
https://github.com/apache/airflow/blob/4e3b5ae7248c2327864f64b25dc7a5bd7705430c/airflow/cli/commands/task_command.py#L590
   
   2. Have a module with a global variable, which we could initialise in 
`task_command` and then read from either in `TaskInstance.xcom_pull` or 
`BaseXCom.get_one` / `.get_many`. Global state..
   
   3. Monkeypatch `ti.xcom_pull` (dirty?)
   
   4. Implement a custom `XCom` backend to be used in `xcom.py`, which would 
read the injected data from memory. 
   
   5. If we are able to run everything inside a common transaction, which is 
rolled-back at the end of the command execution, then we can just "put" the 
corresponding XComs in "DB" before running the task?
   
   --
   
   Since I wasn't sure about feasibility and validity of some of the approaches 
above (especially 4 and 5), I implemented the most straightforward way, similar 
to what we already have (the `params` reference above), seeking for feedback. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to