This is an automated email from the ASF dual-hosted git repository. vikramkoka pushed a commit to branch aip-99-blog-posts in repository https://gitbox.apache.org/repos/asf/airflow-site.git
commit c1ca856c99150b0d5fcc208c6d9b7d49a65e3621 Author: Vikram Koka <[email protected]> AuthorDate: Mon Apr 13 17:25:42 2026 -0700 Blog posts to go with the common.ai provider There are two blog posts to go along with the common.ai provider release. The first highlights the use case of the common.ai provider with a relatively straight forward scenario based on the Airflow survey data. The second builds on it with a more complex agentic workflow which covers typical agentic patterns achievable through Airflow using this provider. --- .../en/blog/ai-survey-analysis-pipelines/index.md | 345 +++++++++++++++++++++ .../blog/blog-agentic-workloads-airflow-3/index.md | 336 ++++++++++++++++++++ 2 files changed, 681 insertions(+) diff --git a/landing-pages/site/content/en/blog/ai-survey-analysis-pipelines/index.md b/landing-pages/site/content/en/blog/ai-survey-analysis-pipelines/index.md new file mode 100644 index 0000000000..b705baf79a --- /dev/null +++ b/landing-pages/site/content/en/blog/ai-survey-analysis-pipelines/index.md @@ -0,0 +1,345 @@ +--- +title: "Ask Your Survey Anything: Building AI Analysis Pipelines with Airflow 3" +linkTitle: "AI Survey Analysis Pipelines with Airflow 3" +authors: + - name: "Vikram Koka" + github: "vikramkoka" + linkedin: "vikramkoka" +description: "A walkthrough of two natural language analysis pipelines over the 2025 Airflow Community Survey — an interactive human-in-the-loop version and a fully automated scheduled version — using operators from the common.ai and common.sql providers." +tags: [Community, Tutorial] +date: "2026-04-XX" +images: ["/blog/ai-survey-analysis-pipelines/images/survey-pipeline-dag.png"] +--- + +The [2025 Airflow Community Survey](https://airflow.apache.org/survey/) collected responses +from nearly 6,000 practitioners across 168 questions. You can open a spreadsheet and filter, +or write SQL by hand. But what if you could just ask a question and have Airflow figure out +the query, run it, and bring the result back for your approval? + +This post builds two pipelines that do exactly that, using the +[`apache-airflow-providers-common-ai`](https://airflow.apache.org/blog/common-ai-provider/) +provider released with Airflow 3. + +The first pipeline is **interactive**: a human reviews the question before it reaches the LLM +and approves the result before the DAG finishes. The second is **scheduled**: it downloads +fresh survey data, validates the schema, runs the query unattended, and emails the result. + +If you haven't seen the common.ai provider overview yet, start there for a tour of all the +operators. This post goes deep on a concrete end-to-end example. + + +## Two Pipelines, One Example File + +Both DAGs live in +[`example_llm_survey_analysis.py`](https://github.com/apache/airflow/tree/main/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_survey_analysis.py) +and share the same schema context and datasource configuration. + +**`example_llm_survey_interactive`** — trigger manually, review at both ends: + +``` +prompt_confirmation → generate_sql → run_query → extract_data → result_confirmation +(HITLEntryOperator) (LLMSQLQuery) (Analytics) (@task) (ApprovalOperator) +``` + +**`example_llm_survey_scheduled`** — runs `@monthly`, no human in the loop: + +``` +download_survey → prepare_csv → check_schema → generate_sql → run_query → extract_data → send_result +(HttpOperator) (@task) (LLMSchema (LLMSQLQuery) (Analytics) (@task) (@task / Email) + Compare) +``` + + +## The Data + +The [Airflow Community Survey 2025](https://airflow.apache.org/survey/) CSV has 5,856 rows +and 168 columns covering everything from Airflow version and executor type to cloud provider, +company size, and AI tool usage. A few highlights from the data: + +- **3,320** respondents identify as Data Engineers +- **2,032** use AWS as their primary cloud provider for Airflow +- **1,445** are already running Airflow 3 +- **1,351** say they *often* use AI tools to write Airflow code + +Those last two numbers together are part of why this example exists: the people most likely +to use this pipeline are already using Airflow 3 and already using AI in their workflow. + +> **Data prep note:** Apache DataFusion is strict about CSV schemas. The raw survey export +> has 22 duplicate `"Other"` column names and some free-text cells with embedded newlines. +> Both need cleaning before DataFusion will parse the file. The interactive DAG assumes a +> cleaned copy at the path set by the `SURVEY_CSV_PATH` environment variable. The scheduled +> DAG downloads the file at runtime and the `prepare_csv` step handles writing it to disk. + + +## The Interactive Pipeline + +Five tasks. No external services beyond your LLM provider and a local copy of the CSV. + +| Step | Operator | What happens | +|---|---|---| +| 1 | `HITLEntryOperator` | DAG pauses. Human reviews and optionally edits the question. | +| 2 | `LLMSQLQueryOperator` | LLM translates the confirmed question into SQL, validated by sqlglot. | +| 3 | `AnalyticsOperator` | Apache DataFusion executes the SQL against the local CSV. | +| 4 | `@task extract_data` | Strips the query from the JSON result — reviewer sees only data rows. | +| 5 | `ApprovalOperator` | DAG pauses again. Human approves or rejects the result. | + +The LLM and DataFusion steps run unattended. The human shows up at step 1 to confirm the +question and at step 5 to sign off on the answer. Everything in between is automated. + +```python +@dag(schedule=None) +def example_llm_survey_interactive(): + + prompt_confirmation = HITLEntryOperator( + task_id="prompt_confirmation", + subject="Review the survey analysis question", + params={ + "prompt": Param( + "How does AI tool usage for writing Airflow code compare between Airflow 3 users and Airflow 2 users?", + type="string", + description="The natural language question to answer via SQL", + ) + }, + response_timeout=datetime.timedelta(hours=1), + ) + + generate_sql = LLMSQLQueryOperator( + task_id="generate_sql", + prompt="{{ ti.xcom_pull(task_ids='prompt_confirmation')['params_input']['prompt'] }}", + llm_conn_id="pydanticai_default", + datasource_config=survey_datasource, + schema_context=SURVEY_SCHEMA, + ) + + run_query = AnalyticsOperator( + task_id="run_query", + datasource_configs=[survey_datasource], + queries=["{{ ti.xcom_pull(task_ids='generate_sql') }}"], + result_output_format="json", + ) + + @task + def extract_data(raw: str) -> str: + results = json.loads(raw) + data = [row for item in results for row in item["data"]] + return json.dumps(data, indent=2) + + result_data = extract_data(run_query.output) + + result_confirmation = ApprovalOperator( + task_id="result_confirmation", + subject="Review the survey query result", + body="{{ ti.xcom_pull(task_ids='extract_data') }}", + response_timeout=datetime.timedelta(hours=1), + ) + + prompt_confirmation >> generate_sql >> run_query >> result_data >> result_confirmation +``` + + +## Walking Through a Run + +**Step 1 — Prompt confirmation.** Trigger the DAG and navigate to the HITL review tab. +The default question appears in an editable field. Change it to anything the schema supports, +or leave it as-is and confirm. + +> *"How does AI tool usage for writing Airflow code compare between Airflow 3 users and Airflow 2 users?"* + +**Step 2 — SQL generation.** `LLMSQLQueryOperator` receives the confirmed question, constructs +a system prompt from `SURVEY_SCHEMA`, and calls the LLM. It returns validated SQL — sqlglot +parses the output and rejects anything that isn't a `SELECT`. The generated query goes to XCom. + +```sql +SELECT + "Which version of Airflow do you currently use?" AS airflow_version, + "Are you using AI/LLM (ChatGPT/Cursor/Claude etc) to assist you in writing Airflow code?" AS ai_usage, + COUNT(*) AS respondents +FROM survey +WHERE "Which version of Airflow do you currently use?" IS NOT NULL + AND "Are you using AI/LLM (ChatGPT/Cursor/Claude etc) to assist you in writing Airflow code?" IS NOT NULL +GROUP BY airflow_version, ai_usage +ORDER BY airflow_version, respondents DESC +``` + +**Step 3 — DataFusion execution.** `AnalyticsOperator` loads the CSV into a DataFusion +`SessionContext`, registers it as the `survey` table, and executes the SQL in-process. +No database server, no network call. The 5,856-row CSV runs in under a second. + +**Step 4 — Extract data.** The raw JSON from `AnalyticsOperator` includes the original +query string alongside the result rows. This `@task` strips the query so the reviewer +isn't looking at SQL when they should be looking at data. + +**Step 5 — Result confirmation.** The data rows appear in the Airflow UI approval dialog. +The analyst reads the result, clicks Approve (or Reject if something looks off), and the +DAG completes. + + +## The Scheduled Pipeline + +The scheduled variant adds three capabilities the interactive version intentionally omits: +data acquisition, schema validation, and result delivery. It runs `@monthly` (configurable) +with no human steps. + +| Step | Operator | What happens | +|---|---|---| +| 1 | `HttpOperator` | Downloads the survey CSV from `airflow.apache.org`. | +| 2 | `@task prepare_csv` | Writes the CSV to disk and generates a reference schema file from `SURVEY_SCHEMA`. | +| 3 | `LLMSchemaCompareOperator` | LLM compares the downloaded CSV schema against the reference. Raises if critical columns are missing or renamed. | +| 4 | `LLMSQLQueryOperator` | Translates the fixed question into validated SQL. | +| 5 | `AnalyticsOperator` | Executes the SQL via DataFusion. | +| 6 | `@task extract_data` | Extracts data rows from the JSON result. | +| 7 | `@task send_result` | Sends the result via `EmailOperator` if `SMTP_CONN_ID` and `NOTIFY_EMAIL` are set, otherwise logs to the task log. | + +The schema check at step 3 is worth calling out. `LLMSchemaCompareOperator` compares the +live download against a reference file derived from `SURVEY_SCHEMA`. If the survey format +changes between runs — a renamed column, a dropped field — the operator catches it before +any SQL runs, rather than failing silently mid-pipeline with a cryptic DataFusion error. + +```python +@dag(schedule="@monthly", start_date=None) +def example_llm_survey_scheduled(): + + from airflow.providers.http.operators.http import HttpOperator + + download_survey = HttpOperator( + task_id="download_survey", + http_conn_id="airflow_website", + endpoint="/survey/airflow-user-survey-2025.csv", + method="GET", + response_filter=lambda r: r.text, + log_response=False, + ) + + @task + def prepare_csv(csv_text: str) -> None: + import csv as csv_mod, os + os.makedirs(os.path.dirname(SURVEY_CSV_PATH), exist_ok=True) + with open(SURVEY_CSV_PATH, "w", encoding="utf-8") as f: + f.write(csv_text) + columns = [line.split('"')[1] for line in SURVEY_SCHEMA.strip().splitlines() if '"' in line] + with open(REFERENCE_CSV_PATH, "w", newline="", encoding="utf-8") as ref: + csv_mod.writer(ref).writerow(columns) + + csv_ready = prepare_csv(download_survey.output) + + check_schema = LLMSchemaCompareOperator( + task_id="check_schema", + prompt="Compare the survey CSV schema against the reference. Flag missing or renamed columns.", + llm_conn_id="pydanticai_default", + data_sources=[survey_datasource, reference_datasource], + context_strategy="basic", + ) + csv_ready >> check_schema + + generate_sql = LLMSQLQueryOperator( + task_id="generate_sql", + prompt=SCHEDULED_PROMPT, + llm_conn_id="pydanticai_default", + datasource_config=survey_datasource, + schema_context=SURVEY_SCHEMA, + ) + check_schema >> generate_sql + + run_query = AnalyticsOperator( + task_id="run_query", + datasource_configs=[survey_datasource], + queries=["{{ ti.xcom_pull(task_ids='generate_sql') }}"], + result_output_format="json", + ) + + @task + def extract_data(raw: str) -> str: + results = json.loads(raw) + data = [row for item in results for row in item["data"]] + return json.dumps(data, indent=2) + + result_data = extract_data(run_query.output) + + @task + def send_result(data: str) -> None: + if SMTP_CONN_ID and NOTIFY_EMAIL: + from airflow.providers.smtp.operators.smtp import EmailOperator + EmailOperator( + task_id="_send_email", + smtp_conn_id=SMTP_CONN_ID, + to=NOTIFY_EMAIL, + subject=f"Airflow Survey Analysis: {SCHEDULED_PROMPT}", + html_content=f"<pre>{data}</pre>", + ).execute({}) + else: + print(f"Survey analysis result:\n{data}") + + generate_sql >> run_query >> result_data >> send_result(result_data) +``` + + +## Connecting Your LLM + +Both DAGs use `llm_conn_id="pydanticai_default"`. Create a connection in the Airflow UI: + +| Provider | Connection type | Required fields | +|---|---|---| +| OpenAI | `pydanticai` | Password: API key. Extra: `{"model": "openai:gpt-4o"}` | +| Anthropic | `pydanticai` | Password: API key. Extra: `{"model": "anthropic:claude-haiku-4-5-20251001"}` | +| Google Vertex | `pydanticai-vertex` | Extra: `{"model": "google-vertex:gemini-2.0-flash", "project": "...", "vertexai": true}` | +| AWS Bedrock | `pydanticai-bedrock` | Extra: `{"model": "bedrock:us.anthropic.claude-opus-4-5", "region_name": "us-east-1"}` | + +Switch providers by changing the connection — neither DAG requires any code changes. + +For the scheduled DAG, also create an HTTP connection named `airflow_website` with host +`https://airflow.apache.org` (no auth required), and optionally set the `SMTP_CONN_ID` +and `NOTIFY_EMAIL` environment variables to enable email delivery. + + +## What This Shows + +Four capabilities come together across these two pipelines that haven't been easy to combine before: + +**Natural language as the interface.** Neither pipeline requires the analyst to write SQL. +`LLMSQLQueryOperator` handles schema awareness, column quoting, and query structure. The +`SURVEY_SCHEMA` context is the only thing it needs. + +**In-process query execution.** `AnalyticsOperator` runs Apache DataFusion inside the Airflow +worker. There's no database to configure, no connection to manage for the data itself. Point +it at a file URI and it runs. + +**Schema-aware data validation.** `LLMSchemaCompareOperator` uses an LLM to compare schemas +and surface structural changes in plain language — not a column count diff, but an explanation +of what changed and why it matters for downstream queries. It turns a silent mid-pipeline +failure into an early, actionable error. + +**Human oversight without blocking automation.** The `HITLEntryOperator` and `ApprovalOperator` +are standard Airflow operators from `airflow.providers.standard.operators.hitl`. They have no +AI imports — they just pause the DAG and wait. The interactive pipeline uses them at both ends; +the scheduled pipeline skips them entirely. Adding or removing human review requires no changes +to the LLM or DataFusion steps. + + +## Try It + +Both DAGs are in the `common.ai` provider example DAGs: +[`example_llm_survey_analysis.py`](https://github.com/apache/airflow/tree/main/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_survey_analysis.py). + +```bash +pip install 'apache-airflow-providers-common-ai' \ + 'apache-airflow-providers-common-sql[datafusion]' \ + 'apache-airflow-providers-http' \ + 'apache-airflow-providers-smtp' # optional, for email delivery +``` + +Requires Apache Airflow 3.0+. `apache-airflow-providers-standard` (which provides +`HITLEntryOperator` and `ApprovalOperator`) ships with Airflow 3 and does not need +a separate install. + +For the interactive DAG: set `SURVEY_CSV_PATH` to your local copy of the survey CSV, create +a `pydanticai_default` connection, and trigger `example_llm_survey_interactive`. + +For the scheduled DAG: create the `airflow_website` HTTP connection, set `SMTP_CONN_ID` and +`NOTIFY_EMAIL` if you want email delivery, and trigger `example_llm_survey_scheduled`. + +To go further, the follow-on post [Agentic Workloads on Airflow 3](https://airflow.apache.org/blog/agentic-workloads-airflow-3/) +extends this example into a multi-query synthesis pattern — answering questions that require +querying several dimensions in parallel and synthesizing the results with a second LLM call. + +Questions, feedback, and survey queries that stumped the LLM are all welcome on +[Airflow Slack](https://s.apache.org/airflow-slack) in `#airflow-ai`. diff --git a/landing-pages/site/content/en/blog/blog-agentic-workloads-airflow-3/index.md b/landing-pages/site/content/en/blog/blog-agentic-workloads-airflow-3/index.md new file mode 100644 index 0000000000..54110b9d01 --- /dev/null +++ b/landing-pages/site/content/en/blog/blog-agentic-workloads-airflow-3/index.md @@ -0,0 +1,336 @@ +--- +title: "Agentic Workloads on Airflow: Observable, Retryable, and Auditable by Design" +linkTitle: "Agentic Workloads on Airflow 3" +authors: + - name: "Vikram Koka" + github: "vikramkoka" + linkedin: "vikramkoka" +description: "How Dynamic Task Mapping and the common.ai provider turn a multi-dimensional research question into a fan-out/fan-in pipeline where every LLM call is a named, logged, independently retryable task — not a hidden step inside a reasoning loop." +tags: [Community, Tutorial] +date: "2026-04-XX" +images: ["/blog/agentic-workloads-airflow-3/images/agentic-fanout-dag.png"] +--- + +A question like "How does AI tool usage vary across Airflow versions?" has a natural SQL shape: one cross-tabulation, one result. A question like "What does a typical Airflow deployment look like for practitioners who are actively using AI in their workflow?" does not. It requires querying executor type, deployment method, cloud provider, and Airflow version independently, each filtered to the same respondent group, then synthesizing the results into a coherent picture. No single query r [...] + +This is where Airflow's agentic pattern begins: not when you add an LLM to a workflow, but when the structure of the work itself depends on running multiple LLM calls whose outputs feed a synthesis step. This post builds that pattern using the [2025 Airflow Community Survey](https://airflow.apache.org/survey/) data set and the [`apache-airflow-providers-common-ai`](https://airflow.apache.org/blog/common-ai-provider/) provider released with Airflow 3. + +If you haven't read the [introductory survey analysis post](https://airflow.apache.org/blog/ai-survey-analysis-pipelines/) yet, start there for a walkthrough of the single-query interactive and scheduled pipelines. This post picks up where that one ends. + + +## The Agentic Gap in the Single-Query Pattern + +The interactive and scheduled survey DAGs from the introductory post each do one thing: translate a natural language question into SQL, execute it against the CSV, and return the result. The LLM is involved once. The structure of the pipeline does not change based on what that LLM call returns. + +That is not a limitation to fix — it is the right design for that class of question. For a large fraction of production AI workflows, a single well-structured LLM call with good context is sufficient and preferable. + +The pattern becomes agentic when two things are true simultaneously: + +1. The question requires querying multiple independent dimensions +2. The synthesis step — the thing that produces the final answer — depends on *all* of those results + +In an agent harness framework, this would be handled inside a reasoning loop: the LLM decides to call a tool, receives a result, calls another tool, accumulates context, and eventually produces a synthesis. Each tool call is invisible to any outside observer. If one tool call fails, the loop either retries internally or fails entirely. + +In Airflow, the same logic takes a different shape. Each sub-query becomes a named task. The fan-out is Dynamic Task Mapping. The synthesis is a named task with its inputs in XCom. Every step is observable, independently retryable, and logged. + + +## The DAG: `example_llm_survey_agentic` + +The full DAG is in +[`example_llm_survey_agentic.py`](https://github.com/apache/airflow/tree/main/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_survey_agentic.py). + +**Question:** *"What does a typical Airflow deployment look like for practitioners who actively use AI tools in their workflow?"* + +**Task graph:** + +``` +decompose_question → generate_sql (×4) → wrap_query (×4) → run_query (×4) + (@task) (LLMSQLQuery, (@task, (Analytics, + mapped) mapped) mapped) + ↓ + collect_results + (@task) + ↓ + synthesize_answer + (LLMOperator) + ↓ + result_confirmation + (ApprovalOperator) +``` + +Seven tasks. Four of them run in parallel. Two LLM calls total — one for SQL generation (four instances), one for synthesis. One human review at the end. + +| Step | Operator | What happens | +|---|---|---| +| 1 | `@task decompose_question` | Returns a list of four sub-questions, one per dimension. | +| 2 | `LLMSQLQueryOperator` (mapped ×4) | Each sub-question becomes one SQL query, translated and validated in parallel. | +| 3 | `@task wrap_query` (mapped ×4) | Wraps each SQL string into a single-element list for `AnalyticsOperator`. | +| 4 | `AnalyticsOperator` (mapped ×4) | Apache DataFusion executes all four queries in parallel against the local CSV. | +| 5 | `@task collect_results` | Gathers the four JSON results and labels each by dimension. | +| 6 | `LLMOperator` | Reads all four labeled result sets and writes a narrative characterization. | +| 7 | `ApprovalOperator` | Human reviews the synthesized narrative before the DAG completes. | + + +## Decomposing the Question + +`decompose_question` is a plain `@task` that returns the list of sub-questions. In this example, the list is static — the four dimensions are hardcoded as strings: + +```python +@task +def decompose_question() -> list[str]: + return [ + ( + "Among respondents who use AI/LLM tools to write Airflow code, " + "what executor types (CeleryExecutor, KubernetesExecutor, LocalExecutor) " + "are most commonly enabled? Return the count per executor type." + ), + ( + "Among respondents who use AI/LLM tools to write Airflow code, " + "how do they deploy Airflow? Return the count per deployment method." + ), + ( + "Among respondents who use AI/LLM tools to write Airflow code, " + "which cloud providers are most commonly used for Airflow? " + "Return the count per cloud provider." + ), + ( + "Among respondents who use AI/LLM tools to write Airflow code, " + "what version of Airflow are they currently running? " + "Return the count per version." + ), + ] + +sub_questions = decompose_question() +``` + +The output of this task — a list of four strings — becomes the input to the `expand()` call on `LLMSQLQueryOperator` in the next step. Airflow creates one mapped task instance per list element. + +> **Why keep this static?** A dynamic version — where the LLM itself decomposes the high-level question into sub-questions at runtime — is possible and more agentic. It adds an LLM call before any SQL runs, which introduces latency and a failure point early in the graph. For a first example, static decomposition is clearer. The dynamic variant is a follow-on pattern. + + +## SQL Generation: Mapping Over Sub-Questions + +`LLMSQLQueryOperator.partial().expand()` creates one mapped task instance per sub-question. All four run in parallel, each translating one natural language question into validated SQL against the survey schema: + +```python +generate_sql = LLMSQLQueryOperator.partial( + task_id="generate_sql", + llm_conn_id=LLM_CONN_ID, + datasource_config=survey_datasource, + schema_context=SURVEY_SCHEMA, +).expand(prompt=sub_questions) +``` + +In the Airflow UI, this renders as four task instances: `generate_sql[0]`, `generate_sql[1]`, `generate_sql[2]`, `generate_sql[3]`. Each has its own log, retry counter, and XCom entry. This is what an agent harness's parallel tool calls look like when they are made explicit. + +Each instance returns a single SQL string. `LLMSQLQueryOperator` validates the output with sqlglot before returning it — anything that is not a `SELECT` statement is rejected. + + +## The `wrap_query` Bridge + +`AnalyticsOperator` expects `queries: list[str]` — a list because it can run multiple queries in one execution. `LLMSQLQueryOperator` returns a single `str`. A small `@task` bridges the interface: + +```python +@task +def wrap_query(sql: str) -> list[str]: + return [sql] + +wrapped_queries = wrap_query.expand(sql=generate_sql.output) +``` + +This step is an implementation detail, not a conceptual one. Four mapped instances of `wrap_query` run in parallel, each converting one SQL string into a one-element list. The result is four `list[str]` values that `AnalyticsOperator` can consume directly. + + +## Parallel Execution via DataFusion + +```python +run_query = AnalyticsOperator.partial( + task_id="run_query", + datasource_configs=[survey_datasource], + result_output_format="json", +).expand(queries=wrapped_queries) +``` + +Four mapped instances of `AnalyticsOperator` run in parallel. Each loads the survey CSV into an Apache DataFusion `SessionContext` in-process and executes its SQL against it. No database server, no shared state between instances. + +This is where independent retry earns its value. If the cloud provider query returns a DataFusion error due to a null value in that column, only `run_query[2]` fails. `run_query[0]`, `run_query[1]`, and `run_query[3]` have already succeeded and their results are in XCom. When `run_query[2]` is cleared and retried, the other three results are preserved. + + +## Collecting and Labeling Results + +`collect_results` gathers all four outputs from `run_query` — Airflow passes the list of mapped outputs directly — and labels each one by dimension key: + +```python +# DIMENSION_KEYS = ["executor", "deployment", "cloud", "airflow_version"] +# Order must match the sub-questions returned by decompose_question. +# Airflow preserves mapped task output ordering by index, so this zip is safe. + +@task(trigger_rule="all_success") +def collect_results(results: list[str]) -> dict: + labeled: dict[str, list] = {} + for key, raw in zip(DIMENSION_KEYS, results): + items = json.loads(raw) + data = [row for item in items for row in item["data"]] + labeled[key] = data + return labeled + +collected = collect_results(run_query.output) +``` + +The output is a dict like: + +```json +{ + "executor": [{"KubernetesExecutor": "Yes", "count": 847}, ...], + "deployment": [{"How do you deploy Airflow?": "Managed Cloud Service", "count": 1203}, ...], + "cloud": [{"primary_cloud": "AWS", "count": 891}, ...], + "airflow_version": [{"version": "3.x", "count": 412}, ...] +} +``` + +All four result sets in one XCom entry. This is the input to the synthesis step. + + +## Synthesis: The Second LLM Call + +`LLMOperator` takes the collected results and produces a narrative. This is the synthesis step — the part of the pipeline that could not exist without all four sub-queries having completed first: + +```python +synthesize_answer = LLMOperator( + task_id="synthesize_answer", + llm_conn_id=LLM_CONN_ID, + system_prompt=( + "You are a data analyst summarizing survey results about Apache Airflow practitioners. " + "Write in plain, concise language suitable for a technical audience. " + "Focus on patterns and proportions rather than raw counts." + ), + prompt=( + "Given these four independent survey query results about practitioners " + "who use AI tools to write Airflow code, write a 2-3 sentence " + "characterization of what a typical Airflow deployment looks like for " + "this group.\n\n" + "Results: {{ ti.xcom_pull(task_ids='collect_results') }}" + ), +) +collected >> synthesize_answer +``` + +`prompt` is a template field, so `{{ ti.xcom_pull(task_ids='collect_results') }}` renders the full dict at execution time. `system_prompt` maps to the PydanticAI agent's `instructions` parameter, so the framing instruction carries into every token the model generates. + +The output — a 2-3 sentence characterization — goes to XCom and then to the final approval step. + +> **Inline HITL alternative:** `LLMOperator` supports `require_approval=True` and `allow_modifications=True` as constructor parameters, via `LLMApprovalMixin`. Setting these eliminates the separate `ApprovalOperator` task and lets the reviewer edit the synthesized narrative directly before approving. Whether to use inline approval or a separate `ApprovalOperator` is a design choice; both produce the same result. + + +## Walking Through a Run + +**Step 1 — Decompose.** Trigger the DAG. `decompose_question` completes in milliseconds and returns the four sub-question strings. + +**Steps 2–4 — Fan-out.** Twelve mapped task instances run: four `generate_sql`, four `wrap_query`, four `run_query`. In the Airflow UI, these appear as three rows of four parallel task instances. Each SQL generation call goes to the LLM; each DataFusion execution runs in-process against the CSV. + +<!-- SCREENSHOT PLACEHOLDER: Airflow UI Grid view showing the fan-out — three rows of four mapped instances (generate_sql[0..3], wrap_query[0..3], run_query[0..3]) all in success state, converging to collect_results → synthesize_answer → result_confirmation. Capture after a full successful run. --> + +A representative generated query for the executor dimension: + +```sql +SELECT + CASE WHEN "CeleryExecutor" = 'Yes' THEN 'CeleryExecutor' END AS executor_type, + COUNT(*) AS count +FROM survey +WHERE "Are you using AI/LLM (ChatGPT/Cursor/Claude etc) to assist you in writing Airflow code?" != 'No, I don''t use AI to write Airflow code' + AND "CeleryExecutor" IS NOT NULL +GROUP BY executor_type + +UNION ALL + +SELECT + CASE WHEN "KubernetesExecutor" = 'Yes' THEN 'KubernetesExecutor' END, + COUNT(*) +FROM survey +WHERE "Are you using AI/LLM (ChatGPT/Cursor/Claude etc) to assist you in writing Airflow code?" != 'No, I don''t use AI to write Airflow code' + AND "KubernetesExecutor" IS NOT NULL +GROUP BY 1 +-- ... and so on +``` + +**Step 5 — Collect.** `collect_results` assembles the four result sets into a labeled dict. + +**Step 6 — Synthesize.** `LLMOperator` calls the LLM once with all four result sets as context. A representative output: + +> "Among practitioners who actively use AI tools to write Airflow code, the majority (61%) deploy on a managed cloud service or cloud-native setup, with AWS as the primary cloud provider (38%). KubernetesExecutor is the dominant choice (54%), and this group is adopting Airflow 3.x at a notably higher rate than the survey population as a whole (29% vs. 21% overall)." + +**Step 7 — Review.** The `ApprovalOperator` presents the narrative in the Airflow UI. Approve to complete the DAG; reject to fail it and trigger a retry from the synthesis step if desired. + + +## What the DAG Topology Makes Explicit + +The core difference between this pattern and the equivalent agent harness implementation is not the output — it is what is auditable after the run. + +| What's happening | In an agent harness | In this DAG | +|---|---|---| +| Sub-query: executor distribution | LLM internal tool call, no external artifact | Task `generate_sql[0]` — SQL in XCom, full log | +| Sub-query: cloud provider | LLM internal tool call | Task `generate_sql[2]` — SQL in XCom, full log | +| Parallel execution | Concurrent or sequential, implementation-dependent | Explicit mapped instances, each on its own worker | +| cloud_provider query fails | Entire run restarts from the top, or fails | Only `run_query[2]` retries; other three results preserved | +| Synthesis inputs | Accumulated context in the LLM's reasoning loop | `collect_results` XCom entry — the exact dict the LLM received | +| Why did it characterize it that way? | No artifact | `synthesize_answer` XCom: input dict and output string both stored | + +Each `generate_sql[i]` task log contains the prompt the LLM received, the SQL it returned, and the validation result from sqlglot. Each `run_query[i]` log contains the DataFusion execution details and the row count returned. The synthesis step's XCom entry contains the exact dict that was passed as context. + +This is the same information an agent harness has internally — the difference is that Airflow surfaces it as first-class task artifacts, accessible from the Airflow UI without instrumenting or patching the reasoning loop. + + +## Connecting Your LLM + +Both `LLMSQLQueryOperator` and `LLMOperator` use `llm_conn_id="pydanticai_default"`. The same connection table from the introductory post applies: + +| Provider | Connection type | Required fields | +|---|---|---| +| OpenAI | `pydanticai` | Password: API key. Extra: `{"model": "openai:gpt-4o"}` | +| Anthropic | `pydanticai` | Password: API key. Extra: `{"model": "anthropic:claude-haiku-4-5-20251001"}` | +| Google Vertex | `pydanticai-vertex` | Extra: `{"model": "google-vertex:gemini-2.0-flash", "project": "...", "vertexai": true}` | +| AWS Bedrock | `pydanticai-bedrock` | Extra: `{"model": "bedrock:us.anthropic.claude-opus-4-5", "region_name": "us-east-1"}` | + +One connection serves both operators. The synthesis step and the SQL generation steps can use different connections if you want a stronger model for synthesis and a faster one for the SQL generation pass — set `model_id` on the `LLMOperator` to override the connection's default. + + +## The Multi-Agent Pattern Hidden in Plain Sight + +This DAG was not designed around multi-agent frameworks, but it accidentally implements one of the most common separation-of-concerns patterns in that space: the **SQL Architect / Critic / Narrator** triad. + +In agent harness frameworks, these three roles are typically implemented as distinct agent instances that coordinate through an internal routing layer. The underlying rationale is that mixing generation, evaluation, and communication into a single agent produces outputs that are mediocre at all three jobs. Separating them forces each role to reason only about what it is responsible for. + +The survey DAG lands in the same place through a different path: the task boundary enforces the separation. + +**SQL Architect → `generate_sql[0..3]` (`LLMSQLQueryOperator`).** +Each mapped instance receives one natural language sub-question and produces one SQL query. Schema context is passed as a system-level framing, not as part of the user prompt, so the model reasons about structure before generating syntax. The Architect role is strict: produce a valid `SELECT` statement or fail. + +**Critic → two layers.** +The first layer is embedded in `LLMSQLQueryOperator`: sqlglot parses and validates the generated SQL before the task returns. This is a syntax-level Critic — it rejects anything that is not a `SELECT`. The second and fuller layer is the `LLMBranchOperator` pattern from Pattern 2 in this series: an explicit task that evaluates result quality and decides whether the finding is reportable, needs a drill-down, or warrants a pivot to a different hypothesis. That task does what the Critic does [...] + +**Narrator → `synthesize_answer` (`LLMOperator`).** +Receives the labeled result sets from all four dimensions and produces a plain-language characterization. The Narrator's role is bounded by design: it receives structured data rows, not the intermediate SQL or any reasoning artifacts, and its system prompt constrains it to communication — "focus on patterns and proportions rather than raw counts." The role separation is enforced by what is in XCom, not by agent routing logic. + +One genuine structural difference remains. In a multi-agent system, the Critic can loop back to the Architect with feedback — "this query has a NULL handling problem, try again" — and the cycle runs until the output meets a quality bar. Airflow DAGs are acyclic. The Critic either raises an exception and triggers a task-level retry of the Architect instance (automatic but blunt), or routes to an alternative path via `LLMBranchOperator` (explicit and auditable, but the alternative path mus [...] + +That acyclicity is a deliberate tradeoff: it is also what makes the DAG's execution fully auditable and its failure modes predictable. The feedback loop pattern — and the open question of how far it can be supported within a structured workflow model — is part of what Airflow's roadmap is actively working through. + +--- + +## Try It + +The DAG is in the `common.ai` provider example DAGs: +[`example_llm_survey_agentic.py`](https://github.com/apache/airflow/tree/main/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_survey_agentic.py). + +```bash +pip install 'apache-airflow-providers-common-ai' \ + 'apache-airflow-providers-common-sql[datafusion]' +``` + +Requires Apache Airflow 3.0+. + +Set `SURVEY_CSV_PATH` to your local cleaned copy of the survey CSV, create a `pydanticai_default` connection, and trigger `example_llm_survey_agentic`. + +The Airflow UI will show the four parallel `generate_sql` and `run_query` instances fanning out and converging to `collect_results`. That visual is the clearest way to see what distinguishes the agentic pattern from a single-query run. + +Questions, results, and sub-questions that surprised the LLM are welcome on [Airflow Slack](https://s.apache.org/airflow-slack) in `#airflow-ai`.
