pierrejeambrun commented on code in PR #49164:
URL: https://github.com/apache/airflow/pull/49164#discussion_r2073122320
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py:
##########
@@ -156,7 +157,7 @@ def patch_dag_run(
f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}`
was not found",
)
- dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
+ dag: DAG = get_dag_from_dag_bag(request.app.state.dag_bag, dag_id)
Review Comment:
typing is wrong `DAG | None` (This is found in multiple places and need to
be updated everywhere)
##########
airflow-core/src/airflow/api/common/utils.py:
##########
@@ -0,0 +1,26 @@
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from fastapi import HTTPException, status
+
+if TYPE_CHECKING:
+ from airflow.models.dag import DAG
+
+
+def get_dag_from_dag_bag(dag_bag, dag_id: str) -> DAG:
Review Comment:
dag_bag can be injected from the request dependency, you don't need to pass
it around.
##########
airflow-core/src/airflow/api/common/utils.py:
##########
@@ -0,0 +1,26 @@
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from fastapi import HTTPException, status
+
+if TYPE_CHECKING:
+ from airflow.models.dag import DAG
+
+
+def get_dag_from_dag_bag(dag_bag, dag_id: str) -> DAG:
+ """
+ Retrieve a DAG from dag_bag with consistent error handling.
Review Comment:
Return type is wrong `DAG | None`
##########
airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py:
##########
@@ -1163,6 +1167,37 @@ def test_should_respond_200(self, test_client):
"note": None,
}
+ def test_materialize_asset_missing_dag_id_in_serialized_data(self,
test_client):
+ DAG1_ID = "test_dag_missing_dag_id"
+
+ test_dag = DAG(dag_id=DAG1_ID, start_date=datetime(2025, 4, 15),
schedule="@once")
+ EmptyOperator(task_id="test_task", dag=test_dag)
+ test_dag.sync_to_db()
+ SerializedDagModel.write_dag(test_dag, bundle_name=DAG1_ID)
+
+ with create_session() as session:
+ dag_model =
session.scalar(select(SerializedDagModel).where(SerializedDagModel.dag_id ==
DAG1_ID))
+ if not dag_model:
+ pytest.fail("Failed to find serialized DAG in database")
+ data = dag_model.data
+ del data["dag"]["dag_id"]
+ session.execute(
+ update(SerializedDagModel).where(SerializedDagModel.dag_id ==
DAG1_ID).values(_data=data)
+ )
+ session.commit()
+
+ with create_session() as session:
+ asset = AssetModel(name="test_asset", uri="s3://bucket/key/1")
+ session.add(asset)
+ session.flush()
+ asset_id = asset.id
+ session.add(TaskOutletAssetReference(dag_id=DAG1_ID,
task_id="test_task", asset_id=asset.id))
+ session.commit()
+
+ response = test_client.post(f"/assets/{asset_id}/materialize")
+ assert response.status_code == status.HTTP_400_BAD_REQUEST
+ assert "An unexpected error occurred" in response.json()["detail"]
Review Comment:
Assert the full error message please `msg == response.json()["detail"`. (To
be sure that the dag id and entire message is correct).
##########
airflow-core/src/airflow/api/common/utils.py:
##########
@@ -0,0 +1,26 @@
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from fastapi import HTTPException, status
+
+if TYPE_CHECKING:
+ from airflow.models.dag import DAG
+
+
+def get_dag_from_dag_bag(dag_bag, dag_id: str) -> DAG:
+ """
+ Retrieve a DAG from dag_bag with consistent error handling.
+
+ Raises:
+ HTTPException: with appropriate status code and message on error.
+ """
+ try:
+ return dag_bag.get_dag(dag_id)
+ except RuntimeError as err:
+ if "airflow dags reserialize" in str(err):
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail=f"An unexpected error occurred while trying to load DAG
'{dag_id}'.",
Review Comment:
```suggestion
detail=f"An unexpected error occurred while trying to
deserialize DAG '{dag_id}'.",
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]