This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 5719d195c4a AIP-84 Refactor SortParm (#44345)
5719d195c4a is described below
commit 5719d195c4a4e9e3c4908d548586950a0f137d2c
Author: LIU ZHE YOU <[email protected]>
AuthorDate: Wed Nov 27 18:25:49 2024 +0800
AIP-84 Refactor SortParm (#44345)
* Refactor get_connections
* Allow Column type for `to_replace` parameter
* Refactor get_dags
* Refactor get_import_errors
* Refactor SortParam, get_dag_runs
* Fix default ordering when directly using SortParam
- related: https://github.com/apache/airflow/pull/44393
* Fix get_list_dag_runs_batch
---
airflow/api_fastapi/common/parameters.py | 28 +++++++++-------------
.../core_api/routes/public/connections.py | 4 +++-
.../api_fastapi/core_api/routes/public/dag_run.py | 6 +++--
airflow/api_fastapi/core_api/routes/public/dags.py | 4 +++-
.../core_api/routes/public/import_error.py | 2 +-
5 files changed, 22 insertions(+), 22 deletions(-)
diff --git a/airflow/api_fastapi/common/parameters.py
b/airflow/api_fastapi/common/parameters.py
index 23817e92006..3390a455cc7 100644
--- a/airflow/api_fastapi/common/parameters.py
+++ b/airflow/api_fastapi/common/parameters.py
@@ -40,12 +40,11 @@ from sqlalchemy.inspection import inspect
from airflow.api_connexion.endpoints.task_instance_endpoint import
_convert_ti_states
from airflow.jobs.job import Job
-from airflow.models import Base, Connection
+from airflow.models import Base
from airflow.models.asset import AssetEvent, AssetModel,
DagScheduleAssetReference, TaskOutletAssetReference
from airflow.models.dag import DagModel, DagTag
from airflow.models.dagrun import DagRun
from airflow.models.dagwarning import DagWarning, DagWarningType
-from airflow.models.errors import ParseImportError
from airflow.models.taskinstance import TaskInstance
from airflow.typing_compat import Self
from airflow.utils import timezone
@@ -218,16 +217,8 @@ class _DagDisplayNamePatternSearch(_SearchParam):
class SortParam(BaseParam[str]):
"""Order result by the attribute."""
- attr_mapping = {
- "last_run_state": DagRun.state,
- "last_run_start_date": DagRun.start_date,
- "connection_id": Connection.conn_id,
- "import_error_id": ParseImportError.id,
- "dag_run_id": DagRun.run_id,
- }
-
def __init__(
- self, allowed_attrs: list[str], model: Base, to_replace: dict[str,
str] | None = None
+ self, allowed_attrs: list[str], model: Base, to_replace: dict[str, str
| Column] | None = None
) -> None:
super().__init__()
self.allowed_attrs = allowed_attrs
@@ -242,19 +233,22 @@ class SortParam(BaseParam[str]):
self.value = self.get_primary_key_string()
lstriped_orderby = self.value.lstrip("-")
+ column: Column | None = None
if self.to_replace:
- lstriped_orderby = self.to_replace.get(lstriped_orderby,
lstriped_orderby)
+ replacement = self.to_replace.get(lstriped_orderby,
lstriped_orderby)
+ if isinstance(replacement, str):
+ lstriped_orderby = replacement
+ else:
+ column = replacement
- if self.allowed_attrs and lstriped_orderby not in self.allowed_attrs:
+ if (self.allowed_attrs and lstriped_orderby not in self.allowed_attrs)
and column is None:
raise HTTPException(
400,
f"Ordering with '{lstriped_orderby}' is disallowed or "
f"the attribute does not exist on the model",
)
-
- column: Column = self.attr_mapping.get(lstriped_orderby, None) or
getattr(
- self.model, lstriped_orderby
- )
+ if column is None:
+ column = getattr(self.model, lstriped_orderby)
# MySQL does not support `nullslast`, and True/False ordering depends
on the
# database implementation.
diff --git a/airflow/api_fastapi/core_api/routes/public/connections.py
b/airflow/api_fastapi/core_api/routes/public/connections.py
index edfece1333b..9eb14bb9c40 100644
--- a/airflow/api_fastapi/core_api/routes/public/connections.py
+++ b/airflow/api_fastapi/core_api/routes/public/connections.py
@@ -92,7 +92,9 @@ def get_connections(
SortParam,
Depends(
SortParam(
- ["connection_id", "conn_type", "description", "host", "port",
"id"], Connection
+ ["conn_id", "conn_type", "description", "host", "port", "id"],
+ Connection,
+ {"connection_id": "conn_id"},
).dynamic_depends()
),
],
diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py
b/airflow/api_fastapi/core_api/routes/public/dag_run.py
index 1e95f75273c..fab34d80bde 100644
--- a/airflow/api_fastapi/core_api/routes/public/dag_run.py
+++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py
@@ -266,8 +266,8 @@ def get_dag_runs(
"id",
"state",
"dag_id",
+ "run_id",
"logical_date",
- "dag_run_id",
"start_date",
"end_date",
"updated_at",
@@ -275,6 +275,7 @@ def get_dag_runs(
"conf",
],
DagRun,
+ {"dag_run_id": "run_id"},
).dynamic_depends(default="id")
),
],
@@ -401,7 +402,7 @@ def get_list_dag_runs_batch(
"state",
"dag_id",
"logical_date",
- "dag_run_id",
+ "run_id",
"start_date",
"end_date",
"updated_at",
@@ -409,6 +410,7 @@ def get_list_dag_runs_batch(
"conf",
],
DagRun,
+ {"dag_run_id": "run_id"},
).set_value(body.order_by)
base_query = select(DagRun)
diff --git a/airflow/api_fastapi/core_api/routes/public/dags.py
b/airflow/api_fastapi/core_api/routes/public/dags.py
index 6a099b9b614..4cc2000a2e3 100644
--- a/airflow/api_fastapi/core_api/routes/public/dags.py
+++ b/airflow/api_fastapi/core_api/routes/public/dags.py
@@ -54,6 +54,7 @@ from airflow.api_fastapi.core_api.datamodels.dags import (
from airflow.api_fastapi.core_api.openapi.exceptions import
create_openapi_http_exception_doc
from airflow.exceptions import AirflowException, DagNotFound
from airflow.models import DAG, DagModel, DagTag
+from airflow.models.dagrun import DagRun
dags_router = AirflowRouter(tags=["DAG"], prefix="/dags")
@@ -73,8 +74,9 @@ def get_dags(
SortParam,
Depends(
SortParam(
- ["dag_id", "dag_display_name", "next_dagrun",
"last_run_state", "last_run_start_date"],
+ ["dag_id", "dag_display_name", "next_dagrun", "state",
"start_date"],
DagModel,
+ {"last_run_state": DagRun.state, "last_run_start_date":
DagRun.start_date},
).dynamic_depends()
),
],
diff --git a/airflow/api_fastapi/core_api/routes/public/import_error.py
b/airflow/api_fastapi/core_api/routes/public/import_error.py
index 6090676b560..26e0d0858bf 100644
--- a/airflow/api_fastapi/core_api/routes/public/import_error.py
+++ b/airflow/api_fastapi/core_api/routes/public/import_error.py
@@ -73,12 +73,12 @@ def get_import_errors(
SortParam(
[
"id",
- "import_error_id",
"timestamp",
"filename",
"stacktrace",
],
ParseImportError,
+ {"import_error_id": "id"},
).dynamic_depends()
),
],