This is an automated email from the ASF dual-hosted git repository.
kamilbregula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new fddc572 Customizable page size limit in API (#9431)
fddc572 is described below
commit fddc5721c9b5015cd600eec85496c7fc4bd513a7
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Fri Jul 3 09:14:31 2020 +0100
Customizable page size limit in API (#9431)
Co-authored-by: Kamil BreguĊa <[email protected]>
---
.../api_connexion/endpoints/connection_endpoint.py | 7 ++--
.../api_connexion/endpoints/dag_run_endpoint.py | 3 +-
.../api_connexion/endpoints/event_log_endpoint.py | 4 +++
.../endpoints/import_error_endpoint.py | 11 +++---
airflow/api_connexion/endpoints/pool_endpoint.py | 9 ++---
.../api_connexion/endpoints/variable_endpoint.py | 4 +++
airflow/api_connexion/endpoints/xcom_endpoint.py | 15 ++++++---
airflow/api_connexion/openapi/v1.yaml | 1 -
airflow/api_connexion/parameters.py | 30 ++++++++++++-----
airflow/config_templates/config.yml | 16 +++++++++
airflow/config_templates/default_airflow.cfg | 9 +++++
docs/index.rst | 3 +-
docs/stable-rest-api/index.rst | 32 ++++++++++++++++++
.../endpoints/test_connection_endpoint.py | 31 +++++++++++++++--
.../endpoints/test_dag_run_endpoint.py | 14 +++++++-
.../endpoints/test_event_log_endpoint.py | 14 +++++++-
.../endpoints/test_import_error_endpoint.py | 36 +++++++++++++++++++-
.../api_connexion/endpoints/test_pool_endpoint.py | 26 ++++++++++++++-
.../endpoints/test_variable_endpoint.py | 11 +++++-
tests/api_connexion/test_parameters.py | 39 +++++++++++++++++++++-
20 files changed, 279 insertions(+), 36 deletions(-)
diff --git a/airflow/api_connexion/endpoints/connection_endpoint.py
b/airflow/api_connexion/endpoints/connection_endpoint.py
index 77aa7e5..628fedd 100644
--- a/airflow/api_connexion/endpoints/connection_endpoint.py
+++ b/airflow/api_connexion/endpoints/connection_endpoint.py
@@ -21,6 +21,7 @@ from marshmallow import ValidationError
from sqlalchemy import func
from airflow.api_connexion.exceptions import AlreadyExists, BadRequest,
NotFound
+from airflow.api_connexion.parameters import check_limit, format_parameters
from airflow.api_connexion.schemas.connection_schema import (
ConnectionCollection, connection_collection_item_schema,
connection_collection_schema, connection_schema,
)
@@ -33,8 +34,7 @@ def delete_connection(connection_id, session):
"""
Delete a connection entry
"""
- query = session.query(Connection).filter_by(conn_id=connection_id)
- connection = query.one_or_none()
+ connection =
session.query(Connection).filter_by(conn_id=connection_id).one_or_none()
if connection is None:
raise NotFound('Connection not found')
session.delete(connection)
@@ -52,6 +52,9 @@ def get_connection(connection_id, session):
return connection_collection_item_schema.dump(connection)
+@format_parameters({
+ 'limit': check_limit
+})
@provide_session
def get_connections(session, limit, offset=0):
"""
diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py
b/airflow/api_connexion/endpoints/dag_run_endpoint.py
index bf7907f..a83e293 100644
--- a/airflow/api_connexion/endpoints/dag_run_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py
@@ -18,7 +18,7 @@
from sqlalchemy import func
from airflow.api_connexion.exceptions import NotFound
-from airflow.api_connexion.parameters import format_datetime, format_parameters
+from airflow.api_connexion.parameters import check_limit, format_datetime,
format_parameters
from airflow.api_connexion.schemas.dag_run_schema import (
DAGRunCollection, dagrun_collection_schema, dagrun_schema,
)
@@ -52,6 +52,7 @@ def get_dag_run(dag_id, dag_run_id, session):
'execution_date_lte': format_datetime,
'end_date_gte': format_datetime,
'end_date_lte': format_datetime,
+ 'limit': check_limit
})
@provide_session
def get_dag_runs(session, dag_id, start_date_gte=None, start_date_lte=None,
diff --git a/airflow/api_connexion/endpoints/event_log_endpoint.py
b/airflow/api_connexion/endpoints/event_log_endpoint.py
index ca83384..ac2c21b 100644
--- a/airflow/api_connexion/endpoints/event_log_endpoint.py
+++ b/airflow/api_connexion/endpoints/event_log_endpoint.py
@@ -19,6 +19,7 @@
from sqlalchemy import func
from airflow.api_connexion.exceptions import NotFound
+from airflow.api_connexion.parameters import check_limit, format_parameters
from airflow.api_connexion.schemas.event_log_schema import (
EventLogCollection, event_log_collection_schema, event_log_schema,
)
@@ -37,6 +38,9 @@ def get_event_log(event_log_id, session):
return event_log_schema.dump(event_log)
+@format_parameters({
+ 'limit': check_limit
+})
@provide_session
def get_event_logs(session, limit, offset=None):
"""
diff --git a/airflow/api_connexion/endpoints/import_error_endpoint.py
b/airflow/api_connexion/endpoints/import_error_endpoint.py
index 05bd5e7..e054777 100644
--- a/airflow/api_connexion/endpoints/import_error_endpoint.py
+++ b/airflow/api_connexion/endpoints/import_error_endpoint.py
@@ -14,11 +14,11 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-from flask import request
+
from sqlalchemy import func
-from airflow.api_connexion import parameters
from airflow.api_connexion.exceptions import NotFound
+from airflow.api_connexion.parameters import check_limit, format_parameters
from airflow.api_connexion.schemas.error_schema import (
ImportErrorCollection, import_error_collection_schema, import_error_schema,
)
@@ -38,13 +38,14 @@ def get_import_error(import_error_id, session):
return import_error_schema.dump(error)
+@format_parameters({
+ 'limit': check_limit
+})
@provide_session
-def get_import_errors(session):
+def get_import_errors(session, limit, offset=None):
"""
Get all import errors
"""
- offset = request.args.get(parameters.page_offset, 0)
- limit = min(int(request.args.get(parameters.page_limit, 100)), 100)
total_entries = session.query(func.count(ImportError.id)).scalar()
import_errors =
session.query(ImportError).order_by(ImportError.id).offset(offset).limit(limit).all()
diff --git a/airflow/api_connexion/endpoints/pool_endpoint.py
b/airflow/api_connexion/endpoints/pool_endpoint.py
index 54fbf18..fd69e72 100644
--- a/airflow/api_connexion/endpoints/pool_endpoint.py
+++ b/airflow/api_connexion/endpoints/pool_endpoint.py
@@ -19,8 +19,8 @@ from marshmallow import ValidationError
from sqlalchemy import func
from sqlalchemy.exc import IntegrityError
-from airflow.api_connexion import parameters
from airflow.api_connexion.exceptions import AlreadyExists, BadRequest,
NotFound
+from airflow.api_connexion.parameters import check_limit, format_parameters
from airflow.api_connexion.schemas.pool_schema import PoolCollection,
pool_collection_schema, pool_schema
from airflow.models.pool import Pool
from airflow.utils.session import provide_session
@@ -50,13 +50,14 @@ def get_pool(pool_name, session):
return pool_schema.dump(obj)
+@format_parameters({
+ 'limit': check_limit
+})
@provide_session
-def get_pools(session):
+def get_pools(session, limit, offset=None):
"""
Get all pools
"""
- offset = request.args.get(parameters.page_offset, 0)
- limit = min(int(request.args.get(parameters.page_limit, 100)), 100)
total_entries = session.query(func.count(Pool.id)).scalar()
pools =
session.query(Pool).order_by(Pool.id).offset(offset).limit(limit).all()
diff --git a/airflow/api_connexion/endpoints/variable_endpoint.py
b/airflow/api_connexion/endpoints/variable_endpoint.py
index dbad3e4..c77f38c 100644
--- a/airflow/api_connexion/endpoints/variable_endpoint.py
+++ b/airflow/api_connexion/endpoints/variable_endpoint.py
@@ -21,6 +21,7 @@ from marshmallow import ValidationError
from sqlalchemy import func
from airflow.api_connexion.exceptions import BadRequest, NotFound
+from airflow.api_connexion.parameters import check_limit, format_parameters
from airflow.api_connexion.schemas.variable_schema import
variable_collection_schema, variable_schema
from airflow.models import Variable
from airflow.utils.session import provide_session
@@ -46,6 +47,9 @@ def get_variable(variable_key: str) -> Response:
return variable_schema.dump({"key": variable_key, "val": var})
+@format_parameters({
+ 'limit': check_limit
+})
@provide_session
def get_variables(session, limit: Optional[int], offset: Optional[int] = None)
-> Response:
"""
diff --git a/airflow/api_connexion/endpoints/xcom_endpoint.py
b/airflow/api_connexion/endpoints/xcom_endpoint.py
index cd317ad..ff310c5 100644
--- a/airflow/api_connexion/endpoints/xcom_endpoint.py
+++ b/airflow/api_connexion/endpoints/xcom_endpoint.py
@@ -14,12 +14,13 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-from flask import request
+from typing import Optional
+
from sqlalchemy import and_, func
from sqlalchemy.orm.session import Session
-from airflow.api_connexion import parameters
from airflow.api_connexion.exceptions import NotFound
+from airflow.api_connexion.parameters import check_limit, format_parameters
from airflow.api_connexion.schemas.xcom_schema import (
XComCollection, XComCollectionItemSchema, XComCollectionSchema,
xcom_collection_item_schema,
xcom_collection_schema,
@@ -35,18 +36,22 @@ def delete_xcom_entry():
raise NotImplementedError("Not implemented yet.")
+@format_parameters({
+ 'limit': check_limit
+})
@provide_session
def get_xcom_entries(
dag_id: str,
dag_run_id: str,
task_id: str,
- session: Session
+ session: Session,
+ limit: Optional[int],
+ offset: Optional[int] = None
) -> XComCollectionSchema:
"""
Get all XCom values
"""
- offset = request.args.get(parameters.page_offset, 0)
- limit = min(int(request.args.get(parameters.page_limit, 100)), 100)
+
query = session.query(XCom)
if dag_id != '~':
query = query.filter(XCom.dag_id == dag_id)
diff --git a/airflow/api_connexion/openapi/v1.yaml
b/airflow/api_connexion/openapi/v1.yaml
index bec7037..970b7d5 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -2160,7 +2160,6 @@ components:
required: false
schema:
type: integer
- minimum: 1
default: 100
description: The numbers of items to return.
diff --git a/airflow/api_connexion/parameters.py
b/airflow/api_connexion/parameters.py
index 5f90a77..6ee9be5 100644
--- a/airflow/api_connexion/parameters.py
+++ b/airflow/api_connexion/parameters.py
@@ -20,16 +20,9 @@ from typing import Callable, Dict
from pendulum.parsing import ParserError
from airflow.api_connexion.exceptions import BadRequest
+from airflow.configuration import conf
from airflow.utils import timezone
-# Page parameters
-page_offset = "offset"
-page_limit = "limit"
-
-# Database entity fields
-dag_id = "dag_id"
-pool_id = "pool_id"
-
def format_datetime(value: str):
"""
@@ -38,6 +31,7 @@ def format_datetime(value: str):
This should only be used within connection views because it raises 400
"""
+ value = value.strip()
if value[-1] != 'Z':
value = value.replace(" ", '+')
try:
@@ -48,6 +42,23 @@ def format_datetime(value: str):
)
+def check_limit(value: int):
+ """
+ This checks the limit passed to view and raises BadRequest if
+ limit exceed user configured value
+ """
+ max_val = conf.getint("api", "maximum_page_limit") # user configured max
page limit
+ fallback = conf.getint("api", "fallback_page_limit")
+
+ if value > max_val:
+ return max_val
+ if value == 0:
+ return fallback
+ if value < 0:
+ raise BadRequest("Page limit must be a positive integer")
+ return value
+
+
def format_parameters(params_formatters: Dict[str, Callable[..., bool]]):
"""
Decorator factory that create decorator that convert parameters using
given formatters.
@@ -56,6 +67,7 @@ def format_parameters(params_formatters: Dict[str,
Callable[..., bool]]):
:param params_formatters: Map of key name and formatter function
"""
+
def format_parameters_decorator(func):
@wraps(func)
def wrapped_function(*args, **kwargs):
@@ -63,5 +75,7 @@ def format_parameters(params_formatters: Dict[str,
Callable[..., bool]]):
if key in kwargs:
kwargs[key] = formatter(kwargs[key])
return func(*args, **kwargs)
+
return wrapped_function
+
return format_parameters_decorator
diff --git a/airflow/config_templates/config.yml
b/airflow/config_templates/config.yml
index 6ea67dd..e706c57 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -581,6 +581,22 @@
type: string
example: ~
default: "airflow.api.auth.backend.deny_all"
+ - name: maximum_page_limit
+ description: |
+ Used to set the maximum page limit for API requests
+ version_added: ~
+ type: integer
+ example: ~
+ default: "100"
+ - name: fallback_page_limit
+ description: |
+ Used to set the default page limit when limit is zero. A default limit
+ of 100 is set on OpenApi spec. However, this particular default limit
+ only work when limit is set equal to zero(0) from API requests.
+ If no limit is supplied, the OpenApi spec default is used.
+ type: integer
+ example: ~
+ default: "100"
- name: lineage
description: ~
options:
diff --git a/airflow/config_templates/default_airflow.cfg
b/airflow/config_templates/default_airflow.cfg
index a7ce2b2..6cd5033 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -316,6 +316,15 @@ fail_fast = False
# ("airflow.api.auth.backend.default" allows all requests for historic reasons)
auth_backend = airflow.api.auth.backend.deny_all
+# Used to set the maximum page limit for API requests
+maximum_page_limit = 100
+
+# Used to set the default page limit when limit is zero. A default limit
+# of 100 is set on OpenApi spec. However, this particular default limit
+# only work when limit is set equal to zero(0) from API requests.
+# If no limit is supplied, the OpenApi spec default is used.
+fallback_page_limit = 100
+
[lineage]
# what lineage backend to use
backend =
diff --git a/docs/index.rst b/docs/index.rst
index d208e43..d3282dc 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -95,6 +95,7 @@ Content
kubernetes
lineage
dag-serialization
+ Using the REST API <stable-rest-api/index.rst>
changelog
best-practices
faq
@@ -108,5 +109,5 @@ Content
CLI <cli-ref>
Macros <macros-ref>
Python API <_api/index>
- REST API <rest-api-ref>
+ Experimental REST API <rest-api-ref>
Configurations <configurations-ref>
diff --git a/docs/stable-rest-api/index.rst b/docs/stable-rest-api/index.rst
new file mode 100644
index 0000000..77a7fec
--- /dev/null
+++ b/docs/stable-rest-api/index.rst
@@ -0,0 +1,32 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+REST API Documentation
+======================
+
+Airflow has a REST API that allows third-party application to perform a wide
wide range of operations.
+
+Page size limit
+---------------
+
+To protect against requests that may lead to application instability, the API
has a limit of items in response.
+The default is 100 items, but you can change it using ``maximum_page_limit``
option in ``[api]``
+section in the ``airflow.cfg`` file.
+
+.. note::
+ For more information on setting the configuration, see
:doc:`../howto/set-config`
diff --git a/tests/api_connexion/endpoints/test_connection_endpoint.py
b/tests/api_connexion/endpoints/test_connection_endpoint.py
index 60dc7f3..12c1c05 100644
--- a/tests/api_connexion/endpoints/test_connection_endpoint.py
+++ b/tests/api_connexion/endpoints/test_connection_endpoint.py
@@ -21,6 +21,7 @@ from parameterized import parameterized
from airflow.models import Connection
from airflow.utils.session import create_session, provide_session
from airflow.www import app
+from tests.test_utils.config import conf_vars
from tests.test_utils.db import clear_db_connections
@@ -161,6 +162,7 @@ class TestGetConnections(TestConnectionEndpoint):
class TestGetConnectionsPagination(TestConnectionEndpoint):
+
@parameterized.expand(
[
("/api/v1/connections?limit=1", ['TEST_CONN_ID1']),
@@ -210,16 +212,39 @@ class
TestGetConnectionsPagination(TestConnectionEndpoint):
self.assertEqual(conn_ids, expected_conn_ids)
@provide_session
- def test_should_respect_page_size_limit(self, session):
+ def test_should_respect_page_size_limit_default(self, session):
+ connection_models = self._create_connections(200)
+ session.add_all(connection_models)
+ session.commit()
+
+ response = self.client.get("/api/v1/connections")
+ assert response.status_code == 200
+
+ self.assertEqual(response.json["total_entries"], 200)
+ self.assertEqual(len(response.json["connections"]), 100)
+
+ @provide_session
+ def test_limit_of_zero_should_return_default(self, session):
connection_models = self._create_connections(200)
session.add_all(connection_models)
session.commit()
- response = self.client.get("/api/v1/connections") # default limit is
100
+ response = self.client.get("/api/v1/connections?limit=0")
assert response.status_code == 200
self.assertEqual(response.json["total_entries"], 200)
- self.assertEqual(len(response.json["connections"]), 100) # default
+ self.assertEqual(len(response.json["connections"]), 100)
+
+ @provide_session
+ @conf_vars({("api", "maximum_page_limit"): "150"})
+ def test_should_return_conf_max_if_req_max_above_conf(self, session):
+ connection_models = self._create_connections(200)
+ session.add_all(connection_models)
+ session.commit()
+
+ response = self.client.get("/api/v1/connections?limit=180")
+ assert response.status_code == 200
+ self.assertEqual(len(response.json['connections']), 150)
def _create_connections(self, count):
return [Connection(
diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
index 7aa552f..bc3cd6b 100644
--- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
@@ -25,6 +25,7 @@ from airflow.utils import timezone
from airflow.utils.session import provide_session
from airflow.utils.types import DagRunType
from airflow.www import app
+from tests.test_utils.config import conf_vars
from tests.test_utils.db import clear_db_runs
@@ -235,6 +236,17 @@ class TestGetDagRunsPagination(TestDagRunEndpoint):
self.assertEqual(response.json["total_entries"], 200)
self.assertEqual(len(response.json["dag_runs"]), 100) # default is 100
+ @provide_session
+ @conf_vars({("api", "maximum_page_limit"): "150"})
+ def test_should_return_conf_max_if_req_max_above_conf(self, session):
+ dagrun_models = self._create_dag_runs(200)
+ session.add_all(dagrun_models)
+ session.commit()
+
+ response = self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns?limit=180")
+ assert response.status_code == 200
+ self.assertEqual(len(response.json['dag_runs']), 150)
+
def _create_dag_runs(self, count):
return [
DagRun(
@@ -261,7 +273,7 @@ class TestGetDagRunsPaginationFilters(TestDagRunEndpoint):
["TEST_START_EXEC_DAY_10", "TEST_START_EXEC_DAY_11"],
),
(
-
"api/v1/dags/TEST_DAG_ID/dagRuns?start_date_lte=2020-06-15T18:00:00+00:00"
+ "api/v1/dags/TEST_DAG_ID/dagRuns?start_date_lte=
2020-06-15T18:00:00+00:00"
"&start_date_gte=2020-06-12T18:00:00Z",
["TEST_START_EXEC_DAY_12", "TEST_START_EXEC_DAY_13",
"TEST_START_EXEC_DAY_14", "TEST_START_EXEC_DAY_15"],
diff --git a/tests/api_connexion/endpoints/test_event_log_endpoint.py
b/tests/api_connexion/endpoints/test_event_log_endpoint.py
index 22dea97..fd6a8bd 100644
--- a/tests/api_connexion/endpoints/test_event_log_endpoint.py
+++ b/tests/api_connexion/endpoints/test_event_log_endpoint.py
@@ -24,6 +24,7 @@ from airflow.operators.dummy_operator import DummyOperator
from airflow.utils import timezone
from airflow.utils.session import create_session, provide_session
from airflow.www import app
+from tests.test_utils.config import conf_vars
class TestEventLogEndpoint(unittest.TestCase):
@@ -189,7 +190,7 @@ class TestGetEventLogPagination(TestEventLogEndpoint):
self.assertEqual(events, expected_events)
@provide_session
- def test_should_respect_page_size_limit(self, session):
+ def test_should_respect_page_size_limit_default(self, session):
log_models = self._create_event_logs(200)
session.add_all(log_models)
session.commit()
@@ -200,6 +201,17 @@ class TestGetEventLogPagination(TestEventLogEndpoint):
self.assertEqual(response.json["total_entries"], 200)
self.assertEqual(len(response.json["event_logs"]), 100) # default 100
+ @provide_session
+ @conf_vars({("api", "maximum_page_limit"): "150"})
+ def test_should_return_conf_max_if_req_max_above_conf(self, session):
+ log_models = self._create_event_logs(200)
+ session.add_all(log_models)
+ session.commit()
+
+ response = self.client.get("/api/v1/eventLogs?limit=180")
+ assert response.status_code == 200
+ self.assertEqual(len(response.json['event_logs']), 150)
+
def _create_event_logs(self, count):
return [
Log(
diff --git a/tests/api_connexion/endpoints/test_import_error_endpoint.py
b/tests/api_connexion/endpoints/test_import_error_endpoint.py
index 1170adb..165e2df 100644
--- a/tests/api_connexion/endpoints/test_import_error_endpoint.py
+++ b/tests/api_connexion/endpoints/test_import_error_endpoint.py
@@ -22,6 +22,7 @@ from airflow.models.errors import ImportError # pylint:
disable=redefined-built
from airflow.utils import timezone
from airflow.utils.session import provide_session
from airflow.www import app
+from tests.test_utils.config import conf_vars
from tests.test_utils.db import clear_db_import_errors
@@ -132,7 +133,7 @@ class
TestGetImportErrorsEndpointPagination(TestBaseImportError):
[
# Limit test data
("/api/v1/importErrors?limit=1", ["/tmp/file_1.py"]),
- ("/api/v1/importErrors?limit=110", [f"/tmp/file_{i}.py" for i in
range(1, 101)]),
+ ("/api/v1/importErrors?limit=100", [f"/tmp/file_{i}.py" for i in
range(1, 101)]),
# Offset test data
("/api/v1/importErrors?offset=1", [f"/tmp/file_{i}.py" for i in
range(2, 102)]),
("/api/v1/importErrors?offset=3", [f"/tmp/file_{i}.py" for i in
range(4, 104)]),
@@ -160,3 +161,36 @@ class
TestGetImportErrorsEndpointPagination(TestBaseImportError):
pool["filename"] for pool in response.json["import_errors"]
]
self.assertEqual(import_ids, expected_import_error_ids)
+
+ @provide_session
+ def test_should_respect_page_size_limit_default(self, session):
+ import_errors = [
+ ImportError(
+ filename=f"/tmp/file_{i}.py",
+ stacktrace="Lorem ipsum",
+ timestamp=timezone.parse(self.timestamp, timezone="UTC"),
+ )
+ for i in range(1, 110)
+ ]
+ session.add_all(import_errors)
+ session.commit()
+ response = self.client.get("/api/v1/importErrors")
+ assert response.status_code == 200
+ self.assertEqual(len(response.json['import_errors']), 100)
+
+ @provide_session
+ @conf_vars({("api", "maximum_page_limit"): "150"})
+ def test_should_return_conf_max_if_req_max_above_conf(self, session):
+ import_errors = [
+ ImportError(
+ filename=f"/tmp/file_{i}.py",
+ stacktrace="Lorem ipsum",
+ timestamp=timezone.parse(self.timestamp, timezone="UTC"),
+ )
+ for i in range(200)
+ ]
+ session.add_all(import_errors)
+ session.commit()
+ response = self.client.get("/api/v1/importErrors?limit=180")
+ assert response.status_code == 200
+ self.assertEqual(len(response.json['import_errors']), 150)
diff --git a/tests/api_connexion/endpoints/test_pool_endpoint.py
b/tests/api_connexion/endpoints/test_pool_endpoint.py
index 97ede40..5f03654 100644
--- a/tests/api_connexion/endpoints/test_pool_endpoint.py
+++ b/tests/api_connexion/endpoints/test_pool_endpoint.py
@@ -21,6 +21,7 @@ from parameterized import parameterized
from airflow.models.pool import Pool
from airflow.utils.session import provide_session
from airflow.www import app
+from tests.test_utils.config import conf_vars
from tests.test_utils.db import clear_db_pools
@@ -86,7 +87,7 @@ class TestGetPoolsPagination(TestBasePoolEndpoints):
("/api/v1/pools?limit=1", ["default_pool"]),
# Limit and offset test data
(
- "/api/v1/pools?limit=120&offset=1",
+ "/api/v1/pools?limit=100&offset=1",
[f"test_pool{i}" for i in range(1, 101)],
),
("/api/v1/pools?limit=2&offset=1", ["test_pool1", "test_pool2"]),
@@ -108,6 +109,29 @@ class TestGetPoolsPagination(TestBasePoolEndpoints):
pool_ids = [pool["name"] for pool in response.json["pools"]]
self.assertEqual(pool_ids, expected_pool_ids)
+ @provide_session
+ def test_should_respect_page_size_limit_default(self, session):
+ pools = [Pool(pool=f"test_pool{i}", slots=1) for i in range(1, 121)]
+ session.add_all(pools)
+ session.commit()
+ result = session.query(Pool).count()
+ self.assertEqual(result, 121)
+ response = self.client.get("/api/v1/pools")
+ assert response.status_code == 200
+ self.assertEqual(len(response.json['pools']), 100)
+
+ @provide_session
+ @conf_vars({("api", "maximum_page_limit"): "150"})
+ def test_should_return_conf_max_if_req_max_above_conf(self, session):
+ pools = [Pool(pool=f"test_pool{i}", slots=1) for i in range(1, 200)]
+ session.add_all(pools)
+ session.commit()
+ result = session.query(Pool).count()
+ self.assertEqual(result, 200)
+ response = self.client.get("/api/v1/pools?limit=180")
+ assert response.status_code == 200
+ self.assertEqual(len(response.json['pools']), 150)
+
class TestGetPool(TestBasePoolEndpoints):
@provide_session
diff --git a/tests/api_connexion/endpoints/test_variable_endpoint.py
b/tests/api_connexion/endpoints/test_variable_endpoint.py
index ebccece..72d61fc 100644
--- a/tests/api_connexion/endpoints/test_variable_endpoint.py
+++ b/tests/api_connexion/endpoints/test_variable_endpoint.py
@@ -20,6 +20,7 @@ from parameterized import parameterized
from airflow.models import Variable
from airflow.www import app
+from tests.test_utils.config import conf_vars
from tests.test_utils.db import clear_db_variables
@@ -101,7 +102,7 @@ class TestGetVariables(TestVariableEndpoint):
assert response.status_code == 200
assert response.json == expected
- def test_should_honor_100_limit_default(self):
+ def test_should_respect_page_size_limit_default(self):
for i in range(101):
Variable.set(f"var{i}", i)
response = self.client.get("/api/v1/variables")
@@ -109,6 +110,14 @@ class TestGetVariables(TestVariableEndpoint):
assert response.json["total_entries"] == 101
assert len(response.json["variables"]) == 100
+ @conf_vars({("api", "maximum_page_limit"): "150"})
+ def test_should_return_conf_max_if_req_max_above_conf(self):
+ for i in range(200):
+ Variable.set(f"var{i}", i)
+ response = self.client.get("/api/v1/variables?limit=180")
+ assert response.status_code == 200
+ self.assertEqual(len(response.json['variables']), 150)
+
class TestPatchVariable(TestVariableEndpoint):
def test_should_update_variable(self):
diff --git a/tests/api_connexion/test_parameters.py
b/tests/api_connexion/test_parameters.py
index 31f9295..50f3f11 100644
--- a/tests/api_connexion/test_parameters.py
+++ b/tests/api_connexion/test_parameters.py
@@ -22,8 +22,9 @@ from pendulum import DateTime
from pendulum.tz.timezone import Timezone
from airflow.api_connexion.exceptions import BadRequest
-from airflow.api_connexion.parameters import format_datetime, format_parameters
+from airflow.api_connexion.parameters import check_limit, format_datetime,
format_parameters
from airflow.utils import timezone
+from tests.test_utils.config import conf_vars
class TestDateTimeParser(unittest.TestCase):
@@ -50,6 +51,34 @@ class TestDateTimeParser(unittest.TestCase):
format_datetime(invalid_datetime)
+class TestMaximumPagelimit(unittest.TestCase):
+
+ @conf_vars({("api", "maximum_page_limit"): "320"})
+ def test_maximum_limit_return_val(self):
+ limit = check_limit(300)
+ self.assertEqual(limit, 300)
+
+ @conf_vars({("api", "maximum_page_limit"): "320"})
+ def test_maximum_limit_returns_configured_if_limit_above_conf(self):
+ limit = check_limit(350)
+ self.assertEqual(limit, 320)
+
+ @conf_vars({("api", "maximum_page_limit"): "1000"})
+ def test_limit_returns_set_max_if_give_limit_is_exceeded(self):
+ limit = check_limit(1500)
+ self.assertEqual(limit, 1000)
+
+ @conf_vars({("api", "fallback_page_limit"): "100"})
+ def test_limit_of_zero_returns_default(self):
+ limit = check_limit(0)
+ self.assertEqual(limit, 100)
+
+ @conf_vars({("api", "maximum_page_limit"): "1500"})
+ def test_negative_limit_raises(self):
+ with self.assertRaises(BadRequest):
+ check_limit(-1)
+
+
class TestFormatParameters(unittest.TestCase):
def test_should_works_with_datetime_formatter(self):
@@ -67,3 +96,11 @@ class TestFormatParameters(unittest.TestCase):
decorated_endpoint = decorator(endpoint)
with self.assertRaises(BadRequest):
decorated_endpoint(param_a='XXXXX')
+
+ @conf_vars({("api", "maximum_page_limit"): "100"})
+ def test_should_work_with_limit(self):
+ decorator = format_parameters({"limit": check_limit})
+ endpoint = mock.MagicMock()
+ decorated_endpoint = decorator(endpoint)
+ decorated_endpoint(limit=89)
+ endpoint.assert_called_once_with(limit=89)