This is an automated email from the ASF dual-hosted git repository.

vikramkoka pushed a commit to branch aip99-doc-loader
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 5e1c8abea8ed7b75f0adcc5cc56bff90820e570e
Author: Vikram Koka <[email protected]>
AuthorDate: Mon May 18 15:48:14 2026 +0100

    Add DocumentLoaderOperator to common.ai provider
    
     - Adds DocumentLoaderOperator, a framework-agnostic file parser that 
bridges Airflow's connectivity layer (hooks returning bytes/files) and the
      AI embedding layer (operators needing list[dict(text, metadata)]). No 
LlamaIndex, LangChain, or other AI framework dependency.
      - Built-in parsers for .txt, .md, .csv, .json with zero extra deps. PDF 
(via pypdf, BSD) and DOCX (via python-docx, MIT) available as optional
      extras: pip install apache-airflow-providers-common-ai[pdf] / [docx].
      - Supports two input modes: source_path (local file, directory, or glob 
pattern) and source_bytes (raw bytes from XCom). Output is
      list[dict(text, metadata)], the same shape consumed by downstream 
embedding operators.
    
      Motivation
    
      File parsing is the highest-volume gap in Airflow's AI story
      Every RAG pipeline on Airflow currently requires custom parsing code. 
This operator makes it a single line in a Dag.
    
      What's included
    
      
┌────────────────────────────────────┬───────────────────────────────────────────┐
      │                File                │                  Purpose           
       │
      
├────────────────────────────────────┼───────────────────────────────────────────┤
      │ operators/document_loader.py       │ Operator (~270 lines)              
       │
      
├────────────────────────────────────┼───────────────────────────────────────────┤
      │ tests/.../test_document_loader.py  │ 26 unit tests                      
       │
      
├────────────────────────────────────┼───────────────────────────────────────────┤
      │ docs/operators/document_loader.rst │ Usage docs                         
       │
      
├────────────────────────────────────┼───────────────────────────────────────────┤
      │ provider.yaml                      │ Operator registration + 
how-to-guide link │
      
├────────────────────────────────────┼───────────────────────────────────────────┤
      │ pyproject.toml                     │ [pdf] and [docx] optional 
dependencies    │
      
├────────────────────────────────────┼───────────────────────────────────────────┤
      │ docs/operators/index.rst           │ Chooser table row                  
       │
      
└────────────────────────────────────┴───────────────────────────────────────────┘
    
      Test plan
    
      - uv run --project providers/common/ai pytest 
providers/common/ai/tests/unit/common/ai/operators/test_document_loader.py -xvs 
(26 tests)
      - Built-in parsers: txt, md, csv (one doc per row), json (single object 
and array)
      - PDF/DOCX parsers: mocked via sys.modules injection (packages not 
installed in test env)
      - ImportError guidance when optional packages are missing
      - Init validation: mutual exclusion of source_path/source_bytes, 
file_type required with source_bytes
      - File discovery: glob patterns, extension filtering, empty directories
      - Output shape: every item has text and metadata, file_name/file_path in 
metadata, custom metadata_fields merged
---
 .../common/ai/docs/operators/document_loader.rst   | 194 +++++++++++++
 providers/common/ai/docs/operators/index.rst       |   3 +
 providers/common/ai/provider.yaml                  |   2 +
 providers/common/ai/pyproject.toml                 |   2 +
 .../common/ai/operators/document_loader.py         | 270 +++++++++++++++++
 .../common/ai/operators/test_document_loader.py    | 319 +++++++++++++++++++++
 6 files changed, 790 insertions(+)

