ashb commented on code in PR #44899:
URL: https://github.com/apache/airflow/pull/44899#discussion_r1886933898
##########
airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -92,35 +197,15 @@ def ti_update_state(
query = update(TI).where(TI.id == ti_id_str).values(data)
+ # TODO: Instead remove this payload from discriminator accepted by this
endpoint
if isinstance(ti_patch_payload, TIEnterRunningPayload):
- if previous_state != State.QUEUED:
- log.warning(
- "Can not start Task Instance ('%s') in invalid state: %s",
- ti_id_str,
- previous_state,
- )
-
- # TODO: Pass a RFC 9457 compliant error message in "detail" field
- # https://datatracker.ietf.org/doc/html/rfc9457
- # to provide more information about the error
- # FastAPI will automatically convert this to a JSON response
- # This might be added in FastAPI in
https://github.com/fastapi/fastapi/issues/10370
- raise HTTPException(
- status_code=status.HTTP_409_CONFLICT,
- detail={
- "reason": "invalid_state",
- "message": "TI was not in a state where it could be marked
as running",
- "previous_state": previous_state,
- },
- )
- log.info("Task with %s state started on %s ", previous_state,
ti_patch_payload.hostname)
- # Ensure there is no end date set.
- query = query.values(
- end_date=None,
- hostname=ti_patch_payload.hostname,
- unixname=ti_patch_payload.unixname,
- pid=ti_patch_payload.pid,
- state=State.RUNNING,
+ raise HTTPException(
+ status_code=status.HTTP_409_CONFLICT,
Review Comment:
No, the request is correct/valid, it's just that the state of the object
doesn't allow it to be processed.
##########
task_sdk/tests/conftest.py:
##########
@@ -128,3 +132,85 @@ def _disable_ol_plugin():
yield
airflow.plugins_manager.plugins = None
+
+
+class MakeTIContextCallable(Protocol):
+ def __call__(
+ self,
+ dag_id: str = ...,
+ run_id: str = ...,
+ logical_date: str | datetime = ...,
+ data_interval_start: str | datetime = ...,
+ data_interval_end: str | datetime = ...,
+ start_date: str | datetime = ...,
+ run_type: str = ...,
+ ) -> TIRunContext: ...
+
+
+class MakeTIContextDictCallable(Protocol):
+ def __call__(
+ self,
+ dag_id: str = ...,
+ run_id: str = ...,
+ logical_date: str = ...,
+ data_interval_start: str | datetime = ...,
+ data_interval_end: str | datetime = ...,
+ start_date: str | datetime = ...,
+ run_type: str = ...,
+ ) -> dict[str, Any]: ...
+
+
[email protected]
+def make_ti_context() -> MakeTIContextCallable:
+ """Factory for creating TIRunContext objects."""
+ from airflow.sdk.api.datamodels._generated import DagRun, TIRunContext
+
+ def _make_context(
+ dag_id: str = "test_dag",
+ run_id: str = "test_run",
+ logical_date: str | datetime = "2024-12-01T01:00:00Z",
+ data_interval_start: str | datetime = "2024-12-01T00:00:00Z",
+ data_interval_end: str | datetime = "2024-12-01T01:00:00Z",
+ start_date: str | datetime = "2024-12-01T01:00:00Z",
+ run_type: str = "manual",
+ ) -> TIRunContext:
+ return TIRunContext(
+ dag_run=DagRun(
+ dag_id=dag_id,
+ run_id=run_id,
+ logical_date=logical_date, # type: ignore
+ data_interval_start=data_interval_start, # type: ignore
+ data_interval_end=data_interval_end, # type: ignore
+ start_date=start_date, # type: ignore
+ run_type=run_type, # type: ignore
+ )
+ )
+
+ return _make_context
+
+
[email protected]
+def make_ti_context_dict(make_ti_context: MakeTIContextCallable) ->
MakeTIContextDictCallable:
+ """Factory for creating context dictionaries suited for API Server
response."""
+
+ def _make_context_dict(
+ dag_id: str = "test_dag",
+ run_id: str = "test_run",
+ logical_date: str | datetime = "2024-12-01T00:00:00Z",
+ data_interval_start: str | datetime = "2024-12-01T00:00:00Z",
+ data_interval_end: str | datetime = "2024-12-01T01:00:00Z",
+ start_date: str | datetime = "2024-12-01T00:00:00Z",
+ run_type: str = "manual",
+ ) -> dict[str, Any]:
+ context = make_ti_context(
+ dag_id=dag_id,
+ run_id=run_id,
+ logical_date=logical_date,
+ data_interval_start=data_interval_start,
+ data_interval_end=data_interval_end,
+ start_date=start_date,
+ run_type=run_type,
+ )
+ return context.model_dump(exclude_unset=True, mode="json")
+
+ return _make_context_dict
Review Comment:
Ah k
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]