kaxil commented on code in PR #65172: URL: https://github.com/apache/airflow/pull/65172#discussion_r3081613952
########## providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_survey_analysis.py: ########## @@ -0,0 +1,423 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +<<<<<<< HEAD Review Comment: This file has unresolved merge conflicts -- there are 7 `<<<<<<< HEAD` / `=======` / `>>>>>>>` blocks throughout. It won't parse as Python until these are resolved. ########## providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_survey_agentic.py: ########## @@ -0,0 +1,265 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Multi-query synthesis — an agentic survey analysis pattern. Review Comment: Nit: em dashes render inconsistently in terminals and diff tools. Consider using `--` instead of the Unicode character here and on line 34. Same applies to the docstrings in the analysis file. ########## providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_survey_agentic.py: ########## @@ -0,0 +1,265 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Multi-query synthesis — an agentic survey analysis pattern. + +Demonstrates how Dynamic Task Mapping turns a multi-dimensional research +question into a fan-out / fan-in pipeline that is observable, retryable, +and auditable at each step. + +**Question:** *"What does a typical Airflow deployment look like for +practitioners who actively use AI tools in their workflow?"* + +This question cannot be answered with a single SQL query. It requires +querying four independent dimensions — executor type, deployment method, +cloud provider, and Airflow version — all filtered to respondents who use +AI tools to write Airflow code. The results are then synthesized by a +second LLM call into a single narrative characterization. + +``example_llm_survey_agentic`` (manual trigger): + +.. code-block:: text + + decompose_question (@task) + → generate_sql (LLMSQLQueryOperator, mapped ×4) + → wrap_query (@task, mapped ×4) + → run_query (AnalyticsOperator, mapped ×4) + → collect_results (@task) + → synthesize_answer (LLMOperator) + → result_confirmation (ApprovalOperator) + +**What this makes visible that an agent harness hides:** + +* Each sub-query is a named, logged task instance — not a hidden tool call. +* If the cloud-provider query fails, only that mapped instance retries; + the other three results are preserved in XCom. +* The synthesis step's inputs are fully auditable XCom values — not an + opaque continuation of an LLM reasoning loop. + +Before running: + +1. Create an LLM connection named ``pydanticai_default`` (or the value of + ``LLM_CONN_ID``) for your chosen model provider. +2. Place the cleaned survey CSV at the path set by ``SURVEY_CSV_PATH``. +""" + +from __future__ import annotations + +import datetime +import json +import os + +from airflow.providers.common.ai.operators.llm import LLMOperator +from airflow.providers.common.ai.operators.llm_sql import LLMSQLQueryOperator +from airflow.providers.common.compat.sdk import dag, task +from airflow.providers.common.sql.config import DataSourceConfig +from airflow.providers.common.sql.operators.analytics import AnalyticsOperator +from airflow.providers.standard.operators.hitl import ApprovalOperator + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + +LLM_CONN_ID = "pydanticai_default" + +SURVEY_CSV_PATH = os.environ.get( + "SURVEY_CSV_PATH", + "/opt/airflow/data/airflow-user-survey-2025.csv", +) +SURVEY_CSV_URI = f"file://{SURVEY_CSV_PATH}" + +# Schema context for LLMSQLQueryOperator. +# All column names must be quoted in SQL because they contain spaces and punctuation. +SURVEY_SCHEMA = """ +Table: survey +Key columns (quote all names in SQL): + "How important is Airflow to your business?" TEXT + "Which version of Airflow do you currently use?" TEXT + "CeleryExecutor" TEXT + "KubernetesExecutor" TEXT + "LocalExecutor" TEXT + "How do you deploy Airflow?" TEXT + "What best describes your current occupation?" TEXT + "What industry do you currently work in?" TEXT + "How many years of experience do you have with Airflow?" TEXT + "Which of the following is your company's primary cloud provider for Airflow?" TEXT + "How many people work at your company?" TEXT + "How many people at your company directly work on data?" TEXT + "How many people at your company use Airflow?" TEXT + "How likely are you to recommend Apache Airflow?" TEXT + "Are you using AI/LLM (ChatGPT/Cursor/Claude etc) to assist you in writing Airflow code?" TEXT +""" + +survey_datasource = DataSourceConfig( + conn_id="", + table_name="survey", + uri=SURVEY_CSV_URI, + format="csv", +) + +# Dimension labels — order must match the sub-questions returned by decompose_question. +DIMENSION_KEYS = ["executor", "deployment", "cloud", "airflow_version"] + +SYNTHESIS_SYSTEM_PROMPT = ( + "You are a data analyst summarizing survey results about Apache Airflow practitioners. " + "Write in plain, concise language suitable for a technical audience. " + "Focus on patterns and proportions rather than raw counts." +) + + +# --------------------------------------------------------------------------- +# DAG: Agentic multi-query synthesis +# --------------------------------------------------------------------------- + + +# [START example_llm_survey_agentic] +@dag(schedule=None) +def example_llm_survey_agentic(): + """ + Fan-out across four survey dimensions, then synthesize into a single narrative. + + Task graph:: + + decompose_question (@task) + → generate_sql (LLMSQLQueryOperator ×4, via Dynamic Task Mapping) + → wrap_query (@task ×4) + → run_query (AnalyticsOperator ×4, via Dynamic Task Mapping) + → collect_results (@task) + → synthesize_answer (LLMOperator) + → result_confirmation (ApprovalOperator) + """ + + # ------------------------------------------------------------------ + # Step 1: Decompose the high-level question into sub-questions, + # one per dimension. Each string becomes one mapped task instance + # in the next step. + # ------------------------------------------------------------------ + @task + def decompose_question() -> list[str]: + return [ + ( + "Among respondents who use AI/LLM tools to write Airflow code, " + "what executor types (CeleryExecutor, KubernetesExecutor, LocalExecutor) " + "are most commonly enabled? Return the count per executor type." + ), + ( + "Among respondents who use AI/LLM tools to write Airflow code, " + "how do they deploy Airflow? Return the count per deployment method." + ), + ( + "Among respondents who use AI/LLM tools to write Airflow code, " + "which cloud providers are most commonly used for Airflow? " + "Return the count per cloud provider." + ), + ( + "Among respondents who use AI/LLM tools to write Airflow code, " + "what version of Airflow are they currently running? " + "Return the count per version." + ), + ] + + sub_questions = decompose_question() + + # ------------------------------------------------------------------ + # Step 2: Generate SQL for each sub-question in parallel. + # LLMSQLQueryOperator is expanded over the sub-question list — + # four mapped instances, each translating one natural-language + # question into validated SQL. + # ------------------------------------------------------------------ + generate_sql = LLMSQLQueryOperator.partial( + task_id="generate_sql", + llm_conn_id=LLM_CONN_ID, + datasource_config=survey_datasource, + schema_context=SURVEY_SCHEMA, + ).expand(prompt=sub_questions) + + # ------------------------------------------------------------------ + # Step 3: Wrap each SQL string into a single-element list. + # AnalyticsOperator expects queries: list[str]; this step bridges + # the scalar output of LLMSQLQueryOperator to that interface. + # ------------------------------------------------------------------ + @task + def wrap_query(sql: str) -> list[str]: + return [sql] + + wrapped_queries = wrap_query.expand(sql=generate_sql.output) + + # ------------------------------------------------------------------ + # Step 4: Execute each SQL against the survey CSV via DataFusion. + # Four mapped instances run in parallel. If one fails, only that + # instance retries — the other three hold their XCom results. + # ------------------------------------------------------------------ + run_query = AnalyticsOperator.partial( + task_id="run_query", + datasource_configs=[survey_datasource], + result_output_format="json", + ).expand(queries=wrapped_queries) + + # ------------------------------------------------------------------ + # Step 5: Collect all four JSON results and label them by dimension. + # trigger_rule="all_success" ensures synthesis only runs when the + # complete picture is available. + # ------------------------------------------------------------------ + @task(trigger_rule="all_success") + def collect_results(results: list[str]) -> dict: + # Airflow preserves index order for mapped task outputs, so zip is safe here: + # results[i] corresponds to the mapped instance at index i, which matches + # the sub-question at DIMENSION_KEYS[i]. + labeled: dict[str, list] = {} + for key, raw in zip(DIMENSION_KEYS, results): Review Comment: `all_success` is the default trigger rule, so this parameter doesn't add anything. Removing it reduces noise and avoids confusing readers into thinking this task has non-standard behavior. ########## providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_survey_analysis.py: ########## @@ -0,0 +1,423 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +<<<<<<< HEAD +Natural language analysis of a survey CSV — interactive and scheduled variants. + +Both DAGs query the `Airflow Community Survey 2025 +<https://airflow.apache.org/survey/>`__ CSV using +:class:`~airflow.providers.common.ai.operators.llm_sql.LLMSQLQueryOperator` +and :class:`~airflow.providers.common.sql.operators.analytics.AnalyticsOperator`. + +``example_llm_survey_interactive`` (five tasks, manual trigger) + Adds human-in-the-loop review at both ends of the pipeline: + + 1. **HITLEntryOperator** — human reviews and optionally edits the question. + 2. **LLMSQLQueryOperator** — translates the confirmed question into SQL. + 3. **AnalyticsOperator** — executes the SQL against the CSV via Apache DataFusion. + 4. A ``@task`` function — extracts the data rows from the JSON payload. + 5. **ApprovalOperator** — human approves or rejects the query result. + +``example_llm_survey_scheduled`` (three tasks, runs on a schedule) + Runs a fixed question end-to-end without human review — suitable for + recurring reporting or dashboards: + + 1. **LLMSQLQueryOperator** — translates the question into SQL. + 2. **AnalyticsOperator** — executes the SQL against the CSV. + 3. A ``@task`` function — extracts the data rows from the JSON payload. + +Before running either DAG: +======= +Interactive natural language analysis of a survey CSV. + +``example_llm_survey_interactive`` queries the `Airflow Community Survey 2025 +<https://airflow.apache.org/survey/>`__ CSV using a five-step pipeline: + + 1. **HITLEntryOperator** — human reviews and optionally edits the question. + 2. **LLMSQLQueryOperator** — translates the confirmed question into SQL. + 3. **AnalyticsOperator** — executes the SQL against the CSV via Apache + DataFusion and returns the results as JSON. + 4. A ``@task`` function — extracts the data rows from the JSON payload. + 5. **ApprovalOperator** — human approves or rejects the query result. + +Before running: +>>>>>>> 6d60bff29b8b7c6916d49033f0956ae6ceff5ab3 + +1. Create an LLM connection named ``pydanticai_default`` (or the value of + ``LLM_CONN_ID`` below) for your chosen model provider. +2. Place the survey CSV at the path set by the ``SURVEY_CSV_PATH`` + environment variable, or update ``SURVEY_CSV_PATH`` below. + A cleaned copy of the 2025 survey CSV (duplicate columns renamed, embedded + newlines removed) is required — Apache DataFusion is strict about these. +""" + +from __future__ import annotations + +import datetime +import json +import os + +<<<<<<< HEAD +from airflow.providers.common.ai.operators.llm_schema_compare import LLMSchemaCompareOperator +======= +>>>>>>> 6d60bff29b8b7c6916d49033f0956ae6ceff5ab3 +from airflow.providers.common.ai.operators.llm_sql import LLMSQLQueryOperator +from airflow.providers.common.compat.sdk import dag, task +from airflow.providers.common.sql.config import DataSourceConfig +from airflow.providers.common.sql.operators.analytics import AnalyticsOperator +from airflow.providers.standard.operators.hitl import ApprovalOperator, HITLEntryOperator +from airflow.sdk import Param + +<<<<<<< HEAD +try: + from airflow.providers.http.operators.http import HttpOperator + + _has_http_provider = True +except ImportError: + _has_http_provider = False + +======= +>>>>>>> 6d60bff29b8b7c6916d49033f0956ae6ceff5ab3 +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + +# LLM provider connection (OpenAI, Anthropic, Vertex AI, etc.) +LLM_CONN_ID = "pydanticai_default" + +<<<<<<< HEAD +# HTTP connection pointing at https://airflow.apache.org (scheduled DAG only). +# Create a connection with host=https://airflow.apache.org, no auth required. +AIRFLOW_WEBSITE_CONN_ID = "airflow_website" + +# Endpoint path for the survey CSV download, relative to the HTTP connection base URL. +SURVEY_CSV_ENDPOINT = "/survey/airflow-user-survey-2025.csv" + +======= +>>>>>>> 6d60bff29b8b7c6916d49033f0956ae6ceff5ab3 +# Path to the survey CSV. Set the SURVEY_CSV_PATH environment variable to +# override — no code change needed when moving between environments. +SURVEY_CSV_PATH = os.environ.get( + "SURVEY_CSV_PATH", + "/opt/airflow/data/airflow-user-survey-2025.csv", +) +SURVEY_CSV_URI = f"file://{SURVEY_CSV_PATH}" + +<<<<<<< HEAD +# Path where the reference schema CSV is written at runtime (scheduled DAG only). +REFERENCE_CSV_PATH = os.environ.get( + "REFERENCE_CSV_PATH", + "/opt/airflow/data/airflow-user-survey-2025-reference.csv", +) +REFERENCE_CSV_URI = f"file://{REFERENCE_CSV_PATH}" + +# SMTP connection for the result notification step (scheduled DAG only). +# Set to None to skip email and log the result instead. +SMTP_CONN_ID = os.environ.get("SMTP_CONN_ID", None) +NOTIFY_EMAIL = os.environ.get("NOTIFY_EMAIL", None) + +# Default question for the interactive DAG — the human can edit it in the first HITL step. +INTERACTIVE_PROMPT = ( + "How does AI tool usage for writing Airflow code compare between Airflow 3 users and Airflow 2 users?" +) + +# Fixed question for the scheduled DAG — runs unattended on every trigger. +SCHEDULED_PROMPT = "What is the breakdown of respondents by Airflow version currently in use?" +======= +# Default question — the human can edit it in the first HITL step. +INTERACTIVE_PROMPT = "Which city had the highest number of respondents?" +>>>>>>> 6d60bff29b8b7c6916d49033f0956ae6ceff5ab3 + +# Schema context for LLMSQLQueryOperator. +# Lists the analytically relevant columns from the 2025 survey CSV (168 total). +# All column names must be quoted in SQL because they contain spaces and +# punctuation. +SURVEY_SCHEMA = """ +Table: survey +Key columns (quote all names in SQL): + "How important is Airflow to your business?" TEXT + "Which version of Airflow do you currently use?" TEXT + "CeleryExecutor" TEXT + "KubernetesExecutor" TEXT + "LocalExecutor" TEXT + "How do you deploy Airflow?" TEXT + "What best describes your current occupation?" TEXT + "What industry do you currently work in?" TEXT + "What city do you currently reside in?" TEXT + "How many years of experience do you have with Airflow?" TEXT + "Which of the following is your company's primary cloud provider for Airflow?" TEXT + "How many people work at your company?" TEXT + "How many people at your company directly work on data?" TEXT + "How many people at your company use Airflow?" TEXT + "How likely are you to recommend Apache Airflow?" TEXT + "Are you using AI/LLM (ChatGPT/Cursor/Claude etc) to assist you in writing Airflow code?" TEXT +""" + +survey_datasource = DataSourceConfig( + conn_id="", + table_name="survey", + uri=SURVEY_CSV_URI, + format="csv", +) + +<<<<<<< HEAD +reference_datasource = DataSourceConfig( + conn_id="", + table_name="survey_reference", + uri=REFERENCE_CSV_URI, + format="csv", +) + +======= +>>>>>>> 6d60bff29b8b7c6916d49033f0956ae6ceff5ab3 + +# --------------------------------------------------------------------------- +# DAG 1: Interactive survey question example +# --------------------------------------------------------------------------- + + +# [START example_llm_survey_interactive] +@dag(schedule=None) Review Comment: Same as the agentic file -- `@dag` without `schedule=None` is sufficient since `None` is the default. ########## providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_survey_agentic.py: ########## @@ -0,0 +1,265 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Multi-query synthesis — an agentic survey analysis pattern. + +Demonstrates how Dynamic Task Mapping turns a multi-dimensional research +question into a fan-out / fan-in pipeline that is observable, retryable, +and auditable at each step. + +**Question:** *"What does a typical Airflow deployment look like for +practitioners who actively use AI tools in their workflow?"* + +This question cannot be answered with a single SQL query. It requires +querying four independent dimensions — executor type, deployment method, +cloud provider, and Airflow version — all filtered to respondents who use +AI tools to write Airflow code. The results are then synthesized by a +second LLM call into a single narrative characterization. + +``example_llm_survey_agentic`` (manual trigger): + +.. code-block:: text + + decompose_question (@task) + → generate_sql (LLMSQLQueryOperator, mapped ×4) + → wrap_query (@task, mapped ×4) + → run_query (AnalyticsOperator, mapped ×4) + → collect_results (@task) + → synthesize_answer (LLMOperator) + → result_confirmation (ApprovalOperator) + +**What this makes visible that an agent harness hides:** + +* Each sub-query is a named, logged task instance — not a hidden tool call. +* If the cloud-provider query fails, only that mapped instance retries; + the other three results are preserved in XCom. +* The synthesis step's inputs are fully auditable XCom values — not an + opaque continuation of an LLM reasoning loop. + +Before running: + +1. Create an LLM connection named ``pydanticai_default`` (or the value of + ``LLM_CONN_ID``) for your chosen model provider. +2. Place the cleaned survey CSV at the path set by ``SURVEY_CSV_PATH``. +""" + +from __future__ import annotations + +import datetime +import json +import os + +from airflow.providers.common.ai.operators.llm import LLMOperator +from airflow.providers.common.ai.operators.llm_sql import LLMSQLQueryOperator +from airflow.providers.common.compat.sdk import dag, task +from airflow.providers.common.sql.config import DataSourceConfig +from airflow.providers.common.sql.operators.analytics import AnalyticsOperator +from airflow.providers.standard.operators.hitl import ApprovalOperator + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + +LLM_CONN_ID = "pydanticai_default" + +SURVEY_CSV_PATH = os.environ.get( + "SURVEY_CSV_PATH", + "/opt/airflow/data/airflow-user-survey-2025.csv", +) +SURVEY_CSV_URI = f"file://{SURVEY_CSV_PATH}" + +# Schema context for LLMSQLQueryOperator. +# All column names must be quoted in SQL because they contain spaces and punctuation. +SURVEY_SCHEMA = """ +Table: survey +Key columns (quote all names in SQL): + "How important is Airflow to your business?" TEXT + "Which version of Airflow do you currently use?" TEXT + "CeleryExecutor" TEXT + "KubernetesExecutor" TEXT + "LocalExecutor" TEXT + "How do you deploy Airflow?" TEXT + "What best describes your current occupation?" TEXT + "What industry do you currently work in?" TEXT + "How many years of experience do you have with Airflow?" TEXT + "Which of the following is your company's primary cloud provider for Airflow?" TEXT + "How many people work at your company?" TEXT + "How many people at your company directly work on data?" TEXT + "How many people at your company use Airflow?" TEXT + "How likely are you to recommend Apache Airflow?" TEXT + "Are you using AI/LLM (ChatGPT/Cursor/Claude etc) to assist you in writing Airflow code?" TEXT +""" + +survey_datasource = DataSourceConfig( + conn_id="", + table_name="survey", + uri=SURVEY_CSV_URI, + format="csv", +) + +# Dimension labels — order must match the sub-questions returned by decompose_question. +DIMENSION_KEYS = ["executor", "deployment", "cloud", "airflow_version"] + +SYNTHESIS_SYSTEM_PROMPT = ( + "You are a data analyst summarizing survey results about Apache Airflow practitioners. " + "Write in plain, concise language suitable for a technical audience. " + "Focus on patterns and proportions rather than raw counts." +) + + +# --------------------------------------------------------------------------- +# DAG: Agentic multi-query synthesis +# --------------------------------------------------------------------------- + + +# [START example_llm_survey_agentic] +@dag(schedule=None) +def example_llm_survey_agentic(): + """ + Fan-out across four survey dimensions, then synthesize into a single narrative. + + Task graph:: + Review Comment: `schedule=None` is the default, so `@dag` alone is equivalent and a bit cleaner. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
