uranusjr commented on code in PR #29608:
URL: https://github.com/apache/airflow/pull/29608#discussion_r1132142637
##########
airflow/models/taskinstance.py:
##########
@@ -2741,6 +2749,14 @@ def tg2(inp):
map_index_start = ancestor_map_index * further_count
return range(map_index_start, map_index_start + further_count)
+ def inject_xcoms(self, xcoms: list[dict[str, Any]]):
+ """
+ To inject upstream dependencies' output instead of trying
+ to read from DB when testing a task individually.
+ """
Review Comment:
> Can the users use XCom directly without the API
They can, via `XCom.get_one` or `get_many`. So I feel the only fully
reliable way is to actually write the data to the database so they are
available directly from the XCom interface.
We already do something similar for DagRun (if you specify a DagRun that
does not actually exist, we would create a temporary one in the database for
the `test` run and delete it afterwards), so I can think of a couple of
approaches:
1. Write the custom XCom data to a separate table, and read them on `XCom`.
Delete them after the test run.
2. Write the custom data to the real XCom table, and delete them afterwards.
Emit an error (and refuse to execute) if there is already existing data
matching the custom input in the table. This feels reasonable since it would
only be possible if you test a task _in an existing DAG run but don’t want to
use the actual upstream XComs_, which feels very niche and not a necessary
feature to me.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]