This is an automated email from the ASF dual-hosted git repository.

jerryjing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c08827bd0f [python] Introduce Full Text Search and Tantivy index in 
Python (#7560)
c08827bd0f is described below

commit c08827bd0facf5b28c4aff4e704921c3ec250f74
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Apr 1 18:14:14 2026 +0800

    [python] Introduce Full Text Search and Tantivy index in Python (#7560)
---
 .github/workflows/paimon-python-checks.yml         |  25 ++
 docs/content/append-table/global-index.md          |  21 ++
 docs/content/pypaimon/cli.md                       |  42 +++
 paimon-python/dev/run_mixed_tests.sh               |  54 +++-
 paimon-python/pypaimon/cli/cli_table.py            | 114 ++++++++
 paimon-python/pypaimon/globalindex/__init__.py     |   4 +
 .../pypaimon/globalindex/full_text_search.py       |  53 ++++
 .../pypaimon/globalindex/global_index_reader.py    |   4 +
 .../globalindex/offset_global_index_reader.py      | 102 +++++++
 .../pypaimon/globalindex/{ => tantivy}/__init__.py |  28 +-
 .../tantivy_full_text_global_index_reader.py       | 294 +++++++++++++++++++++
 .../pypaimon/globalindex/vector_search_result.py   |  31 +++
 .../pypaimon/read/scanner/file_scanner.py          |   8 +-
 paimon-python/pypaimon/read/table_scan.py          |   4 +
 paimon-python/pypaimon/table/file_store_table.py   |   4 +
 .../pypaimon/table/format/format_table.py          |   3 +
 .../pypaimon/table/iceberg/iceberg_table.py        |   3 +
 .../pypaimon/table/object/object_table.py          |   5 +
 .../pypaimon/table/source/full_text_read.py        | 120 +++++++++
 .../pypaimon/table/source/full_text_scan.py        |  93 +++++++
 .../table/source/full_text_search_builder.py       | 100 +++++++
 .../table/source/full_text_search_split.py         |  53 ++++
 paimon-python/pypaimon/table/table.py              |   5 +
 .../pypaimon/tests/e2e/java_py_read_write_test.py  |  68 +++++
 paimon-tantivy/paimon-tantivy-index/pom.xml        | 103 ++++++++
 .../paimon/tantivy/index/JavaPyTantivyE2ETest.java | 226 ++++++++++++++++
 26 files changed, 1539 insertions(+), 28 deletions(-)

diff --git a/.github/workflows/paimon-python-checks.yml 
b/.github/workflows/paimon-python-checks.yml
index 722f37fa05..a797257e06 100755
--- a/.github/workflows/paimon-python-checks.yml
+++ b/.github/workflows/paimon-python-checks.yml
@@ -79,6 +79,20 @@ jobs:
           java -version
           mvn -version
 
+      - name: Install Rust toolchain
+        uses: dtolnay/rust-toolchain@stable
+
+      - name: Build Tantivy native library
+        run: |
+          cd paimon-tantivy/paimon-tantivy-jni/rust
+          cargo build --release
+
+      - name: Copy Tantivy native library to resources
+        run: |
+          
RESOURCE_DIR=paimon-tantivy/paimon-tantivy-jni/src/main/resources/native/linux-amd64
+          mkdir -p ${RESOURCE_DIR}
+          cp 
paimon-tantivy/paimon-tantivy-jni/rust/target/release/libtantivy_jni.so 
${RESOURCE_DIR}/
+
       - name: Verify Python version
         run: python --version
 
@@ -118,6 +132,17 @@ jobs:
             fi
           fi
           df -h
+
+      - name: Build and install tantivy-py from source
+        if: matrix.python-version != '3.6.15'
+        shell: bash
+        run: |
+          pip install maturin
+          git clone -b support_directory 
https://github.com/JingsongLi/tantivy-py.git /tmp/tantivy-py
+          cd /tmp/tantivy-py
+          maturin build --release
+          pip install target/wheels/tantivy-*.whl
+
       - name: Run lint-python.sh
         shell: bash
         run: |
diff --git a/docs/content/append-table/global-index.md 
b/docs/content/append-table/global-index.md
index 172c5bedcb..d88a3348a6 100644
--- a/docs/content/append-table/global-index.md
+++ b/docs/content/append-table/global-index.md
@@ -211,4 +211,25 @@ try (RecordReader<InternalRow> reader = 
readBuilder.newRead().createReader(plan)
 ```
 {{< /tab >}}
 
+{{< tab "Python SDK" >}}
+```python
+table = catalog.get_table('db.my_table')
+
+# Step 1: Build full-text search
+builder = table.new_full_text_search_builder()
+builder.with_text_column('content')
+builder.with_query_text('paimon lake format')
+builder.with_limit(10)
+result = builder.execute_local()
+
+# Step 2: Read matching rows using the search result
+read_builder = table.new_read_builder()
+scan = read_builder.new_scan().with_global_index_result(result)
+plan = scan.plan()
+table_read = read_builder.new_read()
+pa_table = table_read.to_arrow(plan.splits())
+print(pa_table)
+```
+{{< /tab >}}
+
 {{< /tabs >}}
diff --git a/docs/content/pypaimon/cli.md b/docs/content/pypaimon/cli.md
index 22bdbd3c18..a328fc4744 100644
--- a/docs/content/pypaimon/cli.md
+++ b/docs/content/pypaimon/cli.md
@@ -362,6 +362,48 @@ Table 'mydb.old_name' renamed to 'mydb.new_name' 
successfully.
 
 **Note:** Both filesystem and REST catalogs support table rename. For 
filesystem catalogs, the rename is performed by renaming the underlying table 
directory.
 
+### Table Full-Text Search
+
+Perform full-text search on a Paimon table with a Tantivy full-text index and 
display matching rows.
+
+```shell
+paimon table full-text-search mydb.articles --column content --query "paimon 
lake"
+```
+
+**Options:**
+
+- `--column, -c`: Text column to search on - **Required**
+- `--query, -q`: Query text to search for - **Required**
+- `--limit, -l`: Maximum number of results to return (default: 10)
+- `--select, -s`: Select specific columns to display (comma-separated)
+- `--format, -f`: Output format: `table` (default) or `json`
+
+**Examples:**
+
+```shell
+# Basic full-text search
+paimon table full-text-search mydb.articles -c content -q "paimon lake"
+
+# Search with limit
+paimon table full-text-search mydb.articles -c content -q "streaming data" -l 
20
+
+# Search with column projection
+paimon table full-text-search mydb.articles -c content -q "paimon" -s 
"id,title,content"
+
+# Output as JSON
+paimon table full-text-search mydb.articles -c content -q "paimon" -f json
+```
+
+Output:
+```
+ id                                            content
+  0  Apache Paimon is a streaming data lake platform
+  2  Paimon supports real-time data ingestion and...
+  4  Data lake platforms like Paimon handle large-...
+```
+
+**Note:** The table must have a Tantivy full-text index built on the target 
column. See [Global Index]({{< ref "append-table/global-index" >}}) for how to 
create full-text indexes.
+
 ### Table Drop
 
 Drop a table from the catalog. This will permanently delete the table and all 
its data.
diff --git a/paimon-python/dev/run_mixed_tests.sh 
b/paimon-python/dev/run_mixed_tests.sh
index a7e5d5dd76..839a750780 100755
--- a/paimon-python/dev/run_mixed_tests.sh
+++ b/paimon-python/dev/run_mixed_tests.sh
@@ -138,9 +138,6 @@ run_java_read_test() {
 
     cd "$PROJECT_ROOT"
 
-    PYTHON_VERSION=$(python -c "import sys; 
print(f'{sys.version_info.major}.{sys.version_info.minor}')" 2>/dev/null || 
echo "unknown")
-    echo "Detected Python version: $PYTHON_VERSION"
-
     # Run Java test for Parquet/Orc/Avro format in paimon-core
     echo "Running Maven test for JavaPyE2ETest.testReadPkTable (Java Read 
Parquet/Orc/Avro)..."
     echo "Note: Maven may download dependencies on first run, this may take a 
while..."
@@ -171,6 +168,7 @@ run_java_read_test() {
         return 1
     fi
 }
+
 run_pk_dv_test() {
     echo -e "${YELLOW}=== Step 5: Running Primary Key & Deletion Vector Test 
(testPKDeletionVectorWriteRead) ===${NC}"
 
@@ -244,6 +242,30 @@ run_compressed_text_test() {
     fi
 }
 
+# Function to run Tantivy full-text index test (Java write index, Python read 
and search)
+run_tantivy_fulltext_test() {
+    echo -e "${YELLOW}=== Step 8: Running Tantivy Full-Text Index Test (Java 
Write, Python Read) ===${NC}"
+
+    cd "$PROJECT_ROOT"
+
+    echo "Running Maven test for 
JavaPyTantivyE2ETest.testTantivyFullTextIndexWrite..."
+    if mvn test 
-Dtest=org.apache.paimon.tantivy.index.JavaPyTantivyE2ETest#testTantivyFullTextIndexWrite
 -pl paimon-tantivy/paimon-tantivy-index -q -Drun.e2e.tests=true; then
+        echo -e "${GREEN}✓ Java test completed successfully${NC}"
+    else
+        echo -e "${RED}✗ Java test failed${NC}"
+        return 1
+    fi
+    cd "$PAIMON_PYTHON_DIR"
+    echo "Running Python test for 
JavaPyReadWriteTest.test_read_tantivy_full_text_index..."
+    if python -m pytest 
java_py_read_write_test.py::JavaPyReadWriteTest::test_read_tantivy_full_text_index
 -v; then
+        echo -e "${GREEN}✓ Python test completed successfully${NC}"
+        return 0
+    else
+        echo -e "${RED}✗ Python test failed${NC}"
+        return 1
+    fi
+}
+
 # Main execution
 main() {
     local java_write_result=0
@@ -253,6 +275,12 @@ main() {
     local pk_dv_result=0
     local btree_index_result=0
     local compressed_text_result=0
+    local tantivy_fulltext_result=0
+
+    # Detect Python version
+    PYTHON_VERSION=$(python -c "import sys; 
print(f'{sys.version_info.major}.{sys.version_info.minor}')" 2>/dev/null || 
echo "unknown")
+    PYTHON_MINOR=$(python -c "import sys; print(sys.version_info.minor)" 
2>/dev/null || echo "0")
+    echo "Detected Python version: $PYTHON_VERSION"
 
     echo -e "${YELLOW}Starting mixed language test execution...${NC}"
     echo ""
@@ -311,6 +339,18 @@ main() {
 
     echo ""
 
+    # Run Tantivy full-text index test (requires Python >= 3.10)
+    if [[ "$PYTHON_MINOR" -ge 10 ]]; then
+        if ! run_tantivy_fulltext_test; then
+            tantivy_fulltext_result=1
+        fi
+    else
+        echo -e "${YELLOW}⏭ Skipping Tantivy Full-Text Index Test (requires 
Python >= 3.10, current: $PYTHON_VERSION)${NC}"
+        tantivy_fulltext_result=0
+    fi
+
+    echo ""
+
     echo -e "${YELLOW}=== Test Results Summary ===${NC}"
 
     if [[ $java_write_result -eq 0 ]]; then
@@ -355,12 +395,18 @@ main() {
         echo -e "${RED}✗ Compressed Text Test (Java Write, Python Read): 
FAILED${NC}"
     fi
 
+    if [[ $tantivy_fulltext_result -eq 0 ]]; then
+        echo -e "${GREEN}✓ Tantivy Full-Text Index Test (Java Write, Python 
Read): PASSED${NC}"
+    else
+        echo -e "${RED}✗ Tantivy Full-Text Index Test (Java Write, Python 
Read): FAILED${NC}"
+    fi
+
     echo ""
 
     # Clean up warehouse directory after all tests
     cleanup_warehouse
 
-    if [[ $java_write_result -eq 0 && $python_read_result -eq 0 && 
$python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 && 
$btree_index_result -eq 0 && $compressed_text_result -eq 0 ]]; then
+    if [[ $java_write_result -eq 0 && $python_read_result -eq 0 && 
$python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 && 
$btree_index_result -eq 0 && $compressed_text_result -eq 0 && 
$tantivy_fulltext_result -eq 0 ]]; then
         echo -e "${GREEN}🎉 All tests passed! Java-Python interoperability 
verified.${NC}"
         return 0
     else
diff --git a/paimon-python/pypaimon/cli/cli_table.py 
b/paimon-python/pypaimon/cli/cli_table.py
index 6a7c383bb9..0f829f4553 100644
--- a/paimon-python/pypaimon/cli/cli_table.py
+++ b/paimon-python/pypaimon/cli/cli_table.py
@@ -147,6 +147,83 @@ def cmd_table_read(args):
         print(df.to_string(index=False))
 
 
+def cmd_table_full_text_search(args):
+    """
+    Execute the 'table full-text-search' command.
+
+    Performs full-text search on a Paimon table and displays matching rows.
+
+    Args:
+        args: Parsed command line arguments.
+    """
+    from pypaimon.cli.cli import load_catalog_config, create_catalog
+
+    config_path = args.config
+    config = load_catalog_config(config_path)
+    catalog = create_catalog(config)
+
+    table_identifier = args.table
+    parts = table_identifier.split('.')
+    if len(parts) != 2:
+        print(f"Error: Invalid table identifier '{table_identifier}'. "
+              f"Expected format: 'database.table'", file=sys.stderr)
+        sys.exit(1)
+
+    database_name, table_name = parts
+
+    try:
+        table = catalog.get_table(f"{database_name}.{table_name}")
+    except Exception as e:
+        print(f"Error: Failed to get table '{table_identifier}': {e}", 
file=sys.stderr)
+        sys.exit(1)
+
+    # Build full-text search
+    text_column = args.column
+    query_text = args.query
+    limit = args.limit
+
+    try:
+        builder = table.new_full_text_search_builder()
+        builder.with_text_column(text_column)
+        builder.with_query_text(query_text)
+        builder.with_limit(limit)
+        result = builder.execute_local()
+    except Exception as e:
+        print(f"Error: Full-text search failed: {e}", file=sys.stderr)
+        sys.exit(1)
+
+    if result.is_empty():
+        print("No matching rows found.")
+        return
+
+    # Read matching rows using global index result
+    read_builder = table.new_read_builder()
+
+    select_columns = args.select
+    if select_columns:
+        projection = [col.strip() for col in select_columns.split(',')]
+        available_fields = set(field.name for field in 
table.table_schema.fields)
+        invalid_columns = [col for col in projection if col not in 
available_fields]
+        if invalid_columns:
+            print(f"Error: Column(s) {invalid_columns} do not exist in table 
'{table_identifier}'.",
+                  file=sys.stderr)
+            sys.exit(1)
+        read_builder = read_builder.with_projection(projection)
+
+    scan = read_builder.new_scan().with_global_index_result(result)
+    plan = scan.plan()
+    splits = plan.splits()
+    read = read_builder.new_read()
+    df = read.to_pandas(splits)
+
+    output_format = getattr(args, 'format', 'table')
+    if output_format == 'json':
+        import json
+        print(json.dumps(df.to_dict(orient='records'), ensure_ascii=False))
+    else:
+        print(df.to_string(index=False))
+
+
 def cmd_table_get(args):
     """
     Execute the 'table get' command.
@@ -773,6 +850,43 @@ def add_table_subcommands(table_parser):
 
     # table rename command
     rename_parser = table_subparsers.add_parser('rename', help='Rename a 
table')
+
+    # table full-text-search command
+    fts_parser = table_subparsers.add_parser('full-text-search', 
help='Full-text search on a table')
+    fts_parser.add_argument(
+        'table',
+        help='Table identifier in format: database.table'
+    )
+    fts_parser.add_argument(
+        '--column', '-c',
+        required=True,
+        help='Text column to search on'
+    )
+    fts_parser.add_argument(
+        '--query', '-q',
+        required=True,
+        help='Query text to search for'
+    )
+    fts_parser.add_argument(
+        '--limit', '-l',
+        type=int,
+        default=10,
+        help='Maximum number of results to return (default: 10)'
+    )
+    fts_parser.add_argument(
+        '--select', '-s',
+        type=str,
+        default=None,
+        help='Select specific columns to display (comma-separated, e.g., 
"id,name,content")'
+    )
+    fts_parser.add_argument(
+        '--format', '-f',
+        type=str,
+        choices=['table', 'json'],
+        default='table',
+        help='Output format: table (default) or json'
+    )
+    fts_parser.set_defaults(func=cmd_table_full_text_search)
     rename_parser.add_argument(
         'table',
         help='Source table identifier in format: database.table'
diff --git a/paimon-python/pypaimon/globalindex/__init__.py 
b/paimon-python/pypaimon/globalindex/__init__.py
index e96d2d6a80..39be69490a 100644
--- a/paimon-python/pypaimon/globalindex/__init__.py
+++ b/paimon-python/pypaimon/globalindex/__init__.py
@@ -19,6 +19,7 @@
 from pypaimon.globalindex.global_index_result import GlobalIndexResult
 from pypaimon.globalindex.global_index_reader import GlobalIndexReader, 
FieldRef
 from pypaimon.globalindex.vector_search import VectorSearch
+from pypaimon.globalindex.full_text_search import FullTextSearch
 from pypaimon.globalindex.vector_search_result import (
     ScoredGlobalIndexResult,
     DictBasedScoredIndexResult,
@@ -27,6 +28,7 @@ from pypaimon.globalindex.vector_search_result import (
 from pypaimon.globalindex.global_index_meta import GlobalIndexMeta, 
GlobalIndexIOMeta
 from pypaimon.globalindex.global_index_evaluator import GlobalIndexEvaluator
 from pypaimon.globalindex.global_index_scanner import GlobalIndexScanner
+from pypaimon.globalindex.offset_global_index_reader import 
OffsetGlobalIndexReader
 from pypaimon.utils.range import Range
 
 __all__ = [
@@ -34,6 +36,7 @@ __all__ = [
     'GlobalIndexReader',
     'FieldRef',
     'VectorSearch',
+    'FullTextSearch',
     'ScoredGlobalIndexResult',
     'DictBasedScoredIndexResult',
     'ScoreGetter',
@@ -41,5 +44,6 @@ __all__ = [
     'GlobalIndexIOMeta',
     'GlobalIndexEvaluator',
     'GlobalIndexScanner',
+    'OffsetGlobalIndexReader',
     'Range',
 ]
diff --git a/paimon-python/pypaimon/globalindex/full_text_search.py 
b/paimon-python/pypaimon/globalindex/full_text_search.py
new file mode 100644
index 0000000000..463941a317
--- /dev/null
+++ b/paimon-python/pypaimon/globalindex/full_text_search.py
@@ -0,0 +1,53 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""FullTextSearch for performing full-text search on a text column."""
+
+from dataclasses import dataclass
+from typing import Optional
+
+
+@dataclass
+class FullTextSearch:
+    """
+    FullTextSearch to perform full-text search on a text column.
+
+    Attributes:
+        query_text: The query text to search
+        limit: Maximum number of results to return
+        field_name: Name of the text field to search
+    """
+
+    query_text: str
+    limit: int
+    field_name: str
+
+    def __post_init__(self):
+        if not self.query_text:
+            raise ValueError("Query text cannot be None or empty")
+        if self.limit <= 0:
+            raise ValueError(f"Limit must be positive, got: {self.limit}")
+        if not self.field_name:
+            raise ValueError("Field name cannot be null or empty")
+
+    def visit(self, visitor: 'GlobalIndexReader') -> 
Optional['ScoredGlobalIndexResult']:
+        """Visit the global index reader with this full-text search."""
+        return visitor.visit_full_text_search(self)
+
+    def __repr__(self) -> str:
+        return f"FullTextSearch(field={self.field_name}, 
query='{self.query_text}', limit={self.limit})"
diff --git a/paimon-python/pypaimon/globalindex/global_index_reader.py 
b/paimon-python/pypaimon/globalindex/global_index_reader.py
index cb9760ed2d..e5ba29e19b 100644
--- a/paimon-python/pypaimon/globalindex/global_index_reader.py
+++ b/paimon-python/pypaimon/globalindex/global_index_reader.py
@@ -42,6 +42,10 @@ class GlobalIndexReader(ABC):
         """Visit a vector search query."""
         raise NotImplementedError("Vector search not supported by this reader")
 
+    def visit_full_text_search(self, full_text_search: 'FullTextSearch') -> 
Optional['GlobalIndexResult']:
+        """Visit a full-text search query."""
+        raise NotImplementedError("Full-text search not supported by this 
reader")
+
     def visit_equal(self, field_ref: FieldRef, literal: object) -> 
Optional['GlobalIndexResult']:
         """Visit an equality predicate."""
         return None
diff --git a/paimon-python/pypaimon/globalindex/offset_global_index_reader.py 
b/paimon-python/pypaimon/globalindex/offset_global_index_reader.py
new file mode 100644
index 0000000000..acd9a6be95
--- /dev/null
+++ b/paimon-python/pypaimon/globalindex/offset_global_index_reader.py
@@ -0,0 +1,102 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""A GlobalIndexReader that wraps another reader and applies an offset to all 
row IDs."""
+
+from typing import List, Optional
+
+from pypaimon.globalindex.global_index_reader import GlobalIndexReader, 
FieldRef
+from pypaimon.globalindex.global_index_result import GlobalIndexResult
+
+
+class OffsetGlobalIndexReader(GlobalIndexReader):
+    """
+    A GlobalIndexReader that wraps another reader and applies an offset
+    to all row IDs in the results.
+    """
+
+    def __init__(self, wrapped: GlobalIndexReader, offset: int, to: int):
+        self._wrapped = wrapped
+        self._offset = offset
+        self._to = to
+
+    def visit_vector_search(self, vector_search) -> 
Optional[GlobalIndexResult]:
+        result = self._wrapped.visit_vector_search(
+            vector_search.offset_range(self._offset, self._to))
+        if result is not None:
+            return result.offset(self._offset)
+        return None
+
+    def visit_full_text_search(self, full_text_search) -> 
Optional[GlobalIndexResult]:
+        result = self._wrapped.visit_full_text_search(full_text_search)
+        if result is not None:
+            return result.offset(self._offset)
+        return None
+
+    def visit_equal(self, field_ref: FieldRef, literal: object) -> 
Optional[GlobalIndexResult]:
+        return self._apply_offset(self._wrapped.visit_equal(field_ref, 
literal))
+
+    def visit_not_equal(self, field_ref: FieldRef, literal: object) -> 
Optional[GlobalIndexResult]:
+        return self._apply_offset(self._wrapped.visit_not_equal(field_ref, 
literal))
+
+    def visit_less_than(self, field_ref: FieldRef, literal: object) -> 
Optional[GlobalIndexResult]:
+        return self._apply_offset(self._wrapped.visit_less_than(field_ref, 
literal))
+
+    def visit_less_or_equal(self, field_ref: FieldRef, literal: object) -> 
Optional[GlobalIndexResult]:
+        return self._apply_offset(self._wrapped.visit_less_or_equal(field_ref, 
literal))
+
+    def visit_greater_than(self, field_ref: FieldRef, literal: object) -> 
Optional[GlobalIndexResult]:
+        return self._apply_offset(self._wrapped.visit_greater_than(field_ref, 
literal))
+
+    def visit_greater_or_equal(self, field_ref: FieldRef, literal: object) -> 
Optional[GlobalIndexResult]:
+        return 
self._apply_offset(self._wrapped.visit_greater_or_equal(field_ref, literal))
+
+    def visit_is_null(self, field_ref: FieldRef) -> 
Optional[GlobalIndexResult]:
+        return self._apply_offset(self._wrapped.visit_is_null(field_ref))
+
+    def visit_is_not_null(self, field_ref: FieldRef) -> 
Optional[GlobalIndexResult]:
+        return self._apply_offset(self._wrapped.visit_is_not_null(field_ref))
+
+    def visit_in(self, field_ref: FieldRef, literals: List[object]) -> 
Optional[GlobalIndexResult]:
+        return self._apply_offset(self._wrapped.visit_in(field_ref, literals))
+
+    def visit_not_in(self, field_ref: FieldRef, literals: List[object]) -> 
Optional[GlobalIndexResult]:
+        return self._apply_offset(self._wrapped.visit_not_in(field_ref, 
literals))
+
+    def visit_starts_with(self, field_ref: FieldRef, literal: object) -> 
Optional[GlobalIndexResult]:
+        return self._apply_offset(self._wrapped.visit_starts_with(field_ref, 
literal))
+
+    def visit_ends_with(self, field_ref: FieldRef, literal: object) -> 
Optional[GlobalIndexResult]:
+        return self._apply_offset(self._wrapped.visit_ends_with(field_ref, 
literal))
+
+    def visit_contains(self, field_ref: FieldRef, literal: object) -> 
Optional[GlobalIndexResult]:
+        return self._apply_offset(self._wrapped.visit_contains(field_ref, 
literal))
+
+    def visit_like(self, field_ref: FieldRef, literal: object) -> 
Optional[GlobalIndexResult]:
+        return self._apply_offset(self._wrapped.visit_like(field_ref, literal))
+
+    def visit_between(self, field_ref: FieldRef, min_v: object, max_v: object) 
-> Optional[GlobalIndexResult]:
+        return self._apply_offset(self._wrapped.visit_between(field_ref, 
min_v, max_v))
+
+    def _apply_offset(self, result: Optional[GlobalIndexResult]) -> 
Optional[GlobalIndexResult]:
+        if result is not None:
+            return result.offset(self._offset)
+        return None
+
+    def close(self) -> None:
+        self._wrapped.close()
diff --git a/paimon-python/pypaimon/globalindex/__init__.py 
b/paimon-python/pypaimon/globalindex/tantivy/__init__.py
similarity index 51%
copy from paimon-python/pypaimon/globalindex/__init__.py
copy to paimon-python/pypaimon/globalindex/tantivy/__init__.py
index e96d2d6a80..91054c7984 100644
--- a/paimon-python/pypaimon/globalindex/__init__.py
+++ b/paimon-python/pypaimon/globalindex/tantivy/__init__.py
@@ -16,30 +16,12 @@
 # limitations under the License.
 
################################################################################
 
-from pypaimon.globalindex.global_index_result import GlobalIndexResult
-from pypaimon.globalindex.global_index_reader import GlobalIndexReader, 
FieldRef
-from pypaimon.globalindex.vector_search import VectorSearch
-from pypaimon.globalindex.vector_search_result import (
-    ScoredGlobalIndexResult,
-    DictBasedScoredIndexResult,
-    ScoreGetter,
+from pypaimon.globalindex.tantivy.tantivy_full_text_global_index_reader import 
(
+    TantivyFullTextGlobalIndexReader,
+    TANTIVY_FULLTEXT_IDENTIFIER,
 )
-from pypaimon.globalindex.global_index_meta import GlobalIndexMeta, 
GlobalIndexIOMeta
-from pypaimon.globalindex.global_index_evaluator import GlobalIndexEvaluator
-from pypaimon.globalindex.global_index_scanner import GlobalIndexScanner
-from pypaimon.utils.range import Range
 
 __all__ = [
-    'GlobalIndexResult',
-    'GlobalIndexReader',
-    'FieldRef',
-    'VectorSearch',
-    'ScoredGlobalIndexResult',
-    'DictBasedScoredIndexResult',
-    'ScoreGetter',
-    'GlobalIndexMeta',
-    'GlobalIndexIOMeta',
-    'GlobalIndexEvaluator',
-    'GlobalIndexScanner',
-    'Range',
+    'TantivyFullTextGlobalIndexReader',
+    'TANTIVY_FULLTEXT_IDENTIFIER',
 ]
diff --git 
a/paimon-python/pypaimon/globalindex/tantivy/tantivy_full_text_global_index_reader.py
 
b/paimon-python/pypaimon/globalindex/tantivy/tantivy_full_text_global_index_reader.py
new file mode 100644
index 0000000000..c8b1711b59
--- /dev/null
+++ 
b/paimon-python/pypaimon/globalindex/tantivy/tantivy_full_text_global_index_reader.py
@@ -0,0 +1,294 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Full-text global index reader using Tantivy.
+
+Reads the archive header to get file layout, then opens a Tantivy searcher
+backed by a stream-based Directory. No temp files are created on disk.
+"""
+
+import os
+import struct
+import threading
+from typing import Dict, List, Optional
+
+from pypaimon.globalindex.global_index_reader import GlobalIndexReader, 
FieldRef
+from pypaimon.globalindex.global_index_result import GlobalIndexResult
+from pypaimon.globalindex.vector_search_result import (
+    ScoredGlobalIndexResult,
+    DictBasedScoredIndexResult,
+)
+from pypaimon.globalindex.global_index_meta import GlobalIndexIOMeta
+
+TANTIVY_FULLTEXT_IDENTIFIER = "tantivy-fulltext"
+
+
+class StreamDirectory:
+    """Directory backed by a seekable input stream.
+
+    Implements the Python Directory protocol expected by tantivy-py's
+    support_directory branch. Reads file data on demand via seek+read
+    on the underlying stream, avoiding loading the entire archive into memory.
+
+    Thread-safe: all seek+read pairs are serialized by a lock.
+    """
+
+    def __init__(self, stream, file_names: List[str],
+                 file_offsets: List[int], file_lengths: List[int]):
+        self._stream = stream
+        self._lock = threading.Lock()
+        self._layout: Dict[str, tuple] = {}
+        for name, offset, length in zip(file_names, file_offsets, 
file_lengths):
+            self._layout[name] = (offset, length)
+        # In-memory store for files written by tantivy (e.g. lock files, meta)
+        self._mem_files: Dict[str, bytes] = {}
+        # In-memory writers: writer_id -> (path, buffer)
+        self._writers: Dict[int, tuple] = {}
+        self._next_writer_id = 0
+
+    def get_file_handle(self, path: str) -> bytes:
+        if path in self._mem_files:
+            return self._mem_files[path]
+        if path not in self._layout:
+            raise FileNotFoundError(path)
+        offset, length = self._layout[path]
+        with self._lock:
+            self._stream.seek(offset)
+            return self._read_fully(length)
+
+    def exists(self, path: str) -> bool:
+        return path in self._layout or path in self._mem_files
+
+    def atomic_read(self, path: str) -> bytes:
+        if path in self._mem_files:
+            return self._mem_files[path]
+        if path not in self._layout:
+            raise FileNotFoundError(path)
+        offset, length = self._layout[path]
+        with self._lock:
+            self._stream.seek(offset)
+            return self._read_fully(length)
+
+    def atomic_write(self, path: str, data: bytes) -> None:
+        self._mem_files[path] = data
+
+    def open_write(self, path: str) -> int:
+        writer_id = self._next_writer_id
+        self._next_writer_id += 1
+        self._writers[writer_id] = (path, bytearray())
+        return writer_id
+
+    def delete(self, path: str) -> None:
+        self._mem_files.pop(path, None)
+
+    def write(self, writer_id: int, data: bytes) -> None:
+        self._writers[writer_id][1].extend(data)
+
+    def flush(self, writer_id: int) -> None:
+        pass
+
+    def terminate(self, writer_id: int) -> None:
+        path, buf = self._writers.pop(writer_id)
+        self._mem_files[path] = bytes(buf)
+
+    def sync_directory(self) -> None:
+        pass
+
+    def close(self, writer_id: int) -> None:
+        """Called when a writer is dropped (e.g., lock files dropped without 
terminate)."""
+        self._writers.pop(writer_id, None)
+
+    def _read_fully(self, length: int) -> bytes:
+        buf = bytearray()
+        remaining = length
+        while remaining > 0:
+            chunk = self._stream.read(remaining)
+            if not chunk:
+                raise IOError("Unexpected end of stream")
+            buf.extend(chunk)
+            remaining -= len(chunk)
+        return bytes(buf)
+
+
+class TantivyFullTextGlobalIndexReader(GlobalIndexReader):
+    """Full-text global index reader using Tantivy.
+
+    Reads the archive header to get file layout, then opens a Tantivy searcher
+    backed by stream-based callbacks. No temp files are created.
+
+    Archive format (big-endian):
+    [fileCount(4)] then for each file: [nameLen(4)] [name(utf8)] [dataLen(8)] 
[data]
+    """
+
+    def __init__(self, file_io, index_path: str, io_metas: 
List[GlobalIndexIOMeta]):
+        assert len(io_metas) == 1, "Expected exactly one index file per shard"
+        self._file_io = file_io
+        self._index_path = index_path
+        self._io_meta = io_metas[0]
+        self._searcher = None
+        self._index = None
+        self._stream = None
+
+    def visit_full_text_search(self, full_text_search) -> 
Optional[ScoredGlobalIndexResult]:
+        self._ensure_loaded()
+
+        query_text = full_text_search.query_text
+        limit = full_text_search.limit
+
+        searcher = self._searcher
+        query = self._index.parse_query(query_text, ["text"])
+        results = searcher.search(query, limit)
+
+        id_to_scores: Dict[int, float] = {}
+        for score, doc_address in results.hits:
+            doc = searcher.doc(doc_address)
+            row_id = doc["row_id"][0]
+            id_to_scores[row_id] = score
+
+        return DictBasedScoredIndexResult(id_to_scores)
+
+    def _ensure_loaded(self):
+        if self._searcher is not None:
+            return
+
+        import tantivy
+
+        # Open the archive stream
+        file_path = os.path.join(self._index_path, self._io_meta.file_name)
+        stream = self._file_io.new_input_stream(file_path)
+        try:
+            # Parse archive header to get file layout
+            file_names, file_offsets, file_lengths = 
self._parse_archive_header(stream)
+            directory = StreamDirectory(stream, file_names, file_offsets, 
file_lengths)
+
+            # Open tantivy index from stream-backed directory
+            schema_builder = tantivy.SchemaBuilder()
+            schema_builder.add_unsigned_field("row_id", stored=True, 
indexed=True, fast=True)
+            schema_builder.add_text_field("text", stored=False)
+            schema = schema_builder.build()
+
+            self._index = tantivy.Index(schema, directory=directory)
+            self._index.reload()
+            self._searcher = self._index.searcher()
+            self._stream = stream
+        except Exception:
+            stream.close()
+            raise
+
+    @staticmethod
+    def _parse_archive_header(stream):
+        """Parse the archive header to extract file names, offsets, and 
lengths.
+
+        Only reads the header; does not load file data into memory.
+        Computes the absolute byte offset of each file's data within the 
stream.
+        """
+        file_count = _read_int(stream)
+        file_names = []
+        file_offsets = []
+        file_lengths = []
+
+        for _ in range(file_count):
+            name_len = _read_int(stream)
+            name_bytes = _read_fully(stream, name_len)
+            file_names.append(name_bytes.decode('utf-8'))
+
+            data_len = _read_long(stream)
+            data_offset = stream.tell()
+            file_offsets.append(data_offset)
+            file_lengths.append(data_len)
+
+            # Skip past the file data
+            stream.seek(data_offset + data_len)
+
+        return file_names, file_offsets, file_lengths
+
+    # =================== unsupported =====================
+
+    def visit_equal(self, field_ref: FieldRef, literal: object) -> 
Optional[GlobalIndexResult]:
+        return None
+
+    def visit_not_equal(self, field_ref: FieldRef, literal: object) -> 
Optional[GlobalIndexResult]:
+        return None
+
+    def visit_less_than(self, field_ref: FieldRef, literal: object) -> 
Optional[GlobalIndexResult]:
+        return None
+
+    def visit_less_or_equal(self, field_ref: FieldRef, literal: object) -> 
Optional[GlobalIndexResult]:
+        return None
+
+    def visit_greater_than(self, field_ref: FieldRef, literal: object) -> 
Optional[GlobalIndexResult]:
+        return None
+
+    def visit_greater_or_equal(self, field_ref: FieldRef, literal: object) -> 
Optional[GlobalIndexResult]:
+        return None
+
+    def visit_is_null(self, field_ref: FieldRef) -> 
Optional[GlobalIndexResult]:
+        return None
+
+    def visit_is_not_null(self, field_ref: FieldRef) -> 
Optional[GlobalIndexResult]:
+        return None
+
+    def visit_in(self, field_ref: FieldRef, literals: List[object]) -> 
Optional[GlobalIndexResult]:
+        return None
+
+    def visit_not_in(self, field_ref: FieldRef, literals: List[object]) -> 
Optional[GlobalIndexResult]:
+        return None
+
+    def visit_starts_with(self, field_ref: FieldRef, literal: object) -> 
Optional[GlobalIndexResult]:
+        return None
+
+    def visit_ends_with(self, field_ref: FieldRef, literal: object) -> 
Optional[GlobalIndexResult]:
+        return None
+
+    def visit_contains(self, field_ref: FieldRef, literal: object) -> 
Optional[GlobalIndexResult]:
+        return None
+
+    def visit_like(self, field_ref: FieldRef, literal: object) -> 
Optional[GlobalIndexResult]:
+        return None
+
+    def visit_between(self, field_ref: FieldRef, min_v: object, max_v: object) 
-> Optional[GlobalIndexResult]:
+        return None
+
+    def close(self) -> None:
+        self._searcher = None
+        self._index = None
+        if self._stream is not None:
+            self._stream.close()
+            self._stream = None
+
+
+def _read_int(stream) -> int:
+    data = _read_fully(stream, 4)
+    return struct.unpack('>i', data)[0]
+
+
+def _read_long(stream) -> int:
+    data = _read_fully(stream, 8)
+    return struct.unpack('>q', data)[0]
+
+
+def _read_fully(stream, length: int) -> bytes:
+    buf = bytearray()
+    remaining = length
+    while remaining > 0:
+        chunk = stream.read(remaining)
+        if not chunk:
+            raise IOError("Unexpected end of stream")
+        buf.extend(chunk)
+        remaining -= len(chunk)
+    return bytes(buf)
diff --git a/paimon-python/pypaimon/globalindex/vector_search_result.py 
b/paimon-python/pypaimon/globalindex/vector_search_result.py
index 60afe331a9..a93247e690 100644
--- a/paimon-python/pypaimon/globalindex/vector_search_result.py
+++ b/paimon-python/pypaimon/globalindex/vector_search_result.py
@@ -78,6 +78,37 @@ class ScoredGlobalIndexResult(GlobalIndexResult):
         
         return SimpleScoredGlobalIndexResult(result_or, combined_score_getter)
 
+    def top_k(self, k: int) -> 'ScoredGlobalIndexResult':
+        """Return the top-k results by score."""
+        import heapq
+
+        row_ids = self.results()
+        if row_ids.cardinality() <= k:
+            return self
+
+        score_getter_fn = self.score_getter()
+        # Use a min-heap of size k to find top-k scores in O(n log k)
+        heap = []
+        for row_id in row_ids:
+            score = score_getter_fn(row_id)
+            if score is None:
+                score = 0.0
+            if len(heap) < k:
+                heapq.heappush(heap, (score, row_id))
+            elif score > heap[0][0]:
+                heapq.heapreplace(heap, (score, row_id))
+
+        top_k_bitmap = RoaringBitmap64()
+        for _, row_id in heap:
+            top_k_bitmap.add(row_id)
+
+        return SimpleScoredGlobalIndexResult(top_k_bitmap, score_getter_fn)
+
+    @staticmethod
+    def create_empty() -> 'ScoredGlobalIndexResult':
+        """Returns an empty ScoredGlobalIndexResult."""
+        return SimpleScoredGlobalIndexResult(RoaringBitmap64(), lambda row_id: 
0.0)
+
     @staticmethod
     def create(
         supplier: Callable[[], RoaringBitmap64],
diff --git a/paimon-python/pypaimon/read/scanner/file_scanner.py 
b/paimon-python/pypaimon/read/scanner/file_scanner.py
index aea709669e..b0293dac41 100755
--- a/paimon-python/pypaimon/read/scanner/file_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/file_scanner.py
@@ -199,6 +199,7 @@ class FileScanner:
         self.only_read_real_buckets = options.bucket() == 
BucketMode.POSTPONE_BUCKET.value
         self.data_evolution = options.data_evolution_enabled()
         self.deletion_vectors_enabled = options.deletion_vectors_enabled()
+        self._global_index_result = None
 
         def schema_fields_func(schema_id: int):
             return self.table.schema_manager.get_schema(schema_id).fields
@@ -262,7 +263,8 @@ class FileScanner:
     def _create_data_evolution_split_generator(self):
         row_ranges = None
         score_getter = None
-        global_index_result = self._eval_global_index()
+        global_index_result = self._global_index_result if 
self._global_index_result is not None \
+            else self._eval_global_index()
         if global_index_result is not None:
             row_ranges = global_index_result.results().to_range_list()
             if isinstance(global_index_result, ScoredGlobalIndexResult):
@@ -347,6 +349,10 @@ class FileScanner:
         self.end_pos_of_this_subtask = end_pos
         return self
 
+    def with_global_index_result(self, result) -> 'FileScanner':
+        self._global_index_result = result
+        return self
+
     def _apply_push_down_limit(self, splits: List[DataSplit]) -> 
List[DataSplit]:
         if self.limit is None:
             return splits
diff --git a/paimon-python/pypaimon/read/table_scan.py 
b/paimon-python/pypaimon/read/table_scan.py
index 416ef1717d..9579f875cb 100755
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -131,3 +131,7 @@ class TableScan:
     def with_slice(self, start_pos, end_pos) -> 'TableScan':
         self.file_scanner.with_slice(start_pos, end_pos)
         return self
+
+    def with_global_index_result(self, result) -> 'TableScan':
+        self.file_scanner.with_global_index_result(result)
+        return self
diff --git a/paimon-python/pypaimon/table/file_store_table.py 
b/paimon-python/pypaimon/table/file_store_table.py
index 9115c8e885..61a1455f36 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -369,6 +369,10 @@ class FileStoreTable(Table):
     def new_stream_write_builder(self) -> StreamWriteBuilder:
         return StreamWriteBuilder(self)
 
+    def new_full_text_search_builder(self) -> 'FullTextSearchBuilder':
+        from pypaimon.table.source.full_text_search_builder import 
FullTextSearchBuilderImpl
+        return FullTextSearchBuilderImpl(self)
+
     def create_row_key_extractor(self) -> RowKeyExtractor:
         bucket_mode = self.bucket_mode()
         if bucket_mode == BucketMode.HASH_FIXED:
diff --git a/paimon-python/pypaimon/table/format/format_table.py 
b/paimon-python/pypaimon/table/format/format_table.py
index 8c7e82415e..41767e1da8 100644
--- a/paimon-python/pypaimon/table/format/format_table.py
+++ b/paimon-python/pypaimon/table/format/format_table.py
@@ -102,3 +102,6 @@ class FormatTable(Table):
 
     def new_stream_write_builder(self):
         raise NotImplementedError("Format table does not support stream 
write.")
+
+    def new_full_text_search_builder(self):
+        raise NotImplementedError("Format table does not support full text 
search.")
diff --git a/paimon-python/pypaimon/table/iceberg/iceberg_table.py 
b/paimon-python/pypaimon/table/iceberg/iceberg_table.py
index 137b7c8622..8e0ddfa07b 100644
--- a/paimon-python/pypaimon/table/iceberg/iceberg_table.py
+++ b/paimon-python/pypaimon/table/iceberg/iceberg_table.py
@@ -107,3 +107,6 @@ class IcebergTable(Table):
         raise NotImplementedError(
             "IcebergTable does not support stream write operation in 
paimon-python yet."
         )
+
+    def new_full_text_search_builder(self):
+        raise NotImplementedError("IcebergTable does not support full text 
search.")
diff --git a/paimon-python/pypaimon/table/object/object_table.py 
b/paimon-python/pypaimon/table/object/object_table.py
index f3dc44b761..df6d6f2f84 100644
--- a/paimon-python/pypaimon/table/object/object_table.py
+++ b/paimon-python/pypaimon/table/object/object_table.py
@@ -101,3 +101,8 @@ class ObjectTable(Table):
         raise NotImplementedError(
             "ObjectTable is read-only and does not support stream write."
         )
+
+    def new_full_text_search_builder(self):
+        raise NotImplementedError(
+            "ObjectTable is read-only and does not support full text search."
+        )
diff --git a/paimon-python/pypaimon/table/source/full_text_read.py 
b/paimon-python/pypaimon/table/source/full_text_read.py
new file mode 100644
index 0000000000..9be1c2e4ca
--- /dev/null
+++ b/paimon-python/pypaimon/table/source/full_text_read.py
@@ -0,0 +1,120 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Full-text read to read index files."""
+
+from abc import ABC, abstractmethod
+from typing import List, Optional
+
+from pypaimon.globalindex.full_text_search import FullTextSearch
+from pypaimon.globalindex.global_index_meta import GlobalIndexIOMeta
+from pypaimon.globalindex.global_index_result import GlobalIndexResult
+from pypaimon.globalindex.offset_global_index_reader import 
OffsetGlobalIndexReader
+from pypaimon.globalindex.vector_search_result import ScoredGlobalIndexResult
+from pypaimon.table.source.full_text_search_split import FullTextSearchSplit
+from pypaimon.table.source.full_text_scan import FullTextScanPlan
+
+
+class FullTextRead(ABC):
+    """Full-text read to read index files."""
+
+    def read_plan(self, plan: FullTextScanPlan) -> GlobalIndexResult:
+        return self.read(plan.splits())
+
+    @abstractmethod
+    def read(self, splits: List[FullTextSearchSplit]) -> GlobalIndexResult:
+        pass
+
+
+class FullTextReadImpl(FullTextRead):
+    """Implementation for FullTextRead."""
+
+    def __init__(
+        self,
+        table: 'FileStoreTable',
+        limit: int,
+        text_column: 'DataField',
+        query_text: str
+    ):
+        self._table = table
+        self._limit = limit
+        self._text_column = text_column
+        self._query_text = query_text
+
+    def read(self, splits: List[FullTextSearchSplit]) -> GlobalIndexResult:
+        if not splits:
+            return GlobalIndexResult.create_empty()
+
+        result = ScoredGlobalIndexResult.create_empty()
+        for split in splits:
+            split_result = self._eval(
+                split.row_range_start, split.row_range_end,
+                split.full_text_index_files
+            )
+            if split_result is not None:
+                result = result.or_(split_result)
+
+        return result.top_k(self._limit)
+
+    def _eval(self, row_range_start, row_range_end, full_text_index_files
+              ) -> Optional[ScoredGlobalIndexResult]:
+        index_io_meta_list = []
+        for index_file in full_text_index_files:
+            meta = index_file.global_index_meta
+            assert meta is not None
+            index_io_meta_list.append(
+                GlobalIndexIOMeta(
+                    file_name=index_file.file_name,
+                    file_size=index_file.file_size,
+                    metadata=meta.index_meta
+                )
+            )
+
+        index_type = full_text_index_files[0].index_type
+        index_path = 
self._table.path_factory().global_index_path_factory().index_path()
+        file_io = self._table.file_io
+
+        reader = _create_full_text_reader(
+            index_type, file_io, index_path,
+            index_io_meta_list
+        )
+
+        full_text_search = FullTextSearch(
+            query_text=self._query_text,
+            limit=self._limit,
+            field_name=self._text_column.name
+        )
+
+        try:
+            offset_reader = OffsetGlobalIndexReader(reader, row_range_start, 
row_range_end)
+            return offset_reader.visit_full_text_search(full_text_search)
+        finally:
+            reader.close()
+
+
+def _create_full_text_reader(index_type, file_io, index_path, 
index_io_meta_list):
+    """Create a global index reader for full-text search."""
+    from pypaimon.globalindex.tantivy.tantivy_full_text_global_index_reader 
import (
+        TANTIVY_FULLTEXT_IDENTIFIER,
+    )
+    if index_type == TANTIVY_FULLTEXT_IDENTIFIER:
+        from 
pypaimon.globalindex.tantivy.tantivy_full_text_global_index_reader import (
+            TantivyFullTextGlobalIndexReader,
+        )
+        return TantivyFullTextGlobalIndexReader(file_io, index_path, 
index_io_meta_list)
+    raise ValueError(f"Unsupported full-text index type: '{index_type}'")
diff --git a/paimon-python/pypaimon/table/source/full_text_scan.py 
b/paimon-python/pypaimon/table/source/full_text_scan.py
new file mode 100644
index 0000000000..06f72405b3
--- /dev/null
+++ b/paimon-python/pypaimon/table/source/full_text_scan.py
@@ -0,0 +1,93 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Full-text scan to scan index files."""
+
+from abc import ABC, abstractmethod
+from collections import defaultdict
+from typing import List
+
+from pypaimon.table.source.full_text_search_split import FullTextSearchSplit
+from pypaimon.utils.range import Range
+
+
+class FullTextScanPlan:
+    """Plan of full-text scan."""
+
+    def __init__(self, splits: List[FullTextSearchSplit]):
+        self._splits = splits
+
+    def splits(self) -> List[FullTextSearchSplit]:
+        return self._splits
+
+
+class FullTextScan(ABC):
+    """Full-text scan to scan index files."""
+
+    @abstractmethod
+    def scan(self) -> FullTextScanPlan:
+        pass
+
+
+class FullTextScanImpl(FullTextScan):
+    """Implementation for FullTextScan."""
+
+    def __init__(self, table: 'FileStoreTable', text_column: 'DataField'):
+        self._table = table
+        self._text_column = text_column
+
+    def scan(self) -> FullTextScanPlan:
+        from pypaimon.index.index_file_handler import IndexFileHandler
+        from pypaimon.snapshot.snapshot_manager import SnapshotManager
+
+        text_column = self._text_column
+        snapshot = SnapshotManager(self._table).get_latest_snapshot()
+
+        from pypaimon.snapshot.time_travel_util import TimeTravelUtil
+        from pypaimon.common.options.options import Options
+        travel_snapshot = TimeTravelUtil.try_travel_to_snapshot(
+            Options(self._table.table_schema.options),
+            self._table.tag_manager()
+        )
+        if travel_snapshot is not None:
+            snapshot = travel_snapshot
+
+        index_file_handler = IndexFileHandler(table=self._table)
+
+        def index_file_filter(entry):
+            global_index_meta = entry.index_file.global_index_meta
+            if global_index_meta is None:
+                return False
+            return text_column.id == global_index_meta.index_field_id
+
+        entries = index_file_handler.scan(snapshot, index_file_filter)
+        all_index_files = [entry.index_file for entry in entries]
+
+        # Group full-text index files by (rowRangeStart, rowRangeEnd)
+        by_range = defaultdict(list)
+        for index_file in all_index_files:
+            meta = index_file.global_index_meta
+            assert meta is not None
+            range_key = Range(meta.row_range_start, meta.row_range_end)
+            by_range[range_key].append(index_file)
+
+        splits = []
+        for range_key, files in by_range.items():
+            splits.append(FullTextSearchSplit(range_key.from_, range_key.to, 
files))
+
+        return FullTextScanPlan(splits)
diff --git a/paimon-python/pypaimon/table/source/full_text_search_builder.py 
b/paimon-python/pypaimon/table/source/full_text_search_builder.py
new file mode 100644
index 0000000000..70bdb28a46
--- /dev/null
+++ b/paimon-python/pypaimon/table/source/full_text_search_builder.py
@@ -0,0 +1,100 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Builder to build full-text search."""
+
+from abc import ABC, abstractmethod
+from typing import Optional
+
+from pypaimon.globalindex.global_index_result import GlobalIndexResult
+from pypaimon.table.source.full_text_read import FullTextRead, FullTextReadImpl
+from pypaimon.table.source.full_text_scan import FullTextScan, FullTextScanImpl
+
+
+class FullTextSearchBuilder(ABC):
+    """Builder to build full-text search."""
+
+    @abstractmethod
+    def with_limit(self, limit: int) -> 'FullTextSearchBuilder':
+        """The top k results to return."""
+        pass
+
+    @abstractmethod
+    def with_text_column(self, name: str) -> 'FullTextSearchBuilder':
+        """The text column to search."""
+        pass
+
+    @abstractmethod
+    def with_query_text(self, query_text: str) -> 'FullTextSearchBuilder':
+        """The query text to search."""
+        pass
+
+    @abstractmethod
+    def new_full_text_scan(self) -> FullTextScan:
+        """Create full-text scan to scan index files."""
+        pass
+
+    @abstractmethod
+    def new_full_text_read(self) -> FullTextRead:
+        """Create full-text read to read index files."""
+        pass
+
+    def execute_local(self) -> GlobalIndexResult:
+        """Execute full-text index search in local."""
+        return 
self.new_full_text_read().read_plan(self.new_full_text_scan().scan())
+
+
+class FullTextSearchBuilderImpl(FullTextSearchBuilder):
+    """Implementation for FullTextSearchBuilder."""
+
+    def __init__(self, table: 'FileStoreTable'):
+        self._table = table
+        self._limit: int = 0
+        self._text_column: Optional['DataField'] = None
+        self._query_text: Optional[str] = None
+
+    def with_limit(self, limit: int) -> 'FullTextSearchBuilder':
+        self._limit = limit
+        return self
+
+    def with_text_column(self, name: str) -> 'FullTextSearchBuilder':
+        field_dict = {f.name: f for f in self._table.fields}
+        if name not in field_dict:
+            raise ValueError(f"Text column '{name}' not found in table schema")
+        self._text_column = field_dict[name]
+        return self
+
+    def with_query_text(self, query_text: str) -> 'FullTextSearchBuilder':
+        self._query_text = query_text
+        return self
+
+    def new_full_text_scan(self) -> FullTextScan:
+        if self._text_column is None:
+            raise ValueError("Text column must be set via with_text_column()")
+        return FullTextScanImpl(self._table, self._text_column)
+
+    def new_full_text_read(self) -> FullTextRead:
+        if self._limit <= 0:
+            raise ValueError("Limit must be positive, set via with_limit()")
+        if self._text_column is None:
+            raise ValueError("Text column must be set via with_text_column()")
+        if self._query_text is None:
+            raise ValueError("Query text must be set via with_query_text()")
+        return FullTextReadImpl(
+            self._table, self._limit, self._text_column, self._query_text
+        )
diff --git a/paimon-python/pypaimon/table/source/full_text_search_split.py 
b/paimon-python/pypaimon/table/source/full_text_search_split.py
new file mode 100644
index 0000000000..b3ce272aa1
--- /dev/null
+++ b/paimon-python/pypaimon/table/source/full_text_search_split.py
@@ -0,0 +1,53 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Split of full-text search."""
+
+from dataclasses import dataclass
+from typing import List
+
+from pypaimon.index.index_file_meta import IndexFileMeta
+
+
+@dataclass
+class FullTextSearchSplit:
+    """Split of full-text search."""
+
+    row_range_start: int
+    row_range_end: int
+    full_text_index_files: List[IndexFileMeta]
+
+    def __eq__(self, other):
+        if not isinstance(other, FullTextSearchSplit):
+            return False
+        return (
+            self.row_range_start == other.row_range_start
+            and self.row_range_end == other.row_range_end
+            and self.full_text_index_files == other.full_text_index_files
+        )
+
+    def __hash__(self):
+        return hash((self.row_range_start, self.row_range_end, 
tuple(self.full_text_index_files)))
+
+    def __repr__(self):
+        return (
+            f"FullTextSearchSplit("
+            f"row_range_start={self.row_range_start}, "
+            f"row_range_end={self.row_range_end}, "
+            f"full_text_index_files={self.full_text_index_files})"
+        )
diff --git a/paimon-python/pypaimon/table/table.py 
b/paimon-python/pypaimon/table/table.py
index dfae232810..68ad3a9d13 100644
--- a/paimon-python/pypaimon/table/table.py
+++ b/paimon-python/pypaimon/table/table.py
@@ -20,6 +20,7 @@ from abc import ABC, abstractmethod
 
 from pypaimon.read.read_builder import ReadBuilder
 from pypaimon.read.stream_read_builder import StreamReadBuilder
+from pypaimon.table.source.full_text_search_builder import 
FullTextSearchBuilder
 from pypaimon.write.write_builder import BatchWriteBuilder, StreamWriteBuilder
 
 
@@ -41,3 +42,7 @@ class Table(ABC):
     @abstractmethod
     def new_stream_write_builder(self) -> StreamWriteBuilder:
         """Returns a builder for building stream table write and table 
commit."""
+
+    @abstractmethod
+    def new_full_text_search_builder(self) -> FullTextSearchBuilder:
+        """Returns a new full-text search builder."""
diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py 
b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
index 4d888c3cb4..61cd48ca67 100644
--- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
+++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
@@ -496,3 +496,71 @@ class JavaPyReadWriteTest(unittest.TestCase):
             table_read.to_arrow(splits)
         self.assertIn(file_format, str(ctx.exception))
         self.assertIn("not yet supported", str(ctx.exception))
+
+    def test_read_tantivy_full_text_index(self):
+        """Test reading a Tantivy full-text index built by Java."""
+        table = self.catalog.get_table('default.test_tantivy_fulltext')
+
+        # Use FullTextSearchBuilder to search
+        builder = table.new_full_text_search_builder()
+        builder.with_text_column('content')
+        builder.with_query_text('paimon')
+        builder.with_limit(10)
+
+        result = builder.execute_local()
+        # Row 0, 2, 4 mention "paimon"
+        row_ids = sorted(list(result.results()))
+        print(f"Tantivy full-text search for 'paimon': row_ids={row_ids}")
+        self.assertEqual(row_ids, [0, 2, 4])
+
+        # Read matching rows using withGlobalIndexResult
+        read_builder = table.new_read_builder()
+        scan = read_builder.new_scan().with_global_index_result(result)
+        plan = scan.plan()
+        table_read = read_builder.new_read()
+        pa_table = table_read.to_arrow(plan.splits())
+        pa_table = table_sort_by(pa_table, 'id')
+        self.assertEqual(pa_table.num_rows, 3)
+        ids = pa_table.column('id').to_pylist()
+        self.assertEqual(ids, [0, 2, 4])
+
+        # Search for "tantivy" - only row 1
+        builder2 = table.new_full_text_search_builder()
+        builder2.with_text_column('content')
+        builder2.with_query_text('tantivy')
+        builder2.with_limit(10)
+
+        result2 = builder2.execute_local()
+        row_ids2 = sorted(list(result2.results()))
+        print(f"Tantivy full-text search for 'tantivy': row_ids={row_ids2}")
+        self.assertEqual(row_ids2, [1])
+
+        # Read matching rows
+        read_builder2 = table.new_read_builder()
+        scan2 = read_builder2.new_scan().with_global_index_result(result2)
+        plan2 = scan2.plan()
+        pa_table2 = read_builder2.new_read().to_arrow(plan2.splits())
+        self.assertEqual(pa_table2.num_rows, 1)
+        self.assertEqual(pa_table2.column('id').to_pylist(), [1])
+
+        # Search for "full-text search" - rows 1, 3
+        builder3 = table.new_full_text_search_builder()
+        builder3.with_text_column('content')
+        builder3.with_query_text('full-text search')
+        builder3.with_limit(10)
+
+        result3 = builder3.execute_local()
+        row_ids3 = sorted(list(result3.results()))
+        print(f"Tantivy full-text search for 'full-text search': 
row_ids={row_ids3}")
+        self.assertIn(1, row_ids3)
+        self.assertIn(3, row_ids3)
+
+        # Read matching rows
+        read_builder3 = table.new_read_builder()
+        scan3 = read_builder3.new_scan().with_global_index_result(result3)
+        plan3 = scan3.plan()
+        pa_table3 = read_builder3.new_read().to_arrow(plan3.splits())
+        pa_table3 = table_sort_by(pa_table3, 'id')
+        ids3 = pa_table3.column('id').to_pylist()
+        self.assertIn(1, ids3)
+        self.assertIn(3, ids3)
diff --git a/paimon-tantivy/paimon-tantivy-index/pom.xml 
b/paimon-tantivy/paimon-tantivy-index/pom.xml
index b4cd732546..e416d9ef5c 100644
--- a/paimon-tantivy/paimon-tantivy-index/pom.xml
+++ b/paimon-tantivy/paimon-tantivy-index/pom.xml
@@ -60,6 +60,22 @@ under the License.
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-core</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.paimon</groupId>
             <artifactId>paimon-format</artifactId>
@@ -73,6 +89,93 @@ under the License.
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs-client</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.avro</groupId>
+                    <artifactId>avro</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.avro</groupId>
+                    <artifactId>avro</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>jdk.tools</groupId>
+                    <artifactId>jdk.tools</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.protobuf</groupId>
+                    <artifactId>protobuf-java</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-core</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.avro</groupId>
+                    <artifactId>avro</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.protobuf</groupId>
+                    <artifactId>protobuf-java</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>ch.qos.reload4j</groupId>
+                    <artifactId>reload4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-reload4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>jdk.tools</groupId>
+                    <artifactId>jdk.tools</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/paimon-tantivy/paimon-tantivy-index/src/test/java/org/apache/paimon/tantivy/index/JavaPyTantivyE2ETest.java
 
b/paimon-tantivy/paimon-tantivy-index/src/test/java/org/apache/paimon/tantivy/index/JavaPyTantivyE2ETest.java
new file mode 100644
index 0000000000..fa46e898c3
--- /dev/null
+++ 
b/paimon-tantivy/paimon-tantivy-index/src/test/java/org/apache/paimon/tantivy/index/JavaPyTantivyE2ETest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.tantivy.index;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.fs.FileIOFinder;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.globalindex.GlobalIndexBuilderUtils;
+import org.apache.paimon.globalindex.GlobalIndexSingletonWriter;
+import org.apache.paimon.globalindex.ResultEntry;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.AppendOnlyFileStoreTable;
+import org.apache.paimon.table.CatalogEnvironment;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.tantivy.NativeLoader;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Range;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
+
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.paimon.CoreOptions.DATA_EVOLUTION_ENABLED;
+import static org.apache.paimon.CoreOptions.GLOBAL_INDEX_ENABLED;
+import static org.apache.paimon.CoreOptions.PATH;
+import static org.apache.paimon.CoreOptions.ROW_TRACKING_ENABLED;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+/**
+ * Mixed language E2E test for Java Tantivy full-text index building and 
Python reading.
+ *
+ * <p>Java writes data and builds a Tantivy full-text index, then Python reads 
and searches it.
+ */
+public class JavaPyTantivyE2ETest {
+
+    @BeforeAll
+    public static void checkNativeLibrary() {
+        assumeTrue(isNativeAvailable(), "Tantivy native library not available, 
skipping tests");
+    }
+
+    private static boolean isNativeAvailable() {
+        try {
+            NativeLoader.loadJni();
+            return true;
+        } catch (Throwable t) {
+            return false;
+        }
+    }
+
+    java.nio.file.Path tempDir =
+            
Paths.get("../../paimon-python/pypaimon/tests/e2e").toAbsolutePath();
+
+    protected Path warehouse;
+
+    @BeforeEach
+    public void before() throws Exception {
+        if (!Files.exists(tempDir.resolve("warehouse"))) {
+            Files.createDirectories(tempDir.resolve("warehouse"));
+        }
+        warehouse = new Path("file://" + tempDir.resolve("warehouse"));
+    }
+
+    @Test
+    @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
+    public void testTantivyFullTextIndexWrite() throws Exception {
+        String tableName = "test_tantivy_fulltext";
+        Path tablePath = new Path(warehouse.toString() + "/default.db/" + 
tableName);
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.STRING()},
+                        new String[] {"id", "content"});
+
+        Options options = new Options();
+        options.set(PATH, tablePath.toString());
+        options.set(ROW_TRACKING_ENABLED, true);
+        options.set(DATA_EVOLUTION_ENABLED, true);
+        options.set(GLOBAL_INDEX_ENABLED, true);
+
+        TableSchema tableSchema =
+                SchemaUtils.forceCommit(
+                        new SchemaManager(LocalFileIO.create(), tablePath),
+                        new Schema(
+                                rowType.getFields(),
+                                Collections.emptyList(),
+                                Collections.emptyList(),
+                                options.toMap(),
+                                ""));
+
+        AppendOnlyFileStoreTable table =
+                new AppendOnlyFileStoreTable(
+                        FileIOFinder.find(tablePath),
+                        tablePath,
+                        tableSchema,
+                        CatalogEnvironment.empty());
+
+        // Write data
+        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+        try (BatchTableWrite write = writeBuilder.newWrite();
+                BatchTableCommit commit = writeBuilder.newCommit()) {
+            write.write(
+                    GenericRow.of(
+                            0,
+                            BinaryString.fromString(
+                                    "Apache Paimon is a streaming data lake 
platform")));
+            write.write(
+                    GenericRow.of(
+                            1,
+                            BinaryString.fromString(
+                                    "Tantivy is a full-text search engine 
written in Rust")));
+            write.write(
+                    GenericRow.of(
+                            2,
+                            BinaryString.fromString(
+                                    "Paimon supports real-time data ingestion 
and analytics")));
+            write.write(
+                    GenericRow.of(
+                            3,
+                            BinaryString.fromString(
+                                    "Full-text search enables efficient text 
retrieval")));
+            write.write(
+                    GenericRow.of(
+                            4,
+                            BinaryString.fromString(
+                                    "Data lake platforms like Paimon handle 
large-scale data")));
+            commit.commit(write.prepareCommit());
+        }
+
+        // Build tantivy full-text index on the "content" column
+        DataField contentField = table.rowType().getField("content");
+        Options indexOptions = table.coreOptions().toConfiguration();
+
+        GlobalIndexSingletonWriter writer =
+                (GlobalIndexSingletonWriter)
+                        GlobalIndexBuilderUtils.createIndexWriter(
+                                table,
+                                TantivyFullTextGlobalIndexerFactory.IDENTIFIER,
+                                contentField,
+                                indexOptions);
+
+        // Write the same text data to the index
+        writer.write(BinaryString.fromString("Apache Paimon is a streaming 
data lake platform"));
+        writer.write(
+                BinaryString.fromString("Tantivy is a full-text search engine 
written in Rust"));
+        writer.write(
+                BinaryString.fromString("Paimon supports real-time data 
ingestion and analytics"));
+        writer.write(BinaryString.fromString("Full-text search enables 
efficient text retrieval"));
+        writer.write(
+                BinaryString.fromString("Data lake platforms like Paimon 
handle large-scale data"));
+
+        List<ResultEntry> entries = writer.finish();
+        assertThat(entries).hasSize(1);
+        assertThat(entries.get(0).rowCount()).isEqualTo(5);
+
+        Range rowRange = new Range(0, 4);
+        List<IndexFileMeta> indexFiles =
+                GlobalIndexBuilderUtils.toIndexFileMetas(
+                        table.fileIO(),
+                        table.store().pathFactory().globalIndexFileFactory(),
+                        table.coreOptions(),
+                        rowRange,
+                        contentField.id(),
+                        TantivyFullTextGlobalIndexerFactory.IDENTIFIER,
+                        entries);
+
+        // Commit the index
+        DataIncrement dataIncrement = DataIncrement.indexIncrement(indexFiles);
+        CommitMessage message =
+                new CommitMessageImpl(
+                        BinaryRow.EMPTY_ROW,
+                        0,
+                        null,
+                        dataIncrement,
+                        CompactIncrement.emptyIncrement());
+        try (BatchTableCommit commit = writeBuilder.newCommit()) {
+            commit.commit(Collections.singletonList(message));
+        }
+
+        // Verify the index was committed
+        List<org.apache.paimon.manifest.IndexManifestEntry> indexEntries =
+                
table.indexManifestFileReader().read(table.latestSnapshot().get().indexManifest());
+        assertThat(indexEntries).hasSize(1);
+        assertThat(indexEntries.get(0).indexFile().indexType())
+                .isEqualTo(TantivyFullTextGlobalIndexerFactory.IDENTIFIER);
+    }
+}

Reply via email to