kaxil commented on code in PR #63358:
URL: https://github.com/apache/airflow/pull/63358#discussion_r2920847228


##########
providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py:
##########
@@ -164,6 +181,9 @@ class DbApiHook(BaseHook):
     connector: ConnectorProtocol | None = None
     # Override with db-specific query to check connection
     _test_connection_sql = "select 1"
+    # Exception types that indicate the model should retry the query (e.g. 
undefined column).

Review Comment:
   `RETRYABLE_ERRORS` and `get_retry_exceptions()` are only consumed by 
`SQLToolset` in `common/ai`. Adding AI-specific API surface to `DbApiHook` — 
the base for 25+ database hooks — may confuse future contributors into thinking 
this is a general retry mechanism.
   
   Worth considering whether this mapping belongs in the `SQLToolset` layer 
instead (e.g., a dict from hook type → exception tuple, or `isinstance` 
checking at call time). That keeps `DbApiHook` focused on database concerns and 
avoids coupling it to the AI toolset contract.



##########
providers/sqlite/src/airflow/providers/sqlite/hooks/sqlite.py:
##########
@@ -27,6 +27,8 @@ class SqliteHook(DbApiHook):
     """Interact with SQLite."""
 
     conn_name_attr = "sqlite_conn_id"
+    # Exceptions that indicate the llm model should retry with a corrected 
query (e.g. no such column).
+    RETRYABLE_ERRORS: tuple[type[Exception], ...] = (sqlite3.OperationalError, 
sqlite3.ProgrammingError)

Review Comment:
   `sqlite3.OperationalError` includes "database is locked", "disk I/O error", 
and "unable to open database file". `sqlite3.ProgrammingError` includes "Cannot 
operate on a closed database" and "You can only execute one statement at a 
time". Neither is fixable by rewriting the query.
   
   Unfortunately `sqlite3` doesn't have fine-grained subclasses like psycopg2, 
so you'd need message-based filtering at the `SQLToolset` level (e.g., checking 
for "no such column" / "no such table" in the message), or just accept some 
false-positive retries with `OperationalError` alone and drop 
`ProgrammingError`.



##########
providers/postgres/src/airflow/providers/postgres/hooks/postgres.py:
##########
@@ -51,9 +52,20 @@
     USE_PSYCOPG3 = False
 
 if USE_PSYCOPG3:
+    from psycopg.errors import (
+        OperationalError as Psycopg3OperationalError,
+        ProgrammingError as Psycopg3ProgrammingError,
+    )
     from psycopg.rows import dict_row, namedtuple_row
     from psycopg.types.json import register_default_adapters
 
+_POSTGRES_RETRYABLE_ERRORS: tuple[type[Exception], ...] = (
+    psycopg2.errors.UndefinedColumn,
+    psycopg2.errors.UndefinedTable,
+)

Review Comment:
   `Psycopg3OperationalError` covers connection failures, timeouts, 
out-of-memory, and disk I/O errors — none of which can be fixed by rewriting a 
query. For psycopg2 you correctly picked the narrow 
`UndefinedColumn`/`UndefinedTable` subclasses.
   
   The psycopg3 equivalents would be `psycopg.errors.UndefinedColumn` and 
`psycopg.errors.UndefinedTable` (same error hierarchy, different package). That 
keeps the psycopg3 list as tight as the psycopg2 one.



##########
providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py:
##########
@@ -41,6 +42,22 @@
     ArgumentError = Exception  # type: ignore[misc,assignment]
     NoSuchModuleError = Exception  # type: ignore[misc,assignment]
 
+if importlib.util.find_spec("sqlalchemy") is not None:

Review Comment:
   This `importlib.util.find_spec` check is unnecessary — `common-sql` declares 
