This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new c4b0677  test: introduce python integration tests (#341)
c4b0677 is described below

commit c4b0677db3a267ff21c8e080ef38e5bf31c4b068
Author: Keith Lee <[email protected]>
AuthorDate: Tue Feb 17 10:18:14 2026 +0000

    test: introduce python integration tests (#341)
---
 .github/workflows/ci.yml                     |  42 ++
 .gitignore                                   |   3 +
 bindings/python/GENERATED_README.md          |   1 +
 bindings/python/pyproject.toml               |   5 +
 bindings/python/test/conftest.py             | 137 ++++++
 bindings/python/test/test_admin.py           | 301 ++++++++++++
 bindings/python/test/test_kv_table.py        | 428 +++++++++++++++++
 bindings/python/test/test_log_table.py       | 675 +++++++++++++++++++++++++++
 website/docs/developer-guide/contributing.md |  14 +-
 9 files changed, 1602 insertions(+), 4 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index e9048fb..20bee87 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -137,3 +137,45 @@ jobs:
         env:
           RUST_LOG: DEBUG
           RUST_BACKTRACE: full
+
+  python-integration-test:
+    timeout-minutes: 60
+    runs-on: ubuntu-latest
+    strategy:
+      matrix:
+        python: ["3.9", "3.10", "3.11", "3.12"]
+    steps:
+      - uses: actions/checkout@v4
+
+      - name: Set up Python
+        uses: actions/setup-python@v5
+        with:
+          python-version: ${{ matrix.python }}
+
+      - name: Install uv
+        uses: astral-sh/setup-uv@v4
+
+      - name: Install protoc
+        run: sudo apt-get update && sudo apt-get install -y protobuf-compiler
+
+      - name: Rust Cache
+        uses: actions/cache@v4
+        with:
+          path: |
+            ~/.cargo/registry
+            ~/.cargo/git
+            target
+          key: python-test-${{ runner.os }}-${{ matrix.python }}-${{ 
hashFiles('**/Cargo.lock') }}
+
+      - name: Build Python bindings
+        working-directory: bindings/python
+        run: |
+          uv sync --extra dev
+          uv run maturin develop
+
+      - name: Run Python integration tests
+        working-directory: bindings/python
+        run: uv run pytest test/ -v
+        env:
+          RUST_LOG: DEBUG
+          RUST_BACKTRACE: full
diff --git a/.gitignore b/.gitignore
index 476f84e..f251aab 100644
--- a/.gitignore
+++ b/.gitignore
@@ -28,9 +28,12 @@ __pycache__/
 *.egg-info/
 dist/
 build/
+.venv/
+uv.lock
 
 # CPP
 *CMakeFiles/
+.cache/
 
 # Website (Docusaurus)
 website/node_modules
diff --git a/bindings/python/GENERATED_README.md 
b/bindings/python/GENERATED_README.md
new file mode 100644
index 0000000..0a011ba
--- /dev/null
+++ b/bindings/python/GENERATED_README.md
@@ -0,0 +1 @@
+This readme can be automatically generated by generate_readme.py.
\ No newline at end of file
diff --git a/bindings/python/pyproject.toml b/bindings/python/pyproject.toml
index 0be25a0..f5b0b68 100644
--- a/bindings/python/pyproject.toml
+++ b/bindings/python/pyproject.toml
@@ -52,6 +52,7 @@ dev = [
     "pytest-asyncio>=0.25.3",
     "ruff>=0.9.10",
     "maturin>=1.8.2",
+    "testcontainers>=4.0.0",
 ]
 docs = [
     "pdoc>=15.0.4",
@@ -90,6 +91,10 @@ docstring-code-format = true
 [tool.ruff.lint.isort]
 known-first-party = ["fluss"]
 
+[tool.pytest.ini_options]
+asyncio_mode = "auto"
+asyncio_default_fixture_loop_scope = "session"
+
 [tool.mypy]
 python_version = "3.9"
 warn_return_any = true
diff --git a/bindings/python/test/conftest.py b/bindings/python/test/conftest.py
new file mode 100644
index 0000000..fbd7396
--- /dev/null
+++ b/bindings/python/test/conftest.py
@@ -0,0 +1,137 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Shared fixtures for Fluss Python integration tests.
+
+If FLUSS_BOOTSTRAP_SERVERS is set, tests connect to an existing cluster.
+Otherwise, a Fluss cluster is started automatically via testcontainers.
+
+Run with:
+    uv run maturin develop && uv run pytest test/ -v
+"""
+
+import os
+import socket
+import time
+
+import pytest
+import pytest_asyncio
+
+import fluss
+
+FLUSS_VERSION = "0.7.0"
+BOOTSTRAP_SERVERS_ENV = os.environ.get("FLUSS_BOOTSTRAP_SERVERS")
+
+
+def _wait_for_port(host, port, timeout=60):
+    """Wait for a TCP port to become available."""
+    start = time.time()
+    while time.time() - start < timeout:
+        try:
+            with socket.create_connection((host, port), timeout=1):
+                return
+        except (ConnectionRefusedError, TimeoutError, OSError):
+            time.sleep(1)
+    raise TimeoutError(f"Port {port} on {host} not available after {timeout}s")
+
+
[email protected](scope="session")
+def fluss_cluster():
+    """Start a Fluss cluster using testcontainers, or use an existing one."""
+    if BOOTSTRAP_SERVERS_ENV:
+        yield BOOTSTRAP_SERVERS_ENV
+        return
+
+    from testcontainers.core.container import DockerContainer
+    from testcontainers.core.network import Network
+
+    network = Network()
+    network.create()
+
+    zookeeper = (
+        DockerContainer("zookeeper:3.9.2")
+        .with_network(network)
+        .with_name("zookeeper-python-test")
+    )
+
+    coordinator_props = "\n".join([
+        "zookeeper.address: zookeeper-python-test:2181",
+        "bind.listeners: INTERNAL://coordinator-server-python-test:0,"
+        " CLIENT://coordinator-server-python-test:9123",
+        "advertised.listeners: CLIENT://localhost:9123",
+        "internal.listener.name: INTERNAL",
+        "netty.server.num-network-threads: 1",
+        "netty.server.num-worker-threads: 3",
+    ])
+    coordinator = (
+        DockerContainer(f"fluss/fluss:{FLUSS_VERSION}")
+        .with_network(network)
+        .with_name("coordinator-server-python-test")
+        .with_bind_ports(9123, 9123)
+        .with_command("coordinatorServer")
+        .with_env("FLUSS_PROPERTIES", coordinator_props)
+    )
+
+    tablet_props = "\n".join([
+        "zookeeper.address: zookeeper-python-test:2181",
+        "bind.listeners: INTERNAL://tablet-server-python-test:0,"
+        " CLIENT://tablet-server-python-test:9123",
+        "advertised.listeners: CLIENT://localhost:9124",
+        "internal.listener.name: INTERNAL",
+        "tablet-server.id: 0",
+        "netty.server.num-network-threads: 1",
+        "netty.server.num-worker-threads: 3",
+    ])
+    tablet_server = (
+        DockerContainer(f"fluss/fluss:{FLUSS_VERSION}")
+        .with_network(network)
+        .with_name("tablet-server-python-test")
+        .with_bind_ports(9123, 9124)
+        .with_command("tabletServer")
+        .with_env("FLUSS_PROPERTIES", tablet_props)
+    )
+
+    zookeeper.start()
+    coordinator.start()
+    tablet_server.start()
+
+    _wait_for_port("localhost", 9123)
+    _wait_for_port("localhost", 9124)
+    # Extra wait for cluster to fully initialize
+    time.sleep(10)
+
+    yield "127.0.0.1:9123"
+
+    tablet_server.stop()
+    coordinator.stop()
+    zookeeper.stop()
+    network.remove()
+
+
+@pytest_asyncio.fixture(scope="session")
+async def connection(fluss_cluster):
+    """Session-scoped connection to the Fluss cluster."""
+    config = fluss.Config({"bootstrap.servers": fluss_cluster})
+    conn = await fluss.FlussConnection.create(config)
+    yield conn
+    conn.close()
+
+
+@pytest_asyncio.fixture(scope="session")
+async def admin(connection):
+    """Session-scoped admin client."""
+    return await connection.get_admin()
diff --git a/bindings/python/test/test_admin.py 
b/bindings/python/test/test_admin.py
new file mode 100644
index 0000000..f203400
--- /dev/null
+++ b/bindings/python/test/test_admin.py
@@ -0,0 +1,301 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Integration tests for FlussAdmin operations.
+
+Mirrors the Rust integration tests in crates/fluss/tests/integration/admin.rs.
+"""
+
+import pyarrow as pa
+import pytest
+
+import fluss
+
+
+async def test_create_database(admin):
+    """Test database create, exists, get_info, and drop lifecycle."""
+    db_name = "py_test_create_database"
+
+    # Cleanup in case of prior failed run
+    await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True)
+
+    assert not await admin.database_exists(db_name)
+
+    db_descriptor = fluss.DatabaseDescriptor(
+        comment="test_db",
+        custom_properties={"k1": "v1", "k2": "v2"},
+    )
+    await admin.create_database(db_name, db_descriptor, ignore_if_exists=False)
+
+    assert await admin.database_exists(db_name)
+
+    db_info = await admin.get_database_info(db_name)
+    assert db_info.database_name == db_name
+
+    descriptor = db_info.get_database_descriptor()
+    assert descriptor.comment == "test_db"
+    assert descriptor.get_custom_properties() == {"k1": "v1", "k2": "v2"}
+
+    await admin.drop_database(db_name, ignore_if_not_exists=False, 
cascade=True)
+
+    assert not await admin.database_exists(db_name)
+
+
+async def test_create_table(admin):
+    """Test table create, exists, get_info, list, and drop lifecycle."""
+    db_name = "py_test_create_table_db"
+
+    await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True)
+
+    assert not await admin.database_exists(db_name)
+    await admin.create_database(
+        db_name,
+        fluss.DatabaseDescriptor(comment="Database for test_create_table"),
+        ignore_if_exists=False,
+    )
+
+    table_name = "test_user_table"
+    table_path = fluss.TablePath(db_name, table_name)
+
+    schema = fluss.Schema(
+        pa.schema(
+            [
+                pa.field("id", pa.int32()),
+                pa.field("name", pa.string()),
+                pa.field("age", pa.int32()),
+                pa.field("email", pa.string()),
+            ]
+        ),
+        primary_keys=["id"],
+    )
+
+    table_descriptor = fluss.TableDescriptor(
+        schema,
+        bucket_count=3,
+        bucket_keys=["id"],
+        comment="Test table for user data (id, name, age, email)",
+        log_format="arrow",
+        kv_format="indexed",
+        properties={"table.replication.factor": "1"},
+    )
+
+    await admin.create_table(table_path, table_descriptor, 
ignore_if_exists=False)
+
+    assert await admin.table_exists(table_path)
+
+    tables = await admin.list_tables(db_name)
+    assert len(tables) == 1
+    assert table_name in tables
+
+    table_info = await admin.get_table_info(table_path)
+
+    assert table_info.comment == "Test table for user data (id, name, age, 
email)"
+    assert table_info.get_primary_keys() == ["id"]
+    assert table_info.num_buckets == 3
+    assert table_info.get_bucket_keys() == ["id"]
+    assert table_info.get_column_names() == ["id", "name", "age", "email"]
+
+    await admin.drop_table(table_path, ignore_if_not_exists=False)
+    assert not await admin.table_exists(table_path)
+
+    await admin.drop_database(db_name, ignore_if_not_exists=False, 
cascade=True)
+    assert not await admin.database_exists(db_name)
+
+
+async def test_partition_apis(admin):
+    """Test partition create, list, and drop lifecycle."""
+    db_name = "py_test_partition_apis_db"
+
+    await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True)
+    await admin.create_database(
+        db_name,
+        fluss.DatabaseDescriptor(comment="Database for test_partition_apis"),
+        ignore_if_exists=True,
+    )
+
+    table_path = fluss.TablePath(db_name, "partitioned_table")
+
+    schema = fluss.Schema(
+        pa.schema(
+            [
+                pa.field("id", pa.int32()),
+                pa.field("name", pa.string()),
+                pa.field("dt", pa.string()),
+                pa.field("region", pa.string()),
+            ]
+        ),
+        primary_keys=["id", "dt", "region"],
+    )
+
+    table_descriptor = fluss.TableDescriptor(
+        schema,
+        partition_keys=["dt", "region"],
+        bucket_count=3,
+        bucket_keys=["id"],
+        log_format="arrow",
+        kv_format="compacted",
+        properties={"table.replication.factor": "1"},
+    )
+
+    await admin.create_table(table_path, table_descriptor, 
ignore_if_exists=True)
+
+    # Initially no partitions
+    partitions = await admin.list_partition_infos(table_path)
+    assert len(partitions) == 0
+
+    # Create a partition
+    await admin.create_partition(
+        table_path,
+        {"dt": "2024-01-15", "region": "EMEA"},
+        ignore_if_exists=False,
+    )
+
+    partitions = await admin.list_partition_infos(table_path)
+    assert len(partitions) == 1
+    assert partitions[0].partition_name == "2024-01-15$EMEA"
+
+    # Drop the partition
+    await admin.drop_partition(
+        table_path,
+        {"dt": "2024-01-15", "region": "EMEA"},
+        ignore_if_not_exists=False,
+    )
+
+    partitions = await admin.list_partition_infos(table_path)
+    assert len(partitions) == 0
+
+    await admin.drop_table(table_path, ignore_if_not_exists=True)
+    await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True)
+
+
+async def test_fluss_error_response(admin):
+    """Test that API errors are raised as FlussError with correct error 
codes."""
+    table_path = fluss.TablePath("fluss", "py_not_exist")
+
+    with pytest.raises(fluss.FlussError) as exc_info:
+        await admin.get_table_info(table_path)
+
+    assert exc_info.value.error_code == fluss.ErrorCode.TABLE_NOT_EXIST
+
+
+async def test_error_database_not_exist(admin):
+    """Test error handling for non-existent database operations."""
+    # get_database_info
+    with pytest.raises(fluss.FlussError) as exc_info:
+        await admin.get_database_info("py_no_such_db")
+    assert exc_info.value.error_code == fluss.ErrorCode.DATABASE_NOT_EXIST
+
+    # drop_database without ignore flag
+    with pytest.raises(fluss.FlussError) as exc_info:
+        await admin.drop_database("py_no_such_db", ignore_if_not_exists=False)
+    assert exc_info.value.error_code == fluss.ErrorCode.DATABASE_NOT_EXIST
+
+    # list_tables for non-existent database
+    with pytest.raises(fluss.FlussError) as exc_info:
+        await admin.list_tables("py_no_such_db")
+    assert exc_info.value.error_code == fluss.ErrorCode.DATABASE_NOT_EXIST
+
+
+async def test_error_database_already_exist(admin):
+    """Test error when creating a database that already exists."""
+    db_name = "py_test_error_db_already_exist"
+
+    await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True)
+    await admin.create_database(db_name, ignore_if_exists=False)
+
+    # Create same database again without ignore flag
+    with pytest.raises(fluss.FlussError) as exc_info:
+        await admin.create_database(db_name, ignore_if_exists=False)
+    assert exc_info.value.error_code == fluss.ErrorCode.DATABASE_ALREADY_EXIST
+
+    # With ignore flag should succeed
+    await admin.create_database(db_name, ignore_if_exists=True)
+
+    await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True)
+
+
+async def test_error_table_already_exist(admin):
+    """Test error when creating a table that already exists."""
+    db_name = "py_test_error_tbl_already_exist_db"
+
+    await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True)
+    await admin.create_database(db_name, ignore_if_exists=True)
+
+    table_path = fluss.TablePath(db_name, "my_table")
+    schema = fluss.Schema(
+        pa.schema([pa.field("id", pa.int32()), pa.field("name", pa.string())])
+    )
+    table_descriptor = fluss.TableDescriptor(
+        schema,
+        bucket_count=1,
+        properties={"table.replication.factor": "1"},
+    )
+
+    await admin.create_table(table_path, table_descriptor, 
ignore_if_exists=False)
+
+    # Create same table again without ignore flag
+    with pytest.raises(fluss.FlussError) as exc_info:
+        await admin.create_table(table_path, table_descriptor, 
ignore_if_exists=False)
+    assert exc_info.value.error_code == fluss.ErrorCode.TABLE_ALREADY_EXIST
+
+    # With ignore flag should succeed
+    await admin.create_table(table_path, table_descriptor, 
ignore_if_exists=True)
+
+    await admin.drop_table(table_path, ignore_if_not_exists=True)
+    await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True)
+
+
+async def test_error_table_not_exist(admin):
+    """Test error handling for non-existent table operations."""
+    table_path = fluss.TablePath("fluss", "py_no_such_table")
+
+    # drop without ignore flag
+    with pytest.raises(fluss.FlussError) as exc_info:
+        await admin.drop_table(table_path, ignore_if_not_exists=False)
+    assert exc_info.value.error_code == fluss.ErrorCode.TABLE_NOT_EXIST
+
+    # drop with ignore flag should succeed
+    await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+
+async def test_error_table_not_partitioned(admin):
+    """Test error when calling partition operations on non-partitioned 
table."""
+    db_name = "py_test_error_not_partitioned_db"
+
+    await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True)
+    await admin.create_database(db_name, ignore_if_exists=True)
+
+    table_path = fluss.TablePath(db_name, "non_partitioned_table")
+    schema = fluss.Schema(
+        pa.schema([pa.field("id", pa.int32()), pa.field("name", pa.string())])
+    )
+    table_descriptor = fluss.TableDescriptor(
+        schema,
+        bucket_count=1,
+        properties={"table.replication.factor": "1"},
+    )
+
+    await admin.create_table(table_path, table_descriptor, 
ignore_if_exists=False)
+
+    with pytest.raises(fluss.FlussError) as exc_info:
+        await admin.list_partition_infos(table_path)
+    assert (
+        exc_info.value.error_code == 
fluss.ErrorCode.TABLE_NOT_PARTITIONED_EXCEPTION
+    )
+
+    await admin.drop_table(table_path, ignore_if_not_exists=True)
+    await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True)
diff --git a/bindings/python/test/test_kv_table.py 
b/bindings/python/test/test_kv_table.py
new file mode 100644
index 0000000..98b0cee
--- /dev/null
+++ b/bindings/python/test/test_kv_table.py
@@ -0,0 +1,428 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Integration tests for KV (primary key) table operations.
+
+Mirrors the Rust integration tests in 
crates/fluss/tests/integration/kv_table.rs.
+"""
+
+import math
+from datetime import date, datetime, timezone
+from datetime import time as dt_time
+from decimal import Decimal
+
+import pyarrow as pa
+
+import fluss
+
+
+async def test_upsert_delete_and_lookup(connection, admin):
+    """Test upsert, lookup, update, delete, and non-existent key lookup."""
+    table_path = fluss.TablePath("fluss", "py_test_upsert_and_lookup")
+    await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+    schema = fluss.Schema(
+        pa.schema(
+            [
+                pa.field("id", pa.int32()),
+                pa.field("name", pa.string()),
+                pa.field("age", pa.int64()),
+            ]
+        ),
+        primary_keys=["id"],
+    )
+    table_descriptor = fluss.TableDescriptor(schema)
+    await admin.create_table(table_path, table_descriptor, 
ignore_if_exists=False)
+
+    table = await connection.get_table(table_path)
+    upsert_writer = table.new_upsert().create_writer()
+
+    test_data = [(1, "Verso", 32), (2, "Noco", 25), (3, "Esquie", 35)]
+
+    # Upsert rows (fire-and-forget, then flush)
+    for id_, name, age in test_data:
+        upsert_writer.upsert({"id": id_, "name": name, "age": age})
+    await upsert_writer.flush()
+
+    # Lookup and verify
+    lookuper = table.new_lookup().create_lookuper()
+
+    for id_, expected_name, expected_age in test_data:
+        result = await lookuper.lookup({"id": id_})
+        assert result is not None, f"Row with id={id_} should exist"
+        assert result["id"] == id_
+        assert result["name"] == expected_name
+        assert result["age"] == expected_age
+
+    # Update record with id=1 (await acknowledgment)
+    handle = upsert_writer.upsert({"id": 1, "name": "Verso", "age": 33})
+    await handle.wait()
+
+    result = await lookuper.lookup({"id": 1})
+    assert result is not None
+    assert result["age"] == 33
+    assert result["name"] == "Verso"
+
+    # Delete record with id=1 (await acknowledgment)
+    handle = upsert_writer.delete({"id": 1})
+    await handle.wait()
+
+    result = await lookuper.lookup({"id": 1})
+    assert result is None, "Record 1 should not exist after delete"
+
+    # Verify other records still exist
+    for id_ in [2, 3]:
+        result = await lookuper.lookup({"id": id_})
+        assert result is not None, f"Record {id_} should still exist"
+
+    # Lookup non-existent key
+    result = await lookuper.lookup({"id": 999})
+    assert result is None, "Non-existent key should return None"
+
+    await admin.drop_table(table_path, ignore_if_not_exists=False)
+
+
+async def test_composite_primary_keys(connection, admin):
+    """Test upsert and lookup with composite (multi-column) primary keys."""
+    table_path = fluss.TablePath("fluss", "py_test_composite_pk")
+    await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+    schema = fluss.Schema(
+        pa.schema(
+            [
+                pa.field("region", pa.string()),
+                pa.field("user_id", pa.int32()),
+                pa.field("score", pa.int64()),
+            ]
+        ),
+        primary_keys=["region", "user_id"],
+    )
+    table_descriptor = fluss.TableDescriptor(schema)
+    await admin.create_table(table_path, table_descriptor, 
ignore_if_exists=False)
+
+    table = await connection.get_table(table_path)
+    upsert_writer = table.new_upsert().create_writer()
+
+    test_data = [
+        ("US", 1, 100),
+        ("US", 2, 200),
+        ("EU", 1, 150),
+        ("EU", 2, 250),
+    ]
+
+    for region, user_id, score in test_data:
+        upsert_writer.upsert({"region": region, "user_id": user_id, "score": 
score})
+    await upsert_writer.flush()
+
+    lookuper = table.new_lookup().create_lookuper()
+
+    # Lookup (US, 1) -> score 100
+    result = await lookuper.lookup({"region": "US", "user_id": 1})
+    assert result is not None
+    assert result["score"] == 100
+
+    # Lookup (EU, 2) -> score 250
+    result = await lookuper.lookup({"region": "EU", "user_id": 2})
+    assert result is not None
+    assert result["score"] == 250
+
+    # Update (US, 1) score (await acknowledgment)
+    handle = upsert_writer.upsert({"region": "US", "user_id": 1, "score": 500})
+    await handle.wait()
+
+    result = await lookuper.lookup({"region": "US", "user_id": 1})
+    assert result is not None
+    assert result["score"] == 500
+
+    await admin.drop_table(table_path, ignore_if_not_exists=False)
+
+
+async def test_partial_update(connection, admin):
+    """Test partial column update via partial_update_by_name."""
+    table_path = fluss.TablePath("fluss", "py_test_partial_update")
+    await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+    schema = fluss.Schema(
+        pa.schema(
+            [
+                pa.field("id", pa.int32()),
+                pa.field("name", pa.string()),
+                pa.field("age", pa.int64()),
+                pa.field("score", pa.int64()),
+            ]
+        ),
+        primary_keys=["id"],
+    )
+    table_descriptor = fluss.TableDescriptor(schema)
+    await admin.create_table(table_path, table_descriptor, 
ignore_if_exists=False)
+
+    table = await connection.get_table(table_path)
+
+    # Insert initial record
+    upsert_writer = table.new_upsert().create_writer()
+    handle = upsert_writer.upsert(
+        {"id": 1, "name": "Verso", "age": 32, "score": 6942}
+    )
+    await handle.wait()
+
+    lookuper = table.new_lookup().create_lookuper()
+    result = await lookuper.lookup({"id": 1})
+    assert result is not None
+    assert result["id"] == 1
+    assert result["name"] == "Verso"
+    assert result["age"] == 32
+    assert result["score"] == 6942
+
+    # Partial update: only update score column
+    partial_writer = (
+        table.new_upsert().partial_update_by_name(["id", 
"score"]).create_writer()
+    )
+    handle = partial_writer.upsert({"id": 1, "score": 420})
+    await handle.wait()
+
+    result = await lookuper.lookup({"id": 1})
+    assert result is not None
+    assert result["id"] == 1
+    assert result["name"] == "Verso", "name should remain unchanged"
+    assert result["age"] == 32, "age should remain unchanged"
+    assert result["score"] == 420, "score should be updated to 420"
+
+    await admin.drop_table(table_path, ignore_if_not_exists=False)
+
+
+async def test_partial_update_by_index(connection, admin):
+    """Test partial column update via partial_update_by_index."""
+    table_path = fluss.TablePath("fluss", "py_test_partial_update_by_index")
+    await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+    schema = fluss.Schema(
+        pa.schema(
+            [
+                pa.field("id", pa.int32()),
+                pa.field("name", pa.string()),
+                pa.field("age", pa.int64()),
+                pa.field("score", pa.int64()),
+            ]
+        ),
+        primary_keys=["id"],
+    )
+    table_descriptor = fluss.TableDescriptor(schema)
+    await admin.create_table(table_path, table_descriptor, 
ignore_if_exists=False)
+
+    table = await connection.get_table(table_path)
+
+    upsert_writer = table.new_upsert().create_writer()
+    handle = upsert_writer.upsert(
+        {"id": 1, "name": "Verso", "age": 32, "score": 6942}
+    )
+    await handle.wait()
+
+    # Partial update by indices: columns 0=id (PK), 1=name
+    partial_writer = (
+        table.new_upsert().partial_update_by_index([0, 1]).create_writer()
+    )
+    handle = partial_writer.upsert([1, "Verso Renamed"])
+    await handle.wait()
+
+    lookuper = table.new_lookup().create_lookuper()
+    result = await lookuper.lookup({"id": 1})
+    assert result is not None
+    assert result["name"] == "Verso Renamed", "name should be updated"
+    assert result["score"] == 6942, "score should remain unchanged"
+
+    await admin.drop_table(table_path, ignore_if_not_exists=False)
+
+
+async def test_partitioned_table_upsert_and_lookup(connection, admin):
+    """Test upsert/lookup/delete on a partitioned KV table."""
+    table_path = fluss.TablePath("fluss", "py_test_partitioned_kv_table")
+    await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+    schema = fluss.Schema(
+        pa.schema(
+            [
+                pa.field("region", pa.string()),
+                pa.field("user_id", pa.int32()),
+                pa.field("name", pa.string()),
+                pa.field("score", pa.int64()),
+            ]
+        ),
+        primary_keys=["region", "user_id"],
+    )
+    table_descriptor = fluss.TableDescriptor(
+        schema,
+        partition_keys=["region"],
+    )
+    await admin.create_table(table_path, table_descriptor, 
ignore_if_exists=False)
+
+    # Create partitions
+    for region in ["US", "EU", "APAC"]:
+        await admin.create_partition(
+            table_path, {"region": region}, ignore_if_exists=True
+        )
+
+    table = await connection.get_table(table_path)
+    upsert_writer = table.new_upsert().create_writer()
+
+    test_data = [
+        ("US", 1, "Gustave", 100),
+        ("US", 2, "Lune", 200),
+        ("EU", 1, "Sciel", 150),
+        ("EU", 2, "Maelle", 250),
+        ("APAC", 1, "Noco", 300),
+    ]
+
+    for region, user_id, name, score in test_data:
+        upsert_writer.upsert(
+            {"region": region, "user_id": user_id, "name": name, "score": 
score}
+        )
+    await upsert_writer.flush()
+
+    lookuper = table.new_lookup().create_lookuper()
+
+    # Verify all rows across partitions
+    for region, user_id, expected_name, expected_score in test_data:
+        result = await lookuper.lookup({"region": region, "user_id": user_id})
+        assert result is not None, f"Row ({region}, {user_id}) should exist"
+        assert result["region"] == region
+        assert result["user_id"] == user_id
+        assert result["name"] == expected_name
+        assert result["score"] == expected_score
+
+    # Update within a partition (await acknowledgment)
+    handle = upsert_writer.upsert(
+        {"region": "US", "user_id": 1, "name": "Gustave Updated", "score": 999}
+    )
+    await handle.wait()
+
+    result = await lookuper.lookup({"region": "US", "user_id": 1})
+    assert result is not None
+    assert result["name"] == "Gustave Updated"
+    assert result["score"] == 999
+
+    # Lookup in non-existent partition should return None
+    result = await lookuper.lookup({"region": "UNKNOWN_REGION", "user_id": 1})
+    assert result is None, "Lookup in non-existent partition should return 
None"
+
+    # Delete within a partition (await acknowledgment)
+    handle = upsert_writer.delete({"region": "EU", "user_id": 1})
+    await handle.wait()
+
+    result = await lookuper.lookup({"region": "EU", "user_id": 1})
+    assert result is None, "Deleted record should not exist"
+
+    # Verify sibling record still exists
+    result = await lookuper.lookup({"region": "EU", "user_id": 2})
+    assert result is not None
+    assert result["name"] == "Maelle"
+
+    await admin.drop_table(table_path, ignore_if_not_exists=False)
+
+
+async def test_all_supported_datatypes(connection, admin):
+    """Test upsert/lookup for all supported data types, including nulls."""
+    table_path = fluss.TablePath("fluss", "py_test_kv_all_datatypes")
+    await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+    schema = fluss.Schema(
+        pa.schema(
+            [
+                pa.field("pk_int", pa.int32()),
+                pa.field("col_boolean", pa.bool_()),
+                pa.field("col_tinyint", pa.int8()),
+                pa.field("col_smallint", pa.int16()),
+                pa.field("col_int", pa.int32()),
+                pa.field("col_bigint", pa.int64()),
+                pa.field("col_float", pa.float32()),
+                pa.field("col_double", pa.float64()),
+                pa.field("col_string", pa.string()),
+                pa.field("col_decimal", pa.decimal128(10, 2)),
+                pa.field("col_date", pa.date32()),
+                pa.field("col_time", pa.time32("ms")),
+                pa.field("col_timestamp_ntz", pa.timestamp("us")),
+                pa.field("col_timestamp_ltz", pa.timestamp("us", tz="UTC")),
+                pa.field("col_bytes", pa.binary()),
+            ]
+        ),
+        primary_keys=["pk_int"],
+    )
+    table_descriptor = fluss.TableDescriptor(schema)
+    await admin.create_table(table_path, table_descriptor, 
ignore_if_exists=False)
+
+    table = await connection.get_table(table_path)
+    upsert_writer = table.new_upsert().create_writer()
+
+    # Test data for all types
+    row_data = {
+        "pk_int": 1,
+        "col_boolean": True,
+        "col_tinyint": 127,
+        "col_smallint": 32767,
+        "col_int": 2147483647,
+        "col_bigint": 9223372036854775807,
+        "col_float": 3.14,
+        "col_double": 2.718281828459045,
+        "col_string": "world of fluss python client",
+        "col_decimal": Decimal("123.45"),
+        "col_date": date(2026, 1, 23),
+        "col_time": dt_time(10, 13, 47, 123000),  # millisecond precision
+        "col_timestamp_ntz": datetime(2026, 1, 23, 10, 13, 47, 123000),
+        "col_timestamp_ltz": datetime(2026, 1, 23, 10, 13, 47, 123000),
+        "col_bytes": b"binary data",
+    }
+
+    handle = upsert_writer.upsert(row_data)
+    await handle.wait()
+
+    lookuper = table.new_lookup().create_lookuper()
+    result = await lookuper.lookup({"pk_int": 1})
+    assert result is not None, "Row should exist"
+
+    assert result["pk_int"] == 1
+    assert result["col_boolean"] is True
+    assert result["col_tinyint"] == 127
+    assert result["col_smallint"] == 32767
+    assert result["col_int"] == 2147483647
+    assert result["col_bigint"] == 9223372036854775807
+    assert math.isclose(result["col_float"], 3.14, rel_tol=1e-6)
+    assert math.isclose(result["col_double"], 2.718281828459045, rel_tol=1e-15)
+    assert result["col_string"] == "world of fluss python client"
+    assert result["col_decimal"] == Decimal("123.45")
+    assert result["col_date"] == date(2026, 1, 23)
+    assert result["col_time"] == dt_time(10, 13, 47, 123000)
+    assert result["col_timestamp_ntz"] == datetime(2026, 1, 23, 10, 13, 47, 
123000)
+    assert result["col_timestamp_ltz"] == datetime(
+        2026, 1, 23, 10, 13, 47, 123000, tzinfo=timezone.utc
+    )
+    assert result["col_bytes"] == b"binary data"
+
+    # Test with null values for all nullable columns
+    null_row = {"pk_int": 2}
+    for col in row_data:
+        if col != "pk_int":
+            null_row[col] = None
+    handle = upsert_writer.upsert(null_row)
+    await handle.wait()
+
+    result = await lookuper.lookup({"pk_int": 2})
+    assert result is not None, "Row with nulls should exist"
+    assert result["pk_int"] == 2
+    for col in row_data:
+        if col != "pk_int":
+            assert result[col] is None, f"{col} should be null"
+
+    await admin.drop_table(table_path, ignore_if_not_exists=False)
diff --git a/bindings/python/test/test_log_table.py 
b/bindings/python/test/test_log_table.py
new file mode 100644
index 0000000..3219f03
--- /dev/null
+++ b/bindings/python/test/test_log_table.py
@@ -0,0 +1,675 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Integration tests for log (append-only) table operations.
+
+Mirrors the Rust integration tests in 
crates/fluss/tests/integration/log_table.rs.
+"""
+
+import asyncio
+import time
+
+import pyarrow as pa
+
+import fluss
+
+
+async def test_append_and_scan(connection, admin):
+    """Test appending record batches and scanning with a record-based 
scanner."""
+    table_path = fluss.TablePath("fluss", "py_test_append_and_scan")
+    await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+    schema = fluss.Schema(
+        pa.schema([pa.field("c1", pa.int32()), pa.field("c2", pa.string())])
+    )
+    table_descriptor = fluss.TableDescriptor(schema)
+    await admin.create_table(table_path, table_descriptor, 
ignore_if_exists=False)
+
+    table = await connection.get_table(table_path)
+    append_writer = table.new_append().create_writer()
+
+    batch1 = pa.RecordBatch.from_arrays(
+        [pa.array([1, 2, 3], type=pa.int32()), pa.array(["a1", "a2", "a3"])],
+        schema=pa.schema([pa.field("c1", pa.int32()), pa.field("c2", 
pa.string())]),
+    )
+    append_writer.write_arrow_batch(batch1)
+
+    batch2 = pa.RecordBatch.from_arrays(
+        [pa.array([4, 5, 6], type=pa.int32()), pa.array(["a4", "a5", "a6"])],
+        schema=pa.schema([pa.field("c1", pa.int32()), pa.field("c2", 
pa.string())]),
+    )
+    append_writer.write_arrow_batch(batch2)
+
+    await append_writer.flush()
+
+    # Scan with record-based scanner
+    scanner = await table.new_scan().create_log_scanner()
+    num_buckets = (await admin.get_table_info(table_path)).num_buckets
+    scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in 
range(num_buckets)})
+
+    records = _poll_records(scanner, expected_count=6)
+
+    assert len(records) == 6, f"Expected 6 records, got {len(records)}"
+
+    records.sort(key=lambda r: r.row["c1"])
+
+    expected_c1 = [1, 2, 3, 4, 5, 6]
+    expected_c2 = ["a1", "a2", "a3", "a4", "a5", "a6"]
+    for i, record in enumerate(records):
+        assert record.row["c1"] == expected_c1[i], f"c1 mismatch at row {i}"
+        assert record.row["c2"] == expected_c2[i], f"c2 mismatch at row {i}"
+
+    # Test unsubscribe
+    scanner.unsubscribe(bucket_id=0)
+
+    await admin.drop_table(table_path, ignore_if_not_exists=False)
+
+
+async def test_append_dict_rows(connection, admin):
+    """Test appending rows as dicts and scanning."""
+    table_path = fluss.TablePath("fluss", "py_test_append_dict_rows")
+    await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+    schema = fluss.Schema(
+        pa.schema([pa.field("id", pa.int32()), pa.field("name", pa.string())])
+    )
+    table_descriptor = fluss.TableDescriptor(schema)
+    await admin.create_table(table_path, table_descriptor, 
ignore_if_exists=False)
+
+    table = await connection.get_table(table_path)
+    append_writer = table.new_append().create_writer()
+
+    # Append using dicts
+    append_writer.append({"id": 1, "name": "Alice"})
+    append_writer.append({"id": 2, "name": "Bob"})
+    # Append using lists
+    append_writer.append([3, "Charlie"])
+    await append_writer.flush()
+
+    scanner = await table.new_scan().create_log_scanner()
+    num_buckets = (await admin.get_table_info(table_path)).num_buckets
+    scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in 
range(num_buckets)})
+
+    records = _poll_records(scanner, expected_count=3)
+    assert len(records) == 3
+
+    rows = sorted([r.row for r in records], key=lambda r: r["id"])
+    assert rows[0] == {"id": 1, "name": "Alice"}
+    assert rows[1] == {"id": 2, "name": "Bob"}
+    assert rows[2] == {"id": 3, "name": "Charlie"}
+
+    await admin.drop_table(table_path, ignore_if_not_exists=False)
+
+
+async def test_list_offsets(connection, admin):
+    """Test listing earliest, latest, and timestamp-based offsets."""
+    table_path = fluss.TablePath("fluss", "py_test_list_offsets")
+    await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+    schema = fluss.Schema(
+        pa.schema([pa.field("id", pa.int32()), pa.field("name", pa.string())])
+    )
+    table_descriptor = fluss.TableDescriptor(schema)
+    await admin.create_table(table_path, table_descriptor, 
ignore_if_exists=False)
+
+    await asyncio.sleep(2)  # Wait for table initialization
+
+    # Earliest offset should be 0 for empty table
+    earliest = await admin.list_offsets(
+        table_path, bucket_ids=[0], offset_spec=fluss.OffsetSpec.earliest()
+    )
+    assert earliest[0] == 0
+
+    # Latest offset should be 0 for empty table
+    latest = await admin.list_offsets(
+        table_path, bucket_ids=[0], offset_spec=fluss.OffsetSpec.latest()
+    )
+    assert latest[0] == 0
+
+    before_append_ms = int(time.time() * 1000)
+
+    # Append some records
+    table = await connection.get_table(table_path)
+    append_writer = table.new_append().create_writer()
+    batch = pa.RecordBatch.from_arrays(
+        [
+            pa.array([1, 2, 3], type=pa.int32()),
+            pa.array(["alice", "bob", "charlie"]),
+        ],
+        schema=pa.schema([pa.field("id", pa.int32()), pa.field("name", 
pa.string())]),
+    )
+    append_writer.write_arrow_batch(batch)
+    await append_writer.flush()
+
+    await asyncio.sleep(1)
+
+    after_append_ms = int(time.time() * 1000)
+
+    # Latest offset should be 3 after appending 3 records
+    latest_after = await admin.list_offsets(
+        table_path, bucket_ids=[0], offset_spec=fluss.OffsetSpec.latest()
+    )
+    assert latest_after[0] == 3
+
+    # Earliest offset should still be 0
+    earliest_after = await admin.list_offsets(
+        table_path, bucket_ids=[0], offset_spec=fluss.OffsetSpec.earliest()
+    )
+    assert earliest_after[0] == 0
+
+    # Timestamp before append should resolve to offset 0
+    ts_before = await admin.list_offsets(
+        table_path,
+        bucket_ids=[0],
+        offset_spec=fluss.OffsetSpec.timestamp(before_append_ms),
+    )
+    assert ts_before[0] == 0
+
+    # Timestamp after append should resolve to offset 3
+    ts_after = await admin.list_offsets(
+        table_path,
+        bucket_ids=[0],
+        offset_spec=fluss.OffsetSpec.timestamp(after_append_ms),
+    )
+    assert ts_after[0] == 3
+
+    await admin.drop_table(table_path, ignore_if_not_exists=False)
+
+
+async def test_project(connection, admin):
+    """Test column projection by name and by index."""
+    table_path = fluss.TablePath("fluss", "py_test_project")
+    await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+    schema = fluss.Schema(
+        pa.schema(
+            [
+                pa.field("col_a", pa.int32()),
+                pa.field("col_b", pa.string()),
+                pa.field("col_c", pa.int32()),
+            ]
+        )
+    )
+    table_descriptor = fluss.TableDescriptor(schema)
+    await admin.create_table(table_path, table_descriptor, 
ignore_if_exists=False)
+
+    table = await connection.get_table(table_path)
+    append_writer = table.new_append().create_writer()
+
+    batch = pa.RecordBatch.from_arrays(
+        [
+            pa.array([1, 2, 3], type=pa.int32()),
+            pa.array(["x", "y", "z"]),
+            pa.array([10, 20, 30], type=pa.int32()),
+        ],
+        schema=pa.schema(
+            [
+                pa.field("col_a", pa.int32()),
+                pa.field("col_b", pa.string()),
+                pa.field("col_c", pa.int32()),
+            ]
+        ),
+    )
+    append_writer.write_arrow_batch(batch)
+    await append_writer.flush()
+
+    # Test project_by_name: select col_b and col_c only
+    scan = table.new_scan().project_by_name(["col_b", "col_c"])
+    scanner = await scan.create_log_scanner()
+    scanner.subscribe_buckets({0: 0})
+
+    records = _poll_records(scanner, expected_count=3)
+    assert len(records) == 3
+
+    records.sort(key=lambda r: r.row["col_c"])
+    expected_col_b = ["x", "y", "z"]
+    expected_col_c = [10, 20, 30]
+    for i, record in enumerate(records):
+        assert record.row["col_b"] == expected_col_b[i]
+        assert record.row["col_c"] == expected_col_c[i]
+        # col_a should not be present in projected results
+        assert "col_a" not in record.row
+
+    # Test project by indices [1, 0] -> (col_b, col_a)
+    scanner2 = await table.new_scan().project([1, 0]).create_log_scanner()
+    scanner2.subscribe_buckets({0: 0})
+
+    records2 = _poll_records(scanner2, expected_count=3)
+    assert len(records2) == 3
+
+    records2.sort(key=lambda r: r.row["col_a"])
+    for i, record in enumerate(records2):
+        assert record.row["col_b"] == expected_col_b[i]
+        assert record.row["col_a"] == [1, 2, 3][i]
+        assert "col_c" not in record.row
+
+    await admin.drop_table(table_path, ignore_if_not_exists=False)
+
+
+async def test_poll_batches(connection, admin):
+    """Test batch-based scanning with poll_arrow and poll_record_batch."""
+    table_path = fluss.TablePath("fluss", "py_test_poll_batches")
+    await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+    schema = fluss.Schema(
+        pa.schema([pa.field("id", pa.int32()), pa.field("name", pa.string())])
+    )
+    table_descriptor = fluss.TableDescriptor(schema)
+    await admin.create_table(table_path, table_descriptor, 
ignore_if_exists=False)
+
+    await asyncio.sleep(1)
+
+    table = await connection.get_table(table_path)
+    scanner = await table.new_scan().create_record_batch_log_scanner()
+    scanner.subscribe(bucket_id=0, start_offset=0)
+
+    # Empty table should return empty result
+    result = scanner.poll_arrow(500)
+    assert result.num_rows == 0
+
+    writer = table.new_append().create_writer()
+    pa_schema = pa.schema([pa.field("id", pa.int32()), pa.field("name", 
pa.string())])
+    writer.write_arrow_batch(
+        pa.RecordBatch.from_arrays(
+            [pa.array([1, 2], type=pa.int32()), pa.array(["a", "b"])],
+            schema=pa_schema,
+        )
+    )
+    writer.write_arrow_batch(
+        pa.RecordBatch.from_arrays(
+            [pa.array([3, 4], type=pa.int32()), pa.array(["c", "d"])],
+            schema=pa_schema,
+        )
+    )
+    writer.write_arrow_batch(
+        pa.RecordBatch.from_arrays(
+            [pa.array([5, 6], type=pa.int32()), pa.array(["e", "f"])],
+            schema=pa_schema,
+        )
+    )
+    await writer.flush()
+
+    # Poll until we get all 6 records
+    all_ids = _poll_arrow_ids(scanner, expected_count=6)
+    assert all_ids == [1, 2, 3, 4, 5, 6]
+
+    # Append more and verify offset continuation (no duplicates)
+    writer.write_arrow_batch(
+        pa.RecordBatch.from_arrays(
+            [pa.array([7, 8], type=pa.int32()), pa.array(["g", "h"])],
+            schema=pa_schema,
+        )
+    )
+    await writer.flush()
+
+    new_ids = _poll_arrow_ids(scanner, expected_count=2)
+    assert new_ids == [7, 8]
+
+    # Subscribe from mid-offset should truncate (skip earlier records)
+    trunc_scanner = await table.new_scan().create_record_batch_log_scanner()
+    trunc_scanner.subscribe(bucket_id=0, start_offset=3)
+
+    trunc_ids = _poll_arrow_ids(trunc_scanner, expected_count=5)
+    assert trunc_ids == [4, 5, 6, 7, 8]
+
+    # Projection with batch scanner
+    proj_scanner = (
+        await table.new_scan()
+        .project_by_name(["id"])
+        .create_record_batch_log_scanner()
+    )
+    proj_scanner.subscribe(bucket_id=0, start_offset=0)
+    batches = proj_scanner.poll_record_batch(10000)
+    assert len(batches) > 0
+    assert batches[0].batch.num_columns == 1
+
+    await admin.drop_table(table_path, ignore_if_not_exists=False)
+
+
+async def test_to_arrow_and_to_pandas(connection, admin):
+    """Test to_arrow() and to_pandas() convenience methods."""
+    table_path = fluss.TablePath("fluss", "py_test_to_arrow_pandas")
+    await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+    schema = fluss.Schema(
+        pa.schema([pa.field("id", pa.int32()), pa.field("name", pa.string())])
+    )
+    table_descriptor = fluss.TableDescriptor(schema)
+    await admin.create_table(table_path, table_descriptor, 
ignore_if_exists=False)
+
+    table = await connection.get_table(table_path)
+    writer = table.new_append().create_writer()
+
+    pa_schema = pa.schema([pa.field("id", pa.int32()), pa.field("name", 
pa.string())])
+    writer.write_arrow_batch(
+        pa.RecordBatch.from_arrays(
+            [pa.array([1, 2, 3], type=pa.int32()), pa.array(["a", "b", "c"])],
+            schema=pa_schema,
+        )
+    )
+    await writer.flush()
+
+    num_buckets = (await admin.get_table_info(table_path)).num_buckets
+
+    # to_arrow()
+    scanner = await table.new_scan().create_record_batch_log_scanner()
+    scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in 
range(num_buckets)})
+    arrow_table = scanner.to_arrow()
+    assert arrow_table.num_rows == 3
+    assert arrow_table.schema.names == ["id", "name"]
+
+    # to_pandas()
+    scanner2 = await table.new_scan().create_record_batch_log_scanner()
+    scanner2.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in 
range(num_buckets)})
+    df = scanner2.to_pandas()
+    assert len(df) == 3
+    assert list(df.columns) == ["id", "name"]
+
+    await admin.drop_table(table_path, ignore_if_not_exists=False)
+
+
+async def test_partitioned_table_append_scan(connection, admin):
+    """Test append and scan on a partitioned log table."""
+    table_path = fluss.TablePath("fluss", "py_test_partitioned_log_append")
+    await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+    schema = fluss.Schema(
+        pa.schema(
+            [
+                pa.field("id", pa.int32()),
+                pa.field("region", pa.string()),
+                pa.field("value", pa.int64()),
+            ]
+        )
+    )
+    table_descriptor = fluss.TableDescriptor(
+        schema,
+        partition_keys=["region"],
+    )
+    await admin.create_table(table_path, table_descriptor, 
ignore_if_exists=False)
+
+    # Create partitions
+    for region in ["US", "EU"]:
+        await admin.create_partition(
+            table_path, {"region": region}, ignore_if_exists=True
+        )
+
+    await asyncio.sleep(2)  # Wait for partitions to be available
+
+    table = await connection.get_table(table_path)
+    append_writer = table.new_append().create_writer()
+
+    # Append rows
+    test_data = [
+        (1, "US", 100),
+        (2, "US", 200),
+        (3, "EU", 300),
+        (4, "EU", 400),
+    ]
+    for id_, region, value in test_data:
+        append_writer.append({"id": id_, "region": region, "value": value})
+    await append_writer.flush()
+
+    # Append arrow batches per partition
+    pa_schema = pa.schema(
+        [
+            pa.field("id", pa.int32()),
+            pa.field("region", pa.string()),
+            pa.field("value", pa.int64()),
+        ]
+    )
+    us_batch = pa.RecordBatch.from_arrays(
+        [
+            pa.array([5, 6], type=pa.int32()),
+            pa.array(["US", "US"]),
+            pa.array([500, 600], type=pa.int64()),
+        ],
+        schema=pa_schema,
+    )
+    append_writer.write_arrow_batch(us_batch)
+
+    eu_batch = pa.RecordBatch.from_arrays(
+        [
+            pa.array([7, 8], type=pa.int32()),
+            pa.array(["EU", "EU"]),
+            pa.array([700, 800], type=pa.int64()),
+        ],
+        schema=pa_schema,
+    )
+    append_writer.write_arrow_batch(eu_batch)
+    await append_writer.flush()
+
+    # Verify partition offsets
+    us_offsets = await admin.list_partition_offsets(
+        table_path,
+        partition_name="US",
+        bucket_ids=[0],
+        offset_spec=fluss.OffsetSpec.latest(),
+    )
+    assert us_offsets[0] == 4, "US partition should have 4 records"
+
+    eu_offsets = await admin.list_partition_offsets(
+        table_path,
+        partition_name="EU",
+        bucket_ids=[0],
+        offset_spec=fluss.OffsetSpec.latest(),
+    )
+    assert eu_offsets[0] == 4, "EU partition should have 4 records"
+
+    # Scan all partitions
+    scanner = await table.new_scan().create_log_scanner()
+    partition_infos = await admin.list_partition_infos(table_path)
+    for p in partition_infos:
+        scanner.subscribe_partition(
+            partition_id=p.partition_id, bucket_id=0, start_offset=0
+        )
+
+    expected = [
+        (1, "US", 100),
+        (2, "US", 200),
+        (3, "EU", 300),
+        (4, "EU", 400),
+        (5, "US", 500),
+        (6, "US", 600),
+        (7, "EU", 700),
+        (8, "EU", 800),
+    ]
+
+    records = _poll_records(scanner, expected_count=8)
+    assert len(records) == 8
+
+    collected = sorted(
+        [(r.row["id"], r.row["region"], r.row["value"]) for r in records],
+        key=lambda x: x[0],
+    )
+    assert collected == expected
+
+    # Test unsubscribe_partition: unsubscribe from EU, only US data should 
remain
+    unsub_scanner = await table.new_scan().create_log_scanner()
+    eu_partition_id = next(
+        p.partition_id for p in partition_infos if p.partition_name == "EU"
+    )
+    for p in partition_infos:
+        unsub_scanner.subscribe_partition(p.partition_id, 0, 0)
+    unsub_scanner.unsubscribe_partition(eu_partition_id, 0)
+
+    remaining = _poll_records(unsub_scanner, expected_count=4, timeout_s=5)
+    assert len(remaining) == 4
+    assert all(r.row["region"] == "US" for r in remaining)
+
+    # Test subscribe_partition_buckets (batch subscribe)
+    batch_scanner = await table.new_scan().create_log_scanner()
+    partition_bucket_offsets = {
+        (p.partition_id, 0): fluss.EARLIEST_OFFSET for p in partition_infos
+    }
+    batch_scanner.subscribe_partition_buckets(partition_bucket_offsets)
+
+    batch_records = _poll_records(batch_scanner, expected_count=8)
+    assert len(batch_records) == 8
+    batch_collected = sorted(
+        [(r.row["id"], r.row["region"], r.row["value"]) for r in 
batch_records],
+        key=lambda x: x[0],
+    )
+    assert batch_collected == expected
+
+    await admin.drop_table(table_path, ignore_if_not_exists=False)
+
+
+async def test_write_arrow(connection, admin):
+    """Test writing a full PyArrow Table via write_arrow()."""
+    table_path = fluss.TablePath("fluss", "py_test_write_arrow")
+    await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+    schema = fluss.Schema(
+        pa.schema([pa.field("id", pa.int32()), pa.field("name", pa.string())])
+    )
+    table_descriptor = fluss.TableDescriptor(schema)
+    await admin.create_table(table_path, table_descriptor, 
ignore_if_exists=False)
+
+    table = await connection.get_table(table_path)
+    writer = table.new_append().create_writer()
+
+    pa_schema = pa.schema([pa.field("id", pa.int32()), pa.field("name", 
pa.string())])
+    arrow_table = pa.table(
+        {
+            "id": pa.array([1, 2, 3, 4, 5], type=pa.int32()),
+            "name": pa.array(["alice", "bob", "charlie", "dave", "eve"]),
+        },
+        schema=pa_schema,
+    )
+    writer.write_arrow(arrow_table)
+    await writer.flush()
+
+    num_buckets = (await admin.get_table_info(table_path)).num_buckets
+    scanner = await table.new_scan().create_record_batch_log_scanner()
+    scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in 
range(num_buckets)})
+
+    result = scanner.to_arrow()
+    assert result.num_rows == 5
+
+    ids = sorted(result.column("id").to_pylist())
+    names = [
+        n
+        for _, n in sorted(
+            zip(result.column("id").to_pylist(), 
result.column("name").to_pylist())
+        )
+    ]
+    assert ids == [1, 2, 3, 4, 5]
+    assert names == ["alice", "bob", "charlie", "dave", "eve"]
+
+    await admin.drop_table(table_path, ignore_if_not_exists=False)
+
+
+async def test_write_pandas(connection, admin):
+    """Test writing a Pandas DataFrame via write_pandas()."""
+    import pandas as pd
+
+    table_path = fluss.TablePath("fluss", "py_test_write_pandas")
+    await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+    schema = fluss.Schema(
+        pa.schema([pa.field("id", pa.int32()), pa.field("name", pa.string())])
+    )
+    table_descriptor = fluss.TableDescriptor(schema)
+    await admin.create_table(table_path, table_descriptor, 
ignore_if_exists=False)
+
+    table = await connection.get_table(table_path)
+    writer = table.new_append().create_writer()
+
+    df = pd.DataFrame({"id": [10, 20, 30], "name": ["x", "y", "z"]})
+    writer.write_pandas(df)
+    await writer.flush()
+
+    num_buckets = (await admin.get_table_info(table_path)).num_buckets
+    scanner = await table.new_scan().create_record_batch_log_scanner()
+    scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in 
range(num_buckets)})
+
+    result = scanner.to_pandas()
+    assert len(result) == 3
+
+    result_sorted = result.sort_values("id").reset_index(drop=True)
+    assert result_sorted["id"].tolist() == [10, 20, 30]
+    assert result_sorted["name"].tolist() == ["x", "y", "z"]
+
+    await admin.drop_table(table_path, ignore_if_not_exists=False)
+
+
+async def test_partitioned_table_to_arrow(connection, admin):
+    """Test to_arrow() on partitioned tables."""
+    table_path = fluss.TablePath("fluss", "py_test_partitioned_to_arrow")
+    await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+    schema = fluss.Schema(
+        pa.schema(
+            [
+                pa.field("id", pa.int32()),
+                pa.field("region", pa.string()),
+                pa.field("value", pa.int64()),
+            ]
+        )
+    )
+    table_descriptor = fluss.TableDescriptor(schema, partition_keys=["region"])
+    await admin.create_table(table_path, table_descriptor, 
ignore_if_exists=False)
+
+    for region in ["US", "EU"]:
+        await admin.create_partition(
+            table_path, {"region": region}, ignore_if_exists=True
+        )
+
+    await asyncio.sleep(2)
+
+    table = await connection.get_table(table_path)
+    writer = table.new_append().create_writer()
+    writer.append({"id": 1, "region": "US", "value": 100})
+    writer.append({"id": 2, "region": "EU", "value": 200})
+    await writer.flush()
+
+    scanner = await table.new_scan().create_record_batch_log_scanner()
+    partition_infos = await admin.list_partition_infos(table_path)
+    for p in partition_infos:
+        scanner.subscribe_partition(p.partition_id, 0, fluss.EARLIEST_OFFSET)
+
+    arrow_table = scanner.to_arrow()
+    assert arrow_table.num_rows == 2
+
+    await admin.drop_table(table_path, ignore_if_not_exists=False)
+
+
+# ---------------------------------------------------------------------------
+# Helpers
+# ---------------------------------------------------------------------------
+
+
+def _poll_records(scanner, expected_count, timeout_s=10):
+    """Poll a record-based scanner until expected_count records are 
collected."""
+    collected = []
+    deadline = time.monotonic() + timeout_s
+    while len(collected) < expected_count and time.monotonic() < deadline:
+        records = scanner.poll(5000)
+        collected.extend(records)
+    return collected
+
+
+def _poll_arrow_ids(scanner, expected_count, timeout_s=10):
+    """Poll a batch scanner and extract 'id' column values."""
+    all_ids = []
+    deadline = time.monotonic() + timeout_s
+    while len(all_ids) < expected_count and time.monotonic() < deadline:
+        arrow_table = scanner.poll_arrow(5000)
+        if arrow_table.num_rows > 0:
+            all_ids.extend(arrow_table.column("id").to_pylist())
+    return all_ids
diff --git a/website/docs/developer-guide/contributing.md 
b/website/docs/developer-guide/contributing.md
index eced106..38b792e 100644
--- a/website/docs/developer-guide/contributing.md
+++ b/website/docs/developer-guide/contributing.md
@@ -82,7 +82,7 @@ cargo build --workspace --all-targets
 # Run unit tests
 cargo test --workspace
 
-# Run integration tests (requires a running Fluss cluster)
+# Run integration tests (requires Docker)
 RUST_TEST_THREADS=1 cargo test --features integration_tests --workspace
 
 # Run a single test
@@ -93,9 +93,15 @@ cargo test test_name
 
 ```bash
 cd bindings/python
-pip install maturin
-pip install -e ".[dev]"
-maturin develop
+
+# Install dev dependencies and build the extension
+uv sync --extra dev && uv run maturin develop
+
+# Run integration tests (requires Docker)
+uv run pytest test/ -v
+
+# To run against an existing cluster instead
+FLUSS_BOOTSTRAP_SERVERS=127.0.0.1:9123 uv run pytest test/ -v
 ```
 
 ### C++ Bindings

Reply via email to