This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 053d903 `func.sum` may returns `Decimal` that break rest APIs
(#15585)
053d903 is described below
commit 053d903816464f699876109b50390636bf617eff
Author: suiting-young <[email protected]>
AuthorDate: Thu Apr 29 22:26:39 2021 +0800
`func.sum` may returns `Decimal` that break rest APIs (#15585)
`sqlalchemy.func.sum` has a known *"issue"* that it **may** returns
`Decimal` value (_in my case MySQL 5.7_).
This will cause problem when calling [rest
APIs](https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#tag/Pool).
E.g.
```http
GET /airflow/api/v1/pools?limit=100
...
TypeError: Object of type 'Decimal' is not JSON serializable
```
---
airflow/models/pool.py | 15 +++++++++------
1 file changed, 9 insertions(+), 6 deletions(-)
diff --git a/airflow/models/pool.py b/airflow/models/pool.py
index 131559d..feade77 100644
--- a/airflow/models/pool.py
+++ b/airflow/models/pool.py
@@ -161,12 +161,13 @@ class Pool(Base):
"""
from airflow.models.taskinstance import TaskInstance # Avoid circular
import
- return (
+ return int(
session.query(func.sum(TaskInstance.pool_slots))
.filter(TaskInstance.pool == self.pool)
.filter(TaskInstance.state.in_(list(EXECUTION_STATES)))
.scalar()
- ) or 0
+ or 0
+ )
@provide_session
def running_slots(self, session: Session):
@@ -178,12 +179,13 @@ class Pool(Base):
"""
from airflow.models.taskinstance import TaskInstance # Avoid circular
import
- return (
+ return int(
session.query(func.sum(TaskInstance.pool_slots))
.filter(TaskInstance.pool == self.pool)
.filter(TaskInstance.state == State.RUNNING)
.scalar()
- ) or 0
+ or 0
+ )
@provide_session
def queued_slots(self, session: Session):
@@ -195,12 +197,13 @@ class Pool(Base):
"""
from airflow.models.taskinstance import TaskInstance # Avoid circular
import
- return (
+ return int(
session.query(func.sum(TaskInstance.pool_slots))
.filter(TaskInstance.pool == self.pool)
.filter(TaskInstance.state == State.QUEUED)
.scalar()
- ) or 0
+ or 0
+ )
@provide_session
def open_slots(self, session: Session) -> float: