This is an automated email from the ASF dual-hosted git repository.

gopidesu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3bec5d61818 AIP-99: Add LLMFileAnalysisOperator and 
@task.llm_file_analysis (#64077)
3bec5d61818 is described below

commit 3bec5d61818bb49d3d706ef99ca71507c466ae6a
Author: GPK <[email protected]>
AuthorDate: Thu Mar 26 17:21:30 2026 +0000

    AIP-99: Add LLMFileAnalysisOperator and @task.llm_file_analysis (#64077)
    
    * Add LLMFileAnalysisOperator and @task.llm_file_analysis to the common-ai 
provider
    
    # Conflicts:
    #       uv.lock
    
    * Fix mypy issues
    
    * Update utils
    
    * Update return model
    
    * Fix spells
    
    * fix up read
    
    * document prefix lookup operation
---
 docs/spelling_wordlist.txt                         |   2 +
 providers/common/ai/docs/operators/index.rst       |  12 +-
 .../common/ai/docs/operators/llm_file_analysis.rst | 141 +++++
 providers/common/ai/provider.yaml                  |   4 +
 providers/common/ai/pyproject.toml                 |   8 +
 .../common/ai/decorators/llm_file_analysis.py      |  96 +++
 .../ai/example_dags/example_llm_file_analysis.py   | 133 ++++
 .../src/airflow/providers/common/ai/exceptions.py  |  16 +
 .../providers/common/ai/get_provider_info.py       |   6 +
 .../common/ai/operators/llm_file_analysis.py       | 165 +++++
 .../providers/common/ai/utils/file_analysis.py     | 670 +++++++++++++++++++++
 .../unit/common/ai/assets/__init__.py}             |   7 -
 .../unit/common/ai/assets/airflow-3-task-sdk.png   | Bin 0 -> 126152 bytes
 .../common/ai/decorators/test_llm_file_analysis.py | 118 ++++
 .../common/ai/operators/test_llm_file_analysis.py  | 303 ++++++++++
 .../unit/common/ai/utils/test_file_analysis.py     | 525 ++++++++++++++++
 uv.lock                                            |  12 +-
 17 files changed, 2208 insertions(+), 10 deletions(-)

diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index bee00082c44..df7bea5521c 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -256,6 +256,7 @@ cncf
 cnf
 cnt
 codebase
+codec
 codecov
 codepoints
 Colour
@@ -1156,6 +1157,7 @@ passwd
 pathlib
 pathType
 Paxos
+PDFs
 PEM
 Pem
 pem
diff --git a/providers/common/ai/docs/operators/index.rst 
b/providers/common/ai/docs/operators/index.rst
index e961931925a..89ba5d15e6c 100644
--- a/providers/common/ai/docs/operators/index.rst
+++ b/providers/common/ai/docs/operators/index.rst
@@ -21,7 +21,7 @@ Common AI Operators
 Choosing the right operator
 ---------------------------
 
-The common-ai provider ships four operators (and matching ``@task`` 
decorators). Use this table
+The common-ai provider ships five operators (and matching ``@task`` 
decorators). Use this table
 to pick the one that fits your use case:
 
 .. list-table::
@@ -34,6 +34,9 @@ to pick the one that fits your use case:
    * - Single prompt → text or structured output
      - :class:`~airflow.providers.common.ai.operators.llm.LLMOperator`
      - ``@task.llm``
+   * - Analyze files, prefixes, images, or PDFs with one prompt
+     - 
:class:`~airflow.providers.common.ai.operators.llm_file_analysis.LLMFileAnalysisOperator`
+     - ``@task.llm_file_analysis``
    * - LLM picks which downstream task runs
      - 
:class:`~airflow.providers.common.ai.operators.llm_branch.LLMBranchOperator`
      - ``@task.llm_branch``
@@ -46,7 +49,12 @@ to pick the one that fits your use case:
 
 **LLMOperator / @task.llm** — stateless, single-turn calls. Use this for 
classification,
 summarization, extraction, or any prompt that produces one response. Supports 
structured output
-via a ``response_format`` Pydantic model.
+via an ``output_type`` Pydantic model.
+
+**LLMFileAnalysisOperator / @task.llm_file_analysis** — stateless, single-turn 
file analysis.
+Use this when the prompt should reason over file contents or multimodal 
attachments already chosen
+by the DAG author. The operator resolves files via ``ObjectStoragePath`` and 
keeps the interaction
+read-only.
 
 **AgentOperator / @task.agent** — multi-turn tool-calling loop. The model 
decides which tools to
 invoke and when to stop. Use this when the LLM needs to take actions (query 
databases, call APIs,
diff --git a/providers/common/ai/docs/operators/llm_file_analysis.rst 
b/providers/common/ai/docs/operators/llm_file_analysis.rst
new file mode 100644
index 00000000000..17d49593b3c
--- /dev/null
+++ b/providers/common/ai/docs/operators/llm_file_analysis.rst
@@ -0,0 +1,141 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _howto/operator:llm_file_analysis:
+
+``LLMFileAnalysisOperator`` & ``@task.llm_file_analysis``
+=========================================================
+
+Use 
:class:`~airflow.providers.common.ai.operators.llm_file_analysis.LLMFileAnalysisOperator`
+or the ``@task.llm_file_analysis`` decorator to analyze files from object 
storage
+or local storage with a single prompt.
+
+The operator resolves ``file_path`` through
+:class:`~airflow.providers.common.compat.sdk.ObjectStoragePath`, reads 
supported
+formats in a read-only manner, injects file metadata and normalized content 
into
+the prompt, and optionally attaches images or PDFs as multimodal inputs.
+
+.. seealso::
+    :ref:`Connection configuration <howto/connection:pydanticai>`
+
+Basic Usage
+-----------
+
+Analyze a text-like file or prefix with one prompt:
+
+.. exampleinclude:: 
/../../ai/src/airflow/providers/common/ai/example_dags/example_llm_file_analysis.py
+    :language: python
+    :start-after: [START howto_operator_llm_file_analysis_basic]
+    :end-before: [END howto_operator_llm_file_analysis_basic]
+
+Directory / Prefix Analysis
+---------------------------
+
+Use a directory or object-storage prefix when you want the operator to analyze
+multiple files in one request. ``max_files`` bounds how many resolved files are
+included in the request, while the size and text limits keep the request safe:
+
+.. exampleinclude:: 
/../../ai/src/airflow/providers/common/ai/example_dags/example_llm_file_analysis.py
+    :language: python
+    :start-after: [START howto_operator_llm_file_analysis_prefix]
+    :end-before: [END howto_operator_llm_file_analysis_prefix]
+
+.. note::
+
+    Prefix resolution enumerates objects under the supplied path and checks 
each
+    candidate to find files before ``max_files`` is applied. For very large
+    object-store prefixes, prefer a more specific path or a narrower prefix to
+    avoid expensive listing and stat calls.
+
+Multimodal Analysis
+-------------------
+
+Set ``multi_modal=True`` for PNG/JPG/PDF inputs so they are sent as binary
+attachments to a vision-capable model:
+
+.. exampleinclude:: 
/../../ai/src/airflow/providers/common/ai/example_dags/example_llm_file_analysis.py
+    :language: python
+    :start-after: [START howto_operator_llm_file_analysis_multimodal]
+    :end-before: [END howto_operator_llm_file_analysis_multimodal]
+
+Structured Output
+-----------------
+
+Set ``output_type`` to a Pydantic ``BaseModel`` when you want a typed response
+back from the LLM instead of a plain string:
+
+.. exampleinclude:: 
/../../ai/src/airflow/providers/common/ai/example_dags/example_llm_file_analysis.py
+    :language: python
+    :start-after: [START howto_operator_llm_file_analysis_structured]
+    :end-before: [END howto_operator_llm_file_analysis_structured]
+
+TaskFlow Decorator
+------------------
+
+The ``@task.llm_file_analysis`` decorator wraps the operator. The function
+returns the prompt string; file settings are passed to the decorator:
+
+.. exampleinclude:: 
/../../ai/src/airflow/providers/common/ai/example_dags/example_llm_file_analysis.py
+    :language: python
+    :start-after: [START howto_decorator_llm_file_analysis]
+    :end-before: [END howto_decorator_llm_file_analysis]
+
+Parameters
+----------
+
+- ``prompt``: The analysis request to send to the LLM (operator) or the return
+  value of the decorated function (decorator).
+- ``llm_conn_id``: Airflow connection ID for the LLM provider.
+- ``file_path``: File or prefix to analyze.
+- ``file_conn_id``: Optional connection ID for the storage backend. Overrides a
+  connection embedded in ``file_path``.
+- ``multi_modal``: Allow PNG/JPG/PDF inputs as binary attachments. Default 
``False``.
+- ``max_files``: Maximum number of files included from a prefix. Extra files 
are
+  omitted and noted in the prompt. Default ``20``.
+- ``max_file_size_bytes``: Maximum size of any single input file. Default ``5 
MiB``.
+- ``max_total_size_bytes``: Maximum cumulative size across all resolved files.
+  Default ``20 MiB``.
+- ``max_text_chars``: Maximum normalized text context sent to the LLM after
+  sampling and truncation. Default ``100000``.
+- ``sample_rows``: Maximum number of sampled rows or records included for CSV,
+  Parquet, and Avro inputs. This controls structural preview depth, while
+  ``max_file_size_bytes`` and ``max_total_size_bytes`` are byte-level read
+  guards and ``max_text_chars`` is the final prompt-text budget. Default 
``10``.
+- ``model_id``: Model identifier (e.g. ``"openai:gpt-5"``). Overrides the
+  connection's extra field.
+- ``system_prompt``: System-level instructions appended to the operator's
+  built-in read-only guidance.
+- ``output_type``: Expected output type (default: ``str``). Set to a Pydantic
+  ``BaseModel`` for structured output.
+- ``agent_params``: Additional keyword arguments passed to the pydantic-ai
+  ``Agent`` constructor (e.g. ``retries``, ``model_settings``).
+
+Supported Formats
+-----------------
+
+- Text-like: ``.log``, ``.json``, ``.csv``, ``.parquet``, ``.avro``
+- Multimodal: ``.png``, ``.jpg``, ``.jpeg``, ``.pdf`` when ``multi_modal=True``
+- Gzip-compressed text inputs are supported for ``.log.gz``, ``.json.gz``, and
+  ``.csv.gz``.
+- Gzip is not supported for ``.parquet``, ``.avro``, image, or PDF inputs.
+
+Parquet and Avro readers require their corresponding optional extras:
+
+.. code-block:: bash
+
+    pip install apache-airflow-providers-common-ai[parquet]
+    pip install apache-airflow-providers-common-ai[avro]
diff --git a/providers/common/ai/provider.yaml 
b/providers/common/ai/provider.yaml
index 43a98af32a3..e267e2ec800 100644
--- a/providers/common/ai/provider.yaml
+++ b/providers/common/ai/provider.yaml
@@ -35,6 +35,7 @@ integrations:
     how-to-guide:
       - /docs/apache-airflow-providers-common-ai/operators/agent.rst
       - /docs/apache-airflow-providers-common-ai/operators/llm.rst
+      - 
/docs/apache-airflow-providers-common-ai/operators/llm_file_analysis.rst
       - /docs/apache-airflow-providers-common-ai/operators/llm_branch.rst
       - /docs/apache-airflow-providers-common-ai/operators/llm_sql.rst
       - 
/docs/apache-airflow-providers-common-ai/operators/llm_schema_compare.rst
@@ -288,6 +289,7 @@ operators:
     python-modules:
       - airflow.providers.common.ai.operators.agent
       - airflow.providers.common.ai.operators.llm
+      - airflow.providers.common.ai.operators.llm_file_analysis
       - airflow.providers.common.ai.operators.llm_branch
       - airflow.providers.common.ai.operators.llm_sql
       - airflow.providers.common.ai.operators.llm_schema_compare
@@ -297,6 +299,8 @@ task-decorators:
     name: agent
   - class-name: airflow.providers.common.ai.decorators.llm.llm_task
     name: llm
+  - class-name: 
airflow.providers.common.ai.decorators.llm_file_analysis.llm_file_analysis_task
+    name: llm_file_analysis
   - class-name: 
airflow.providers.common.ai.decorators.llm_branch.llm_branch_task
     name: llm_branch
   - class-name: airflow.providers.common.ai.decorators.llm_sql.llm_sql_task
diff --git a/providers/common/ai/pyproject.toml 
b/providers/common/ai/pyproject.toml
index ec23da07b82..fe5ed3d6b69 100644
--- a/providers/common/ai/pyproject.toml
+++ b/providers/common/ai/pyproject.toml
@@ -80,6 +80,14 @@ dependencies = [
 "google" = ["pydantic-ai-slim[google]"]
 "openai" = ["pydantic-ai-slim[openai]"]
 "mcp" = ["pydantic-ai-slim[mcp]"]
+"avro" = [
+    'fastavro>=1.10.0; python_version < "3.14"',
+    'fastavro>=1.12.1; python_version >= "3.14"',
+]
+"parquet" = [
+    "pyarrow>=18.0.0; python_version < '3.14'",
+    "pyarrow>=22.0.0; python_version >= '3.14'",
+]
 "sql" = [
     "apache-airflow-providers-common-sql",
     "sqlglot>=30.0.0",
diff --git 
a/providers/common/ai/src/airflow/providers/common/ai/decorators/llm_file_analysis.py
 
b/providers/common/ai/src/airflow/providers/common/ai/decorators/llm_file_analysis.py
new file mode 100644
index 00000000000..c9451b3fbee
--- /dev/null
+++ 
b/providers/common/ai/src/airflow/providers/common/ai/decorators/llm_file_analysis.py
@@ -0,0 +1,96 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""TaskFlow decorator for LLM-backed file analysis."""
+
+from __future__ import annotations
+
+from collections.abc import Callable, Collection, Mapping, Sequence
+from typing import TYPE_CHECKING, Any, ClassVar
+
+from airflow.providers.common.ai.operators.llm_file_analysis import 
LLMFileAnalysisOperator
+from airflow.providers.common.compat.sdk import (
+    DecoratedOperator,
+    TaskDecorator,
+    context_merge,
+    determine_kwargs,
+    task_decorator_factory,
+)
+from airflow.sdk.definitions._internal.types import SET_DURING_EXECUTION
+
+if TYPE_CHECKING:
+    from airflow.sdk import Context
+
+
+class _LLMFileAnalysisDecoratedOperator(DecoratedOperator, 
LLMFileAnalysisOperator):
+    """Wrap a callable that returns the prompt string for file analysis."""
+
+    template_fields: Sequence[str] = (
+        *DecoratedOperator.template_fields,
+        *LLMFileAnalysisOperator.template_fields,
+    )
+    template_fields_renderers: ClassVar[dict[str, str]] = {
+        **DecoratedOperator.template_fields_renderers,
+    }
+
+    custom_operator_name: str = "@task.llm_file_analysis"
+
+    def __init__(
+        self,
+        *,
+        python_callable: Callable,
+        op_args: Collection[Any] | None = None,
+        op_kwargs: Mapping[str, Any] | None = None,
+        **kwargs: Any,
+    ) -> None:
+        super().__init__(
+            python_callable=python_callable,
+            op_args=op_args,
+            op_kwargs=op_kwargs,
+            prompt=SET_DURING_EXECUTION,
+            **kwargs,
+        )
+
+    def execute(self, context: Context) -> Any:
+        context_merge(context, self.op_kwargs)
+        kwargs = determine_kwargs(self.python_callable, self.op_args, context)
+
+        self.prompt = self.python_callable(*self.op_args, **kwargs)
+        if not isinstance(self.prompt, str) or not self.prompt.strip():
+            raise TypeError(
+                "The returned value from the @task.llm_file_analysis callable 
must be a non-empty string."
+            )
+
+        self.render_template_fields(context)
+        return LLMFileAnalysisOperator.execute(self, context)
+
+
+def llm_file_analysis_task(
+    python_callable: Callable | None = None,
+    **kwargs: Any,
+) -> TaskDecorator:
+    """
+    Wrap a callable that returns a prompt into an LLM-backed file-analysis 
task.
+
+    Any file-analysis keyword arguments accepted by
+    
:class:`~airflow.providers.common.ai.operators.llm_file_analysis.LLMFileAnalysisOperator`,
+    including ``sample_rows``, can be passed through this decorator.
+    """
+    return task_decorator_factory(
+        python_callable=python_callable,
+        decorated_operator_class=_LLMFileAnalysisDecoratedOperator,
+        **kwargs,
+    )
diff --git 
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_file_analysis.py
 
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_file_analysis.py
new file mode 100644
index 00000000000..7fe2bb89a64
--- /dev/null
+++ 
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_file_analysis.py
@@ -0,0 +1,133 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Example DAGs demonstrating LLMFileAnalysisOperator usage."""
+
+from __future__ import annotations
+
+from pydantic import BaseModel
+
+from airflow.providers.common.ai.operators.llm_file_analysis import 
LLMFileAnalysisOperator
+from airflow.providers.common.compat.sdk import dag, task
+
+
+# [START howto_operator_llm_file_analysis_basic]
+@dag
+def example_llm_file_analysis_basic():
+    LLMFileAnalysisOperator(
+        task_id="analyze_error_logs",
+        prompt="Find error patterns and correlate them with deployment 
timestamps.",
+        llm_conn_id="pydanticai_default",
+        file_path="s3://logs/app/2024-01-15/",
+        file_conn_id="aws_default",
+    )
+
+
+# [END howto_operator_llm_file_analysis_basic]
+
+example_llm_file_analysis_basic()
+
+
+# [START howto_operator_llm_file_analysis_prefix]
+@dag
+def example_llm_file_analysis_prefix():
+    LLMFileAnalysisOperator(
+        task_id="summarize_partitioned_logs",
+        prompt=(
+            "Summarize recurring errors across these partitioned log files and 
call out "
+            "which partition keys appear in the highest-severity findings."
+        ),
+        llm_conn_id="pydanticai_default",
+        file_path="s3://logs/app/dt=2024-01-15/",
+        file_conn_id="aws_default",
+        max_files=10,
+        max_total_size_bytes=10 * 1024 * 1024,
+        max_text_chars=20_000,
+    )
+
+
+# [END howto_operator_llm_file_analysis_prefix]
+
+example_llm_file_analysis_prefix()
+
+
+# [START howto_operator_llm_file_analysis_multimodal]
+@dag
+def example_llm_file_analysis_multimodal():
+    LLMFileAnalysisOperator(
+        task_id="validate_dashboards",
+        prompt="Check charts for visual anomalies or stale data indicators.",
+        llm_conn_id="pydanticai_default",
+        file_path="s3://monitoring/dashboards/latest.png",
+        file_conn_id="aws_default",
+        multi_modal=True,
+    )
+
+
+# [END howto_operator_llm_file_analysis_multimodal]
+
+example_llm_file_analysis_multimodal()
+
+
+# [START howto_operator_llm_file_analysis_structured]
+@dag
+def example_llm_file_analysis_structured():
+
+    class FileAnalysisSummary(BaseModel):
+        """Structured output schema for the file-analysis examples."""
+
+        findings: list[str]
+        highest_severity: str
+        truncated_inputs: bool
+
+    LLMFileAnalysisOperator(
+        task_id="analyze_parquet_quality",
+        prompt=(
+            "Return the top data-quality findings from this Parquet dataset. "
+            "Include whether any inputs were truncated."
+        ),
+        llm_conn_id="pydanticai_default",
+        file_path="s3://analytics/warehouse/customers/",
+        file_conn_id="aws_default",
+        output_type=FileAnalysisSummary,
+        sample_rows=5,
+        max_files=5,
+    )
+
+
+# [END howto_operator_llm_file_analysis_structured]
+
+example_llm_file_analysis_structured()
+
+
+# [START howto_decorator_llm_file_analysis]
+@dag
+def example_llm_file_analysis_decorator():
+    @task.llm_file_analysis(
+        llm_conn_id="pydanticai_default",
+        file_path="s3://analytics/reports/quarterly.pdf",
+        file_conn_id="aws_default",
+        multi_modal=True,
+    )
+    def review_quarterly_report():
+        return "Extract the key revenue, risk, and compliance findings from 
this report."
+
+    review_quarterly_report()
+
+
+# [END howto_decorator_llm_file_analysis]
+
+example_llm_file_analysis_decorator()
diff --git a/providers/common/ai/src/airflow/providers/common/ai/exceptions.py 
b/providers/common/ai/src/airflow/providers/common/ai/exceptions.py
index 7c83f30d68a..d0412fb2a48 100644
--- a/providers/common/ai/src/airflow/providers/common/ai/exceptions.py
+++ b/providers/common/ai/src/airflow/providers/common/ai/exceptions.py
@@ -21,3 +21,19 @@ from airflow.providers.common.compat.sdk import 
AirflowException
 
 class HITLMaxIterationsError(AirflowException):
     """Raised when the HITL review loop exhausts max iterations without 
approval or rejection."""
+
+
+class LLMFileAnalysisError(ValueError):
+    """Base class for file-analysis validation errors."""
+
+
+class LLMFileAnalysisUnsupportedFormatError(LLMFileAnalysisError):
+    """Raised when a file format is not supported by LLM file analysis."""
+
+
+class LLMFileAnalysisLimitExceededError(LLMFileAnalysisError):
+    """Raised when file-analysis safety limits are exceeded."""
+
+
+class 
LLMFileAnalysisMultimodalRequiredError(LLMFileAnalysisUnsupportedFormatError):
+    """Raised when image/PDF inputs are used without ``multi_modal=True``."""
diff --git 
a/providers/common/ai/src/airflow/providers/common/ai/get_provider_info.py 
b/providers/common/ai/src/airflow/providers/common/ai/get_provider_info.py
index 90ae393d64d..9a216856fb0 100644
--- a/providers/common/ai/src/airflow/providers/common/ai/get_provider_info.py
+++ b/providers/common/ai/src/airflow/providers/common/ai/get_provider_info.py
@@ -33,6 +33,7 @@ def get_provider_info():
                 "how-to-guide": [
                     
"/docs/apache-airflow-providers-common-ai/operators/agent.rst",
                     
"/docs/apache-airflow-providers-common-ai/operators/llm.rst",
+                    
"/docs/apache-airflow-providers-common-ai/operators/llm_file_analysis.rst",
                     
"/docs/apache-airflow-providers-common-ai/operators/llm_branch.rst",
                     
"/docs/apache-airflow-providers-common-ai/operators/llm_sql.rst",
                     
"/docs/apache-airflow-providers-common-ai/operators/llm_schema_compare.rst",
@@ -241,6 +242,7 @@ def get_provider_info():
                 "python-modules": [
                     "airflow.providers.common.ai.operators.agent",
                     "airflow.providers.common.ai.operators.llm",
+                    "airflow.providers.common.ai.operators.llm_file_analysis",
                     "airflow.providers.common.ai.operators.llm_branch",
                     "airflow.providers.common.ai.operators.llm_sql",
                     "airflow.providers.common.ai.operators.llm_schema_compare",
@@ -250,6 +252,10 @@ def get_provider_info():
         "task-decorators": [
             {"class-name": 
"airflow.providers.common.ai.decorators.agent.agent_task", "name": "agent"},
             {"class-name": 
"airflow.providers.common.ai.decorators.llm.llm_task", "name": "llm"},
+            {
+                "class-name": 
"airflow.providers.common.ai.decorators.llm_file_analysis.llm_file_analysis_task",
+                "name": "llm_file_analysis",
+            },
             {
                 "class-name": 
"airflow.providers.common.ai.decorators.llm_branch.llm_branch_task",
                 "name": "llm_branch",
diff --git 
a/providers/common/ai/src/airflow/providers/common/ai/operators/llm_file_analysis.py
 
b/providers/common/ai/src/airflow/providers/common/ai/operators/llm_file_analysis.py
new file mode 100644
index 00000000000..bb10b66680d
--- /dev/null
+++ 
b/providers/common/ai/src/airflow/providers/common/ai/operators/llm_file_analysis.py
@@ -0,0 +1,165 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Operator for analyzing files with LLMs."""
+
+from __future__ import annotations
+
+from collections.abc import Sequence
+from typing import TYPE_CHECKING, Any
+
+from pydantic import BaseModel
+
+from airflow.providers.common.ai.operators.llm import LLMOperator
+from airflow.providers.common.ai.utils.file_analysis import 
build_file_analysis_request
+from airflow.providers.common.ai.utils.logging import log_run_summary
+
+if TYPE_CHECKING:
+    from pydantic_ai import Agent
+
+    from airflow.sdk import Context
+
+
+class LLMFileAnalysisOperator(LLMOperator):
+    """
+    Analyze files from object storage or local storage using a single LLM call.
+
+    The operator resolves ``file_path`` via
+    :class:`~airflow.providers.common.compat.sdk.ObjectStoragePath`, normalizes
+    supported formats into text context, and optionally attaches images/PDFs as
+    multimodal inputs when ``multi_modal=True``.
+
+    :param prompt: The analysis prompt for the LLM.
+    :param llm_conn_id: Connection ID for the LLM provider.
+    :param file_path: File or prefix to analyze.
+    :param file_conn_id: Optional connection ID for the storage backend.
+        Overrides a connection embedded in ``file_path``.
+    :param multi_modal: Allow PNG/JPG/PDF inputs as binary attachments.
+        Default ``False``.
+    :param max_files: Maximum number of files to include from a prefix.
+        Excess files are omitted and noted in the prompt. Default ``20``.
+    :param max_file_size_bytes: Maximum size of any single input file.
+        Default ``5 MiB``.
+    :param max_total_size_bytes: Maximum cumulative size across all resolved
+        files. Default ``20 MiB``.
+    :param max_text_chars: Maximum normalized text context passed to the LLM
+        after sampling/truncation. Default ``100000``.
+    :param sample_rows: Maximum number of sampled rows or records included for
+        CSV, Parquet, and Avro inputs. This limits structural preview depth,
+        while ``max_file_size_bytes`` and ``max_total_size_bytes`` limit bytes
+        read from storage and ``max_text_chars`` limits the final prompt text
+        budget. Default ``10``.
+    """
+
+    template_fields: Sequence[str] = (
+        *LLMOperator.template_fields,
+        "file_path",
+        "file_conn_id",
+    )
+
+    def __init__(
+        self,
+        *,
+        file_path: str,
+        file_conn_id: str | None = None,
+        multi_modal: bool = False,
+        max_files: int = 20,
+        max_file_size_bytes: int = 5 * 1024 * 1024,
+        max_total_size_bytes: int = 20 * 1024 * 1024,
+        max_text_chars: int = 100_000,
+        sample_rows: int = 10,
+        **kwargs: Any,
+    ) -> None:
+        super().__init__(**kwargs)
+        if max_files <= 0:
+            raise ValueError("max_files must be greater than zero.")
+        if max_file_size_bytes <= 0:
+            raise ValueError("max_file_size_bytes must be greater than zero.")
+        if max_total_size_bytes <= 0:
+            raise ValueError("max_total_size_bytes must be greater than zero.")
+        if max_text_chars <= 0:
+            raise ValueError("max_text_chars must be greater than zero.")
+        if sample_rows <= 0:
+            raise ValueError("sample_rows must be greater than zero.")
+
+        self.file_path = file_path
+        self.file_conn_id = file_conn_id
+        self.multi_modal = multi_modal
+        self.max_files = max_files
+        self.max_file_size_bytes = max_file_size_bytes
+        self.max_total_size_bytes = max_total_size_bytes
+        self.max_text_chars = max_text_chars
+        self.sample_rows = sample_rows
+
+    def execute(self, context: Context) -> Any:
+        request = build_file_analysis_request(
+            file_path=self.file_path,
+            file_conn_id=self.file_conn_id,
+            prompt=self.prompt,
+            multi_modal=self.multi_modal,
+            max_files=self.max_files,
+            max_file_size_bytes=self.max_file_size_bytes,
+            max_total_size_bytes=self.max_total_size_bytes,
+            max_text_chars=self.max_text_chars,
+            sample_rows=self.sample_rows,
+        )
+        self.log.info(
+            "Calling model for file analysis: files=%s, attachments=%s, 
text_files=%s, total_size_bytes=%s, "
+            "omitted_files=%s, text_truncated=%s, multi_modal=%s, 
sample_rows=%s",
+            len(request.resolved_paths),
+            request.attachment_count,
+            request.text_file_count,
+            request.total_size_bytes,
+            request.omitted_files,
+            request.text_truncated,
+            self.multi_modal,
+            self.sample_rows,
+        )
+        self.log.debug("Resolved file analysis paths: %s", 
request.resolved_paths)
+        agent: Agent[None, Any] = self.llm_hook.create_agent(
+            output_type=self.output_type,
+            instructions=self._build_system_prompt(),
+            **self.agent_params,
+        )
+        result = agent.run_sync(request.user_content)
+        log_run_summary(self.log, result)
+        output = result.output
+
+        if self.require_approval:
+            self.defer_for_approval(context, output)  # type: ignore[misc]
+
+        if isinstance(output, BaseModel):
+            output = output.model_dump()
+
+        return output
+
+    def execute_complete(self, context: Context, generated_output: str, event: 
dict[str, Any]) -> Any:
+        """Resume after human review, restoring structured outputs for XCom 
consumers."""
+        output = super().execute_complete(context, generated_output, event)
+        if isinstance(self.output_type, type) and issubclass(self.output_type, 
BaseModel):
+            return self.output_type.model_validate_json(output).model_dump()
+        return output
+
+    def _build_system_prompt(self) -> str:
+        prompt = (
+            "You are a read-only file analysis assistant.\n"
+            "Use only the provided metadata, normalized file content, and 
multimodal attachments.\n"
+            "Do not claim to have modified files or executed any external 
actions.\n"
+            "If file content is truncated or sampled, say so in your answer."
+        )
+        if self.system_prompt:
+            prompt += f"\n\nAdditional instructions:\n{self.system_prompt}"
+        return prompt
diff --git 
a/providers/common/ai/src/airflow/providers/common/ai/utils/file_analysis.py 
b/providers/common/ai/src/airflow/providers/common/ai/utils/file_analysis.py
new file mode 100644
index 00000000000..f962119616e
--- /dev/null
+++ b/providers/common/ai/src/airflow/providers/common/ai/utils/file_analysis.py
@@ -0,0 +1,670 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Helpers for building file-analysis prompts for LLM operators."""
+
+from __future__ import annotations
+
+import csv
+import gzip
+import io
+import json
+import logging
+from bisect import insort
+from dataclasses import dataclass
+from pathlib import PurePosixPath
+from typing import TYPE_CHECKING, Any
+
+from pydantic_ai.messages import BinaryContent
+
+from airflow.providers.common.ai.exceptions import (
+    LLMFileAnalysisLimitExceededError,
+    LLMFileAnalysisMultimodalRequiredError,
+    LLMFileAnalysisUnsupportedFormatError,
+)
+from airflow.providers.common.compat.sdk import 
AirflowOptionalProviderFeatureException, ObjectStoragePath
+
+if TYPE_CHECKING:
+    from collections.abc import Sequence
+
+    from pydantic_ai.messages import UserContent
+
+SUPPORTED_FILE_FORMATS: tuple[str, ...] = (
+    "avro",
+    "csv",
+    "jpeg",
+    "jpg",
+    "json",
+    "log",
+    "parquet",
+    "pdf",
+    "png",
+)
+
+_TEXT_LIKE_FORMATS = frozenset({"csv", "json", "log", "avro", "parquet"})
+_MULTI_MODAL_FORMATS = frozenset({"jpeg", "jpg", "pdf", "png"})
+_COMPRESSION_SUFFIXES = {
+    "bz2": "bzip2",
+    "gz": "gzip",
+    "snappy": "snappy",
+    "xz": "xz",
+    "zst": "zstd",
+}
+_GZIP_SUPPORTED_FORMATS = frozenset({"csv", "json", "log"})
+_TEXT_SAMPLE_HEAD_CHARS = 8_000
+_TEXT_SAMPLE_TAIL_CHARS = 2_000
+_MEDIA_TYPES = {
+    "jpeg": "image/jpeg",
+    "jpg": "image/jpeg",
+    "pdf": "application/pdf",
+    "png": "image/png",
+}
+log = logging.getLogger(__name__)
+
+
+@dataclass
+class FileAnalysisRequest:
+    """Prepared prompt content and discovery metadata for the file-analysis 
operator."""
+
+    user_content: str | Sequence[UserContent]
+    resolved_paths: list[str]
+    total_size_bytes: int
+    omitted_files: int = 0
+    text_truncated: bool = False
+    attachment_count: int = 0
+    text_file_count: int = 0
+
+
+@dataclass
+class _PreparedFile:
+    path: ObjectStoragePath
+    file_format: str
+    size_bytes: int
+    compression: str | None
+    partitions: tuple[str, ...]
+    estimated_rows: int | None = None
+    text_content: str | None = None
+    attachment: BinaryContent | None = None
+    content_size_bytes: int = 0
+    content_truncated: bool = False
+    content_omitted: bool = False
+
+
+@dataclass
+class _DiscoveredFile:
+    path: ObjectStoragePath
+    file_format: str
+    size_bytes: int
+    compression: str | None
+
+
+@dataclass
+class _RenderResult:
+    text: str
+    estimated_rows: int | None
+    content_size_bytes: int
+
+
+def build_file_analysis_request(
+    *,
+    file_path: str,
+    file_conn_id: str | None,
+    prompt: str,
+    multi_modal: bool,
+    max_files: int,
+    max_file_size_bytes: int,
+    max_total_size_bytes: int,
+    max_text_chars: int,
+    sample_rows: int,
+) -> FileAnalysisRequest:
+    """Resolve files, normalize supported formats, and build prompt content 
for an LLM run."""
+    if sample_rows <= 0:
+        raise ValueError("sample_rows must be greater than zero.")
+    log.info(
+        "Preparing file analysis request for path=%s, file_conn_id=%s, 
multi_modal=%s, "
+        "max_files=%s, max_file_size_bytes=%s, max_total_size_bytes=%s, 
max_text_chars=%s, sample_rows=%s",
+        file_path,
+        file_conn_id,
+        multi_modal,
+        max_files,
+        max_file_size_bytes,
+        max_total_size_bytes,
+        max_text_chars,
+        sample_rows,
+    )
+    root = ObjectStoragePath(file_path, conn_id=file_conn_id)
+    resolved_paths, omitted_files = _resolve_paths(root=root, 
max_files=max_files)
+    log.info(
+        "Resolved %s file(s) from %s%s",
+        len(resolved_paths),
+        file_path,
+        f"; omitted {omitted_files} additional file(s) due to max_files limit" 
if omitted_files else "",
+    )
+    if log.isEnabledFor(logging.DEBUG):
+        log.debug("Resolved file paths: %s", [str(path) for path in 
resolved_paths])
+
+    discovered_files: list[_DiscoveredFile] = []
+    total_size_bytes = 0
+    for path in resolved_paths:
+        discovered = _discover_file(
+            path=path,
+            max_file_size_bytes=max_file_size_bytes,
+        )
+        total_size_bytes += discovered.size_bytes
+        if total_size_bytes > max_total_size_bytes:
+            log.info(
+                "Rejecting file set before content reads because cumulative 
size reached %s bytes (limit=%s bytes).",
+                total_size_bytes,
+                max_total_size_bytes,
+            )
+            raise LLMFileAnalysisLimitExceededError(
+                "Total input size exceeds the configured limit: "
+                f"{total_size_bytes} bytes > {max_total_size_bytes} bytes."
+            )
+        discovered_files.append(discovered)
+
+    log.info(
+        "Validated byte limits for %s file(s) before reading file contents; 
total_size_bytes=%s.",
+        len(discovered_files),
+        total_size_bytes,
+    )
+
+    prepared_files: list[_PreparedFile] = []
+    processed_size_bytes = 0
+    for discovered in discovered_files:
+        remaining_content_bytes = max_total_size_bytes - processed_size_bytes
+        if remaining_content_bytes <= 0:
+            raise LLMFileAnalysisLimitExceededError(
+                "Total processed input size exceeds the configured limit after 
decompression."
+            )
+        prepared = _prepare_file(
+            discovered_file=discovered,
+            multi_modal=multi_modal,
+            sample_rows=sample_rows,
+            max_content_bytes=min(max_file_size_bytes, 
remaining_content_bytes),
+        )
+        processed_size_bytes += prepared.content_size_bytes
+        prepared_files.append(prepared)
+
+    text_truncated = _apply_text_budget(prepared_files=prepared_files, 
max_text_chars=max_text_chars)
+    if text_truncated:
+        log.info("Normalized text content exceeded max_text_chars=%s and was 
truncated.", max_text_chars)
+    text_preamble = _build_text_preamble(
+        prompt=prompt,
+        prepared_files=prepared_files,
+        omitted_files=omitted_files,
+        text_truncated=text_truncated,
+    )
+    attachments = [prepared.attachment for prepared in prepared_files if 
prepared.attachment is not None]
+    text_file_count = sum(1 for prepared in prepared_files if 
prepared.text_content is not None)
+    user_content: str | list[UserContent]
+    if attachments:
+        user_content = [text_preamble, *attachments]
+    else:
+        user_content = text_preamble
+    log.info(
+        "Prepared file analysis request with %s text file(s), %s 
attachment(s), total_size_bytes=%s.",
+        text_file_count,
+        len(attachments),
+        total_size_bytes,
+    )
+    if log.isEnabledFor(logging.DEBUG):
+        log.debug("Prepared text preamble length=%s", len(text_preamble))
+    return FileAnalysisRequest(
+        user_content=user_content,
+        resolved_paths=[str(path) for path in resolved_paths],
+        total_size_bytes=total_size_bytes,
+        omitted_files=omitted_files,
+        text_truncated=text_truncated,
+        attachment_count=len(attachments),
+        text_file_count=text_file_count,
+    )
+
+
+def _resolve_paths(*, root: ObjectStoragePath, max_files: int) -> 
tuple[list[ObjectStoragePath], int]:
+    try:
+        if root.is_file():
+            return [root], 0
+    except FileNotFoundError:
+        pass
+
+    try:
+        selected: list[tuple[str, ObjectStoragePath]] = []
+        omitted_files = 0
+        for path in root.rglob("*"):
+            if not path.is_file():
+                continue
+            path_key = str(path)
+            if len(selected) < max_files:
+                insort(selected, (path_key, path))
+                continue
+            if path_key < selected[-1][0]:
+                insort(selected, (path_key, path))
+                selected.pop()
+            omitted_files += 1
+    except (FileNotFoundError, NotADirectoryError):
+        selected = []
+        omitted_files = 0
+
+    if not selected:
+        raise FileNotFoundError(f"No files found for {root}.")
+
+    return [path for _, path in selected], omitted_files
+
+
+def _discover_file(*, path: ObjectStoragePath, max_file_size_bytes: int) -> 
_DiscoveredFile:
+    file_format, compression = detect_file_format(path)
+    size_bytes = path.stat().st_size
+    log.debug(
+        "Discovered file %s (format=%s, size_bytes=%s%s).",
+        path,
+        file_format,
+        size_bytes,
+        f", compression={compression}" if compression else "",
+    )
+    if size_bytes > max_file_size_bytes:
+        log.info(
+            "Rejecting file %s because size_bytes=%s exceeds the per-file 
limit=%s.",
+            path,
+            size_bytes,
+            max_file_size_bytes,
+        )
+        raise LLMFileAnalysisLimitExceededError(
+            f"File {path} exceeds the configured per-file limit: {size_bytes} 
bytes > {max_file_size_bytes} bytes."
+        )
+    return _DiscoveredFile(
+        path=path,
+        file_format=file_format,
+        size_bytes=size_bytes,
+        compression=compression,
+    )
+
+
+def _prepare_file(
+    *,
+    discovered_file: _DiscoveredFile,
+    multi_modal: bool,
+    sample_rows: int,
+    max_content_bytes: int,
+) -> _PreparedFile:
+    path = discovered_file.path
+    file_format = discovered_file.file_format
+    size_bytes = discovered_file.size_bytes
+    compression = discovered_file.compression
+    log.debug(
+        "Preparing file content for %s (format=%s, size_bytes=%s%s).",
+        path,
+        file_format,
+        size_bytes,
+        f", compression={compression}" if compression else "",
+    )
+    prepared = _PreparedFile(
+        path=path,
+        file_format=file_format,
+        size_bytes=size_bytes,
+        compression=compression,
+        partitions=_infer_partitions(path),
+    )
+
+    if file_format in _MULTI_MODAL_FORMATS:
+        if not multi_modal:
+            log.info("Rejecting file %s because format=%s requires 
multi_modal=True.", path, file_format)
+            raise LLMFileAnalysisMultimodalRequiredError(
+                f"File {path} has format {file_format!r}; set multi_modal=True 
to analyze images or PDFs."
+            )
+        prepared.attachment = BinaryContent(
+            data=_read_raw_bytes(path, compression=compression, 
max_bytes=max_content_bytes),
+            media_type=_MEDIA_TYPES[file_format],
+            identifier=str(path),
+        )
+        prepared.content_size_bytes = len(prepared.attachment.data)
+        log.debug(
+            "Attached %s as multimodal binary content with media_type=%s.", 
path, _MEDIA_TYPES[file_format]
+        )
+        return prepared
+
+    render_result = _render_text_content(
+        path=path,
+        file_format=file_format,
+        compression=compression,
+        sample_rows=sample_rows,
+        max_content_bytes=max_content_bytes,
+    )
+    prepared.text_content = render_result.text
+    prepared.estimated_rows = render_result.estimated_rows
+    prepared.content_size_bytes = render_result.content_size_bytes
+    log.debug(
+        "Normalized %s into text content of %s characters%s.",
+        path,
+        len(render_result.text),
+        f"; estimated_rows={render_result.estimated_rows}"
+        if render_result.estimated_rows is not None
+        else "",
+    )
+    return prepared
+
+
+def detect_file_format(path: ObjectStoragePath) -> tuple[str, str | None]:
+    """Detect the logical file format and compression codec from a path 
suffix."""
+    suffixes = [suffix.removeprefix(".").lower() for suffix in path.suffixes]
+    compression: str | None = None
+    if suffixes and suffixes[-1] in _COMPRESSION_SUFFIXES:
+        compression = _COMPRESSION_SUFFIXES[suffixes[-1]]
+        suffixes = suffixes[:-1]
+    detected = suffixes[-1] if suffixes else "log"
+    if detected not in SUPPORTED_FILE_FORMATS:
+        raise LLMFileAnalysisUnsupportedFormatError(
+            f"Unsupported file format {detected!r} for {path}. Supported 
formats: {', '.join(SUPPORTED_FILE_FORMATS)}."
+        )
+    if compression and compression != "gzip":
+        log.info("Rejecting file %s because compression=%s is not supported.", 
path, compression)
+        raise LLMFileAnalysisUnsupportedFormatError(
+            f"Compression {compression!r} is not supported for file analysis."
+        )
+    if compression == "gzip" and detected not in _GZIP_SUPPORTED_FORMATS:
+        raise LLMFileAnalysisUnsupportedFormatError(
+            f"Compression {compression!r} is not supported for {detected!r} 
file analysis."
+        )
+    return detected, compression
+
+
+def _render_text_content(
+    *,
+    path: ObjectStoragePath,
+    file_format: str,
+    compression: str | None,
+    sample_rows: int,
+    max_content_bytes: int,
+) -> _RenderResult:
+    if file_format == "json":
+        return _render_json(path, compression=compression, 
max_content_bytes=max_content_bytes)
+    if file_format == "csv":
+        return _render_csv(
+            path, compression=compression, sample_rows=sample_rows, 
max_content_bytes=max_content_bytes
+        )
+    if file_format == "parquet":
+        return _render_parquet(path, sample_rows=sample_rows, 
max_content_bytes=max_content_bytes)
+    if file_format == "avro":
+        return _render_avro(path, sample_rows=sample_rows, 
max_content_bytes=max_content_bytes)
+    return _render_text_like(path, compression=compression, 
max_content_bytes=max_content_bytes)
+
+
+def _render_text_like(
+    path: ObjectStoragePath, *, compression: str | None, max_content_bytes: int
+) -> _RenderResult:
+    raw_bytes = _read_raw_bytes(path, compression=compression, 
max_bytes=max_content_bytes)
+    text = _decode_text(raw_bytes)
+    return _RenderResult(text=_truncate_text(text), estimated_rows=None, 
content_size_bytes=len(raw_bytes))
+
+
+def _render_json(
+    path: ObjectStoragePath, *, compression: str | None, max_content_bytes: int
+) -> _RenderResult:
+    raw_bytes = _read_raw_bytes(path, compression=compression, 
max_bytes=max_content_bytes)
+    decoded = _decode_text(raw_bytes)
+    document = json.loads(decoded)
+    if isinstance(document, list):
+        estimated_rows = len(document)
+    else:
+        estimated_rows = None
+    pretty = json.dumps(document, indent=2, sort_keys=True, default=str)
+    return _RenderResult(
+        text=_truncate_text(pretty),
+        estimated_rows=estimated_rows,
+        content_size_bytes=len(raw_bytes),
+    )
+
+
+def _render_csv(
+    path: ObjectStoragePath, *, compression: str | None, sample_rows: int, 
max_content_bytes: int
+) -> _RenderResult:
+    raw_bytes = _read_raw_bytes(path, compression=compression, 
max_bytes=max_content_bytes)
+    decoded = _decode_text(raw_bytes)
+    reader = list(csv.reader(io.StringIO(decoded)))
+    if not reader:
+        return _RenderResult(text="", estimated_rows=0, 
content_size_bytes=len(raw_bytes))
+    header, rows = reader[0], reader[1:]
+    sampled_rows = rows[:sample_rows]
+    payload = ["Header: " + ", ".join(header)]
+    if sampled_rows:
+        payload.append("Sample rows:")
+        payload += [", ".join(str(value) for value in row) for row in 
sampled_rows]
+    return _RenderResult(
+        text=_truncate_text("\n".join(payload)),
+        estimated_rows=len(rows),
+        content_size_bytes=len(raw_bytes),
+    )
+
+
+def _render_parquet(path: ObjectStoragePath, *, sample_rows: int, 
max_content_bytes: int) -> _RenderResult:
+    try:
+        import pyarrow.parquet as pq
+    except ImportError as exc:
+        raise AirflowOptionalProviderFeatureException(
+            "Parquet analysis requires the `parquet` extra for 
apache-airflow-providers-common-ai."
+        ) from exc
+
+    with path.open("rb") as handle:
+        parquet_file = pq.ParquetFile(handle)
+        metadata = parquet_file.metadata
+        num_rows = metadata.num_rows if metadata is not None else 0
+
+        handle.seek(0, io.SEEK_END)
+        content_size_bytes = handle.tell()
+        handle.seek(0)
+        if content_size_bytes > max_content_bytes:
+            raise LLMFileAnalysisLimitExceededError(
+                f"File {path} exceeds the configured processed-content limit: 
{content_size_bytes} bytes > {max_content_bytes} bytes."
+            )
+
+        schema = ", ".join(f"{field.name}: {field.type}" for field in 
parquet_file.schema_arrow)
+        sampled_rows: list[dict[str, Any]] = []
+        if sample_rows > 0 and num_rows > 0 and parquet_file.num_row_groups > 
0:
+            remaining_rows = sample_rows
+            for row_group_index in range(parquet_file.num_row_groups):
+                if remaining_rows <= 0:
+                    break
+                row_group = parquet_file.read_row_group(row_group_index)
+                if row_group.num_rows == 0:
+                    continue
+                group_rows = row_group.slice(0, remaining_rows).to_pylist()
+                sampled_rows.extend(group_rows)
+                remaining_rows -= len(group_rows)
+    payload = [f"Schema: {schema}", "Sample rows:", json.dumps(sampled_rows, 
indent=2, default=str)]
+    return _RenderResult(
+        text=_truncate_text("\n".join(payload)),
+        estimated_rows=num_rows,
+        content_size_bytes=content_size_bytes,
+    )
+
+
+def _render_avro(path: ObjectStoragePath, *, sample_rows: int, 
max_content_bytes: int) -> _RenderResult:
+    try:
+        import fastavro
+    except ImportError as exc:
+        raise AirflowOptionalProviderFeatureException(
+            "Avro analysis requires the `avro` extra for 
apache-airflow-providers-common-ai."
+        ) from exc
+
+    sampled_rows: list[dict[str, Any]] = []
+    total_rows = 0
+    with path.open("rb") as handle:
+        handle.seek(0, io.SEEK_END)
+        content_size_bytes = handle.tell()
+        handle.seek(0)
+        if content_size_bytes > max_content_bytes:
+            raise LLMFileAnalysisLimitExceededError(
+                f"File {path} exceeds the configured processed-content limit: 
{content_size_bytes} bytes > {max_content_bytes} bytes."
+            )
+        reader = fastavro.reader(handle)
+        writer_schema = getattr(reader, "writer_schema", None)
+        fully_read = False
+        if sample_rows > 0:
+            for record in reader:
+                total_rows += 1
+                if isinstance(record, dict):
+                    sampled_rows.append({str(key): value for key, value in 
record.items()})
+                if total_rows >= sample_rows:
+                    break
+            else:
+                fully_read = True
+    payload = [
+        f"Schema: {json.dumps(writer_schema, indent=2, default=str)}",
+        "Sample rows:",
+        json.dumps(sampled_rows, indent=2, default=str),
+    ]
+    return _RenderResult(
+        text=_truncate_text("\n".join(payload)),
+        estimated_rows=total_rows if fully_read else None,
+        content_size_bytes=content_size_bytes,
+    )
+
+
+def _read_raw_bytes(path: ObjectStoragePath, *, compression: str | None, 
max_bytes: int) -> bytes:
+    with path.open("rb") as handle:
+        if compression == "gzip":
+            with gzip.GzipFile(fileobj=handle) as gzip_handle:
+                return _read_limited_bytes(gzip_handle, path=path, 
max_bytes=max_bytes)
+        return _read_limited_bytes(handle, path=path, max_bytes=max_bytes)
+
+
+def _read_limited_bytes(handle: io.BufferedIOBase, *, path: ObjectStoragePath, 
max_bytes: int) -> bytes:
+    chunks: list[bytes] = []
+    total_bytes = 0
+    while True:
+        chunk = handle.read(min(64 * 1024, max_bytes - total_bytes + 1))
+        if not chunk:
+            break
+        total_bytes += len(chunk)
+        if total_bytes > max_bytes:
+            raise LLMFileAnalysisLimitExceededError(
+                f"File {path} exceeds the configured processed-content limit: 
> {max_bytes} bytes."
+            )
+        chunks.append(chunk)
+    return b"".join(chunks)
+
+
+def _decode_text(data: bytes) -> str:
+    return data.decode("utf-8", errors="replace")
+
+
+def _apply_text_budget(*, prepared_files: list[_PreparedFile], max_text_chars: 
int) -> bool:
+    remaining = max_text_chars
+    truncated_any = False
+    for prepared in prepared_files:
+        if prepared.text_content is None:
+            continue
+        if remaining <= 0:
+            prepared.text_content = None
+            prepared.content_omitted = True
+            truncated_any = True
+            log.debug(
+                "Omitted normalized text for %s because the prompt text budget 
was exhausted.", prepared.path
+            )
+            continue
+        original = prepared.text_content
+        if len(original) > remaining:
+            prepared.text_content = _truncate_text(original, 
max_chars=remaining)
+            prepared.content_truncated = True
+            truncated_any = True
+            log.debug(
+                "Truncated normalized text for %s from %s to %s characters to 
fit the remaining budget.",
+                prepared.path,
+                len(original),
+                len(prepared.text_content),
+            )
+        remaining -= len(prepared.text_content)
+    return truncated_any
+
+
+def _build_text_preamble(
+    *,
+    prompt: str,
+    prepared_files: list[_PreparedFile],
+    omitted_files: int,
+    text_truncated: bool,
+) -> str:
+    lines = [
+        "User request:",
+        prompt,
+        "",
+        "Resolved files:",
+    ]
+    text_sections: list[str] = []
+    has_attachments = False
+    for prepared in prepared_files:
+        lines.append(f"- {_format_file_metadata(prepared)}")
+        if prepared.text_content is not None:
+            text_sections.append(f"### File: 
{prepared.path}\n{prepared.text_content}")
+        if prepared.attachment is not None:
+            has_attachments = True
+    if omitted_files:
+        lines.append(f"- omitted_files={omitted_files} (max_files limit 
reached)")
+    if text_truncated:
+        lines.append("- text_context_truncated=True")
+
+    if text_sections:
+        lines.extend(["", "Normalized content:", *text_sections])
+    if has_attachments:
+        lines.extend(
+            [
+                "",
+                "Attached multimodal files follow this text block.",
+                "Use the matching file metadata above when referring to those 
attachments.",
+            ]
+        )
+    return "\n".join(lines)
+
+
+def _truncate_text(text: str, *, max_chars: int = _TEXT_SAMPLE_HEAD_CHARS + 
_TEXT_SAMPLE_TAIL_CHARS) -> str:
+    if len(text) <= max_chars:
+        return text
+    if max_chars <= 32:
+        return text[:max_chars]
+    if max_chars >= _TEXT_SAMPLE_HEAD_CHARS + _TEXT_SAMPLE_TAIL_CHARS:
+        head = _TEXT_SAMPLE_HEAD_CHARS
+    else:
+        head = max_chars // 2
+    tail = max_chars - head - len("\n...\n")
+    if tail <= 0:
+        return text[:max_chars]
+    return f"{text[:head]}\n...\n{text[-tail:]}"
+
+
+def _format_file_metadata(prepared: _PreparedFile) -> str:
+    metadata = [
+        f"path={prepared.path}",
+        f"format={prepared.file_format}",
+        f"size_bytes={prepared.size_bytes}",
+    ]
+    if prepared.compression:
+        metadata.append(f"compression={prepared.compression}")
+    if prepared.estimated_rows is not None:
+        metadata.append(f"estimated_rows={prepared.estimated_rows}")
+    if prepared.partitions:
+        metadata.append(f"partitions={list(prepared.partitions)}")
+    if prepared.content_truncated:
+        metadata.append("content_truncated=True")
+    if prepared.content_omitted:
+        metadata.append("content_omitted=True")
+    if prepared.attachment is not None:
+        metadata.append("attached_as_binary=True")
+    return ", ".join(metadata)
+
+
+def _infer_partitions(path: ObjectStoragePath) -> tuple[str, ...]:
+    pure_path = PurePosixPath(path.path)
+    return tuple(part for part in pure_path.parts if "=" in part)
diff --git a/providers/common/ai/src/airflow/providers/common/ai/exceptions.py 
b/providers/common/ai/tests/unit/common/ai/assets/__init__.py
similarity index 75%
copy from providers/common/ai/src/airflow/providers/common/ai/exceptions.py
copy to providers/common/ai/tests/unit/common/ai/assets/__init__.py
index 7c83f30d68a..13a83393a91 100644
--- a/providers/common/ai/src/airflow/providers/common/ai/exceptions.py
+++ b/providers/common/ai/tests/unit/common/ai/assets/__init__.py
@@ -14,10 +14,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from __future__ import annotations
-
-from airflow.providers.common.compat.sdk import AirflowException
-
-
-class HITLMaxIterationsError(AirflowException):
-    """Raised when the HITL review loop exhausts max iterations without 
approval or rejection."""
diff --git 
a/providers/common/ai/tests/unit/common/ai/assets/airflow-3-task-sdk.png 
b/providers/common/ai/tests/unit/common/ai/assets/airflow-3-task-sdk.png
new file mode 100644
index 00000000000..e7179f8cac5
Binary files /dev/null and 
b/providers/common/ai/tests/unit/common/ai/assets/airflow-3-task-sdk.png differ
diff --git 
a/providers/common/ai/tests/unit/common/ai/decorators/test_llm_file_analysis.py 
b/providers/common/ai/tests/unit/common/ai/decorators/test_llm_file_analysis.py
new file mode 100644
index 00000000000..5b32f061862
--- /dev/null
+++ 
b/providers/common/ai/tests/unit/common/ai/decorators/test_llm_file_analysis.py
@@ -0,0 +1,118 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+from airflow.providers.common.ai.decorators.llm_file_analysis import 
_LLMFileAnalysisDecoratedOperator
+from airflow.providers.common.ai.utils.file_analysis import FileAnalysisRequest
+
+
+def _make_mock_run_result(output):
+    mock_result = MagicMock(spec=["output", "usage", "response", 
"all_messages"])
+    mock_result.output = output
+    mock_result.usage.return_value = MagicMock(
+        spec=["requests", "tool_calls", "input_tokens", "output_tokens", 
"total_tokens"],
+        requests=1,
+        tool_calls=0,
+        input_tokens=0,
+        output_tokens=0,
+        total_tokens=0,
+    )
+    mock_result.response = MagicMock(spec=["model_name"], 
model_name="test-model")
+    mock_result.all_messages.return_value = []
+    return mock_result
+
+
+class TestLLMFileAnalysisDecoratedOperator:
+    def test_custom_operator_name(self):
+        assert _LLMFileAnalysisDecoratedOperator.custom_operator_name == 
"@task.llm_file_analysis"
+
+    @patch("airflow.providers.common.ai.operators.llm.PydanticAIHook", 
autospec=True)
+    @patch(
+        
"airflow.providers.common.ai.operators.llm_file_analysis.build_file_analysis_request",
 autospec=True
+    )
+    def test_execute_calls_callable_and_returns_output(self, 
mock_build_request, mock_hook_cls):
+        mock_build_request.return_value = FileAnalysisRequest(
+            user_content="prepared prompt",
+            resolved_paths=["/tmp/app.log"],
+            total_size_bytes=10,
+        )
+        mock_agent = MagicMock(spec=["run_sync"])
+        mock_agent.run_sync.return_value = _make_mock_run_result("This is a 
summary.")
+        mock_hook_cls.get_hook.return_value.create_agent.return_value = 
mock_agent
+
+        def my_prompt():
+            return "Summarize this text"
+
+        op = _LLMFileAnalysisDecoratedOperator(
+            task_id="test",
+            python_callable=my_prompt,
+            llm_conn_id="my_llm",
+            file_path="/tmp/app.log",
+        )
+        result = op.execute(context={})
+
+        assert result == "This is a summary."
+        assert op.prompt == "Summarize this text"
+        mock_agent.run_sync.assert_called_once_with("prepared prompt")
+
+    @pytest.mark.parametrize(
+        "return_value",
+        [42, "", "   ", None],
+        ids=["non-string", "empty", "whitespace", "none"],
+    )
+    def test_execute_raises_on_invalid_prompt(self, return_value):
+        op = _LLMFileAnalysisDecoratedOperator(
+            task_id="test",
+            python_callable=lambda: return_value,
+            llm_conn_id="my_llm",
+            file_path="/tmp/app.log",
+        )
+        with pytest.raises(TypeError, match="non-empty string"):
+            op.execute(context={})
+
+    @patch("airflow.providers.common.ai.operators.llm.PydanticAIHook", 
autospec=True)
+    @patch(
+        
"airflow.providers.common.ai.operators.llm_file_analysis.build_file_analysis_request",
 autospec=True
+    )
+    def test_execute_merges_op_kwargs_into_callable(self, mock_build_request, 
mock_hook_cls):
+        mock_build_request.return_value = FileAnalysisRequest(
+            user_content="prepared prompt",
+            resolved_paths=["/tmp/app.log"],
+            total_size_bytes=10,
+        )
+        mock_agent = MagicMock(spec=["run_sync"])
+        mock_agent.run_sync.return_value = _make_mock_run_result("done")
+        mock_hook_cls.get_hook.return_value.create_agent.return_value = 
mock_agent
+
+        def my_prompt(topic):
+            return f"Summarize {topic}"
+
+        op = _LLMFileAnalysisDecoratedOperator(
+            task_id="test",
+            python_callable=my_prompt,
+            llm_conn_id="my_llm",
+            file_path="/tmp/app.log",
+            op_kwargs={"topic": "system logs"},
+        )
+        op.execute(context={"task_instance": MagicMock(spec=["task_id"])})
+
+        assert op.prompt == "Summarize system logs"
+        mock_agent.run_sync.assert_called_once_with("prepared prompt")
diff --git 
a/providers/common/ai/tests/unit/common/ai/operators/test_llm_file_analysis.py 
b/providers/common/ai/tests/unit/common/ai/operators/test_llm_file_analysis.py
new file mode 100644
index 00000000000..abd98d92326
--- /dev/null
+++ 
b/providers/common/ai/tests/unit/common/ai/operators/test_llm_file_analysis.py
@@ -0,0 +1,303 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import timedelta
+from unittest.mock import MagicMock, patch
+from uuid import uuid4
+
+import pytest
+from pydantic import BaseModel
+
+from airflow.providers.common.ai.operators.llm_file_analysis import 
LLMFileAnalysisOperator
+from airflow.providers.common.ai.utils.file_analysis import FileAnalysisRequest
+
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_1_PLUS
+
+
+def _make_mock_run_result(output):
+    mock_result = MagicMock(spec=["output", "usage", "response", 
"all_messages"])
+    mock_result.output = output
+    mock_result.usage.return_value = MagicMock(
+        spec=["requests", "tool_calls", "input_tokens", "output_tokens", 
"total_tokens"],
+        requests=1,
+        tool_calls=0,
+        input_tokens=0,
+        output_tokens=0,
+        total_tokens=0,
+    )
+    mock_result.response = MagicMock(spec=["model_name"], 
model_name="test-model")
+    mock_result.all_messages.return_value = []
+    return mock_result
+
+
+def _make_context(ti_id=None):
+    ti_id = ti_id or uuid4()
+    ti = MagicMock(spec=["id"])
+    ti.id = ti_id
+    context = MagicMock(spec=["__getitem__"])
+    context.__getitem__.side_effect = lambda key: {"task_instance": ti}[key]
+    return context
+
+
+class TestLLMFileAnalysisOperator:
+    def test_template_fields(self):
+        expected = {
+            "prompt",
+            "llm_conn_id",
+            "model_id",
+            "system_prompt",
+            "agent_params",
+            "file_path",
+            "file_conn_id",
+        }
+        assert set(LLMFileAnalysisOperator.template_fields) == expected
+
+    @patch("airflow.providers.common.ai.operators.llm.PydanticAIHook", 
autospec=True)
+    @patch(
+        
"airflow.providers.common.ai.operators.llm_file_analysis.build_file_analysis_request",
 autospec=True
+    )
+    def test_execute_returns_string_output(self, mock_build_request, 
mock_hook_cls):
+        mock_build_request.return_value = FileAnalysisRequest(
+            user_content="prepared prompt",
+            resolved_paths=["/tmp/app.log"],
+            total_size_bytes=10,
+        )
+        mock_agent = MagicMock(spec=["run_sync"])
+        mock_agent.run_sync.return_value = _make_mock_run_result("Analysis 
complete")
+        mock_hook_cls.get_hook.return_value.create_agent.return_value = 
mock_agent
+
+        op = LLMFileAnalysisOperator(
+            task_id="test",
+            prompt="Summarize the file",
+            llm_conn_id="my_llm",
+            file_path="/tmp/app.log",
+        )
+        result = op.execute(context={})
+
+        assert result == "Analysis complete"
+        mock_build_request.assert_called_once_with(
+            file_path="/tmp/app.log",
+            file_conn_id=None,
+            prompt="Summarize the file",
+            multi_modal=False,
+            max_files=20,
+            max_file_size_bytes=5 * 1024 * 1024,
+            max_total_size_bytes=20 * 1024 * 1024,
+            max_text_chars=100_000,
+            sample_rows=10,
+        )
+        mock_agent.run_sync.assert_called_once_with("prepared prompt")
+
+    @patch("airflow.providers.common.ai.operators.llm.PydanticAIHook", 
autospec=True)
+    @patch(
+        
"airflow.providers.common.ai.operators.llm_file_analysis.build_file_analysis_request",
 autospec=True
+    )
+    def test_execute_structured_output_serializes_model(self, 
mock_build_request, mock_hook_cls):
+        class Summary(BaseModel):
+            findings: list[str]
+
+        mock_build_request.return_value = FileAnalysisRequest(
+            user_content="prepared prompt",
+            resolved_paths=["/tmp/app.log"],
+            total_size_bytes=10,
+        )
+        mock_agent = MagicMock(spec=["run_sync"])
+        mock_agent.run_sync.return_value = 
_make_mock_run_result(Summary(findings=["error spike"]))
+        mock_hook_cls.get_hook.return_value.create_agent.return_value = 
mock_agent
+
+        op = LLMFileAnalysisOperator(
+            task_id="test",
+            prompt="Summarize the file",
+            llm_conn_id="my_llm",
+            file_path="/tmp/app.log",
+            output_type=Summary,
+        )
+        result = op.execute(context={})
+
+        assert result == {"findings": ["error spike"]}
+
+    @patch(
+        
"airflow.providers.common.ai.operators.llm_file_analysis.build_file_analysis_request",
 autospec=True
+    )
+    def test_parameter_validation(self, mock_build_request):
+        with pytest.raises(ValueError, match="max_files"):
+            LLMFileAnalysisOperator(
+                task_id="test",
+                prompt="p",
+                llm_conn_id="my_llm",
+                file_path="/tmp/app.log",
+                max_files=0,
+            )
+        with pytest.raises(ValueError, match="sample_rows"):
+            LLMFileAnalysisOperator(
+                task_id="test",
+                prompt="p",
+                llm_conn_id="my_llm",
+                file_path="/tmp/app.log",
+                sample_rows=0,
+            )
+        mock_build_request.assert_not_called()
+
+
[email protected](
+    not AIRFLOW_V_3_1_PLUS, reason="Human in the loop is only compatible with 
Airflow >= 3.1.0"
+)
+class TestLLMFileAnalysisOperatorApproval:
+    class Summary(BaseModel):
+        findings: list[str]
+
+    @patch("airflow.providers.standard.triggers.hitl.HITLTrigger", 
autospec=True)
+    @patch("airflow.sdk.execution_time.hitl.upsert_hitl_detail")
+    @patch("airflow.providers.common.ai.operators.llm.PydanticAIHook", 
autospec=True)
+    @patch(
+        
"airflow.providers.common.ai.operators.llm_file_analysis.build_file_analysis_request",
 autospec=True
+    )
+    def test_execute_with_approval_defers(
+        self, mock_build_request, mock_hook_cls, mock_upsert, mock_trigger_cls
+    ):
+        from airflow.providers.common.compat.sdk import TaskDeferred
+
+        mock_build_request.return_value = FileAnalysisRequest(
+            user_content="prepared prompt",
+            resolved_paths=["/tmp/app.log"],
+            total_size_bytes=10,
+        )
+        mock_agent = MagicMock(spec=["run_sync"])
+        mock_agent.run_sync.return_value = _make_mock_run_result("LLM 
response")
+        mock_hook_cls.get_hook.return_value.create_agent.return_value = 
mock_agent
+
+        op = LLMFileAnalysisOperator(
+            task_id="approval_test",
+            prompt="Summarize this",
+            llm_conn_id="my_llm",
+            file_path="/tmp/app.log",
+            require_approval=True,
+        )
+        ctx = _make_context()
+
+        with pytest.raises(TaskDeferred) as exc_info:
+            op.execute(context=ctx)
+
+        assert exc_info.value.method_name == "execute_complete"
+        assert exc_info.value.kwargs["generated_output"] == "LLM response"
+        mock_upsert.assert_called_once()
+
+    @patch("airflow.providers.standard.triggers.hitl.HITLTrigger", 
autospec=True)
+    @patch("airflow.sdk.execution_time.hitl.upsert_hitl_detail")
+    @patch("airflow.providers.common.ai.operators.llm.PydanticAIHook", 
autospec=True)
+    @patch(
+        
"airflow.providers.common.ai.operators.llm_file_analysis.build_file_analysis_request",
 autospec=True
+    )
+    def test_execute_with_approval_defers_structured_output_as_json(
+        self, mock_build_request, mock_hook_cls, mock_upsert, mock_trigger_cls
+    ):
+        from airflow.providers.common.compat.sdk import TaskDeferred
+
+        mock_build_request.return_value = FileAnalysisRequest(
+            user_content="prepared prompt",
+            resolved_paths=["/tmp/app.log"],
+            total_size_bytes=10,
+        )
+        mock_agent = MagicMock(spec=["run_sync"])
+        mock_agent.run_sync.return_value = 
_make_mock_run_result(self.Summary(findings=["error spike"]))
+        mock_hook_cls.get_hook.return_value.create_agent.return_value = 
mock_agent
+
+        op = LLMFileAnalysisOperator(
+            task_id="approval_structured_test",
+            prompt="Summarize this",
+            llm_conn_id="my_llm",
+            file_path="/tmp/app.log",
+            output_type=self.Summary,
+            require_approval=True,
+        )
+
+        with pytest.raises(TaskDeferred) as exc_info:
+            op.execute(context=_make_context())
+
+        assert exc_info.value.kwargs["generated_output"] == 
'{"findings":["error spike"]}'
+        mock_upsert.assert_called_once()
+
+    def test_execute_complete_with_approval_restores_structured_output(self):
+        op = LLMFileAnalysisOperator(
+            task_id="approval_complete_test",
+            prompt="Summarize this",
+            llm_conn_id="my_llm",
+            file_path="/tmp/app.log",
+            output_type=self.Summary,
+            require_approval=True,
+        )
+        event = {"chosen_options": [op.APPROVE], "params_input": {}, 
"responded_by_user": "reviewer"}
+
+        result = op.execute_complete({}, generated_output='{"findings":["error 
spike"]}', event=event)
+
+        assert result == {"findings": ["error spike"]}
+
+    def 
test_execute_complete_with_approval_restores_modified_structured_output(self):
+        op = LLMFileAnalysisOperator(
+            task_id="approval_complete_modified_test",
+            prompt="Summarize this",
+            llm_conn_id="my_llm",
+            file_path="/tmp/app.log",
+            output_type=self.Summary,
+            require_approval=True,
+            allow_modifications=True,
+        )
+        event = {
+            "chosen_options": [op.APPROVE],
+            "params_input": {"output": '{"findings":["reviewed output"]}'},
+            "responded_by_user": "reviewer",
+        }
+
+        result = op.execute_complete({}, generated_output='{"findings":["error 
spike"]}', event=event)
+
+        assert result == {"findings": ["reviewed output"]}
+
+    @patch("airflow.providers.standard.triggers.hitl.HITLTrigger", 
autospec=True)
+    @patch("airflow.sdk.execution_time.hitl.upsert_hitl_detail")
+    @patch("airflow.providers.common.ai.operators.llm.PydanticAIHook", 
autospec=True)
+    @patch(
+        
"airflow.providers.common.ai.operators.llm_file_analysis.build_file_analysis_request",
 autospec=True
+    )
+    def test_execute_with_approval_timeout(
+        self, mock_build_request, mock_hook_cls, mock_upsert, mock_trigger_cls
+    ):
+        from airflow.providers.common.compat.sdk import TaskDeferred
+
+        mock_build_request.return_value = FileAnalysisRequest(
+            user_content="prepared prompt",
+            resolved_paths=["/tmp/app.log"],
+            total_size_bytes=10,
+        )
+        mock_agent = MagicMock(spec=["run_sync"])
+        mock_agent.run_sync.return_value = _make_mock_run_result("output")
+        mock_hook_cls.get_hook.return_value.create_agent.return_value = 
mock_agent
+
+        timeout = timedelta(hours=1)
+        op = LLMFileAnalysisOperator(
+            task_id="timeout_test",
+            prompt="p",
+            llm_conn_id="my_llm",
+            file_path="/tmp/app.log",
+            require_approval=True,
+            approval_timeout=timeout,
+        )
+
+        with pytest.raises(TaskDeferred) as exc_info:
+            op.execute(context=_make_context())
+
+        assert exc_info.value.timeout == timeout
diff --git 
a/providers/common/ai/tests/unit/common/ai/utils/test_file_analysis.py 
b/providers/common/ai/tests/unit/common/ai/utils/test_file_analysis.py
new file mode 100644
index 00000000000..32091ffa59b
--- /dev/null
+++ b/providers/common/ai/tests/unit/common/ai/utils/test_file_analysis.py
@@ -0,0 +1,525 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import builtins
+import gzip
+from pathlib import Path
+from unittest.mock import MagicMock, patch
+
+import pytest
+from pydantic_ai.messages import BinaryContent
+
+from airflow.providers.common.ai.exceptions import (
+    LLMFileAnalysisLimitExceededError,
+    LLMFileAnalysisMultimodalRequiredError,
+    LLMFileAnalysisUnsupportedFormatError,
+)
+from airflow.providers.common.ai.utils.file_analysis import (
+    FileAnalysisRequest,
+    _infer_partitions,
+    _read_raw_bytes,
+    _render_avro,
+    _render_parquet,
+    _resolve_paths,
+    _truncate_text,
+    build_file_analysis_request,
+    detect_file_format,
+)
+from airflow.providers.common.compat.sdk import 
AirflowOptionalProviderFeatureException, ObjectStoragePath
+
+AIRFLOW_PNG_PATH = Path(__file__).resolve().parents[1] / "assets" / 
"airflow-3-task-sdk.png"
+
+
+class TestBuildFileAnalysisRequest:
+    def test_text_file_analysis(self, tmp_path):
+        path = tmp_path / "app.log"
+        path.write_text("first line\nsecond line\n", encoding="utf-8")
+
+        request = build_file_analysis_request(
+            file_path=str(path),
+            file_conn_id=None,
+            prompt="Summarize the log",
+            multi_modal=False,
+            max_files=20,
+            max_file_size_bytes=1024,
+            max_total_size_bytes=2048,
+            max_text_chars=500,
+            sample_rows=10,
+        )
+
+        assert isinstance(request, FileAnalysisRequest)
+        assert request.resolved_paths == [str(path)]
+        assert "Summarize the log" in request.user_content
+        assert "first line" in request.user_content
+        assert "format=log" in request.user_content
+
+    def test_file_conn_id_overrides_embedded_connection(self, tmp_path):
+        path = tmp_path / "data.json"
+        path.write_text('{"status": "ok"}', encoding="utf-8")
+
+        with patch(
+            
"airflow.providers.common.ai.utils.file_analysis.ObjectStoragePath", 
autospec=True
+        ) as mock_path:
+            mock_instance = MagicMock(spec=ObjectStoragePath)
+            mock_handle = MagicMock(spec=["read"])
+            mock_handle.read.side_effect = [b'{"status": "ok"}', b""]
+            mock_instance.is_file.return_value = True
+            mock_instance.stat.return_value.st_size = 16
+            mock_instance.suffixes = [".json"]
+            mock_instance.path = str(path)
+            mock_instance.open.return_value.__enter__.return_value = 
mock_handle
+            mock_path.return_value = mock_instance
+
+            build_file_analysis_request(
+                file_path="s3://embedded_conn@bucket/data.json",
+                file_conn_id="override_conn",
+                prompt="Inspect",
+                multi_modal=False,
+                max_files=1,
+                max_file_size_bytes=1024,
+                max_total_size_bytes=1024,
+                max_text_chars=500,
+                sample_rows=10,
+            )
+
+        
mock_path.assert_called_once_with("s3://embedded_conn@bucket/data.json", 
conn_id="override_conn")
+
+    def test_sample_rows_must_be_positive(self, tmp_path):
+        path = tmp_path / "metrics.csv"
+        path.write_text("name,value\nalpha,1\n", encoding="utf-8")
+
+        with pytest.raises(ValueError, match="sample_rows must be greater than 
zero"):
+            build_file_analysis_request(
+                file_path=str(path),
+                file_conn_id=None,
+                prompt="Analyze",
+                multi_modal=False,
+                max_files=1,
+                max_file_size_bytes=1024,
+                max_total_size_bytes=1024,
+                max_text_chars=500,
+                sample_rows=0,
+            )
+
+    def test_prefix_aggregation_is_sorted_and_omits_extra_files(self, 
tmp_path):
+        (tmp_path / "b.log").write_text("b", encoding="utf-8")
+        (tmp_path / "a.log").write_text("a", encoding="utf-8")
+        (tmp_path / "c.log").write_text("c", encoding="utf-8")
+
+        request = build_file_analysis_request(
+            file_path=str(tmp_path),
+            file_conn_id=None,
+            prompt="Summarize",
+            multi_modal=False,
+            max_files=2,
+            max_file_size_bytes=1024,
+            max_total_size_bytes=2048,
+            max_text_chars=500,
+            sample_rows=10,
+        )
+
+        assert request.resolved_paths == [str(tmp_path / "a.log"), 
str(tmp_path / "b.log")]
+        assert request.omitted_files == 1
+        assert "omitted_files=1" in request.user_content
+
+    def test_multi_modal_image_creates_binary_attachment(self):
+        request = build_file_analysis_request(
+            file_path=str(AIRFLOW_PNG_PATH),
+            file_conn_id=None,
+            prompt="Check for anomalies",
+            multi_modal=True,
+            max_files=1,
+            max_file_size_bytes=256 * 1024,
+            max_total_size_bytes=256 * 1024,
+            max_text_chars=500,
+            sample_rows=10,
+        )
+
+        assert isinstance(request.user_content, list)
+        assert isinstance(request.user_content[1], BinaryContent)
+        assert request.user_content[1].media_type == "image/png"
+        assert request.resolved_paths == [str(AIRFLOW_PNG_PATH)]
+        assert "attached_as_binary=True" in request.user_content[0]
+
+    def test_multimodal_required_for_image_and_pdf(self, tmp_path):
+        path = tmp_path / "report.pdf"
+        path.write_bytes(b"%PDF-1.7")
+
+        with pytest.raises(LLMFileAnalysisMultimodalRequiredError, 
match="multi_modal=True"):
+            build_file_analysis_request(
+                file_path=str(path),
+                file_conn_id=None,
+                prompt="Analyze",
+                multi_modal=False,
+                max_files=1,
+                max_file_size_bytes=1024,
+                max_total_size_bytes=1024,
+                max_text_chars=500,
+                sample_rows=10,
+            )
+
+    def test_unsupported_suffix_raises(self, tmp_path):
+        path = tmp_path / "payload.bin"
+        path.write_bytes(b"abc")
+
+        with pytest.raises(LLMFileAnalysisUnsupportedFormatError, 
match="Unsupported file format"):
+            build_file_analysis_request(
+                file_path=str(path),
+                file_conn_id=None,
+                prompt="Analyze",
+                multi_modal=False,
+                max_files=1,
+                max_file_size_bytes=1024,
+                max_total_size_bytes=1024,
+                max_text_chars=500,
+                sample_rows=10,
+            )
+
+    def test_single_file_limit_failure(self, tmp_path):
+        path = tmp_path / "big.log"
+        path.write_text("abcdef", encoding="utf-8")
+
+        with patch(
+            "airflow.providers.common.ai.utils.file_analysis._prepare_file", 
autospec=True
+        ) as mock_prepare:
+            with pytest.raises(LLMFileAnalysisLimitExceededError, 
match="per-file limit"):
+                build_file_analysis_request(
+                    file_path=str(path),
+                    file_conn_id=None,
+                    prompt="Analyze",
+                    multi_modal=False,
+                    max_files=1,
+                    max_file_size_bytes=5,
+                    max_total_size_bytes=1024,
+                    max_text_chars=500,
+                    sample_rows=10,
+                )
+
+        mock_prepare.assert_not_called()
+
+    def test_total_size_limit_failure(self, tmp_path):
+        (tmp_path / "a.log").write_text("abc", encoding="utf-8")
+        (tmp_path / "b.log").write_text("def", encoding="utf-8")
+
+        with patch(
+            "airflow.providers.common.ai.utils.file_analysis._prepare_file", 
autospec=True
+        ) as mock_prepare:
+            with pytest.raises(LLMFileAnalysisLimitExceededError, match="Total 
input size exceeds"):
+                build_file_analysis_request(
+                    file_path=str(tmp_path),
+                    file_conn_id=None,
+                    prompt="Analyze",
+                    multi_modal=False,
+                    max_files=10,
+                    max_file_size_bytes=1024,
+                    max_total_size_bytes=5,
+                    max_text_chars=500,
+                    sample_rows=10,
+                )
+
+        mock_prepare.assert_not_called()
+
+    def test_gzip_expansion_respects_processed_content_limit(self, tmp_path):
+        path = tmp_path / "big.log.gz"
+        path.write_bytes(gzip.compress(b"A" * 20_000))
+
+        with pytest.raises(LLMFileAnalysisLimitExceededError, 
match="processed-content limit"):
+            build_file_analysis_request(
+                file_path=str(path),
+                file_conn_id=None,
+                prompt="Analyze",
+                multi_modal=False,
+                max_files=1,
+                max_file_size_bytes=256,
+                max_total_size_bytes=256,
+                max_text_chars=500,
+                sample_rows=10,
+            )
+
+    def test_gzip_expansion_respects_total_processed_content_limit(self, 
tmp_path):
+        (tmp_path / "a.log.gz").write_bytes(gzip.compress(b"A" * 1_000))
+        (tmp_path / "b.log.gz").write_bytes(gzip.compress(b"B" * 1_000))
+
+        with pytest.raises(LLMFileAnalysisLimitExceededError, 
match="processed-content limit"):
+            build_file_analysis_request(
+                file_path=str(tmp_path),
+                file_conn_id=None,
+                prompt="Analyze",
+                multi_modal=False,
+                max_files=10,
+                max_file_size_bytes=2_048,
+                max_total_size_bytes=1_500,
+                max_text_chars=500,
+                sample_rows=10,
+            )
+
+    def test_json_is_pretty_printed(self, tmp_path):
+        path = tmp_path / "data.json"
+        path.write_text('{"b": 2, "a": 1}', encoding="utf-8")
+
+        request = build_file_analysis_request(
+            file_path=str(path),
+            file_conn_id=None,
+            prompt="Analyze",
+            multi_modal=False,
+            max_files=1,
+            max_file_size_bytes=1024,
+            max_total_size_bytes=1024,
+            max_text_chars=500,
+            sample_rows=10,
+        )
+
+        assert '"a": 1' in request.user_content
+        assert '"b": 2' in request.user_content
+
+    def test_text_context_truncation_is_marked(self, tmp_path):
+        path = tmp_path / "huge.log"
+        path.write_text("line\n" * 400, encoding="utf-8")
+
+        request = build_file_analysis_request(
+            file_path=str(path),
+            file_conn_id=None,
+            prompt="Analyze",
+            multi_modal=False,
+            max_files=1,
+            max_file_size_bytes=16_384,
+            max_total_size_bytes=16_384,
+            max_text_chars=100,
+            sample_rows=10,
+        )
+
+        assert request.text_truncated is True
+        assert "text_context_truncated=True" in request.user_content
+
+    def test_csv_sample_rows_is_configurable(self, tmp_path):
+        path = tmp_path / "metrics.csv"
+        path.write_text("name,value\nalpha,1\nbeta,2\ngamma,3\n", 
encoding="utf-8")
+
+        request = build_file_analysis_request(
+            file_path=str(path),
+            file_conn_id=None,
+            prompt="Analyze",
+            multi_modal=False,
+            max_files=1,
+            max_file_size_bytes=1024,
+            max_total_size_bytes=1024,
+            max_text_chars=500,
+            sample_rows=1,
+        )
+
+        assert "alpha, 1" in request.user_content
+        assert "beta, 2" not in request.user_content
+        assert "gamma, 3" not in request.user_content
+
+
+class TestFileAnalysisHelpers:
+    @pytest.mark.parametrize(
+        ("filename", "expected_format", "expected_compression"),
+        [
+            ("events.csv", "csv", None),
+            ("events.csv.gz", "csv", "gzip"),
+            ("dashboard.jpg", "jpg", None),
+            ("report.pdf", "pdf", None),
+            ("app", "log", None),
+        ],
+    )
+    def test_detect_file_format(self, tmp_path, filename, expected_format, 
expected_compression):
+        path = tmp_path / filename
+        path.write_bytes(b"content")
+
+        file_format, compression = 
detect_file_format(ObjectStoragePath(str(path)))
+
+        assert file_format == expected_format
+        assert compression == expected_compression
+
+    def test_detect_file_format_rejects_unsupported_compression(self, 
tmp_path):
+        path = tmp_path / "events.csv.zst"
+        path.write_bytes(b"content")
+
+        with pytest.raises(LLMFileAnalysisUnsupportedFormatError, 
match="Compression"):
+            detect_file_format(ObjectStoragePath(str(path)))
+
+    @pytest.mark.parametrize("filename", ["sample.parquet.gz", 
"sample.avro.gz", "sample.png.gz"])
+    def 
test_detect_file_format_rejects_unsupported_gzip_format_combinations(self, 
tmp_path, filename):
+        path = tmp_path / filename
+        path.write_bytes(b"content")
+
+        with pytest.raises(LLMFileAnalysisUnsupportedFormatError, match="not 
supported for"):
+            detect_file_format(ObjectStoragePath(str(path)))
+
+    def test_read_raw_bytes_decompresses_gzip(self, tmp_path):
+        path = tmp_path / "events.log.gz"
+        path.write_bytes(gzip.compress(b"line one\nline two\n"))
+
+        content = _read_raw_bytes(ObjectStoragePath(str(path)), 
compression="gzip", max_bytes=1_024)
+
+        assert content == b"line one\nline two\n"
+
+    def test_truncate_text_preserves_head_and_tail(self):
+        text = "A" * 9_000 + "B" * 3_000
+
+        truncated = _truncate_text(text)
+
+        assert truncated.startswith("A" * 8_000)
+        assert truncated.endswith("B" * 1_995)
+        assert "\n...\n" in truncated
+
+    def test_truncate_text_with_small_max_chars_returns_prefix_only(self):
+        assert _truncate_text("abcdefghij", max_chars=8) == "abcdefgh"
+
+    def test_infer_partitions(self, tmp_path):
+        path = tmp_path / "dt=2024-01-15" / "region=us" / "events.csv"
+        path.parent.mkdir(parents=True)
+        path.write_text("id\n1\n", encoding="utf-8")
+
+        partitions = _infer_partitions(ObjectStoragePath(str(path)))
+
+        assert partitions == ("dt=2024-01-15", "region=us")
+
+    def test_resolve_paths_raises_when_no_files_found(self, tmp_path):
+        empty_dir = ObjectStoragePath(str(tmp_path / "missing"))
+
+        with pytest.raises(FileNotFoundError, match="No files found"):
+            _resolve_paths(root=empty_dir, max_files=10)
+
+
+class TestFormatReaders:
+    def test_render_parquet_uses_lazy_import(self, tmp_path):
+        pyarrow = pytest.importorskip("pyarrow")
+        pq = pytest.importorskip("pyarrow.parquet")
+
+        path = tmp_path / "sample.parquet"
+        table = pyarrow.Table.from_pylist([{"id": 1}, {"id": 2}])
+        pq.write_table(table, path)
+
+        result = _render_parquet(ObjectStoragePath(str(path)), sample_rows=1, 
max_content_bytes=1_024)
+
+        assert result.estimated_rows == 2
+        assert "Schema: id: int64" in result.text
+        assert '"id": 1' in result.text
+        assert '"id": 2' not in result.text
+
+    def test_render_parquet_does_not_materialize_full_table(self, tmp_path):
+        pyarrow = pytest.importorskip("pyarrow")
+        pq = pytest.importorskip("pyarrow.parquet")
+
+        path = tmp_path / "sample.parquet"
+        table = pyarrow.Table.from_pylist([{"id": 1}, {"id": 2}, {"id": 3}])
+        pq.write_table(table, path, row_group_size=1)
+
+        with patch("pyarrow.parquet.ParquetFile.read", autospec=True, 
side_effect=AssertionError):
+            result = _render_parquet(ObjectStoragePath(str(path)), 
sample_rows=2, max_content_bytes=4_096)
+
+        assert result.estimated_rows == 3
+        assert '"id": 1' in result.text
+        assert '"id": 2' in result.text
+        assert '"id": 3' not in result.text
+
+    def test_render_parquet_enforces_processed_content_limit_before_read(self, 
tmp_path):
+        pyarrow = pytest.importorskip("pyarrow")
+        pq = pytest.importorskip("pyarrow.parquet")
+
+        path = tmp_path / "sample.parquet"
+        table = pyarrow.Table.from_pylist([{"id": 1}, {"id": 2}])
+        pq.write_table(table, path)
+
+        with pytest.raises(LLMFileAnalysisLimitExceededError, 
match="processed-content limit"):
+            _render_parquet(
+                ObjectStoragePath(str(path)), sample_rows=1, 
max_content_bytes=path.stat().st_size - 1
+            )
+
+    def test_render_parquet_missing_dependency_raises(self, tmp_path):
+        path = tmp_path / "sample.parquet"
+        path.write_bytes(b"parquet")
+        real_import = builtins.__import__
+
+        def failing_import(name, *args, **kwargs):
+            if name in {"pyarrow", "pyarrow.parquet"}:
+                raise ImportError("missing pyarrow")
+            return real_import(name, *args, **kwargs)
+
+        with patch("builtins.__import__", side_effect=failing_import):
+            with pytest.raises(AirflowOptionalProviderFeatureException, 
match="parquet"):
+                _render_parquet(ObjectStoragePath(str(path)), sample_rows=1, 
max_content_bytes=1_024)
+
+    def test_render_avro_uses_lazy_import(self, tmp_path):
+        fastavro = pytest.importorskip("fastavro")
+
+        path = tmp_path / "sample.avro"
+        schema = {
+            "type": "record",
+            "name": "sample",
+            "fields": [{"name": "id", "type": "long"}],
+        }
+        with path.open("wb") as handle:
+            fastavro.writer(handle, schema, [{"id": 1}, {"id": 2}])
+
+        result = _render_avro(ObjectStoragePath(str(path)), sample_rows=1, 
max_content_bytes=1_024)
+
+        assert result.estimated_rows is None
+        assert '"name": "sample"' in result.text
+        assert '"id": 1' in result.text
+        assert '"id": 2' not in result.text
+
+    def test_render_avro_estimates_rows_when_fully_read(self, tmp_path):
+        fastavro = pytest.importorskip("fastavro")
+
+        path = tmp_path / "sample.avro"
+        schema = {
+            "type": "record",
+            "name": "sample",
+            "fields": [{"name": "id", "type": "long"}],
+        }
+        with path.open("wb") as handle:
+            fastavro.writer(handle, schema, [{"id": 1}, {"id": 2}])
+
+        result = _render_avro(ObjectStoragePath(str(path)), sample_rows=5, 
max_content_bytes=1_024)
+
+        assert result.estimated_rows == 2
+        assert '"id": 2' in result.text
+
+    def test_render_avro_enforces_processed_content_limit_before_scan(self, 
tmp_path):
+        fastavro = pytest.importorskip("fastavro")
+
+        path = tmp_path / "sample.avro"
+        schema = {
+            "type": "record",
+            "name": "sample",
+            "fields": [{"name": "id", "type": "long"}],
+        }
+        with path.open("wb") as handle:
+            fastavro.writer(handle, schema, [{"id": 1}, {"id": 2}])
+
+        with pytest.raises(LLMFileAnalysisLimitExceededError, 
match="processed-content limit"):
+            _render_avro(
+                ObjectStoragePath(str(path)), sample_rows=1, 
max_content_bytes=path.stat().st_size - 1
+            )
+
+    def test_render_avro_missing_dependency_raises(self, tmp_path):
+        path = tmp_path / "sample.avro"
+        path.write_bytes(b"avro")
+        real_import = builtins.__import__
+
+        def failing_import(name, *args, **kwargs):
+            if name == "fastavro":
+                raise ImportError("missing fastavro")
+            return real_import(name, *args, **kwargs)
+
+        with patch("builtins.__import__", side_effect=failing_import):
+            with pytest.raises(AirflowOptionalProviderFeatureException, 
match="avro"):
+                _render_avro(ObjectStoragePath(str(path)), sample_rows=1, 
max_content_bytes=1_024)
diff --git a/uv.lock b/uv.lock
index a06e11966a3..b79d7562ac9 100644
--- a/uv.lock
+++ b/uv.lock
@@ -3874,6 +3874,9 @@ dependencies = [
 anthropic = [
     { name = "pydantic-ai-slim", extra = ["anthropic"] },
 ]
+avro = [
+    { name = "fastavro" },
+]
 bedrock = [
     { name = "pydantic-ai-slim", extra = ["bedrock"] },
 ]
@@ -3889,6 +3892,9 @@ mcp = [
 openai = [
     { name = "pydantic-ai-slim", extra = ["openai"] },
 ]
+parquet = [
+    { name = "pyarrow" },
+]
 sql = [
     { name = "apache-airflow-providers-common-sql" },
     { name = "sqlglot" },
@@ -3916,6 +3922,10 @@ requires-dist = [
     { name = "apache-airflow-providers-common-sql", marker = "extra == 
'common-sql'", editable = "providers/common/sql" },
     { name = "apache-airflow-providers-common-sql", marker = "extra == 'sql'", 
editable = "providers/common/sql" },
     { name = "apache-airflow-providers-standard", editable = 
"providers/standard" },
+    { name = "fastavro", marker = "python_full_version >= '3.14' and extra == 
'avro'", specifier = ">=1.12.1" },
+    { name = "fastavro", marker = "python_full_version < '3.14' and extra == 
'avro'", specifier = ">=1.10.0" },
+    { name = "pyarrow", marker = "python_full_version >= '3.14' and extra == 
'parquet'", specifier = ">=22.0.0" },
+    { name = "pyarrow", marker = "python_full_version < '3.14' and extra == 
'parquet'", specifier = ">=18.0.0" },
     { name = "pydantic-ai-slim", specifier = ">=1.34.0" },
     { name = "pydantic-ai-slim", extras = ["anthropic"], marker = "extra == 
'anthropic'" },
     { name = "pydantic-ai-slim", extras = ["bedrock"], marker = "extra == 
'bedrock'" },
@@ -3924,7 +3934,7 @@ requires-dist = [
     { name = "pydantic-ai-slim", extras = ["openai"], marker = "extra == 
'openai'" },
     { name = "sqlglot", marker = "extra == 'sql'", specifier = ">=30.0.0" },
 ]
-provides-extras = ["anthropic", "bedrock", "google", "openai", "mcp", "sql", 
"common-sql"]
+provides-extras = ["anthropic", "bedrock", "google", "openai", "mcp", "avro", 
"parquet", "sql", "common-sql"]
 
 [package.metadata.requires-dev]
 dev = [

Reply via email to