This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 221cdd64056 Fix SQLToolset read-only mode bypass via data-modifying
CTEs and SELECT INTO (#64173)
221cdd64056 is described below
commit 221cdd640567bc0f8fb9990b1f81258446308182
Author: Elad Kalif <[email protected]>
AuthorDate: Thu Mar 26 03:24:32 2026 +0200
Fix SQLToolset read-only mode bypass via data-modifying CTEs and SELECT
INTO (#64173)
* Fix SQLToolset read-only mode bypass via data-modifying CTEs and SELECT
INTO
* fixes
---
.../providers/common/ai/utils/sql_validation.py | 43 +++++++++++++++++
.../unit/common/ai/utils/test_sql_validation.py | 54 ++++++++++++++++++++++
2 files changed, 97 insertions(+)
diff --git
a/providers/common/ai/src/airflow/providers/common/ai/utils/sql_validation.py
b/providers/common/ai/src/airflow/providers/common/ai/utils/sql_validation.py
index 42d41dcfc7d..3315eff1dcc 100644
---
a/providers/common/ai/src/airflow/providers/common/ai/utils/sql_validation.py
+++
b/providers/common/ai/src/airflow/providers/common/ai/utils/sql_validation.py
@@ -39,6 +39,22 @@ DEFAULT_ALLOWED_TYPES: tuple[type[exp.Expr], ...] = (
exp.Except,
)
+# Denylist: expression types that mutate data or schema when found anywhere in
the AST.
+# This catches data-modifying CTEs (e.g. WITH del AS (DELETE …) SELECT …),
+# SELECT INTO, and other constructs that bypass top-level type checks.
+# Note: exp.Command is sqlglot's fallback for any syntax it doesn't recognize.
+# Including it makes the denylist fail-closed (safer), but may block legitimate
+# vendor-specific SQL that sqlglot can't parse. Callers who need such syntax
can
+# provide custom allowed_types to bypass the deep scan entirely.
+_DATA_MODIFYING_NODES: tuple[type[exp.Expr], ...] = (
+ exp.Insert,
+ exp.Update,
+ exp.Delete,
+ exp.Merge,
+ exp.Into,
+ exp.Command,
+)
+
class SQLSafetyError(Exception):
"""Generated SQL failed safety validation."""
@@ -97,4 +113,31 @@ def validate_sql(
f"Statement type '{type(stmt).__name__}' is not allowed.
Allowed types: {allowed_names}"
)
+ # Deep scan: reject data-modifying nodes hidden inside otherwise-allowed
statements
+ # (e.g. data-modifying CTEs, SELECT INTO). Only applies when using the
default
+ # read-only allowlist — callers who provide custom allowed_types have
explicitly
+ # opted into non-read-only operations.
+ if types is DEFAULT_ALLOWED_TYPES:
+ _check_for_data_modifying_nodes(parsed)
+
return parsed
+
+
+def _check_for_data_modifying_nodes(statements: list[exp.Expr]) -> None:
+ """
+ Walk the full AST of each statement and reject data-modifying expressions.
+
+ This catches bypass vectors like:
+ - Data-modifying CTEs: ``WITH d AS (DELETE FROM t RETURNING *) SELECT *
FROM d``
+ - SELECT INTO: ``SELECT * INTO new_table FROM t``
+ - INSERT/UPDATE/DELETE hidden inside subqueries or CTEs
+
+ :raises SQLSafetyError: If any data-modifying node is found in the AST.
+ """
+ for stmt in statements:
+ for node in stmt.walk():
+ if isinstance(node, _DATA_MODIFYING_NODES):
+ raise SQLSafetyError(
+ f"Data-modifying operation '{type(node).__name__}' found
inside statement. "
+ f"Only pure read operations are allowed in read-only mode."
+ )
diff --git
a/providers/common/ai/tests/unit/common/ai/utils/test_sql_validation.py
b/providers/common/ai/tests/unit/common/ai/utils/test_sql_validation.py
index 6c6b218cadb..fe27379aebe 100644
--- a/providers/common/ai/tests/unit/common/ai/utils/test_sql_validation.py
+++ b/providers/common/ai/tests/unit/common/ai/utils/test_sql_validation.py
@@ -159,3 +159,57 @@ class TestValidateSQLEdgeCases:
"""Trailing semicolon should not cause multi-statement error."""
result = validate_sql("SELECT 1;")
assert len(result) == 1
+
+
+class TestDataModifyingNodeDetection:
+ """Data-modifying operations hidden inside allowed statement types should
be blocked."""
+
+ def test_cte_with_delete_blocked(self):
+ """DELETE inside a CTE bypasses top-level type check."""
+ with pytest.raises(SQLSafetyError, match="Data-modifying operation
'Delete'"):
+ validate_sql(
+ "WITH del AS (DELETE FROM users RETURNING *) SELECT * FROM
del",
+ dialect="postgres",
+ )
+
+ def test_cte_with_insert_blocked(self):
+ """INSERT inside a CTE bypasses top-level type check."""
+ with pytest.raises(SQLSafetyError, match="Data-modifying operation
'Insert'"):
+ validate_sql(
+ "WITH ins AS (INSERT INTO users(name) VALUES ('x') RETURNING
*) SELECT * FROM ins",
+ dialect="postgres",
+ )
+
+ def test_cte_with_update_blocked(self):
+ """UPDATE inside a CTE bypasses top-level type check."""
+ with pytest.raises(SQLSafetyError, match="Data-modifying operation
'Update'"):
+ validate_sql(
+ "WITH upd AS (UPDATE users SET name = 'x' RETURNING *) SELECT
* FROM upd",
+ dialect="postgres",
+ )
+
+ def test_select_into_blocked(self):
+ """SELECT INTO creates a new table — should be blocked."""
+ with pytest.raises(SQLSafetyError, match="Data-modifying operation
'Into'"):
+ validate_sql("SELECT * INTO new_table FROM users")
+
+ def test_plain_cte_select_still_allowed(self):
+ """Normal read-only CTEs should not be affected."""
+ result = validate_sql("WITH t AS (SELECT id FROM users) SELECT * FROM
t")
+ assert len(result) == 1
+
+ def test_nested_subquery_select_still_allowed(self):
+ """Subqueries that are pure reads should not be affected."""
+ result = validate_sql("SELECT * FROM users WHERE id IN (SELECT user_id
FROM orders)")
+ assert len(result) == 1
+
+ def test_deep_scan_runs_with_explicit_default_types(self):
+ """Deep scan should also block when DEFAULT_ALLOWED_TYPES is passed
explicitly."""
+ from airflow.providers.common.ai.utils.sql_validation import
DEFAULT_ALLOWED_TYPES
+
+ with pytest.raises(SQLSafetyError, match="Data-modifying operation
'Delete'"):
+ validate_sql(
+ "WITH del AS (DELETE FROM users RETURNING *) SELECT * FROM
del",
+ allowed_types=DEFAULT_ALLOWED_TYPES,
+ dialect="postgres",
+ )