jason810496 commented on code in PR #66911:
URL: https://github.com/apache/airflow/pull/66911#discussion_r3265080004
##########
task-sdk/src/airflow/sdk/api/client.py:
##########
@@ -869,22 +869,104 @@ def trigger(
run_after=run_after,
)
+ dag_run_exists_before_trigger = self._dag_run_exists(dag_id=dag_id,
run_id=run_id)
Review Comment:
Then we could use `self.get_count` directly. SInce it's definitely safe for
retry.
##########
task-sdk/src/airflow/sdk/api/client.py:
##########
@@ -869,22 +869,104 @@ def trigger(
run_after=run_after,
)
+ dag_run_exists_before_trigger = self._dag_run_exists(dag_id=dag_id,
run_id=run_id)
+ if dag_run_exists_before_trigger is True:
+ if reset_dag_run:
+ log.info("Dag Run already exists; Resetting Dag Run.",
dag_id=dag_id, run_id=run_id)
+ return self.clear(run_id=run_id, dag_id=dag_id)
Review Comment:
We should introduce the idempotent logic for `self.clear` as follow-up as
well. Make sure the DagRun really existed before having the `clear` API call.
##########
task-sdk/src/airflow/sdk/api/client.py:
##########
@@ -869,22 +869,104 @@ def trigger(
run_after=run_after,
)
+ dag_run_exists_before_trigger = self._dag_run_exists(dag_id=dag_id,
run_id=run_id)
+ if dag_run_exists_before_trigger is True:
+ if reset_dag_run:
+ log.info("Dag Run already exists; Resetting Dag Run.",
dag_id=dag_id, run_id=run_id)
+ return self.clear(run_id=run_id, dag_id=dag_id)
+ log.info("Dag Run already exists!", dag_id=dag_id, run_id=run_id)
+ return ErrorResponse(error=ErrorType.DAGRUN_ALREADY_EXISTS)
+
try:
- self.client.post(
- f"dag-runs/{dag_id}/{run_id}",
content=body.model_dump_json(exclude_defaults=True)
+ self.client._request_without_retry(
+ "POST", f"dag-runs/{dag_id}/{run_id}",
content=body.model_dump_json(exclude_defaults=True)
)
+ except (httpx.ReadError, httpx.ReadTimeout, httpx.RemoteProtocolError):
+ if (
+ dag_run_exists_before_trigger is False
+ and self._dag_run_exists(dag_id=dag_id, run_id=run_id,
retry=True) is True
+ ):
+ log.info(
+ "Dag Run exists after ambiguous trigger response; treating
trigger as successful.",
+ dag_id=dag_id,
+ run_id=run_id,
+ )
+ return OKResponse(ok=True)
+ raise
except ServerResponseError as e:
if e.response.status_code == HTTPStatus.CONFLICT:
if reset_dag_run:
- log.info("Dag Run already exists; Resetting Dag Run.",
dag_id=dag_id, run_id=run_id)
+ log.info(
+ "Dag Run already exists after trigger attempt;
Resetting Dag Run.",
+ detail=e.detail,
+ dag_id=dag_id,
+ run_id=run_id,
+ )
return self.clear(run_id=run_id, dag_id=dag_id)
-
- log.info("Dag Run already exists!", detail=e.detail,
dag_id=dag_id, run_id=run_id)
+ log.info(
+ "Dag Run already exists after trigger attempt.",
+ detail=e.detail,
+ dag_id=dag_id,
+ run_id=run_id,
+ )
return ErrorResponse(error=ErrorType.DAGRUN_ALREADY_EXISTS)
raise
return OKResponse(ok=True)
+ def _dag_run_exists(self, dag_id: str, run_id: str, *, retry: bool =
False) -> bool | None:
+ """Return whether the Dag run exists, or None when the detail endpoint
is unavailable."""
+ if self.client._dry_run:
+ return None
+
+ try:
+ if retry:
+ self.client.get(f"dag-runs/{dag_id}/{run_id}")
+ else:
+ self.client._request_without_retry("GET",
f"dag-runs/{dag_id}/{run_id}")
+ except httpx.RequestError:
+ return None
+ except ServerResponseError as e:
+ if e.response.status_code == HTTPStatus.NOT_FOUND:
+ return False
+ if e.response.status_code == HTTPStatus.METHOD_NOT_ALLOWED:
+ # Older execution API servers may not support the Dag run
detail endpoint yet.
+ return None
+ if (
+ e.response.status_code == HTTPStatus.UNPROCESSABLE_ENTITY
+ and run_id == "previous"
+ and self._is_legacy_previous_dag_run_route_response(e.response)
+ ):
+ return None
+ if e.response.status_code >= HTTPStatus.INTERNAL_SERVER_ERROR:
+ return None
+ raise
+ except httpx.HTTPStatusError as e:
+ if e.response.status_code == HTTPStatus.NOT_FOUND:
+ return False
+ if e.response.status_code == HTTPStatus.METHOD_NOT_ALLOWED:
+ return None
+ if e.response.status_code >= HTTPStatus.INTERNAL_SERVER_ERROR:
+ return None
+ raise
+ return True
Review Comment:
I will prefer to use `get_dr_count` (pass with the run_id) to make sure
whether the DagRun existed or not.
We could document down the history context here then we don't need to handle
complex the compatible logic here.
Since `get_dr_count` was introduced since the first Airflow 3.0 release.
https://github.com/apache/airflow/blob/v3-0-stable/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py#L160-L181
##########
task-sdk/src/airflow/sdk/api/client.py:
##########
@@ -1125,6 +1208,19 @@ def _update_auth(self, response: httpx.Response):
log.debug("Execution API issued us a refreshed Task token")
self.auth = BearerAuth(new_token)
+ @staticmethod
+ def _ensure_json_content_type(kwargs: dict[str, Any]) -> None:
+ # Set content type as convenience if not already set
+ if kwargs.get("content", None) is not None and "content-type" not in (
+ kwargs.get("headers", {}) or {}
+ ):
Review Comment:
We should make this a staticmethod at `Client` class in
`task-sdk/src/airflow/sdk/api/client.py` level.
It's duplicated of
https://github.com/apache/airflow/blob/4498582dd1ee553c403db5119ddadd7151d6fde1/task-sdk/src/airflow/sdk/api/client.py#L1138-L1142
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]