pierrejeambrun commented on code in PR #51920:
URL: https://github.com/apache/airflow/pull/51920#discussion_r2182921964


##########
airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py:
##########
@@ -1606,3 +1606,49 @@ def test_should_respond_200_with_null_logical_date(self, 
test_client):
             "conf": {},
             "note": None,
         }
+
+
+class TestWaitDagRun:
+    # The way we init async engine does not work well with FastAPI app init.
+    # Creating the engine implicitly creates an event loop, which Airflow does
+    # once for the entire process; creating the FastAPI app also does, but our
+    # test setup does it once for each test. I don't know how to properly fix
+    # this without rewriting how Airflow does db; re-configuring the db for 
each
+    # test at least makes the tests run correctly.
+    @pytest.fixture(autouse=True)
+    def reconfigure_async_db_engine(self):
+        from airflow.settings import _configure_async_session
+
+        _configure_async_session()
+
+    def test_should_respond_401(self, unauthenticated_test_client):
+        response = 
unauthenticated_test_client.get(f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/wait?interval=1")
+        assert response.status_code == 401
+
+    def test_should_respond_403(self, unauthorized_test_client):
+        response = 
unauthorized_test_client.get(f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/wait?interval=1")
+        assert response.status_code == 403
+
+    def test_should_respond_404(self, test_client):
+        response = 
test_client.get(f"/dags/{DAG1_ID}/dagRuns/does-not-exist/wait?interval=1")
+        assert response.status_code == 404
+
+    def test_should_respond_422_without_interval_param(self, test_client):
+        response = 
test_client.get(f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/wait")
+        assert response.status_code == 422
+
+    @pytest.mark.parametrize(
+        "run_id, state",
+        [(DAG1_RUN1_ID, DAG1_RUN1_STATE), (DAG1_RUN2_ID, DAG1_RUN2_STATE)],
+    )
+    def test_should_respond_200_immediately_for_finished_run(self, 
test_client, run_id, state):
+        response = 
test_client.get(f"/dags/{DAG1_ID}/dagRuns/{run_id}/wait?interval=100")
+        assert response.status_code == 200
+        data = response.json()
+        assert data == {"state": state}
+
+    def test_collect_task(self, test_client):
+        response = 
test_client.get(f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/wait?interval=100&collect=task_1")

Review Comment:
   same here



##########
airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml:
##########
@@ -2135,6 +2135,87 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/HTTPValidationError'
+  /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/wait:

Review Comment:
   ```suggestion
     /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/watch:
   ```
   
   Nit: To me it looks like we're watching the dag run state, and stop watching 
when it's settled. But I'm good with wait too.



##########
airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py:
##########
@@ -1606,3 +1606,49 @@ def test_should_respond_200_with_null_logical_date(self, 
test_client):
             "conf": {},
             "note": None,
         }
+
+
+class TestWaitDagRun:
+    # The way we init async engine does not work well with FastAPI app init.
+    # Creating the engine implicitly creates an event loop, which Airflow does
+    # once for the entire process; creating the FastAPI app also does, but our
+    # test setup does it once for each test. I don't know how to properly fix
+    # this without rewriting how Airflow does db; re-configuring the db for 
each
+    # test at least makes the tests run correctly.
+    @pytest.fixture(autouse=True)
+    def reconfigure_async_db_engine(self):
+        from airflow.settings import _configure_async_session
+
+        _configure_async_session()
+
+    def test_should_respond_401(self, unauthenticated_test_client):
+        response = 
unauthenticated_test_client.get(f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/wait?interval=1")
+        assert response.status_code == 401
+
+    def test_should_respond_403(self, unauthorized_test_client):
+        response = 
unauthorized_test_client.get(f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/wait?interval=1")
+        assert response.status_code == 403
+
+    def test_should_respond_404(self, test_client):
+        response = 
test_client.get(f"/dags/{DAG1_ID}/dagRuns/does-not-exist/wait?interval=1")
+        assert response.status_code == 404
+
+    def test_should_respond_422_without_interval_param(self, test_client):
+        response = 
test_client.get(f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/wait")
+        assert response.status_code == 422
+
+    @pytest.mark.parametrize(
+        "run_id, state",
+        [(DAG1_RUN1_ID, DAG1_RUN1_STATE), (DAG1_RUN2_ID, DAG1_RUN2_STATE)],
+    )
+    def test_should_respond_200_immediately_for_finished_run(self, 
test_client, run_id, state):
+        response = 
test_client.get(f"/dags/{DAG1_ID}/dagRuns/{run_id}/wait?interval=100")

Review Comment:
   Nit: Here and bellow, can you use `params=....` argument and let the 
test_client do the url encoding instead of doing this manually please.
   
   It's easier to read especially when there are multiple query params (so we 
tend to do that everywhere), and manipulating a dict is more convenient that a 
string. (looking up variables, etc.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to