1fanwang opened a new issue, #66889:
URL: https://github.com/apache/airflow/issues/66889

   ### What happened
   
   Across most public REST API and execution API write endpoints, when a 
payload exceeds the database column's declared type the request returns a 
generic `500 Internal Server Error` with no actionable signal that payload size 
was the cause. The underlying `sqlalchemy.exc.DataError` (MySQL `1406 Data too 
long`, MySQL `1264 Out of range`, Postgres `value too long for type ...`, 
Postgres `numeric field overflow`) is wrapped in the generic 500 handler 
instead of being translated into a 4xx with a hint at the column / remediation. 
The same failure shape repeats on every length-capped column in the metadata DB.
   
   #66779 reported this for `DagRun.conf` specifically; this issue captures the 
broader class so it can be closed by a single PR instead of one boundary patch 
per column.
   
   ### Affected endpoints (public REST API)
   
   | Endpoint | Column(s) that can produce `DataError` | Type |
   |---|---|---|
   | `POST/PATCH /dags/{dag_id}/dagRuns` | `dag_run.conf` (`TEXT`), 
`dag_run.note.content` (`String(1000) / Text(1000)`) | JSON / text |
   | `PATCH /dagRuns/{run_id}` | `dag_run.note.content` | text |
   | `PATCH /dags/{dag_id}` | `dag.description` (`Text`), 
`dag.timetable_summary` (`Text`) | text |
   | `POST/PATCH /connections` | `connection.login` (`Text`), 
`connection.password`, `connection.extra` | text / JSON |
   | `POST/PATCH /variables` | `variable.val`, `variable.description` (`Text`) 
| text |
   | `POST/PATCH /pools` | `pool.description`, `pool.slots` (`Integer` 
overflow) | text / numeric |
   | `POST /xcoms` (UI / internal) | `xcom.value` | binary / text |
   | `PATCH /taskInstances/{ti_id}` | `task_instance.note`, 
`task_instance.rendered_map_index` (`String(250)`) | text |
   | `POST /backfills` | `backfill.dag_run_conf` (same shape as DagRun conf) | 
JSON / text |
   | `POST /assets/events` | `asset_event.extra` | JSON |
   | HITL endpoints | `hitl_detail.subject` (`Text`), `hitl_detail.body` 
(`Text`), `hitl_detail.options`, `hitl_detail.params` | text / JSON |
   
   ### Affected endpoints (execution API, worker → server)
   
   | Endpoint | Column(s) that can produce `DataError` |
   |---|---|
   | `PATCH /task-instances/{id}/state` | `task_instance.note`, 
`task_instance.rendered_map_index` |
   | `POST /xcoms/{id}` | `xcom.value` |
   | `PUT /assets/events` | `asset_event.extra` |
   | `PATCH /asset-events/{id}/...` | `asset_event.extra` |
   
   Any future write endpoint that touches a length-capped column has the same 
exposure today — there is no central translation, so each new column relies on 
its caller knowing the DB schema's caps.
   
   ### How to reproduce
   
   Two layers of evidence. First the literal MySQL error against a real 
