This is an automated email from the ASF dual-hosted git repository.

kaxil pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 58480dd8386 Fix common.ai example DAGs failing to parse without the 
sql extra (#68497)
58480dd8386 is described below

commit 58480dd8386b3c876bf3878143ef7658faced800
Author: Ehab Abdalla <[email protected]>
AuthorDate: Tue Jun 16 18:04:18 2026 -0400

    Fix common.ai example DAGs failing to parse without the sql extra (#68497)
    
    * Fix common.ai example DAGs failing to parse without the sql extra
    
    * style: apply ruff formatting
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
    
    ---------
    
    Co-authored-by: Claude Opus 4.6 <[email protected]>
---
 .../common/ai/example_dags/example_agent.py        | 162 ++++----
 .../ai/example_dags/example_agent_capabilities.py  |  57 +--
 .../ai/example_dags/example_agent_durable.py       | 102 ++---
 .../common/ai/example_dags/example_agent_skills.py |  49 ++-
 .../common/ai/example_dags/example_llm_sql.py      | 251 ++++++------
 .../ai/example_dags/example_llm_survey_agentic.py  | 241 +++++------
 .../ai/example_dags/example_llm_survey_analysis.py | 450 +++++++++++----------
 7 files changed, 670 insertions(+), 642 deletions(-)

diff --git 
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_agent.py
 
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_agent.py
index dfb058c6b92..ff2c6bcaf87 100644
--- 
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_agent.py
+++ 
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_agent.py
@@ -24,9 +24,13 @@ from pydantic import BaseModel
 
 from airflow.providers.common.ai.operators.agent import AgentOperator
 from airflow.providers.common.ai.toolsets.hook import HookToolset
-from airflow.providers.common.ai.toolsets.sql import SQLToolset
 from airflow.providers.common.compat.sdk import dag, task
 
+try:
+    from airflow.providers.common.ai.toolsets.sql import SQLToolset
+except Exception:
+    SQLToolset = None  # type: ignore[assignment,misc]
+
 
 # [START howto_decorator_agent_structured_output_class]
 # Pydantic output classes must be defined at module scope so downstream
@@ -48,29 +52,30 @@ class Analysis(BaseModel):
 
 
 # [START howto_operator_agent_sql]
-@dag(tags=["example"])
-def example_agent_operator_sql():
-    AgentOperator(
-        task_id="analyst",
-        prompt="What are the top 5 customers by order count?",
-        llm_conn_id="pydanticai_default",
-        system_prompt=(
-            "You are a SQL analyst. Use the available tools to explore "
-            "the schema and answer the question with data."
-        ),
-        toolsets=[
-            SQLToolset(
-                db_conn_id="postgres_default",
-                allowed_tables=["customers", "orders"],
-                max_rows=20,
-            )
-        ],
-    )
-
-
-# [END howto_operator_agent_sql]
-
-example_agent_operator_sql()
+if SQLToolset is not None:
+
+    @dag(tags=["example"])
+    def example_agent_operator_sql():
+        AgentOperator(
+            task_id="analyst",
+            prompt="What are the top 5 customers by order count?",
+            llm_conn_id="pydanticai_default",
+            system_prompt=(
+                "You are a SQL analyst. Use the available tools to explore "
+                "the schema and answer the question with data."
+            ),
+            toolsets=[
+                SQLToolset(
+                    db_conn_id="postgres_default",
+                    allowed_tables=["customers", "orders"],
+                    max_rows=20,
+                )
+            ],
+        )
+
+    # [END howto_operator_agent_sql]
+
+    example_agent_operator_sql()
 
 
 # ---------------------------------------------------------------------------
@@ -111,27 +116,28 @@ example_agent_operator_hook()
 
 
 # [START howto_decorator_agent]
-@dag(tags=["example"])
-def example_agent_decorator():
-    @task.agent(
-        llm_conn_id="pydanticai_default",
-        system_prompt="You are a data analyst. Use tools to answer questions.",
-        toolsets=[
-            SQLToolset(
-                db_conn_id="postgres_default",
-                allowed_tables=["orders"],
-            )
-        ],
-    )
-    def analyze(question: str):
-        return f"Answer this question about our orders data: {question}"
+if SQLToolset is not None:
 
-    analyze("What was our total revenue last month?")
+    @dag(tags=["example"])
+    def example_agent_decorator():
+        @task.agent(
+            llm_conn_id="pydanticai_default",
+            system_prompt="You are a data analyst. Use tools to answer 
questions.",
+            toolsets=[
+                SQLToolset(
+                    db_conn_id="postgres_default",
+                    allowed_tables=["orders"],
+                )
+            ],
+        )
+        def analyze(question: str):
+            return f"Answer this question about our orders data: {question}"
 
+        analyze("What was our total revenue last month?")
 
-# [END howto_decorator_agent]
+    # [END howto_decorator_agent]
 
-example_agent_decorator()
+    example_agent_decorator()
 
 
 # ---------------------------------------------------------------------------
@@ -140,23 +146,24 @@ example_agent_decorator()
 
 
 # [START howto_decorator_agent_structured]
-@dag(tags=["example"])
-def example_agent_structured_output():
-    @task.agent(
-        llm_conn_id="pydanticai_default",
-        system_prompt="You are a data analyst. Return structured results.",
-        output_type=Analysis,
-        toolsets=[SQLToolset(db_conn_id="postgres_default")],
-    )
-    def analyze(question: str):
-        return f"Analyze: {question}"
+if SQLToolset is not None:
 
-    analyze("What are the trending products this week?")
+    @dag(tags=["example"])
+    def example_agent_structured_output():
+        @task.agent(
+            llm_conn_id="pydanticai_default",
+            system_prompt="You are a data analyst. Return structured results.",
+            output_type=Analysis,
+            toolsets=[SQLToolset(db_conn_id="postgres_default")],
+        )
+        def analyze(question: str):
+            return f"Analyze: {question}"
 
+        analyze("What are the trending products this week?")
 
-# [END howto_decorator_agent_structured]
+    # [END howto_decorator_agent_structured]
 
-example_agent_structured_output()
+    example_agent_structured_output()
 
 
 # ---------------------------------------------------------------------------
@@ -165,29 +172,30 @@ example_agent_structured_output()
 
 
 # [START howto_agent_chain]
-@dag(tags=["example"])
-def example_agent_chain():
-    @task.agent(
-        llm_conn_id="pydanticai_default",
-        system_prompt="You are a SQL analyst.",
-        toolsets=[SQLToolset(db_conn_id="postgres_default", 
allowed_tables=["orders"])],
-    )
-    def investigate(question: str):
-        return f"Investigate: {question}"
-
-    @task
-    def send_report(analysis: str):
-        """Send the agent's analysis to a downstream system."""
-        print(f"Report: {analysis}")
-        return analysis
-
-    result = investigate("Summarize order trends for last quarter")
-    send_report(result)
-
-
-# [END howto_agent_chain]
-
-example_agent_chain()
+if SQLToolset is not None:
+
+    @dag(tags=["example"])
+    def example_agent_chain():
+        @task.agent(
+            llm_conn_id="pydanticai_default",
+            system_prompt="You are a SQL analyst.",
+            toolsets=[SQLToolset(db_conn_id="postgres_default", 
allowed_tables=["orders"])],
+        )
+        def investigate(question: str):
+            return f"Investigate: {question}"
+
+        @task
+        def send_report(analysis: str):
+            """Send the agent's analysis to a downstream system."""
+            print(f"Report: {analysis}")
+            return analysis
+
+        result = investigate("Summarize order trends for last quarter")
+        send_report(result)
+
+    # [END howto_agent_chain]
+
+    example_agent_chain()
 
 
 # ---------------------------------------------------------------------------
diff --git 
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_agent_capabilities.py
 
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_agent_capabilities.py
index 3d069acdf33..26549e70390 100644
--- 
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_agent_capabilities.py
+++ 
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_agent_capabilities.py
@@ -28,9 +28,13 @@ from __future__ import annotations
 from pydantic_ai.capabilities import Thinking, WebSearch
 
 from airflow.providers.common.ai.operators.agent import AgentOperator
-from airflow.providers.common.ai.toolsets.sql import SQLToolset
 from airflow.providers.common.compat.sdk import dag
 
+try:
+    from airflow.providers.common.ai.toolsets.sql import SQLToolset
+except Exception:
+    SQLToolset = None  # type: ignore[assignment,misc]
+
 # ---------------------------------------------------------------------------
 # 1. Thinking capability: enable model reasoning at a configurable effort level
 # ---------------------------------------------------------------------------
@@ -86,29 +90,30 @@ example_agent_capabilities_web_search()
 
 
 # [START howto_operator_agent_capabilities_composed]
-@dag(tags=["example"])
-def example_agent_capabilities_composed():
-    AgentOperator(
-        task_id="analyst",
-        prompt="Cross-reference our top customers with their recent public 
news. Think first.",
-        llm_conn_id="pydanticai_default",
-        system_prompt=(
-            "You are a sales analyst. Query the database for customers, then 
search the web "
-            "for recent news. Reason carefully about which leads to surface."
-        ),
-        toolsets=[
-            SQLToolset(
-                db_conn_id="postgres_default",
-                allowed_tables=["customers", "orders"],
-                max_rows=20,
+if SQLToolset is not None:
+
+    @dag(tags=["example"])
+    def example_agent_capabilities_composed():
+        AgentOperator(
+            task_id="analyst",
+            prompt="Cross-reference our top customers with their recent public 
news. Think first.",
+            llm_conn_id="pydanticai_default",
+            system_prompt=(
+                "You are a sales analyst. Query the database for customers, 
then search the web "
+                "for recent news. Reason carefully about which leads to 
surface."
             ),
-        ],
-        agent_params={
-            "capabilities": [Thinking(effort="medium"), WebSearch()],
-        },
-    )
-
-
-# [END howto_operator_agent_capabilities_composed]
-
-example_agent_capabilities_composed()
+            toolsets=[
+                SQLToolset(
+                    db_conn_id="postgres_default",
+                    allowed_tables=["customers", "orders"],
+                    max_rows=20,
+                ),
+            ],
+            agent_params={
+                "capabilities": [Thinking(effort="medium"), WebSearch()],
+            },
+        )
+
+    # [END howto_operator_agent_capabilities_composed]
+
+    example_agent_capabilities_composed()
diff --git 
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_agent_durable.py
 
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_agent_durable.py
index 6385c005918..b7d2449fc1b 100644
--- 
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_agent_durable.py
+++ 
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_agent_durable.py
@@ -21,40 +21,45 @@ from __future__ import annotations
 from datetime import timedelta
 
 from airflow.providers.common.ai.operators.agent import AgentOperator
-from airflow.providers.common.ai.toolsets.sql import SQLToolset
 from airflow.providers.common.compat.sdk import dag, task
 
+try:
+    from airflow.providers.common.ai.toolsets.sql import SQLToolset
+except Exception:
+    SQLToolset = None  # type: ignore[assignment,misc]
+
 # ---------------------------------------------------------------------------
 # 1. Durable AgentOperator: resumes from last model call on retry
 # ---------------------------------------------------------------------------
 
 
 # [START howto_operator_agent_durable]
-@dag(default_args={"retries": 3, "retry_delay": timedelta(seconds=30)}, 
tags=["example"])
-def example_agent_durable_operator():
-    """Agent with durable execution -- resumes from the last model call on 
retry."""
-    AgentOperator(
-        task_id="durable_analyst",
-        prompt="What are the top 5 customers by order count?",
-        llm_conn_id="pydanticai_default",
-        system_prompt=(
-            "You are a SQL analyst. Use the available tools to explore "
-            "the schema and answer the question with data."
-        ),
-        durable=True,
-        toolsets=[
-            SQLToolset(
-                db_conn_id="postgres_default",
-                allowed_tables=["customers", "orders"],
-                max_rows=20,
-            )
-        ],
-    )
-
-
-# [END howto_operator_agent_durable]
-
-example_agent_durable_operator()
+if SQLToolset is not None:
+
+    @dag(default_args={"retries": 3, "retry_delay": timedelta(seconds=30)}, 
tags=["example"])
+    def example_agent_durable_operator():
+        """Agent with durable execution -- resumes from the last model call on 
retry."""
+        AgentOperator(
+            task_id="durable_analyst",
+            prompt="What are the top 5 customers by order count?",
+            llm_conn_id="pydanticai_default",
+            system_prompt=(
+                "You are a SQL analyst. Use the available tools to explore "
+                "the schema and answer the question with data."
+            ),
+            durable=True,
+            toolsets=[
+                SQLToolset(
+                    db_conn_id="postgres_default",
+                    allowed_tables=["customers", "orders"],
+                    max_rows=20,
+                )
+            ],
+        )
+
+    # [END howto_operator_agent_durable]
+
+    example_agent_durable_operator()
 
 
 # ---------------------------------------------------------------------------
@@ -63,25 +68,26 @@ example_agent_durable_operator()
 
 
 # [START howto_decorator_agent_durable]
-@dag(default_args={"retries": 3, "retry_delay": timedelta(seconds=30)}, 
tags=["example"])
-def example_agent_durable_decorator():
-    @task.agent(
-        llm_conn_id="pydanticai_default",
-        system_prompt="You are a data analyst. Use tools to answer questions.",
-        durable=True,
-        toolsets=[
-            SQLToolset(
-                db_conn_id="postgres_default",
-                allowed_tables=["orders"],
-            )
-        ],
-    )
-    def analyze(question: str):
-        return f"Answer this question about our orders data: {question}"
-
-    analyze("What was our total revenue last month?")
-
-
-# [END howto_decorator_agent_durable]
-
-example_agent_durable_decorator()
+if SQLToolset is not None:
+
+    @dag(default_args={"retries": 3, "retry_delay": timedelta(seconds=30)}, 
tags=["example"])
+    def example_agent_durable_decorator():
+        @task.agent(
+            llm_conn_id="pydanticai_default",
+            system_prompt="You are a data analyst. Use tools to answer 
questions.",
+            durable=True,
+            toolsets=[
+                SQLToolset(
+                    db_conn_id="postgres_default",
+                    allowed_tables=["orders"],
+                )
+            ],
+        )
+        def analyze(question: str):
+            return f"Answer this question about our orders data: {question}"
+
+        analyze("What was our total revenue last month?")
+
+    # [END howto_decorator_agent_durable]
+
+    example_agent_durable_decorator()
diff --git 
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_agent_skills.py
 
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_agent_skills.py
index 4608446cf10..d7523e608a0 100644
--- 
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_agent_skills.py
+++ 
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_agent_skills.py
@@ -35,9 +35,13 @@ from pathlib import Path
 from airflow.providers.common.ai.operators.agent import AgentOperator
 from airflow.providers.common.ai.skills import GitSkills
 from airflow.providers.common.ai.toolsets.skills import AgentSkillsToolset
-from airflow.providers.common.ai.toolsets.sql import SQLToolset
 from airflow.providers.common.compat.sdk import dag
 
+try:
+    from airflow.providers.common.ai.toolsets.sql import SQLToolset
+except Exception:
+    SQLToolset = None  # type: ignore[assignment,misc]
+
 # Skills ship next to this DAG file; resolve relative to __file__ so the path
 # holds regardless of the dag-processor's working directory.
 SKILLS_DIR = Path(__file__).parent / "skills"
@@ -49,27 +53,28 @@ SKILLS_DIR = Path(__file__).parent / "skills"
 
 
 # [START howto_operator_agent_skills_local]
-@dag(tags=["example"])
-def example_agent_skills_local():
-    AgentOperator(
-        task_id="reporter",
-        prompt="How many orders did our top 5 customers place last month?",
-        llm_conn_id="pydanticai_default",
-        system_prompt="You are a data analyst. Consult your skills before 
writing SQL.",
-        toolsets=[
-            AgentSkillsToolset(sources=[str(SKILLS_DIR)]),
-            SQLToolset(
-                db_conn_id="postgres_default",
-                allowed_tables=["customers", "orders"],
-                max_rows=50,
-            ),
-        ],
-    )
-
-
-# [END howto_operator_agent_skills_local]
-
-example_agent_skills_local()
+if SQLToolset is not None:
+
+    @dag(tags=["example"])
+    def example_agent_skills_local():
+        AgentOperator(
+            task_id="reporter",
+            prompt="How many orders did our top 5 customers place last month?",
+            llm_conn_id="pydanticai_default",
+            system_prompt="You are a data analyst. Consult your skills before 
writing SQL.",
+            toolsets=[
+                AgentSkillsToolset(sources=[str(SKILLS_DIR)]),
+                SQLToolset(
+                    db_conn_id="postgres_default",
+                    allowed_tables=["customers", "orders"],
+                    max_rows=50,
+                ),
+            ],
+        )
+
+    # [END howto_operator_agent_skills_local]
+
+    example_agent_skills_local()
 
 
 # ---------------------------------------------------------------------------
diff --git 
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_sql.py
 
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_sql.py
index 4f5261fe7a7..3ff2f20cff8 100644
--- 
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_sql.py
+++ 
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_sql.py
@@ -18,135 +18,128 @@
 
 from __future__ import annotations
 
-from airflow.providers.common.ai.operators.llm_sql import LLMSQLQueryOperator
+try:
+    from airflow.providers.common.ai.operators.llm_sql import 
LLMSQLQueryOperator
+except Exception:
+    LLMSQLQueryOperator = None  # type: ignore[assignment,misc]
+
 from airflow.providers.common.compat.sdk import dag, task
 from airflow.providers.common.sql.config import DataSourceConfig
 
-
-# [START howto_operator_llm_sql_basic]
-@dag(tags=["example"])
-def example_llm_sql_basic():
-    LLMSQLQueryOperator(
-        task_id="generate_sql",
-        prompt="Find the top 10 customers by total revenue",
-        llm_conn_id="pydanticai_default",
-        schema_context=(
-            "Table: customers\n"
-            "Columns: id INT, name TEXT, email TEXT\n\n"
-            "Table: orders\n"
-            "Columns: id INT, customer_id INT, total DECIMAL, created_at 
TIMESTAMP"
-        ),
-    )
-
-
-# [END howto_operator_llm_sql_basic]
-
-example_llm_sql_basic()
-
-
-# [START howto_operator_llm_sql_schema]
-@dag(tags=["example"])
-def example_llm_sql_schema_introspection():
-    LLMSQLQueryOperator(
-        task_id="generate_sql",
-        prompt="Calculate monthly revenue for 2024",
-        llm_conn_id="pydanticai_default",
-        db_conn_id="postgres_default",
-        table_names=["orders", "customers"],
-        dialect="postgres",
-    )
-
-
-# [END howto_operator_llm_sql_schema]
-
-example_llm_sql_schema_introspection()
-
-
-# [START howto_decorator_llm_sql]
-@dag(tags=["example"])
-def example_llm_sql_decorator():
-    @task.llm_sql(
-        llm_conn_id="pydanticai_default",
-        schema_context="Table: users\nColumns: id INT, name TEXT, signup_date 
DATE",
-    )
-    def build_churn_query(ds=None):
-        return f"Find users who signed up before {ds} and have no orders"
-
-    build_churn_query()
-
-
-# [END howto_decorator_llm_sql]
-
-example_llm_sql_decorator()
-
-
-# [START howto_operator_llm_sql_expand]
-@dag(tags=["example"])
-def example_llm_sql_expand():
-    LLMSQLQueryOperator.partial(
-        task_id="generate_sql",
-        llm_conn_id="pydanticai_default",
-        schema_context=(
-            "Table: orders\nColumns: id INT, customer_id INT, total DECIMAL, 
created_at TIMESTAMP"
-        ),
-    ).expand(
-        prompt=[
-            "Total revenue by month",
-            "Top 10 customers by order count",
-            "Average order value by day of week",
-        ]
-    )
-
-
-# [END howto_operator_llm_sql_expand]
-
-example_llm_sql_expand()
-
-
-# [START howto_operator_llm_sql_with_object_storage]
-@dag(tags=["example"])
-def example_llm_sql_with_object_storage():
-    datasource_config = DataSourceConfig(
-        conn_id="aws_default",
-        table_name="sales_data",
-        uri="s3://my-bucket/data/sales/",
-        format="parquet",
-    )
-
-    LLMSQLQueryOperator(
-        task_id="generate_sql",
-        prompt="Find the top 5 products by total sales amount",
-        llm_conn_id="pydanticai_default",
-        datasource_config=datasource_config,
-    )
-
-
-# [END howto_operator_llm_sql_with_object_storage]
-
-example_llm_sql_with_object_storage()
-
-
-# [START howto_operator_llm_sql_approval]
-@dag(tags=["example"])
-def example_llm_sql_approval():
-    from datetime import timedelta
-
-    LLMSQLQueryOperator(
-        task_id="generate_sql_with_approval",
-        prompt="Find the top 10 customers by total revenue in the last 
quarter",
-        llm_conn_id="pydanticai_default",
-        schema_context=(
-            "Table: customers\n"
-            "Columns: id INT, name TEXT\n\n"
-            "Table: orders\n"
-            "Columns: id INT, customer_id INT, total DECIMAL, created_at 
TIMESTAMP"
-        ),
-        require_approval=True,
-        approval_timeout=timedelta(hours=1),
-        allow_modifications=True,
-    )
-
-
-# [END howto_operator_llm_sql_approval]
-
-example_llm_sql_approval()
+if LLMSQLQueryOperator is not None:
+    # [START howto_operator_llm_sql_basic]
+    @dag(tags=["example"])
+    def example_llm_sql_basic():
+        LLMSQLQueryOperator(
+            task_id="generate_sql",
+            prompt="Find the top 10 customers by total revenue",
+            llm_conn_id="pydanticai_default",
+            schema_context=(
+                "Table: customers\n"
+                "Columns: id INT, name TEXT, email TEXT\n\n"
+                "Table: orders\n"
+                "Columns: id INT, customer_id INT, total DECIMAL, created_at 
TIMESTAMP"
+            ),
+        )
+
+    # [END howto_operator_llm_sql_basic]
+
+    example_llm_sql_basic()
+
+    # [START howto_operator_llm_sql_schema]
+    @dag(tags=["example"])
+    def example_llm_sql_schema_introspection():
+        LLMSQLQueryOperator(
+            task_id="generate_sql",
+            prompt="Calculate monthly revenue for 2024",
+            llm_conn_id="pydanticai_default",
+            db_conn_id="postgres_default",
+            table_names=["orders", "customers"],
+            dialect="postgres",
+        )
+
+    # [END howto_operator_llm_sql_schema]
+
+    example_llm_sql_schema_introspection()
+
+    # [START howto_decorator_llm_sql]
+    @dag(tags=["example"])
+    def example_llm_sql_decorator():
+        @task.llm_sql(
+            llm_conn_id="pydanticai_default",
+            schema_context="Table: users\nColumns: id INT, name TEXT, 
signup_date DATE",
+        )
+        def build_churn_query(ds=None):
+            return f"Find users who signed up before {ds} and have no orders"
+
+        build_churn_query()
+
+    # [END howto_decorator_llm_sql]
+
+    example_llm_sql_decorator()
+
+    # [START howto_operator_llm_sql_expand]
+    @dag(tags=["example"])
+    def example_llm_sql_expand():
+        LLMSQLQueryOperator.partial(
+            task_id="generate_sql",
+            llm_conn_id="pydanticai_default",
+            schema_context=(
+                "Table: orders\nColumns: id INT, customer_id INT, total 
DECIMAL, created_at TIMESTAMP"
+            ),
+        ).expand(
+            prompt=[
+                "Total revenue by month",
+                "Top 10 customers by order count",
+                "Average order value by day of week",
+            ]
+        )
+
+    # [END howto_operator_llm_sql_expand]
+
+    example_llm_sql_expand()
+
+    # [START howto_operator_llm_sql_with_object_storage]
+    @dag(tags=["example"])
+    def example_llm_sql_with_object_storage():
+        datasource_config = DataSourceConfig(
+            conn_id="aws_default",
+            table_name="sales_data",
+            uri="s3://my-bucket/data/sales/",
+            format="parquet",
+        )
+
+        LLMSQLQueryOperator(
+            task_id="generate_sql",
+            prompt="Find the top 5 products by total sales amount",
+            llm_conn_id="pydanticai_default",
+            datasource_config=datasource_config,
+        )
+
+    # [END howto_operator_llm_sql_with_object_storage]
+
+    example_llm_sql_with_object_storage()
+
+    # [START howto_operator_llm_sql_approval]
+    @dag(tags=["example"])
+    def example_llm_sql_approval():
+        from datetime import timedelta
+
+        LLMSQLQueryOperator(
+            task_id="generate_sql_with_approval",
+            prompt="Find the top 10 customers by total revenue in the last 
quarter",
+            llm_conn_id="pydanticai_default",
+            schema_context=(
+                "Table: customers\n"
+                "Columns: id INT, name TEXT\n\n"
+                "Table: orders\n"
+                "Columns: id INT, customer_id INT, total DECIMAL, created_at 
TIMESTAMP"
+            ),
+            require_approval=True,
+            approval_timeout=timedelta(hours=1),
+            allow_modifications=True,
+        )
+
+    # [END howto_operator_llm_sql_approval]
+
+    example_llm_sql_approval()
diff --git 
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_survey_agentic.py
 
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_survey_agentic.py
index 9ac48bceb2c..78782d68302 100644
--- 
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_survey_agentic.py
+++ 
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_survey_agentic.py
@@ -55,6 +55,10 @@ Before running:
 1. Create an LLM connection named ``pydanticai_default`` (or the value of
    ``LLM_CONN_ID``) for your chosen model provider.
 2. Place the cleaned survey CSV at the path set by ``SURVEY_CSV_PATH``.
+
+This DAG needs the optional ``sql`` extra::
+
+    pip install "apache-airflow-providers-common-ai[sql]"
 """
 
 from __future__ import annotations
@@ -64,12 +68,16 @@ import json
 import os
 
 from airflow.providers.common.ai.operators.llm import LLMOperator
-from airflow.providers.common.ai.operators.llm_sql import LLMSQLQueryOperator
 from airflow.providers.common.compat.sdk import dag, task
 from airflow.providers.common.sql.config import DataSourceConfig
 from airflow.providers.common.sql.operators.analytics import AnalyticsOperator
 from airflow.providers.standard.operators.hitl import ApprovalOperator
 
+try:
+    from airflow.providers.common.ai.operators.llm_sql import 
LLMSQLQueryOperator
+except Exception:
+    LLMSQLQueryOperator = None  # type: ignore[assignment,misc]
+
 # ---------------------------------------------------------------------------
 # Configuration
 # ---------------------------------------------------------------------------
@@ -134,138 +142,137 @@ Focus on patterns and proportions rather than raw 
counts."""
 # DAG: Agentic multi-query synthesis
 # ---------------------------------------------------------------------------
 
-
-# [START example_llm_survey_agentic]
-@dag(tags=["example"])
-def example_llm_survey_agentic():
-    """
-    Fan-out across four survey dimensions, then synthesize into a single 
narrative.
-
-    Task graph::
-
-        decompose_question (@task)
-            → generate_sql  (LLMSQLQueryOperator ×4, via Dynamic Task Mapping)
-            → wrap_query    (@task ×4)
-            → run_query     (AnalyticsOperator ×4, via Dynamic Task Mapping)
-            → collect_results (@task)
-            → synthesize_answer (LLMOperator)
-            → result_confirmation (ApprovalOperator)
-    """
-
-    # ------------------------------------------------------------------
-    # Step 1: Decompose the high-level question into sub-questions,
-    # one per dimension.  Each string becomes one mapped task instance
-    # in the next step.
-    # ------------------------------------------------------------------
-    @task
-    def decompose_question() -> list[str]:
-        return [
-            """\
+if LLMSQLQueryOperator is not None:
+    # [START example_llm_survey_agentic]
+    @dag(tags=["example"])
+    def example_llm_survey_agentic():
+        """
+        Fan-out across four survey dimensions, then synthesize into a single 
narrative.
+
+        Task graph::
+
+            decompose_question (@task)
+                → generate_sql  (LLMSQLQueryOperator ×4, via Dynamic Task 
Mapping)
+                → wrap_query    (@task ×4)
+                → run_query     (AnalyticsOperator ×4, via Dynamic Task 
Mapping)
+                → collect_results (@task)
+                → synthesize_answer (LLMOperator)
+                → result_confirmation (ApprovalOperator)
+        """
+
+        # ------------------------------------------------------------------
+        # Step 1: Decompose the high-level question into sub-questions,
+        # one per dimension.  Each string becomes one mapped task instance
+        # in the next step.
+        # ------------------------------------------------------------------
+        @task
+        def decompose_question() -> list[str]:
+            return [
+                """\
 Among respondents who use AI/LLM tools to write Airflow code,
 what executor types (CeleryExecutor, KubernetesExecutor, LocalExecutor)
 are most commonly enabled? Count an executor as enabled only if the
 column value is clearly affirmative. Treat blank, NULL, and negative
 values as not enabled. Return the count per executor type.""",
-            """\
+                """\
 Among respondents who use AI/LLM tools to write Airflow code,
 how do they deploy Airflow? Return the count per deployment method.""",
-            """\
+                """\
 Among respondents who use AI/LLM tools to write Airflow code,
 which cloud providers are most commonly used for Airflow?
 Return the count per cloud provider.""",
-            """\
+                """\
 Among respondents who use AI/LLM tools to write Airflow code,
 what version of Airflow are they currently running?
 Return the count per version.""",
-        ]
-
-    sub_questions = decompose_question()
-
-    # ------------------------------------------------------------------
-    # Step 2: Generate SQL for each sub-question in parallel.
-    # LLMSQLQueryOperator is expanded over the sub-question list --
-    # four mapped instances, each translating one natural-language
-    # question into validated SQL.
-    # ------------------------------------------------------------------
-    generate_sql = LLMSQLQueryOperator.partial(
-        task_id="generate_sql",
-        llm_conn_id=LLM_CONN_ID,
-        datasource_config=survey_datasource,
-        schema_context=SURVEY_SCHEMA,
-        system_prompt=SQL_SYSTEM_PROMPT,
-    ).expand(prompt=sub_questions)
-
-    # ------------------------------------------------------------------
-    # Step 3: Wrap each SQL string into a single-element list.
-    # AnalyticsOperator expects queries: list[str]; this step bridges
-    # the scalar output of LLMSQLQueryOperator to that interface.
-    # ------------------------------------------------------------------
-    @task
-    def wrap_query(sql: str) -> list[str]:
-        return [sql]
-
-    wrapped_queries = wrap_query.expand(sql=generate_sql.output)
-
-    # ------------------------------------------------------------------
-    # Step 4: Execute each SQL against the survey CSV via DataFusion.
-    # Four mapped instances run in parallel.  If one fails, only that
-    # instance retries -- the other three hold their XCom results.
-    # ------------------------------------------------------------------
-    run_query = AnalyticsOperator.partial(
-        task_id="run_query",
-        datasource_configs=[survey_datasource],
-        result_output_format="json",
-    ).expand(queries=wrapped_queries)
-
-    # ------------------------------------------------------------------
-    # Step 5: Collect all four JSON results and label them by dimension.
-    # The default trigger rule (all_success) ensures synthesis only runs
-    # when the complete picture is available.
-    # ------------------------------------------------------------------
-    @task
-    def collect_results(results: list[str]) -> dict:
-        # Airflow preserves index order for mapped task outputs, so zip is 
safe here:
-        # results[i] corresponds to the mapped instance at index i, which 
matches
-        # the sub-question at DIMENSION_KEYS[i].
-        labeled: dict[str, list] = {}
-        for key, raw in zip(DIMENSION_KEYS, results):
-            items = json.loads(raw)
-            data = [row for item in items for row in item["data"]]
-            labeled[key] = data
-        return labeled
-
-    collected = collect_results(run_query.output)
-
-    # ------------------------------------------------------------------
-    # Step 6: Synthesize the four labeled result sets into a narrative.
-    # This is the second LLM call -- the first four generated SQL,
-    # this one interprets the results.  Inputs are fully visible in XCom.
-    # ------------------------------------------------------------------
-    synthesize_answer = LLMOperator(
-        task_id="synthesize_answer",
-        llm_conn_id=LLM_CONN_ID,
-        system_prompt=SYNTHESIS_SYSTEM_PROMPT,
-        prompt="""\
+            ]
+
+        sub_questions = decompose_question()
+
+        # ------------------------------------------------------------------
+        # Step 2: Generate SQL for each sub-question in parallel.
+        # LLMSQLQueryOperator is expanded over the sub-question list --
+        # four mapped instances, each translating one natural-language
+        # question into validated SQL.
+        # ------------------------------------------------------------------
+        generate_sql = LLMSQLQueryOperator.partial(
+            task_id="generate_sql",
+            llm_conn_id=LLM_CONN_ID,
+            datasource_config=survey_datasource,
+            schema_context=SURVEY_SCHEMA,
+            system_prompt=SQL_SYSTEM_PROMPT,
+        ).expand(prompt=sub_questions)
+
+        # ------------------------------------------------------------------
+        # Step 3: Wrap each SQL string into a single-element list.
+        # AnalyticsOperator expects queries: list[str]; this step bridges
+        # the scalar output of LLMSQLQueryOperator to that interface.
+        # ------------------------------------------------------------------
+        @task
+        def wrap_query(sql: str) -> list[str]:
+            return [sql]
+
+        wrapped_queries = wrap_query.expand(sql=generate_sql.output)
+
+        # ------------------------------------------------------------------
+        # Step 4: Execute each SQL against the survey CSV via DataFusion.
+        # Four mapped instances run in parallel.  If one fails, only that
+        # instance retries -- the other three hold their XCom results.
+        # ------------------------------------------------------------------
+        run_query = AnalyticsOperator.partial(
+            task_id="run_query",
+            datasource_configs=[survey_datasource],
+            result_output_format="json",
+        ).expand(queries=wrapped_queries)
+
+        # ------------------------------------------------------------------
+        # Step 5: Collect all four JSON results and label them by dimension.
+        # The default trigger rule (all_success) ensures synthesis only runs
+        # when the complete picture is available.
+        # ------------------------------------------------------------------
+        @task
+        def collect_results(results: list[str]) -> dict:
+            # Airflow preserves index order for mapped task outputs, so zip is 
safe here:
+            # results[i] corresponds to the mapped instance at index i, which 
matches
+            # the sub-question at DIMENSION_KEYS[i].
+            labeled: dict[str, list] = {}
+            for key, raw in zip(DIMENSION_KEYS, results):
+                items = json.loads(raw)
+                data = [row for item in items for row in item["data"]]
+                labeled[key] = data
+            return labeled
+
+        collected = collect_results(run_query.output)
+
+        # ------------------------------------------------------------------
+        # Step 6: Synthesize the four labeled result sets into a narrative.
+        # This is the second LLM call -- the first four generated SQL,
+        # this one interprets the results.  Inputs are fully visible in XCom.
+        # ------------------------------------------------------------------
+        synthesize_answer = LLMOperator(
+            task_id="synthesize_answer",
+            llm_conn_id=LLM_CONN_ID,
+            system_prompt=SYNTHESIS_SYSTEM_PROMPT,
+            prompt="""\
 Given these four independent survey query results about practitioners
 who use AI tools to write Airflow code, write a 2-3 sentence
 characterization of what a typical Airflow deployment looks like for
 this group.
 
 Results: {{ ti.xcom_pull(task_ids='collect_results') }}""",
-    )
-    collected >> synthesize_answer
-
-    # ------------------------------------------------------------------
-    # Step 7: Human reviews the synthesized narrative before the DAG ends.
-    # ------------------------------------------------------------------
-    result_confirmation = ApprovalOperator(  # noqa: F841
-        task_id="result_confirmation",
-        subject="Review the synthesized survey analysis",
-        body=synthesize_answer.output,
-        response_timeout=datetime.timedelta(hours=1),
-    )
-
-
-# [END example_llm_survey_agentic]
-
-example_llm_survey_agentic()
+        )
+        collected >> synthesize_answer
+
+        # ------------------------------------------------------------------
+        # Step 7: Human reviews the synthesized narrative before the DAG ends.
+        # ------------------------------------------------------------------
+        result_confirmation = ApprovalOperator(  # noqa: F841
+            task_id="result_confirmation",
+            subject="Review the synthesized survey analysis",
+            body=synthesize_answer.output,
+            response_timeout=datetime.timedelta(hours=1),
+        )
+
+    # [END example_llm_survey_agentic]
+
+    example_llm_survey_agentic()
diff --git 
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_survey_analysis.py
 
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_survey_analysis.py
index df28bb04516..1c239c9ab82 100644
--- 
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_survey_analysis.py
+++ 
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_survey_analysis.py
@@ -39,6 +39,10 @@ Before running either DAG:
    environment variable, or update ``SURVEY_CSV_PATH`` below.
    A cleaned copy of the 2025 survey CSV (duplicate columns renamed, embedded
    newlines removed) is required -- Apache DataFusion is strict about these.
+
+These DAGs need the optional ``sql`` extra::
+
+    pip install "apache-airflow-providers-common-ai[sql]"
 """
 
 from __future__ import annotations
@@ -49,7 +53,6 @@ import json
 import os
 
 from airflow.providers.common.ai.operators.llm_schema_compare import 
LLMSchemaCompareOperator
-from airflow.providers.common.ai.operators.llm_sql import LLMSQLQueryOperator
 from airflow.providers.common.compat.sdk import dag, task
 from airflow.providers.common.sql.config import DataSourceConfig
 from airflow.providers.common.sql.operators.analytics import AnalyticsOperator
@@ -57,6 +60,11 @@ from airflow.providers.http.operators.http import 
HttpOperator
 from airflow.providers.standard.operators.hitl import ApprovalOperator, 
HITLEntryOperator
 from airflow.sdk import Param
 
+try:
+    from airflow.providers.common.ai.operators.llm_sql import 
LLMSQLQueryOperator
+except Exception:
+    LLMSQLQueryOperator = None  # type: ignore[assignment,misc]
+
 # ---------------------------------------------------------------------------
 # Configuration
 # ---------------------------------------------------------------------------
@@ -143,227 +151,223 @@ reference_datasource = DataSourceConfig(
 # DAG 1: Interactive survey question example
 # ---------------------------------------------------------------------------
 
-
-# [START example_llm_survey_interactive]
-@dag(tags=["example"])
-def example_llm_survey_interactive():
-    """
-    Ask a natural language question about the survey with human review at each 
end.
-
-    Task graph::
-
-        prompt_confirmation (HITLEntryOperator)
-            → generate_sql (LLMSQLQueryOperator)
-            → run_query (AnalyticsOperator)
-            → extract_data (@task)
-            → result_confirmation (ApprovalOperator)
-
-    The first HITL step lets the analyst review and optionally reword the
-    question before it reaches the LLM.  The final HITL step presents the
-    query result for approval or rejection.
-    """
-
-    # ------------------------------------------------------------------
-    # Step 1: Prompt confirmation -- review or edit the question.
-    # ------------------------------------------------------------------
-    prompt_confirmation = HITLEntryOperator(
-        task_id="prompt_confirmation",
-        subject="Review the survey analysis question",
-        params={
-            "prompt": Param(
-                INTERACTIVE_PROMPT,
-                type="string",
-                description="The natural language question to answer via SQL",
-            )
-        },
-        response_timeout=datetime.timedelta(hours=1),
-    )
-
-    # ------------------------------------------------------------------
-    # Step 2: SQL generation -- LLM translates the confirmed question.
-    # ------------------------------------------------------------------
-    generate_sql = LLMSQLQueryOperator(
-        task_id="generate_sql",
-        prompt="{{ 
ti.xcom_pull(task_ids='prompt_confirmation')['params_input']['prompt'] }}",
-        llm_conn_id=LLM_CONN_ID,
-        datasource_config=survey_datasource,
-        schema_context=SURVEY_SCHEMA,
-    )
-
-    # ------------------------------------------------------------------
-    # Step 3: SQL execution via Apache DataFusion.
-    # ------------------------------------------------------------------
-    run_query = AnalyticsOperator(
-        task_id="run_query",
-        datasource_configs=[survey_datasource],
-        queries=["{{ ti.xcom_pull(task_ids='generate_sql') }}"],
-        result_output_format="json",
-    )
-
-    # ------------------------------------------------------------------
-    # Step 4: Extract data rows from the JSON result.
-    # AnalyticsOperator returns [{"query": "...", "data": [...]}, ...]
-    # This step strips the query field so only the rows reach the reviewer.
-    # ------------------------------------------------------------------
-    @task
-    def extract_data(raw: str) -> str:
-        results = json.loads(raw)
-        data = [row for item in results for row in item["data"]]
-        return json.dumps(data, indent=2)
-
-    result_data = extract_data(run_query.output)
-
-    # ------------------------------------------------------------------
-    # Step 5: Result confirmation -- approve or reject the query result.
-    # ------------------------------------------------------------------
-    result_confirmation = ApprovalOperator(  # noqa: F841
-        task_id="result_confirmation",
-        subject="Review the survey query result",
-        body=result_data,
-        response_timeout=datetime.timedelta(hours=1),
-    )
-
-    prompt_confirmation >> generate_sql >> run_query
-
-
-# [END example_llm_survey_interactive]
-
-example_llm_survey_interactive()
-
-
-# ---------------------------------------------------------------------------
-# DAG 2: Scheduled survey question example
-# ---------------------------------------------------------------------------
-
-
-# [START example_llm_survey_scheduled]
-@dag(schedule="@monthly", start_date=datetime.datetime(2025, 1, 1), 
catchup=False, tags=["example"])
-def example_llm_survey_scheduled():
-    """
-    Download, validate, query, and report on the survey CSV on a schedule.
-
-    Task graph::
-
-        download_survey (HttpOperator)
-            → prepare_csv (@task)
-            → check_schema (LLMSchemaCompareOperator)
-            → generate_sql (LLMSQLQueryOperator)
-            → run_query (AnalyticsOperator)
-            → extract_data (@task)
-            → send_result (@task)
-
-    No human review steps -- suitable for recurring reporting or dashboards.
-    Change ``schedule`` to any cron expression or Airflow timetable to adjust
-    the run frequency.
-
-    Prerequisites:
-
-    - HTTP connection ``airflow_website`` pointing at 
``https://airflow.apache.org``.
-    - Set ``SMTP_CONN_ID`` and ``NOTIFY_EMAIL`` environment variables to enable
-      email delivery of results; otherwise results are logged to the task log.
-    """
-    # ------------------------------------------------------------------
-    # Step 1: Download the survey CSV from the Airflow website.
-    # ------------------------------------------------------------------
-    download_survey = HttpOperator(
-        task_id="download_survey",
-        http_conn_id=AIRFLOW_WEBSITE_CONN_ID,
-        endpoint=SURVEY_CSV_ENDPOINT,
-        method="GET",
-        response_filter=lambda r: r.text,
-        log_response=False,
-    )
-
-    # ------------------------------------------------------------------
-    # Step 2: Write the downloaded CSV to disk and generate a reference
-    # schema file for the schema comparison step.
-    # ------------------------------------------------------------------
-    @task
-    def prepare_csv(csv_text: str) -> None:
-        os.makedirs(os.path.dirname(SURVEY_CSV_PATH), exist_ok=True)
-        with open(SURVEY_CSV_PATH, "w", encoding="utf-8") as f:
-            f.write(csv_text)
-
-        # Write a single-row reference CSV from the schema context so
-        # LLMSchemaCompareOperator has a structured baseline to compare 
against.
-        os.makedirs(os.path.dirname(REFERENCE_CSV_PATH), exist_ok=True)
-        columns = [line.split('"')[1] for line in 
SURVEY_SCHEMA.strip().splitlines() if '"' in line]
-        with open(REFERENCE_CSV_PATH, "w", newline="", encoding="utf-8") as 
ref:
-            csv_mod.writer(ref).writerow(columns)
-
-    csv_ready = prepare_csv(download_survey.output)
-
-    # ------------------------------------------------------------------
-    # Step 3: Validate the downloaded CSV schema against the reference.
-    # Raises if critical columns are missing or renamed.
-    # ------------------------------------------------------------------
-    check_schema = LLMSchemaCompareOperator(
-        task_id="check_schema",
-        prompt="""\
+if LLMSQLQueryOperator is not None:
+    # [START example_llm_survey_interactive]
+    @dag(tags=["example"])
+    def example_llm_survey_interactive():
+        """
+        Ask a natural language question about the survey with human review at 
each end.
+
+        Task graph::
+
+            prompt_confirmation (HITLEntryOperator)
+                → generate_sql (LLMSQLQueryOperator)
+                → run_query (AnalyticsOperator)
+                → extract_data (@task)
+                → result_confirmation (ApprovalOperator)
+
+        The first HITL step lets the analyst review and optionally reword the
+        question before it reaches the LLM.  The final HITL step presents the
+        query result for approval or rejection.
+        """
+
+        # ------------------------------------------------------------------
+        # Step 1: Prompt confirmation -- review or edit the question.
+        # ------------------------------------------------------------------
+        prompt_confirmation = HITLEntryOperator(
+            task_id="prompt_confirmation",
+            subject="Review the survey analysis question",
+            params={
+                "prompt": Param(
+                    INTERACTIVE_PROMPT,
+                    type="string",
+                    description="The natural language question to answer via 
SQL",
+                )
+            },
+            response_timeout=datetime.timedelta(hours=1),
+        )
+
+        # ------------------------------------------------------------------
+        # Step 2: SQL generation -- LLM translates the confirmed question.
+        # ------------------------------------------------------------------
+        generate_sql = LLMSQLQueryOperator(
+            task_id="generate_sql",
+            prompt="{{ 
ti.xcom_pull(task_ids='prompt_confirmation')['params_input']['prompt'] }}",
+            llm_conn_id=LLM_CONN_ID,
+            datasource_config=survey_datasource,
+            schema_context=SURVEY_SCHEMA,
+        )
+
+        # ------------------------------------------------------------------
+        # Step 3: SQL execution via Apache DataFusion.
+        # ------------------------------------------------------------------
+        run_query = AnalyticsOperator(
+            task_id="run_query",
+            datasource_configs=[survey_datasource],
+            queries=["{{ ti.xcom_pull(task_ids='generate_sql') }}"],
+            result_output_format="json",
+        )
+
+        # ------------------------------------------------------------------
+        # Step 4: Extract data rows from the JSON result.
+        # AnalyticsOperator returns [{"query": "...", "data": [...]}, ...]
+        # This step strips the query field so only the rows reach the reviewer.
+        # ------------------------------------------------------------------
+        @task
+        def extract_data(raw: str) -> str:
+            results = json.loads(raw)
+            data = [row for item in results for row in item["data"]]
+            return json.dumps(data, indent=2)
+
+        result_data = extract_data(run_query.output)
+
+        # ------------------------------------------------------------------
+        # Step 5: Result confirmation -- approve or reject the query result.
+        # ------------------------------------------------------------------
+        result_confirmation = ApprovalOperator(  # noqa: F841
+            task_id="result_confirmation",
+            subject="Review the survey query result",
+            body=result_data,
+            response_timeout=datetime.timedelta(hours=1),
+        )
+
+        prompt_confirmation >> generate_sql >> run_query
+
+    # [END example_llm_survey_interactive]
+
+    example_llm_survey_interactive()
+
+    # 
---------------------------------------------------------------------------
+    # DAG 2: Scheduled survey question example
+    # 
---------------------------------------------------------------------------
+
+    # [START example_llm_survey_scheduled]
+    @dag(schedule="@monthly", start_date=datetime.datetime(2025, 1, 1), 
catchup=False, tags=["example"])
+    def example_llm_survey_scheduled():
+        """
+        Download, validate, query, and report on the survey CSV on a schedule.
+
+        Task graph::
+
+            download_survey (HttpOperator)
+                → prepare_csv (@task)
+                → check_schema (LLMSchemaCompareOperator)
+                → generate_sql (LLMSQLQueryOperator)
+                → run_query (AnalyticsOperator)
+                → extract_data (@task)
+                → send_result (@task)
+
+        No human review steps -- suitable for recurring reporting or 
dashboards.
+        Change ``schedule`` to any cron expression or Airflow timetable to 
adjust
+        the run frequency.
+
+        Prerequisites:
+
+        - HTTP connection ``airflow_website`` pointing at 
``https://airflow.apache.org``.
+        - Set ``SMTP_CONN_ID`` and ``NOTIFY_EMAIL`` environment variables to 
enable
+          email delivery of results; otherwise results are logged to the task 
log.
+        """
+        # ------------------------------------------------------------------
+        # Step 1: Download the survey CSV from the Airflow website.
+        # ------------------------------------------------------------------
+        download_survey = HttpOperator(
+            task_id="download_survey",
+            http_conn_id=AIRFLOW_WEBSITE_CONN_ID,
+            endpoint=SURVEY_CSV_ENDPOINT,
+            method="GET",
+            response_filter=lambda r: r.text,
+            log_response=False,
+        )
+
+        # ------------------------------------------------------------------
+        # Step 2: Write the downloaded CSV to disk and generate a reference
+        # schema file for the schema comparison step.
+        # ------------------------------------------------------------------
+        @task
+        def prepare_csv(csv_text: str) -> None:
+            os.makedirs(os.path.dirname(SURVEY_CSV_PATH), exist_ok=True)
+            with open(SURVEY_CSV_PATH, "w", encoding="utf-8") as f:
+                f.write(csv_text)
+
+            # Write a single-row reference CSV from the schema context so
+            # LLMSchemaCompareOperator has a structured baseline to compare 
against.
+            os.makedirs(os.path.dirname(REFERENCE_CSV_PATH), exist_ok=True)
+            columns = [line.split('"')[1] for line in 
SURVEY_SCHEMA.strip().splitlines() if '"' in line]
+            with open(REFERENCE_CSV_PATH, "w", newline="", encoding="utf-8") 
as ref:
+                csv_mod.writer(ref).writerow(columns)
+
+        csv_ready = prepare_csv(download_survey.output)
+
+        # ------------------------------------------------------------------
+        # Step 3: Validate the downloaded CSV schema against the reference.
+        # Raises if critical columns are missing or renamed.
+        # ------------------------------------------------------------------
+        check_schema = LLMSchemaCompareOperator(
+            task_id="check_schema",
+            prompt="""\
 Compare the survey CSV schema against the reference schema.
 Flag any missing or renamed columns that would break the downstream SQL 
queries.""",
-        llm_conn_id=LLM_CONN_ID,
-        data_sources=[survey_datasource, reference_datasource],
-        context_strategy="basic",
-    )
-    csv_ready >> check_schema
-
-    # ------------------------------------------------------------------
-    # Step 4: SQL generation -- LLM translates the fixed question.
-    # ------------------------------------------------------------------
-    generate_sql = LLMSQLQueryOperator(
-        task_id="generate_sql",
-        prompt=SCHEDULED_PROMPT,
-        llm_conn_id=LLM_CONN_ID,
-        datasource_config=survey_datasource,
-        schema_context=SURVEY_SCHEMA,
-    )
-    check_schema >> generate_sql
-
-    # ------------------------------------------------------------------
-    # Step 5: SQL execution via Apache DataFusion.
-    # ------------------------------------------------------------------
-    run_query = AnalyticsOperator(
-        task_id="run_query",
-        datasource_configs=[survey_datasource],
-        queries=["{{ ti.xcom_pull(task_ids='generate_sql') }}"],
-        result_output_format="json",
-    )
-
-    # ------------------------------------------------------------------
-    # Step 6: Extract data rows from the JSON result.
-    # AnalyticsOperator returns [{"query": "...", "data": [...]}, ...]
-    # ------------------------------------------------------------------
-    @task
-    def extract_data(raw: str) -> str:
-        results = json.loads(raw)
-        data = [row for item in results for row in item["data"]]
-        return json.dumps(data, indent=2)
-
-    result_data = extract_data(run_query.output)
-
-    # ------------------------------------------------------------------
-    # Step 7: Send result via email if SMTP is configured, otherwise log.
-    # Set the SMTP_CONN_ID and NOTIFY_EMAIL environment variables to enable
-    # email delivery.
-    # ------------------------------------------------------------------
-    @task
-    def send_result(data: str) -> None:
-        if SMTP_CONN_ID and NOTIFY_EMAIL:
-            from airflow.providers.smtp.hooks.smtp import SmtpHook
-
-            with SmtpHook(smtp_conn_id=SMTP_CONN_ID) as hook:
-                hook.send_email_smtp(
-                    to=NOTIFY_EMAIL,
-                    subject=f"Airflow Survey Analysis: {SCHEDULED_PROMPT}",
-                    html_content=f"<pre>{data}</pre>",
-                )
-        else:
-            print(f"Survey analysis result:\n{data}")
-
-    generate_sql >> run_query >> result_data >> send_result(result_data)
-
-
-# [END example_llm_survey_scheduled]
-
-example_llm_survey_scheduled()
+            llm_conn_id=LLM_CONN_ID,
+            data_sources=[survey_datasource, reference_datasource],
+            context_strategy="basic",
+        )
+        csv_ready >> check_schema
+
+        # ------------------------------------------------------------------
+        # Step 4: SQL generation -- LLM translates the fixed question.
+        # ------------------------------------------------------------------
+        generate_sql = LLMSQLQueryOperator(
+            task_id="generate_sql",
+            prompt=SCHEDULED_PROMPT,
+            llm_conn_id=LLM_CONN_ID,
+            datasource_config=survey_datasource,
+            schema_context=SURVEY_SCHEMA,
+        )
+        check_schema >> generate_sql
+
+        # ------------------------------------------------------------------
+        # Step 5: SQL execution via Apache DataFusion.
+        # ------------------------------------------------------------------
+        run_query = AnalyticsOperator(
+            task_id="run_query",
+            datasource_configs=[survey_datasource],
+            queries=["{{ ti.xcom_pull(task_ids='generate_sql') }}"],
+            result_output_format="json",
+        )
+
+        # ------------------------------------------------------------------
+        # Step 6: Extract data rows from the JSON result.
+        # AnalyticsOperator returns [{"query": "...", "data": [...]}, ...]
+        # ------------------------------------------------------------------
+        @task
+        def extract_data(raw: str) -> str:
+            results = json.loads(raw)
+            data = [row for item in results for row in item["data"]]
+            return json.dumps(data, indent=2)
+
+        result_data = extract_data(run_query.output)
+
+        # ------------------------------------------------------------------
+        # Step 7: Send result via email if SMTP is configured, otherwise log.
+        # Set the SMTP_CONN_ID and NOTIFY_EMAIL environment variables to enable
+        # email delivery.
+        # ------------------------------------------------------------------
+        @task
+        def send_result(data: str) -> None:
+            if SMTP_CONN_ID and NOTIFY_EMAIL:
+                from airflow.providers.smtp.hooks.smtp import SmtpHook
+
+                with SmtpHook(smtp_conn_id=SMTP_CONN_ID) as hook:
+                    hook.send_email_smtp(
+                        to=NOTIFY_EMAIL,
+                        subject=f"Airflow Survey Analysis: {SCHEDULED_PROMPT}",
+                        html_content=f"<pre>{data}</pre>",
+                    )
+            else:
+                print(f"Survey analysis result:\n{data}")
+
+        generate_sql >> run_query >> result_data >> send_result(result_data)
+
+    # [END example_llm_survey_scheduled]
+
+    example_llm_survey_scheduled()

Reply via email to