ashb commented on code in PR #44899:
URL: https://github.com/apache/airflow/pull/44899#discussion_r1883834318


##########
airflow/api_fastapi/execution_api/datamodels/taskinstance.py:
##########
@@ -139,3 +142,34 @@ class TaskInstance(BaseModel):
 
 """Schema for setting RTIF for a task instance."""
 RTIFPayload = RootModel[dict[str, str]]
+
+
+class DagRun(BaseModel):
+    """Schema for DagRun model with minimal required fields needed for 
Runtime."""
+
+    # TODO: `dag_id` and `run_id` are duplicated from TaskInstance
+    #   See if we can avoid sending these fields from API server and instead
+    #   use the TaskInstance data to get the DAG run information in the client 
(Task Execution Interface).
+    dag_id: str
+    run_id: str
+
+    logical_date: UtcDateTime
+    data_interval_start: UtcDateTime | None
+    data_interval_end: UtcDateTime | None
+    start_date: UtcDateTime
+    end_date: UtcDateTime | None
+    run_type: DagRunType
+    conf: Annotated[dict[str, Any], Field(default_factory=dict)]
+
+
+class TIRunContext(BaseModel):
+    """Response schema for TaskInstance run context."""
+
+    dag_run: DagRun

Review Comment:
   I would have put this on the TI object, not in here, but potato potato



##########
airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -48,6 +51,108 @@
 log = logging.getLogger(__name__)
 
 
[email protected](
+    "/{task_instance_id}/run",

Review Comment:
   Maybe 
   ```suggestion
       "/{task_instance_id}/start",
   ```



##########
airflow/api_fastapi/execution_api/datamodels/taskinstance.py:
##########
@@ -139,3 +142,34 @@ class TaskInstance(BaseModel):
 
 """Schema for setting RTIF for a task instance."""
 RTIFPayload = RootModel[dict[str, str]]
+
+
+class DagRun(BaseModel):
+    """Schema for DagRun model with minimal required fields needed for 
Runtime."""
+
+    # TODO: `dag_id` and `run_id` are duplicated from TaskInstance
+    #   See if we can avoid sending these fields from API server and instead
+    #   use the TaskInstance data to get the DAG run information in the client 
(Task Execution Interface).

Review Comment:
   Yes, or vice-versa.



##########
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -48,9 +48,13 @@ class RuntimeTaskInstance(TaskInstance):
     model_config = ConfigDict(arbitrary_types_allowed=True)
 
     task: BaseOperator
+    _ti_context_from_server: TIRunContext | None = None
+    """The Task Instance context from the API server, if any."""

Review Comment:
   Lets exclude this from the repr I think



##########
airflow/api_fastapi/execution_api/datamodels/taskinstance.py:
##########
@@ -139,3 +142,34 @@ class TaskInstance(BaseModel):
 
 """Schema for setting RTIF for a task instance."""
 RTIFPayload = RootModel[dict[str, str]]
+
+
+class DagRun(BaseModel):
+    """Schema for DagRun model with minimal required fields needed for 
Runtime."""
+
+    # TODO: `dag_id` and `run_id` are duplicated from TaskInstance
+    #   See if we can avoid sending these fields from API server and instead
+    #   use the TaskInstance data to get the DAG run information in the client 
(Task Execution Interface).
+    dag_id: str
+    run_id: str
+
+    logical_date: UtcDateTime
+    data_interval_start: UtcDateTime | None
+    data_interval_end: UtcDateTime | None
+    start_date: UtcDateTime
+    end_date: UtcDateTime | None
+    run_type: DagRunType
+    conf: Annotated[dict[str, Any], Field(default_factory=dict)]
+
+
+class TIRunContext(BaseModel):
+    """Response schema for TaskInstance run context."""
+
+    dag_run: DagRun

Review Comment:
   Oh, except it's not available/required in the TI start context (which is 
just the uuid and the 4/5-tuple). Right.



##########
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -77,15 +75,36 @@ def get_template_context(self):
             # "task_instance_key_str": 
f"{task.dag_id}__{task.task_id}__{ds_nodash}",
             # "test_mode": task_instance.test_mode,
             # "triggering_asset_events": 
lazy_object_proxy.Proxy(get_triggering_events),
-            # "ts": ts,
-            # "ts_nodash": ts_nodash,
-            # "ts_nodash_with_tz": ts_nodash_with_tz,
             # "var": {
             #     "json": VariableAccessor(deserialize_json=True),
             #     "value": VariableAccessor(deserialize_json=False),
             # },
             # "conn": ConnectionAccessor(),
         }
+        if self._ti_context_from_server:
+            dag_run = self._ti_context_from_server.dag_run
+
+            logical_date = dag_run.logical_date
+            ds = logical_date.strftime("%Y-%m-%d")
+            ds_nodash = ds.replace("-", "")
+            ts = logical_date.isoformat()
+            ts_nodash = logical_date.strftime("%Y%m%dT%H%M%S")
+            ts_nodash_with_tz = ts.replace("-", "").replace(":", "")

Review Comment:
   Nit/future work: make these lazy



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to