container; then the HTTP response a caller sees.
   
   ```bash
   docker run --rm -d --name mysql-66888 \
       -e MYSQL_ROOT_PASSWORD=test -e MYSQL_DATABASE=airflow_test \
       -p 3309:3306 mysql:8.0
   ```
   
   Repro script:
   
   ```python
   """
   Reproduce the generic 500 + show the response transition with the
   sqlalchemy.exc.DataError translation proposed in #66888.
   
   Step 1 fires the real MySQL DataError against a TEXT column. Step 2
   drives the same DataError through FastAPI TestClient with and without
   _DataErrorHandler registered.
   """
   
   from __future__ import annotations
   
   import json
   
   from fastapi import FastAPI
   from fastapi.testclient import TestClient
   from sqlalchemy import create_engine, text
   from sqlalchemy.exc import DataError
   
   MYSQL_URL = "mysql+pymysql://root:[email protected]:3309/airflow_test"
   PAYLOAD_BYTES = 70_000
   
   
   def setup_mysql() -> None:
       engine = create_engine(MYSQL_URL)
       with engine.begin() as conn:
           conn.execute(text("DROP TABLE IF EXISTS dag_run"))
           conn.execute(
               text(
                   """
                   CREATE TABLE dag_run (
                       id INT PRIMARY KEY AUTO_INCREMENT,
                       dag_id VARCHAR(50),
                       run_id VARCHAR(50),
                       conf TEXT
                   ) ENGINE=InnoDB
                   """
               )
           )
       engine.dispose()
   
   
   def capture_real_dataerror() -> DataError:
       engine = create_engine(MYSQL_URL)
       payload = json.dumps({"k": "x" * PAYLOAD_BYTES})
       try:
           with engine.begin() as conn:
               conn.execute(
                   text("INSERT INTO dag_run (dag_id, run_id, conf) VALUES (:d, 
:r, :c)"),
                   {"d": "example_bash_operator", "r": "manual__test", "c": 
payload},
               )
       except DataError as exc:
           return exc
       finally:
           engine.dispose()
       raise RuntimeError("expected DataError, got none")
   
   
   def show_response(label: str, app: FastAPI) -> None:
       resp = TestClient(app, raise_server_exceptions=False).post("/dagRuns")
       print(f"  {label}: HTTP {resp.status_code}")
       body = resp.text
       print(f"  body: {body[:280]}{'...' if len(body) > 280 else ''}")
   
   
   def main() -> None:
       print("=== Step 1: real MySQL DataError (oversized conf into TEXT 
column) ===")
       setup_mysql()
       real_exc = capture_real_dataerror()
       print(f"  exception class: 
{type(real_exc).__module__}.{type(real_exc).__name__}")
       print(f"  orig:            {real_exc.orig}")
       print()
   
       print("=== Step 2: drive the same DataError through FastAPI TestClient 
===")
       print()
       print("--- 2a: today's behavior (no DataError handler registered) ---")
       app_main = FastAPI()
   
       @app_main.post("/dagRuns")
       def _trigger_main():
           raise real_exc
   
       show_response("main         ", app_main)
       print()
       print("--- 2b: with the translation proposed in #66888 ---")
       from airflow.api_fastapi.common.exceptions import _DataErrorHandler
   
       app_fix = FastAPI()
       handler = _DataErrorHandler()
       app_fix.add_exception_handler(handler.exception_cls, 
handler.exception_handler)
   
       @app_fix.post("/dagRuns")
       def _trigger_fix():
           raise real_exc
   
       show_response("#66888       ", app_fix)
   
   
   if __name__ == "__main__":
       main()
   ```
   
   ### Current behaviour (on `main`)
   
   ```
   === Step 1: real MySQL DataError (oversized conf into TEXT column) ===
     exception class: sqlalchemy.exc.DataError
     orig:            (1406, "Data too long for column 'conf' at row 1")
   
   === Step 2: drive the same DataError through FastAPI TestClient ===
   
   --- 2a: today's behavior (no DataError handler registered) ---
     main         : HTTP 500
     body: Internal Server Error
   
   --- 2b: with the translation proposed in #66888 ---
   ImportError: cannot import name '_DataErrorHandler' from 
'airflow.api_fastapi.common.exceptions'
   ```
   
   ### Expected behaviour (with #66888 applied)
   
   ```
   === Step 1: real MySQL DataError (oversized conf into TEXT column) ===
     exception class: sqlalchemy.exc.DataError
     orig:            (1406, "Data too long for column 'conf' at row 1")
   
   === Step 2: drive the same DataError through FastAPI TestClient ===
   
   --- 2a: today's behavior (no DataError handler registered) ---
     main         : HTTP 500
     body: Internal Server Error
   
   --- 2b: with the translation proposed in #66888 ---
     #66888       : HTTP 413
     body: {"detail":{"reason":"Payload exceeded database column 
limit","orig_error":"(1406, \"Data too long for column 'conf' at row 
1\")","message":"Database rejected the payload. Reduce the field size, or your 
operator may widen the column type (e.g. MEDIUMTEXT / LONGTEXT on MySQL)."}}
   ```
   
   `Data too long` / `too large` / `too big` errors map to `413 Content Too 
Large`; other `DataError` shapes (out-of-range, numeric overflow) map to `422 
Unprocessable Entity`. The `orig_error` field carries MySQL's verbatim message 
so the caller can see the column name and SQL error code.
   
   ### Tests
   
   
`airflow-core/tests/unit/api_fastapi/common/test_exceptions.py::TestDataErrorHandler`
 covers the five dialect-error shapes and an end-to-end dispatch test 
confirming the FastAPI app returns the translated response on `DataError`:
   
   ```
   
TestDataErrorHandler::test_dataerror_translates_to_actionable_http_response[mysql-1406-data-too-long]
 PASSED
   
TestDataErrorHandler::test_dataerror_translates_to_actionable_http_response[postgres-value-too-long]
 PASSED
   
TestDataErrorHandler::test_dataerror_translates_to_actionable_http_response[sqlite-blob-too-big]
 PASSED
   
TestDataErrorHandler::test_dataerror_translates_to_actionable_http_response[mysql-1264-out-of-range]
 PASSED
   
TestDataErrorHandler::test_dataerror_translates_to_actionable_http_response[postgres-numeric-field-overflow]
 PASSED
   TestDataErrorHandler::test_dataerror_dispatched_through_fastapi_app PASSED
   ```
   
   ### Out of scope
   
   `IntegrityError` (unique violations, FK violations, NOT NULL violations) is 
a separate exception class with its own response semantics (typically 409 / 422 
/ 400). Unique-constraint translation is already handled by 
`_UniqueConstraintErrorHandler`; the rest is a natural follow-up if maintainers 
like the `DataError` shape proposed here.
   
   ### Operating System
   
   Reproduced on Linux / macOS against MySQL 8.0 in Docker. Backend-agnostic — 
Postgres deployments produce a different verbatim message but the same 
SQLAlchemy `DataError` shape.
   
   ### Versions of Apache Airflow Providers
   
   n/a — failure is on the API server / metadata DB path.
   
   ### Deployment
   
   Other Docker-based
   
   ### Deployment details
   
   Reproduced against MySQL 8.0 default `innodb_default_row_format` with 
`TEXT`-typed columns. Postgres deployments avoid the `1406` family but hit the 
same `DataError` shape on `VARCHAR(N)` overruns and numeric overflows.
   
   ### Anything else?
   
   Fix in #66888 is a single FastAPI exception handler registered on both the 
public REST API and the task-execution API. Every existing and future write 
endpoint inherits the translation automatically.
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to