diff --git a/providers/common/ai/docs/operators/document_loader.rst 
b/providers/common/ai/docs/operators/document_loader.rst
new file mode 100644
index 00000000000..39f4d9d501c
--- /dev/null
+++ b/providers/common/ai/docs/operators/document_loader.rst
@@ -0,0 +1,194 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _howto/operator:document_loader:
+
+``DocumentLoaderOperator``
+==========================
+
+Use 
:class:`~airflow.providers.common.ai.operators.document_loader.DocumentLoaderOperator`
+to parse files into ``list[dict(text, metadata)]`` for downstream embedding
+pipelines.  The operator bridges Airflow's connectivity layer (hooks that
+produce bytes or local files) and the AI embedding layer (operators that
+need structured text with metadata).
+
+The operator is **framework-agnostic** — it has no dependency on LlamaIndex,
+LangChain, or any other AI framework.
+
+Built-in formats
+----------------
+
+``.txt``, ``.md``, ``.csv``, and ``.json`` are handled with zero extra
+dependencies:
+
+.. code-block:: python
+
+    from airflow.providers.common.ai.operators.document_loader import 
DocumentLoaderOperator
+
+    load_docs = DocumentLoaderOperator(
+        task_id="load_docs",
+        source_path="/opt/airflow/data/articles/",
+    )
+
+CSV files produce one document per row.  JSON files with a top-level array
+produce one document per element; a single JSON object produces one document.
+
+PDF parsing
+-----------
+
+Install the ``pdf`` extra to parse PDF files via `pypdf 
<https://pypdf.readthedocs.io/>`__:
+
+.. code-block:: bash
+
+    pip install apache-airflow-providers-common-ai[pdf]
+
+.. code-block:: python
+
+    load_pdfs = DocumentLoaderOperator(
+        task_id="load_pdfs",
+        source_path="/opt/airflow/data/reports/*.pdf",
+    )
+
+Each page with extractable text becomes a separate document.  Empty pages are
+skipped.  The ``page_number`` is included in the document metadata.
+
+DOCX parsing
+------------
+
+Install the ``docx`` extra to parse Word documents via
+`python-docx <https://python-docx.readthedocs.io/>`__:
+
+.. code-block:: bash
+
+    pip install apache-airflow-providers-common-ai[docx]
+
+.. code-block:: python
+
+    load_word = DocumentLoaderOperator(
+        task_id="load_word",
+        source_path="/opt/airflow/data/specs/*.docx",
+    )
+
+All non-empty paragraphs are concatenated into a single document per file.
+
+Glob patterns and filtering
+----------------------------
+
+Pass a glob pattern to ``source_path`` to match multiple files.  Use
+``file_extensions`` to limit which files are processed:
+
+.. code-block:: python
+
+    load_filtered = DocumentLoaderOperator(
+        task_id="load_filtered",
+        source_path="/opt/airflow/data/mixed/*",
+        file_extensions=[".pdf", ".txt"],
+    )
+
+Composing with downstream operators
+------------------------------------
+
+The output format (``list[dict(text, metadata)]``) is designed to feed
+directly into embedding operators.  For example, with the LlamaIndex
+``EmbeddingOperator``:
+
+.. code-block:: python
+
+    load = DocumentLoaderOperator(
+        task_id="load",
+        source_path="/data/docs/*.pdf",
+    )
+
+    embed = EmbeddingOperator(
+        task_id="embed",
+        documents="{{ ti.xcom_pull(task_ids='load') }}",
+        llm_conn_id="openai_default",
+    )
+
+    load >> embed
+
+Composing with Airflow providers
+---------------------------------
+
+Use any Airflow provider to download files, then parse them with
+``DocumentLoaderOperator``:
+
+.. code-block:: python
+
+    from airflow.providers.amazon.aws.transfers.s3_to_local import 
S3ToLocalFilesystemOperator
+
+    download = S3ToLocalFilesystemOperator(
+        task_id="download",
+        bucket_name="my-bucket",
+        key="documents/report.pdf",
+        local_path="/tmp/report.pdf",
+    )
+
+    load = DocumentLoaderOperator(
+        task_id="load",
+        source_path="/tmp/report.pdf",
+    )
+
+    download >> load
+
+For **structured API data** (Salesforce SOQL results, database query exports),
+a ``@task`` that maps fields to text and metadata is more appropriate than
+``DocumentLoaderOperator``, which is designed for binary file parsing:
+
+.. code-block:: python
+
+    @task
+    def transform_cases(records: list[dict]) -> list[dict]:
+        return [
+            {
+                "text": f"{r['Subject']}\n\n{r['Description']}",
+                "metadata": {"case_id": r["Id"], "source": "salesforce"},
+            }
+            for r in records
+        ]
+
+Loading from bytes
+------------------
+
+When upstream tasks pass file content via XCom, use ``source_bytes`` with
+an explicit ``file_type``:
+
+.. code-block:: python
+
+    load = DocumentLoaderOperator(
+        task_id="load",
+        source_bytes="{{ ti.xcom_pull(task_ids='fetch_file') }}",
+        file_type=".pdf",
+    )
+
+Parameters
+----------
+
+- ``source_path``: Local file path, directory, or glob pattern.
+  Mutually exclusive with ``source_bytes``.
+- ``source_bytes``: Raw file bytes from XCom.  Requires ``file_type``.
+  Mutually exclusive with ``source_path``.
+- ``file_type``: File extension hint (e.g. ``".pdf"``).  Required with
+  ``source_bytes``.  Optional with ``source_path`` to override
+  auto-detection.
+- ``parser``: Parsing backend.  ``"auto"`` (default) selects from the file
+  extension.  Set explicitly to force a specific backend (e.g. ``"text"``
+  to treat an unknown extension as plain text).
+- ``file_extensions``: Filter which files to process when ``source_path``
+  matches multiple files.
+- ``metadata_fields``: Extra key-value pairs merged into every document's
+  metadata dict.
diff --git a/providers/common/ai/docs/operators/index.rst 
b/providers/common/ai/docs/operators/index.rst
index 89ba5d15e6c..967a43de281 100644
--- a/providers/common/ai/docs/operators/index.rst
+++ b/providers/common/ai/docs/operators/index.rst
@@ -46,6 +46,9 @@ to pick the one that fits your use case:
    * - Multi-turn reasoning with tools (DB queries, API calls, etc.)
      - :class:`~airflow.providers.common.ai.operators.agent.AgentOperator`
      - ``@task.agent``
