dabla commented on code in PR #68299:
URL: https://github.com/apache/airflow/pull/68299#discussion_r3442270739
##########
task-sdk/src/airflow/sdk/bases/xcom.py:
##########
@@ -273,6 +349,70 @@ def get_one(
)
return None
+ @classmethod
+ async def aget_one(
+ cls,
+ *,
+ key: str,
+ dag_id: str,
+ task_id: str,
+ run_id: str,
+ map_index: int | None = None,
+ include_prior_dates: bool = False,
+ ) -> Any | None:
+ """
+ Retrieve an XCom value asynchronously, optionally meeting certain
criteria.
+
+ This method returns "full" XCom values (i.e. uses ``deserialize_value``
+ from the XCom backend).
+
+ If there are no results, *None* is returned. If multiple XCom entries
+ match the criteria, an arbitrary one is returned.
+
+ .. seealso:: ``aget_value()`` is a convenience function if you already
+ have a structured TaskInstance or TaskInstanceKey object available.
+
+ :param run_id: Dag run ID for the task.
+ :param dag_id: Only pull XCom from this Dag. Pass *None* (default) to
+ remove the filter.
+ :param task_id: Only XCom from task with matching ID will be pulled.
+ Pass *None* (default) to remove the filter.
+ :param map_index: Only XCom from task with matching ID will be pulled.
+ Pass *None* (default) to remove the filter.
+ :param key: A key for the XCom. If provided, only XCom with matching
+ keys will be returned. Pass *None* (default) to remove the filter.
+ :param include_prior_dates: If *False* (default), only XCom from the
+ specified Dag run is returned. If *True*, the latest matching XCom
is
+ returned regardless of the run it belongs to.
+ """
+ from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
+
+ msg = await SUPERVISOR_COMMS.asend(
+ GetXCom(
+ key=key,
+ dag_id=dag_id,
+ task_id=task_id,
+ run_id=run_id,
+ map_index=map_index,
+ include_prior_dates=include_prior_dates,
+ ),
+ )
+
+ if not isinstance(msg, XComResult):
+ raise TypeError(f"Expected XComResult, received: {type(msg)}
{msg}")
+
+ if msg.value is not None:
+ return cls.deserialize_value(msg)
+ log.warning(
+ "No XCom value found; defaulting to None.",
+ key=key,
+ dag_id=dag_id,
+ task_id=task_id,
+ run_id=run_id,
+ map_index=map_index,
+ )
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]