kaxil commented on code in PR #63358:
URL: https://github.com/apache/airflow/pull/63358#discussion_r2920847228
##########
providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py:
##########
@@ -164,6 +181,9 @@ class DbApiHook(BaseHook):
connector: ConnectorProtocol | None = None
# Override with db-specific query to check connection
_test_connection_sql = "select 1"
+ # Exception types that indicate the model should retry the query (e.g.
undefined column).
Review Comment:
`RETRYABLE_ERRORS` and `get_retry_exceptions()` are only consumed by
`SQLToolset` in `common/ai`. Adding AI-specific API surface to `DbApiHook` —
the base for 25+ database hooks — may confuse future contributors into thinking
this is a general retry mechanism.
Worth considering whether this mapping belongs in the `SQLToolset` layer
instead (e.g., a dict from hook type → exception tuple, or `isinstance`
checking at call time). That keeps `DbApiHook` focused on database concerns and
avoids coupling it to the AI toolset contract.
##########
providers/sqlite/src/airflow/providers/sqlite/hooks/sqlite.py:
##########
@@ -27,6 +27,8 @@ class SqliteHook(DbApiHook):
"""Interact with SQLite."""
conn_name_attr = "sqlite_conn_id"
+ # Exceptions that indicate the llm model should retry with a corrected
query (e.g. no such column).
+ RETRYABLE_ERRORS: tuple[type[Exception], ...] = (sqlite3.OperationalError,
sqlite3.ProgrammingError)
Review Comment:
`sqlite3.OperationalError` includes "database is locked", "disk I/O error",
and "unable to open database file". `sqlite3.ProgrammingError` includes "Cannot
operate on a closed database" and "You can only execute one statement at a
time". Neither is fixable by rewriting the query.
Unfortunately `sqlite3` doesn't have fine-grained subclasses like psycopg2,
so you'd need message-based filtering at the `SQLToolset` level (e.g., checking
for "no such column" / "no such table" in the message), or just accept some
false-positive retries with `OperationalError` alone and drop
`ProgrammingError`.
##########
providers/postgres/src/airflow/providers/postgres/hooks/postgres.py:
##########
@@ -51,9 +52,20 @@
USE_PSYCOPG3 = False
if USE_PSYCOPG3:
+ from psycopg.errors import (
+ OperationalError as Psycopg3OperationalError,
+ ProgrammingError as Psycopg3ProgrammingError,
+ )
from psycopg.rows import dict_row, namedtuple_row
from psycopg.types.json import register_default_adapters
+_POSTGRES_RETRYABLE_ERRORS: tuple[type[Exception], ...] = (
+ psycopg2.errors.UndefinedColumn,
+ psycopg2.errors.UndefinedTable,
+)
Review Comment:
`Psycopg3OperationalError` covers connection failures, timeouts,
out-of-memory, and disk I/O errors — none of which can be fixed by rewriting a
query. For psycopg2 you correctly picked the narrow
`UndefinedColumn`/`UndefinedTable` subclasses.
The psycopg3 equivalents would be `psycopg.errors.UndefinedColumn` and
`psycopg.errors.UndefinedTable` (same error hierarchy, different package). That
keeps the psycopg3 list as tight as the psycopg2 one.
##########
providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py:
##########
@@ -41,6 +42,22 @@
ArgumentError = Exception # type: ignore[misc,assignment]
NoSuchModuleError = Exception # type: ignore[misc,assignment]
+if importlib.util.find_spec("sqlalchemy") is not None:
Review Comment:
This `importlib.util.find_spec` check is unnecessary — `common-sql` declares
`sqlalchemy>=2.0` as a hard dependency in its `pyproject.toml`, so SQLAlchemy
is always installed when this module is loaded. On top of that, SQLAlchemy is
already imported with a `try/except ImportError` at lines 34-42 just above.
You could drop `import importlib.util` entirely and use the same `try/except
ImportError` pattern that's already established a few lines up:
```python
try:
from sqlalchemy.exc import (
ProgrammingError as _SQLAlchemyProgrammingError,
)
_DEFAULT_SQLALCHEMY_RETRYABLE = (_SQLAlchemyProgrammingError,)
except ImportError:
_DEFAULT_SQLALCHEMY_RETRYABLE = ()
```
(Though given it's a hard dep, even the `try/except` is defensive — a bare
import would be fine too.)
##########
providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py:
##########
@@ -41,6 +42,22 @@
ArgumentError = Exception # type: ignore[misc,assignment]
NoSuchModuleError = Exception # type: ignore[misc,assignment]
+if importlib.util.find_spec("sqlalchemy") is not None:
+ try:
+ from sqlalchemy.exc import (
+ IntegrityError as _SQLAlchemyIntegrityError,
+ ProgrammingError as _SQLAlchemyProgrammingError,
+ )
+
+ _DEFAULT_SQLALCHEMY_RETRYABLE: tuple[type[Exception], ...] = (
+ _SQLAlchemyProgrammingError,
+ _SQLAlchemyIntegrityError,
Review Comment:
`IntegrityError` covers duplicate key, FK violations, and check constraint
failures — none of which can be fixed by rewriting a query. The LLM would need
to change the *data*, not the SQL. Including it here means constraint
violations get surfaced as "retry with corrected query", wasting the retry
attempt.
I'd drop `IntegrityError` and keep only `ProgrammingError`.
##########
providers/common/ai/tests/unit/common/ai/toolsets/test_sql.py:
##########
@@ -165,6 +166,42 @@ def test_allows_writes_when_enabled(self):
data = json.loads(result)
assert "rows" in data
+ def test_raises_model_retry_when_query_fails_with_retryable_error(self):
+ """When the query fails with a retryable error, raise ModelRetry so
the model retries."""
+ ts = SQLToolset("pg_default")
+ ts._hook = _make_mock_db_hook()
+ ts._hook.get_records.side_effect = Exception(
+ 'column "nonexistent" does not exist\nLINE 1: SELECT id,
nonexistent FROM users'
+ )
+ ts._hook.get_retry_exceptions.return_value = (Exception,)
Review Comment:
Both tests set `get_retry_exceptions.return_value = (Exception,)`, making
*all* exceptions retryable. That covers the happy path but misses the more
interesting edge cases:
1. A non-retryable exception (e.g., `RuntimeError`) propagates without being
wrapped in `ModelRetry`.
2. The `else` branch where `get_retry_exceptions()` returns an empty tuple —
the exception should propagate as-is.
3. A hook that doesn't have `get_retry_exceptions` at all (the `hasattr`
guard).
Those would give more confidence that the retry logic doesn't accidentally
swallow unrelated errors.
##########
providers/common/ai/src/airflow/providers/common/ai/toolsets/sql.py:
##########
@@ -204,7 +205,16 @@ def _query(self, sql: str) -> str:
_validate_sql(sql)
hook = self._get_db_hook()
- rows = hook.get_records(sql)
+ retryable = hook.get_retry_exceptions() if hasattr(hook,
"get_retry_exceptions") else ()
Review Comment:
This can be simplified — the branching duplicates the `get_records` call:
```python
try:
rows = hook.get_records(sql)
except Exception as e:
retryable = hook.get_retry_exceptions() if hasattr(hook,
"get_retry_exceptions") else ()
if retryable and isinstance(e, retryable):
raise ModelRetry(
f"error: {e!s}, Use get_schema and list_tables tools for more
details."
) from e
raise
```
One call to `get_records`, no branching on the happy path.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]