This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4485393562 Add support for queued state in DagRun update endpoint.
(#23481)
4485393562 is described below
commit 4485393562ea4151a42f1be47bea11638b236001
Author: Karthikeyan Singaravelan <[email protected]>
AuthorDate: Mon May 9 17:55:48 2022 +0530
Add support for queued state in DagRun update endpoint. (#23481)
---
airflow/api_connexion/endpoints/dag_run_endpoint.py | 8 +++++++-
airflow/api_connexion/openapi/v1.yaml | 1 +
airflow/api_connexion/schemas/dag_run_schema.py | 6 +++++-
tests/api_connexion/endpoints/test_dag_run_endpoint.py | 12 +++++++-----
4 files changed, 20 insertions(+), 7 deletions(-)
diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py
b/airflow/api_connexion/endpoints/dag_run_endpoint.py
index a83ca223b0..a3f9bfde92 100644
--- a/airflow/api_connexion/endpoints/dag_run_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py
@@ -23,7 +23,11 @@ from marshmallow import ValidationError
from sqlalchemy import or_
from sqlalchemy.orm import Query, Session
-from airflow.api.common.mark_tasks import set_dag_run_state_to_failed,
set_dag_run_state_to_success
+from airflow.api.common.mark_tasks import (
+ set_dag_run_state_to_failed,
+ set_dag_run_state_to_queued,
+ set_dag_run_state_to_success,
+)
from airflow.api_connexion import security
from airflow.api_connexion.exceptions import AlreadyExists, BadRequest,
NotFound
from airflow.api_connexion.parameters import apply_sorting, check_limit,
format_datetime, format_parameters
@@ -308,6 +312,8 @@ def update_dag_run_state(*, dag_id: str, dag_run_id: str,
session: Session = NEW
dag = current_app.dag_bag.get_dag(dag_id)
if state == DagRunState.SUCCESS:
set_dag_run_state_to_success(dag=dag, run_id=dag_run.run_id,
commit=True)
+ elif state == DagRunState.QUEUED:
+ set_dag_run_state_to_queued(dag=dag, run_id=dag_run.run_id,
commit=True)
else:
set_dag_run_state_to_failed(dag=dag, run_id=dag_run.run_id,
commit=True)
dag_run = session.query(DagRun).get(dag_run.id)
diff --git a/airflow/api_connexion/openapi/v1.yaml
b/airflow/api_connexion/openapi/v1.yaml
index 9e99db032d..df19d12dda 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -2442,6 +2442,7 @@ components:
enum:
- success
- failed
+ - queued
DAGRunCollection:
type: object
diff --git a/airflow/api_connexion/schemas/dag_run_schema.py
b/airflow/api_connexion/schemas/dag_run_schema.py
index 44f6eda496..2933888b8c 100644
--- a/airflow/api_connexion/schemas/dag_run_schema.py
+++ b/airflow/api_connexion/schemas/dag_run_schema.py
@@ -112,7 +112,11 @@ class DAGRunSchema(SQLAlchemySchema):
class SetDagRunStateFormSchema(Schema):
"""Schema for handling the request of setting state of DAG run"""
- state = DagStateField(validate=validate.OneOf([DagRunState.SUCCESS.value,
DagRunState.FAILED.value]))
+ state = DagStateField(
+ validate=validate.OneOf(
+ [DagRunState.SUCCESS.value, DagRunState.FAILED.value,
DagRunState.QUEUED.value]
+ )
+ )
class DAGRunCollection(NamedTuple):
diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
index d2547587dd..f539839208 100644
--- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
@@ -1271,7 +1271,7 @@ class TestPostDagRun(TestDagRunEndpoint):
class TestPatchDagRunState(TestDagRunEndpoint):
- @pytest.mark.parametrize("state", ["failed", "success"])
+ @pytest.mark.parametrize("state", ["failed", "success", "queued"])
@pytest.mark.parametrize("run_type", [state.value for state in DagRunType])
def test_should_respond_200(self, state, run_type, dag_maker, session):
dag_id = "TEST_DAG_ID"
@@ -1294,8 +1294,10 @@ class TestPatchDagRunState(TestDagRunEndpoint):
environ_overrides={"REMOTE_USER": "test"},
)
- ti.refresh_from_db()
- assert ti.state == state
+ if state != "queued":
+ ti.refresh_from_db()
+ assert ti.state == state
+
dr = session.query(DagRun).filter(DagRun.run_id == dr.run_id).first()
assert response.status_code == 200
assert response.json == {
@@ -1314,7 +1316,7 @@ class TestPatchDagRunState(TestDagRunEndpoint):
'run_type': run_type,
}
- @pytest.mark.parametrize('invalid_state', ["running", "queued"])
+ @pytest.mark.parametrize('invalid_state', ["running"])
@freeze_time(TestDagRunEndpoint.default_time)
def test_should_response_400_for_non_existing_dag_run_state(self,
invalid_state, dag_maker):
dag_id = "TEST_DAG_ID"
@@ -1332,7 +1334,7 @@ class TestPatchDagRunState(TestDagRunEndpoint):
)
assert response.status_code == 400
assert response.json == {
- 'detail': f"'{invalid_state}' is not one of ['success', 'failed']
- 'state'",
+ 'detail': f"'{invalid_state}' is not one of ['success', 'failed',
'queued'] - 'state'",
'status': 400,
'title': 'Bad Request',
'type': EXCEPTIONS_LINK_MAP[400],