uranusjr commented on code in PR #45486:
URL: https://github.com/apache/airflow/pull/45486#discussion_r1908225295
##########
task_sdk/src/airflow/sdk/definitions/contextmanager.py:
##########
@@ -27,10 +28,47 @@
T = TypeVar("T")
-__all__ = [
- "DagContext",
- "TaskGroupContext",
-]
+__all__ = ["DagContext", "TaskGroupContext", "get_current_context"]
+
+# This is a global variable that stores the current Task context.
+# It is used to push the Context dictionary when Task starts execution
+# and it is used to retrieve the current context in PythonOperator or Taskflow
API via
+# the `get_current_context` function.
+_CURRENT_CONTEXT: list[Mapping[str, Any]] = []
+
+
+def get_current_context() -> Mapping[str, Any]:
+ """
+ Retrieve the execution context dictionary without altering user method's
signature.
+
+ This is the simplest method of retrieving the execution context dictionary.
+
+ **Old style:**
+
+ .. code:: python
+
+ def my_task(**context):
+ ti = context["ti"]
+
+ **New style:**
+
+ .. code:: python
+
+ from airflow.providers.standard.operators.python import
get_current_context
+
+
+ def my_task():
+ context = get_current_context()
+ ti = context["ti"]
+
+ Current context will only have value if this method was called after an
operator
+ was starting to execute.
+ """
+ if not _CURRENT_CONTEXT:
+ raise RuntimeError(
+ "Current context was requested but no context was found! Are you
running within an airflow task?"
Review Comment:
```suggestion
"Current context was requested but no context was found! Are you
running within an Airflow task?"
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]