pierrejeambrun commented on code in PR #44322:
URL: https://github.com/apache/airflow/pull/44322#discussion_r1883790732
##########
airflow/api_fastapi/common/exceptions.py:
##########
@@ -18,13 +18,18 @@
from __future__ import annotations
from abc import ABC, abstractmethod
-from typing import Generic, TypeVar
+from enum import Enum
+from typing import Generic, NamedTuple, TypeVar
+import re2 as re
from fastapi import HTTPException, Request, status
from sqlalchemy.exc import IntegrityError
T = TypeVar("T", bound=Exception)
+_ColumnsType = dict[str, str]
Review Comment:
Only used once, maybe the alias is not needed.
##########
tests/api_fastapi/common/test_exceptions.py:
##########
@@ -0,0 +1,200 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+from fastapi import HTTPException, status
+from sqlalchemy.exc import IntegrityError
+
+from airflow.api_fastapi.common.exceptions import _DatabaseDialect,
_UniqueConstraintErrorHandler
+from airflow.configuration import conf
+from airflow.models import DagRun, Pool, Variable
+from airflow.utils.session import provide_session
+from airflow.utils.state import DagRunState
+
+from tests_common.test_utils.db import clear_db_connections, clear_db_dags,
clear_db_pools, clear_db_runs
+
+pytestmark = pytest.mark.db_test
+
+CURRENT_DATABASE_DIALECT = conf.get_mandatory_value("database",
"sql_alchemy_conn").lower()
+TEST_POOL = "test_pool"
+TEST_VARIABLE_KEY = "test_key"
+EXPECTED_EXCEPTION_POOL = HTTPException(
+ status_code=status.HTTP_409_CONFLICT,
+ detail="Unique constraint violation: slot_pool with pool=test_pool already
exists",
+)
+
+EXPECTED_EXCEPTION_VARIABLE = HTTPException(
+ status_code=status.HTTP_409_CONFLICT,
+ detail="Unique constraint violation: variable with key=test_key already
exists",
+)
+EXPECTED_EXCEPTION_DAG_RUN = HTTPException(
+ status_code=status.HTTP_409_CONFLICT,
+ detail="Unique constraint violation: dag_run with dag_id=test_dag_id,
run_id=test_run_id already exists",
+)
+PYTEST_MARKS_DB_DIALECT = [
+ {
+ "condition":
CURRENT_DATABASE_DIALECT.startswith(_DatabaseDialect.MYSQL.value)
+ or
CURRENT_DATABASE_DIALECT.startswith(_DatabaseDialect.POSTGRES.value),
+ "reason": f"Test for {_DatabaseDialect.SQLITE.value} only",
+ },
+ {
+ "condition":
CURRENT_DATABASE_DIALECT.startswith(_DatabaseDialect.SQLITE.value)
+ or
CURRENT_DATABASE_DIALECT.startswith(_DatabaseDialect.POSTGRES.value),
+ "reason": f"Test for {_DatabaseDialect.MYSQL.value} only",
+ },
+ {
+ "condition":
CURRENT_DATABASE_DIALECT.startswith(_DatabaseDialect.SQLITE.value)
+ or CURRENT_DATABASE_DIALECT.startswith(_DatabaseDialect.MYSQL.value),
+ "reason": f"Test for {_DatabaseDialect.POSTGRES.value} only",
+ },
+]
+
+
+def generate_test_cases_parametrize(test_cases: list[str],
expected_exceptions: list[HTTPException]):
+ """Generate cross product of test cases for parametrize with different
database dialects."""
+ generated_test_cases = []
+ for test_case, expected_exception in zip(test_cases, expected_exceptions):
+ for mark in PYTEST_MARKS_DB_DIALECT:
+ generated_test_cases.append(
+ pytest.param(
+ test_case,
+ expected_exception,
+ id=f"{mark['reason']} - {test_case}",
+ marks=pytest.mark.skipif(**mark), # type: ignore
+ )
+ )
+ return generated_test_cases
+
+
+def get_unique_constraint_error_prefix():
+ """Get unique constraint error prefix based on current database dialect."""
+ if CURRENT_DATABASE_DIALECT.startswith(_DatabaseDialect.SQLITE.value):
+ return
_UniqueConstraintErrorHandler.unique_constraint_error_prefix_dict[_DatabaseDialect.SQLITE]
+ if CURRENT_DATABASE_DIALECT.startswith(_DatabaseDialect.MYSQL.value):
+ return
_UniqueConstraintErrorHandler.unique_constraint_error_prefix_dict[_DatabaseDialect.MYSQL]
+ if CURRENT_DATABASE_DIALECT.startswith(_DatabaseDialect.POSTGRES.value):
+ return
_UniqueConstraintErrorHandler.unique_constraint_error_prefix_dict[_DatabaseDialect.POSTGRES]
+ return ""
Review Comment:
I think this should explicitely crash ? The dialect is unknown that's not
expected.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]