+   * - Parse files (PDF, DOCX, CSV, etc.) into document dicts for embedding
+     - 
:class:`~airflow.providers.common.ai.operators.document_loader.DocumentLoaderOperator`
+     - *(no decorator)*
 
 **LLMOperator / @task.llm** — stateless, single-turn calls. Use this for 
classification,
 summarization, extraction, or any prompt that produces one response. Supports 
structured output
diff --git a/providers/common/ai/provider.yaml 
b/providers/common/ai/provider.yaml
index 2a13392ea99..d7ffaee57b9 100644
--- a/providers/common/ai/provider.yaml
+++ b/providers/common/ai/provider.yaml
@@ -41,6 +41,7 @@ integrations:
       - /docs/apache-airflow-providers-common-ai/operators/llm_branch.rst
       - /docs/apache-airflow-providers-common-ai/operators/llm_sql.rst
       - 
/docs/apache-airflow-providers-common-ai/operators/llm_schema_compare.rst
+      - /docs/apache-airflow-providers-common-ai/operators/document_loader.rst
     tags: [ai]
   - integration-name: Pydantic AI
     external-doc-url: https://ai.pydantic.dev/
@@ -323,6 +324,7 @@ operators:
       - airflow.providers.common.ai.operators.llm_branch
       - airflow.providers.common.ai.operators.llm_sql
       - airflow.providers.common.ai.operators.llm_schema_compare
+      - airflow.providers.common.ai.operators.document_loader
 
 task-decorators:
   - class-name: airflow.providers.common.ai.decorators.agent.agent_task