`sqlalchemy>=2.0` as a hard dependency in its `pyproject.toml`, so SQLAlchemy 
is always installed when this module is loaded. On top of that, SQLAlchemy is 
already imported with a `try/except ImportError` at lines 34-42 just above.
   
   You could drop `import importlib.util` entirely and use the same `try/except 
ImportError` pattern that's already established a few lines up:
   
   ```python
   try:
       from sqlalchemy.exc import (
           ProgrammingError as _SQLAlchemyProgrammingError,
       )
       _DEFAULT_SQLALCHEMY_RETRYABLE = (_SQLAlchemyProgrammingError,)
   except ImportError:
       _DEFAULT_SQLALCHEMY_RETRYABLE = ()
   ```
   
   (Though given it's a hard dep, even the `try/except` is defensive — a bare 
import would be fine too.)



##########
providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py:
##########
@@ -41,6 +42,22 @@
     ArgumentError = Exception  # type: ignore[misc,assignment]
     NoSuchModuleError = Exception  # type: ignore[misc,assignment]
 
+if importlib.util.find_spec("sqlalchemy") is not None:
+    try:
+        from sqlalchemy.exc import (
+            IntegrityError as _SQLAlchemyIntegrityError,
+            ProgrammingError as _SQLAlchemyProgrammingError,
+        )
+
+        _DEFAULT_SQLALCHEMY_RETRYABLE: tuple[type[Exception], ...] = (
+            _SQLAlchemyProgrammingError,
+            _SQLAlchemyIntegrityError,

Review Comment:
   `IntegrityError` covers duplicate key, FK violations, and check constraint 
failures — none of which can be fixed by rewriting a query. The LLM would need 
to change the *data*, not the SQL. Including it here means constraint 
violations get surfaced as "retry with corrected query", wasting the retry 
attempt.
   
   I'd drop `IntegrityError` and keep only `ProgrammingError`.



##########
providers/common/ai/tests/unit/common/ai/toolsets/test_sql.py:
##########
@@ -165,6 +166,42 @@ def test_allows_writes_when_enabled(self):
         data = json.loads(result)
         assert "rows" in data
 
+    def test_raises_model_retry_when_query_fails_with_retryable_error(self):
+        """When the query fails with a retryable error, raise ModelRetry so 
the model retries."""
+        ts = SQLToolset("pg_default")
+        ts._hook = _make_mock_db_hook()
+        ts._hook.get_records.side_effect = Exception(
+            'column "nonexistent" does not exist\nLINE 1: SELECT id, 
nonexistent FROM users'
+        )
+        ts._hook.get_retry_exceptions.return_value = (Exception,)

Review Comment:
   Both tests set `get_retry_exceptions.return_value = (Exception,)`, making 
*all* exceptions retryable. That covers the happy path but misses the more 
interesting edge cases:
   
   1. A non-retryable exception (e.g., `RuntimeError`) propagates without being 
wrapped in `ModelRetry`.
   2. The `else` branch where `get_retry_exceptions()` returns an empty tuple — 
the exception should propagate as-is.
   3. A hook that doesn't have `get_retry_exceptions` at all (the `hasattr` 
guard).
   
   Those would give more confidence that the retry logic doesn't accidentally 
swallow unrelated errors.



##########
providers/common/ai/src/airflow/providers/common/ai/toolsets/sql.py:
##########
@@ -204,7 +205,16 @@ def _query(self, sql: str) -> str:
             _validate_sql(sql)
 
         hook = self._get_db_hook()
-        rows = hook.get_records(sql)
+        retryable = hook.get_retry_exceptions() if hasattr(hook, 
"get_retry_exceptions") else ()

Review Comment:
   This can be simplified — the branching duplicates the `get_records` call:
   
   ```python
   try:
       rows = hook.get_records(sql)
   except Exception as e:
       retryable = hook.get_retry_exceptions() if hasattr(hook, 
"get_retry_exceptions") else ()
       if retryable and isinstance(e, retryable):
           raise ModelRetry(
               f"error: {e!s}, Use get_schema and list_tables tools for more 
details."
           ) from e
       raise
   ```
   
   One call to `get_records`, no branching on the happy path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to