diff --git a/providers/common/ai/pyproject.toml 
b/providers/common/ai/pyproject.toml
index 36dfcf450da..7958d95d2f1 100644
--- a/providers/common/ai/pyproject.toml
+++ b/providers/common/ai/pyproject.toml
@@ -95,6 +95,8 @@ dependencies = [
 "common.sql" = [
     "apache-airflow-providers-common-sql"
 ]
+"pdf" = ["pypdf>=4.0.0"]
+"docx" = ["python-docx>=1.0.0"]
 
 [dependency-groups]
 dev = [
diff --git 
a/providers/common/ai/src/airflow/providers/common/ai/operators/document_loader.py
 
b/providers/common/ai/src/airflow/providers/common/ai/operators/document_loader.py
new file mode 100644
index 00000000000..e131d61617c
--- /dev/null
+++ 
b/providers/common/ai/src/airflow/providers/common/ai/operators/document_loader.py
@@ -0,0 +1,270 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Operator for parsing files into document dicts suitable for embedding."""
+
+from __future__ import annotations
+
+import csv
+import glob
+import io
+import json
+import os
+import tempfile
+from collections.abc import Sequence
+from pathlib import Path
+from typing import TYPE_CHECKING, Any
+
+from airflow.providers.common.compat.sdk import BaseOperator
+
+if TYPE_CHECKING:
+    from airflow.sdk import Context
+
+
+class DocumentLoaderOperator(BaseOperator):
+    """
+    Parse files into ``list[dict(text, metadata)]`` for downstream embedding.
+
+    Bridges Airflow's connectivity layer (hooks that produce bytes or local
+    files) and the AI embedding layer (operators that need structured text
+    with metadata).  Framework-agnostic: no LlamaIndex, LangChain, or other
+    AI framework dependency.
+
+    Built-in parsers handle ``.txt``, ``.md``, ``.csv``, and ``.json`` with
+    zero extra dependencies.  PDF and DOCX support require optional packages
+    installable via extras::
+
+        pip install apache-airflow-providers-common-ai[pdf]    # pypdf
+        pip install apache-airflow-providers-common-ai[docx]   # python-docx
+
+    Provide exactly one of ``source_path`` or ``source_bytes``.  When using
+    ``source_bytes``, ``file_type`` is required so the operator knows which
+    parser to use.
+
+    :param source_path: Local file path or glob pattern (e.g. ``/data/*.pdf``).
+    :param source_bytes: Raw file bytes, typically from XCom.
+    :param file_type: File extension hint when using ``source_bytes``
+        (e.g. ``".pdf"``).  Also accepted with ``source_path`` to override
+        auto-detection.
+    :param parser: Parsing backend selection.  ``"auto"`` (default) picks the
+        backend from the file extension.
+    :param file_extensions: When ``source_path`` is a directory or glob,
+        only process files whose extension is in this list.
+    :param metadata_fields: Extra key-value pairs merged into every
+        document's ``metadata`` dict.
+    """
+
+    template_fields: Sequence[str] = (
+        "source_path",
+        "source_bytes",
+        "metadata_fields",
+    )
+
+    EXTENSION_BACKEND_MAP: dict[str, str] = {
+        ".txt": "text",
+        ".md": "text",
+        ".csv": "csv",
+        ".json": "json",
+        ".pdf": "pypdf",
+        ".docx": "python-docx",
+    }
+
+    def __init__(
+        self,
+        *,
+        source_path: str | None = None,
+        source_bytes: bytes | None = None,
+        file_type: str | None = None,
+        parser: str = "auto",
+        file_extensions: list[str] | None = None,
+        metadata_fields: dict[str, Any] | None = None,
+        **kwargs: Any,
+    ) -> None:
+        super().__init__(**kwargs)
+        if source_path and source_bytes:
+            raise ValueError("Provide exactly one of 'source_path' or 
'source_bytes', not both.")
+        if not source_path and not source_bytes:
+            raise ValueError("Provide exactly one of 'source_path' or 
'source_bytes'.")
+        if source_bytes and not file_type:
+            raise ValueError("'file_type' is required when using 
'source_bytes' (e.g. '.pdf').")
+
+        self.source_path = source_path
+        self.source_bytes = source_bytes
+        self.file_type = file_type
+        self.parser = parser
+        self.file_extensions = file_extensions
+        self.metadata_fields = metadata_fields
+
+    def execute(self, context: Context) -> list[dict[str, Any]]:
+        if self.source_bytes:
+            documents = self._parse_bytes(self.source_bytes, self.file_type)
+            file_count = 1
+        else:
+            files = self._resolve_files(self.source_path)
+            file_count = len(files)
+            documents = []
+            for file_path in files:
+                ext = self.file_type or file_path.suffix.lower()
+                parsed = self._parse_file(file_path, ext)
+                for doc in parsed:
+                    doc["metadata"]["file_name"] = file_path.name
+                    doc["metadata"]["file_path"] = str(file_path)
+                documents.extend(parsed)
+
+        if self.metadata_fields:
+            for doc in documents:
+                doc["metadata"].update(self.metadata_fields)
+
+        self.log.info("Parsed %d documents from %d files", len(documents), 
file_count)
+        return documents
+
+    def _resolve_files(self, source_path: str) -> list[Path]:
+        path = Path(source_path)
+        if path.is_file():
+            return [path]
+
+        if path.is_dir():
+            candidates = sorted(path.iterdir())
+        else:
+            candidates = [Path(p) for p in sorted(glob.glob(source_path))]
+
+        results = [p for p in candidates if p.is_file()]
+
+        if self.file_extensions:
+            allowed = {ext if ext.startswith(".") else f".{ext}" for ext in 
self.file_extensions}
+            results = [p for p in results if p.suffix.lower() in allowed]
+
+        return results
+
+    def _parse_bytes(self, raw: bytes, file_type: str) -> list[dict[str, Any]]:
+        ext = file_type if file_type.startswith(".") else f".{file_type}"
+        backend = self._resolve_backend(ext)
+
+        if backend in ("pypdf", "python-docx"):
+            with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as tmp:
+                tmp.write(raw)
+                tmp_path = Path(tmp.name)
+            try:
+                return self._parse_file(tmp_path, ext)
+            finally:
+                os.unlink(tmp_path)
+
+        text = raw.decode("utf-8")
+        if backend == "csv":
+            return self._parse_csv_text(text)
+        if backend == "json":
+            return self._parse_json_text(text)
+        return [{"text": text, "metadata": {}}]
+
+    def _parse_file(self, file_path: Path, ext: str) -> list[dict[str, Any]]:
+        backend = self._resolve_backend(ext)
+
+        if backend == "text":
+            return self._parse_text(file_path)
+        if backend == "csv":
+            return self._parse_csv(file_path)
+        if backend == "json":
+            return self._parse_json(file_path)
+        if backend == "pypdf":
+            return self._parse_pdf(file_path)
+        if backend == "python-docx":
+            return self._parse_docx(file_path)
+
+        raise ValueError(f"No parser found for backend '{backend}'.")
+
+    def _resolve_backend(self, ext: str) -> str:
+        if self.parser != "auto":
+            return self.parser
+
+        ext = ext.lower()
+        if ext not in self.EXTENSION_BACKEND_MAP:
+            supported = ", ".join(sorted(self.EXTENSION_BACKEND_MAP.keys()))
+            raise ValueError(
+                f"No parser registered for extension '{ext}'. "
+                f"Supported extensions: {supported}. "
+                f"Set 'parser' explicitly to override auto-detection."
+            )
+        return self.EXTENSION_BACKEND_MAP[ext]
+
+    def _parse_text(self, file_path: Path) -> list[dict[str, Any]]:
+        text = file_path.read_text(encoding="utf-8")
+        return [{"text": text, "metadata": {}}]
+
+    def _parse_csv(self, file_path: Path) -> list[dict[str, Any]]:
+        text = file_path.read_text(encoding="utf-8")
+        return self._parse_csv_text(text)
+
+    def _parse_csv_text(self, text: str) -> list[dict[str, Any]]:
+        reader = csv.DictReader(io.StringIO(text))
+        documents = []
+        for row_idx, row in enumerate(reader):
+            row_text = ", ".join(f"{k}: {v}" for k, v in row.items() if v)
+            documents.append(
+                {
+                    "text": row_text,
+                    "metadata": {"row_index": row_idx},
+                }
+            )
+        return documents
+
+    def _parse_json(self, file_path: Path) -> list[dict[str, Any]]:
+        text = file_path.read_text(encoding="utf-8")
+        return self._parse_json_text(text)
+
+    def _parse_json_text(self, text: str) -> list[dict[str, Any]]:
+        data = json.loads(text)
+        if isinstance(data, list):
+            return [
+                {"text": json.dumps(item, ensure_ascii=False), "metadata": 
{"item_index": idx}}
+                for idx, item in enumerate(data)
+            ]
+        return [{"text": json.dumps(data, ensure_ascii=False), "metadata": {}}]
+
+    def _parse_pdf(self, file_path: Path) -> list[dict[str, Any]]:
+        try:
+            from pypdf import PdfReader
+        except ImportError:
+            raise ImportError(
+                "pypdf is required for PDF parsing. "
+                "Install it with: pip install 
apache-airflow-providers-common-ai[pdf]"
+            )
+
+        reader = PdfReader(str(file_path))
+        documents = []
+        for page_num, page in enumerate(reader.pages):
+            text = page.extract_text() or ""
+            if text.strip():
+                documents.append(
+                    {
+                        "text": text,
+                        "metadata": {"page_number": page_num + 1},
+                    }
+                )
+        return documents
+
+    def _parse_docx(self, file_path: Path) -> list[dict[str, Any]]:
+        try:
+            from docx import Document
+        except ImportError:
+            raise ImportError(
+                "python-docx is required for DOCX parsing. "
+                "Install it with: pip install 
apache-airflow-providers-common-ai[docx]"
+            )
+
+        doc = Document(str(file_path))
+        paragraphs = [p.text for p in doc.paragraphs if p.text.strip()]
+        text = "\n\n".join(paragraphs)
+        return [{"text": text, "metadata": {}}]
diff --git 
a/providers/common/ai/tests/unit/common/ai/operators/test_document_loader.py 
b/providers/common/ai/tests/unit/common/ai/operators/test_document_loader.py
new file mode 100644
index 00000000000..14cf92ba14d
--- /dev/null
+++ b/providers/common/ai/tests/unit/common/ai/operators/test_document_loader.py
@@ -0,0 +1,319 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import json
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+from airflow.providers.common.ai.operators.document_loader import 
DocumentLoaderOperator
+
+
+class TestDocumentLoaderInit:
+    def test_template_fields(self):
+        expected = {"source_path", "source_bytes", "metadata_fields"}
+        assert set(DocumentLoaderOperator.template_fields) == expected
+
+    def test_both_sources_raises(self):
+        with pytest.raises(ValueError, match="not both"):
+            DocumentLoaderOperator(task_id="test", 
source_path="/tmp/file.txt", source_bytes=b"hello")
+
+    def test_neither_source_raises(self):
+        with pytest.raises(ValueError, match="Provide exactly one"):
+            DocumentLoaderOperator(task_id="test")
+
+    def test_source_bytes_without_file_type_raises(self):
+        with pytest.raises(ValueError, match="file_type"):
+            DocumentLoaderOperator(task_id="test", source_bytes=b"hello")
+
+
+class TestTextParser:
+    def test_txt_file(self, tmp_path):
+        f = tmp_path / "doc.txt"
+        f.write_text("Hello world", encoding="utf-8")
+
+        op = DocumentLoaderOperator(task_id="test", source_path=str(f))
+        result = op.execute(context=MagicMock())
+
+        assert len(result) == 1
+        assert result[0]["text"] == "Hello world"
+        assert result[0]["metadata"]["file_name"] == "doc.txt"
+
+    def test_md_file(self, tmp_path):
+        f = tmp_path / "readme.md"
+        f.write_text("# Title\n\nSome content", encoding="utf-8")
+
+        op = DocumentLoaderOperator(task_id="test", source_path=str(f))
+        result = op.execute(context=MagicMock())
+
+        assert len(result) == 1
+        assert "# Title" in result[0]["text"]
+
+
+class TestCsvParser:
+    def test_csv_one_doc_per_row(self, tmp_path):
+        f = tmp_path / "data.csv"
+        f.write_text("name,age\nAlice,30\nBob,25\n", encoding="utf-8")
+
+        op = DocumentLoaderOperator(task_id="test", source_path=str(f))
+        result = op.execute(context=MagicMock())
+
+        assert len(result) == 2
+        assert "Alice" in result[0]["text"]
+        assert "Bob" in result[1]["text"]
+        assert result[0]["metadata"]["row_index"] == 0
+        assert result[1]["metadata"]["row_index"] == 1
+
+    def test_csv_from_bytes(self):
+        raw = b"col1,col2\nval1,val2\n"
+        op = DocumentLoaderOperator(task_id="test", source_bytes=raw, 
file_type=".csv")
+        result = op.execute(context=MagicMock())
+
+        assert len(result) == 1
+        assert "val1" in result[0]["text"]
+
+
+class TestJsonParser:
+    def test_json_array(self, tmp_path):
+        f = tmp_path / "items.json"
+        data = [{"title": "First"}, {"title": "Second"}]
+        f.write_text(json.dumps(data), encoding="utf-8")
+
+        op = DocumentLoaderOperator(task_id="test", source_path=str(f))
+        result = op.execute(context=MagicMock())
+
+        assert len(result) == 2
+        assert result[0]["metadata"]["item_index"] == 0
+
+    def test_json_single_object(self, tmp_path):
+        f = tmp_path / "config.json"
+        f.write_text('{"key": "value"}', encoding="utf-8")
+
+        op = DocumentLoaderOperator(task_id="test", source_path=str(f))
+        result = op.execute(context=MagicMock())
+
+        assert len(result) == 1
+        assert "key" in result[0]["text"]
+
+    def test_json_from_bytes(self):
+        raw = b'[{"a": 1}, {"b": 2}]'
+        op = DocumentLoaderOperator(task_id="test", source_bytes=raw, 
file_type=".json")
+        result = op.execute(context=MagicMock())
+
+        assert len(result) == 2
+
+
+def _make_mock_pypdf_module(mock_reader):
+    """Create a fake pypdf module with a PdfReader that returns mock_reader."""
+    mock_module = MagicMock()
+    mock_module.PdfReader = MagicMock(return_value=mock_reader)
+    return mock_module
+
+
+def _make_mock_docx_module(mock_doc):
+    """Create a fake docx module with a Document that returns mock_doc."""
+    mock_module = MagicMock()
+    mock_module.Document = MagicMock(return_value=mock_doc)
+    return mock_module
+
+
+class TestPdfParser:
+    def test_pdf_parsing(self, tmp_path):
+        mock_page_1 = MagicMock()
+        mock_page_1.extract_text.return_value = "Page one content"
+        mock_page_2 = MagicMock()
+        mock_page_2.extract_text.return_value = "Page two content"
+
+        mock_reader = MagicMock()
+        mock_reader.pages = [mock_page_1, mock_page_2]
+
+        f = tmp_path / "report.pdf"
+        f.write_bytes(b"fake pdf bytes")
+
+        mock_pypdf = _make_mock_pypdf_module(mock_reader)
+        with patch.dict("sys.modules", {"pypdf": mock_pypdf}):
+            op = DocumentLoaderOperator(task_id="test", source_path=str(f))
+            result = op.execute(context=MagicMock())
+
+        assert len(result) == 2
+        assert result[0]["text"] == "Page one content"
+        assert result[0]["metadata"]["page_number"] == 1
+        assert result[1]["metadata"]["page_number"] == 2
+
+    def test_pdf_skips_empty_pages(self, tmp_path):
+        mock_page = MagicMock()
+        mock_page.extract_text.return_value = "   "
+        mock_reader = MagicMock()
+        mock_reader.pages = [mock_page]
+
+        f = tmp_path / "empty.pdf"
+        f.write_bytes(b"fake pdf")
+
+        mock_pypdf = _make_mock_pypdf_module(mock_reader)
+        with patch.dict("sys.modules", {"pypdf": mock_pypdf}):
+            op = DocumentLoaderOperator(task_id="test", source_path=str(f))
+            result = op.execute(context=MagicMock())
+
+        assert len(result) == 0
+
+    def test_pdf_missing_raises_importerror(self, tmp_path):
+        f = tmp_path / "doc.pdf"
+        f.write_bytes(b"fake pdf")
+
+        with patch.dict("sys.modules", {"pypdf": None}):
+            op = DocumentLoaderOperator(task_id="test", source_path=str(f))
+            with pytest.raises(ImportError, 
match="apache-airflow-providers-common-ai\\[pdf\\]"):
+                op.execute(context=MagicMock())
+
+
+class TestDocxParser:
+    def test_docx_parsing(self, tmp_path):
+        mock_para_1 = MagicMock()
+        mock_para_1.text = "First paragraph"
+        mock_para_2 = MagicMock()
+        mock_para_2.text = "Second paragraph"
+        mock_para_empty = MagicMock()
+        mock_para_empty.text = "   "
+
+        mock_doc_obj = MagicMock()
+        mock_doc_obj.paragraphs = [mock_para_1, mock_para_empty, mock_para_2]
+
+        f = tmp_path / "doc.docx"
+        f.write_bytes(b"fake docx")
+
+        mock_docx = _make_mock_docx_module(mock_doc_obj)
+        with patch.dict("sys.modules", {"docx": mock_docx}):
+            op = DocumentLoaderOperator(task_id="test", source_path=str(f))
+            result = op.execute(context=MagicMock())
+
+        assert len(result) == 1
+        assert "First paragraph" in result[0]["text"]
+        assert "Second paragraph" in result[0]["text"]
+
+    def test_docx_missing_raises_importerror(self, tmp_path):
+        f = tmp_path / "doc.docx"
+        f.write_bytes(b"fake docx")
+
+        with patch.dict("sys.modules", {"docx": None}):
+            op = DocumentLoaderOperator(task_id="test", source_path=str(f))
+            with pytest.raises(ImportError, 
match="apache-airflow-providers-common-ai\\[docx\\]"):
+                op.execute(context=MagicMock())
+
+
+class TestFileDiscovery:
+    def test_glob_multiple_files(self, tmp_path):
+        (tmp_path / "a.txt").write_text("file a", encoding="utf-8")
+        (tmp_path / "b.txt").write_text("file b", encoding="utf-8")
+        (tmp_path / "c.md").write_text("file c", encoding="utf-8")
+
+        op = DocumentLoaderOperator(task_id="test", source_path=str(tmp_path / 
"*.txt"))
+        result = op.execute(context=MagicMock())
+
+        assert len(result) == 2
+        texts = {doc["text"] for doc in result}
+        assert texts == {"file a", "file b"}
+
+    def test_directory_source(self, tmp_path):
+        (tmp_path / "x.txt").write_text("hello", encoding="utf-8")
+        (tmp_path / "y.md").write_text("world", encoding="utf-8")
+
+        op = DocumentLoaderOperator(task_id="test", source_path=str(tmp_path))
+        result = op.execute(context=MagicMock())
+
+        assert len(result) == 2
+
+    def test_file_extensions_filter(self, tmp_path):
+        (tmp_path / "keep.txt").write_text("keep me", encoding="utf-8")
+        (tmp_path / "skip.md").write_text("skip me", encoding="utf-8")
+
+        op = DocumentLoaderOperator(task_id="test", source_path=str(tmp_path), 
file_extensions=[".txt"])
+        result = op.execute(context=MagicMock())
+
+        assert len(result) == 1
+        assert result[0]["text"] == "keep me"
+
+    def test_empty_directory_returns_empty(self, tmp_path):
+        op = DocumentLoaderOperator(task_id="test", source_path=str(tmp_path))
+        result = op.execute(context=MagicMock())
+
+        assert result == []
+
+    def test_unknown_extension_raises(self, tmp_path):
+        f = tmp_path / "data.xyz"
+        f.write_text("some data", encoding="utf-8")
+
+        op = DocumentLoaderOperator(task_id="test", source_path=str(f))
+        with pytest.raises(ValueError, match="No parser registered"):
+            op.execute(context=MagicMock())
+
+
+class TestOutputShape:
+    def test_every_item_has_text_and_metadata(self, tmp_path):
+        (tmp_path / "a.txt").write_text("doc a", encoding="utf-8")
+        (tmp_path / "b.txt").write_text("doc b", encoding="utf-8")
+
+        op = DocumentLoaderOperator(task_id="test", source_path=str(tmp_path / 
"*.txt"))
+        result = op.execute(context=MagicMock())
+
+        for doc in result:
+            assert "text" in doc
+            assert "metadata" in doc
+            assert isinstance(doc["text"], str)
+            assert isinstance(doc["metadata"], dict)
+
+    def test_metadata_fields_appended(self, tmp_path):
+        f = tmp_path / "doc.txt"
+        f.write_text("content", encoding="utf-8")
+
+        op = DocumentLoaderOperator(
+            task_id="test",
+            source_path=str(f),
+            metadata_fields={"source": "test_suite", "version": 2},
+        )
+        result = op.execute(context=MagicMock())
+
+        assert result[0]["metadata"]["source"] == "test_suite"
+        assert result[0]["metadata"]["version"] == 2
+
+    def test_file_metadata_included(self, tmp_path):
+        f = tmp_path / "report.txt"
+        f.write_text("content", encoding="utf-8")
+
+        op = DocumentLoaderOperator(task_id="test", source_path=str(f))
+        result = op.execute(context=MagicMock())
+
+        assert result[0]["metadata"]["file_name"] == "report.txt"
+        assert "file_path" in result[0]["metadata"]
+
+    def test_source_bytes_no_file_metadata(self):
+        op = DocumentLoaderOperator(task_id="test", source_bytes=b"hello", 
file_type=".txt")
+        result = op.execute(context=MagicMock())
+
+        assert len(result) == 1
+        assert result[0]["text"] == "hello"
+        assert "file_name" not in result[0]["metadata"]
+
+    def test_explicit_parser_override(self, tmp_path):
+        f = tmp_path / "data.log"
+        f.write_text("log line", encoding="utf-8")
+
+        op = DocumentLoaderOperator(task_id="test", source_path=str(f), 
parser="text")
+        result = op.execute(context=MagicMock())
+
+        assert len(result) == 1
+        assert result[0]["text"] == "log line"


Reply via email to