This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new c4b0677 test: introduce python integration tests (#341)
c4b0677 is described below
commit c4b0677db3a267ff21c8e080ef38e5bf31c4b068
Author: Keith Lee <[email protected]>
AuthorDate: Tue Feb 17 10:18:14 2026 +0000
test: introduce python integration tests (#341)
---
.github/workflows/ci.yml | 42 ++
.gitignore | 3 +
bindings/python/GENERATED_README.md | 1 +
bindings/python/pyproject.toml | 5 +
bindings/python/test/conftest.py | 137 ++++++
bindings/python/test/test_admin.py | 301 ++++++++++++
bindings/python/test/test_kv_table.py | 428 +++++++++++++++++
bindings/python/test/test_log_table.py | 675 +++++++++++++++++++++++++++
website/docs/developer-guide/contributing.md | 14 +-
9 files changed, 1602 insertions(+), 4 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index e9048fb..20bee87 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -137,3 +137,45 @@ jobs:
env:
RUST_LOG: DEBUG
RUST_BACKTRACE: full
+
+ python-integration-test:
+ timeout-minutes: 60
+ runs-on: ubuntu-latest
+ strategy:
+ matrix:
+ python: ["3.9", "3.10", "3.11", "3.12"]
+ steps:
+ - uses: actions/checkout@v4
+
+ - name: Set up Python
+ uses: actions/setup-python@v5
+ with:
+ python-version: ${{ matrix.python }}
+
+ - name: Install uv
+ uses: astral-sh/setup-uv@v4
+
+ - name: Install protoc
+ run: sudo apt-get update && sudo apt-get install -y protobuf-compiler
+
+ - name: Rust Cache
+ uses: actions/cache@v4
+ with:
+ path: |
+ ~/.cargo/registry
+ ~/.cargo/git
+ target
+ key: python-test-${{ runner.os }}-${{ matrix.python }}-${{
hashFiles('**/Cargo.lock') }}
+
+ - name: Build Python bindings
+ working-directory: bindings/python
+ run: |
+ uv sync --extra dev
+ uv run maturin develop
+
+ - name: Run Python integration tests
+ working-directory: bindings/python
+ run: uv run pytest test/ -v
+ env:
+ RUST_LOG: DEBUG
+ RUST_BACKTRACE: full
diff --git a/.gitignore b/.gitignore
index 476f84e..f251aab 100644
--- a/.gitignore
+++ b/.gitignore
@@ -28,9 +28,12 @@ __pycache__/
*.egg-info/
dist/
build/
+.venv/
+uv.lock
# CPP
*CMakeFiles/
+.cache/
# Website (Docusaurus)
website/node_modules
diff --git a/bindings/python/GENERATED_README.md
b/bindings/python/GENERATED_README.md
new file mode 100644
index 0000000..0a011ba
--- /dev/null
+++ b/bindings/python/GENERATED_README.md
@@ -0,0 +1 @@
+This readme can be automatically generated by generate_readme.py.
\ No newline at end of file
diff --git a/bindings/python/pyproject.toml b/bindings/python/pyproject.toml
index 0be25a0..f5b0b68 100644
--- a/bindings/python/pyproject.toml
+++ b/bindings/python/pyproject.toml
@@ -52,6 +52,7 @@ dev = [
"pytest-asyncio>=0.25.3",
"ruff>=0.9.10",
"maturin>=1.8.2",
+ "testcontainers>=4.0.0",
]
docs = [
"pdoc>=15.0.4",
@@ -90,6 +91,10 @@ docstring-code-format = true
[tool.ruff.lint.isort]
known-first-party = ["fluss"]
+[tool.pytest.ini_options]
+asyncio_mode = "auto"
+asyncio_default_fixture_loop_scope = "session"
+
[tool.mypy]
python_version = "3.9"
warn_return_any = true
diff --git a/bindings/python/test/conftest.py b/bindings/python/test/conftest.py
new file mode 100644
index 0000000..fbd7396
--- /dev/null
+++ b/bindings/python/test/conftest.py
@@ -0,0 +1,137 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Shared fixtures for Fluss Python integration tests.
+
+If FLUSS_BOOTSTRAP_SERVERS is set, tests connect to an existing cluster.
+Otherwise, a Fluss cluster is started automatically via testcontainers.
+
+Run with:
+ uv run maturin develop && uv run pytest test/ -v
+"""
+
+import os
+import socket
+import time
+
+import pytest
+import pytest_asyncio
+
+import fluss
+
+FLUSS_VERSION = "0.7.0"
+BOOTSTRAP_SERVERS_ENV = os.environ.get("FLUSS_BOOTSTRAP_SERVERS")
+
+
+def _wait_for_port(host, port, timeout=60):
+ """Wait for a TCP port to become available."""
+ start = time.time()
+ while time.time() - start < timeout:
+ try:
+ with socket.create_connection((host, port), timeout=1):
+ return
+ except (ConnectionRefusedError, TimeoutError, OSError):
+ time.sleep(1)
+ raise TimeoutError(f"Port {port} on {host} not available after {timeout}s")
+
+
[email protected](scope="session")
+def fluss_cluster():
+ """Start a Fluss cluster using testcontainers, or use an existing one."""
+ if BOOTSTRAP_SERVERS_ENV:
+ yield BOOTSTRAP_SERVERS_ENV
+ return
+
+ from testcontainers.core.container import DockerContainer
+ from testcontainers.core.network import Network
+
+ network = Network()
+ network.create()
+
+ zookeeper = (
+ DockerContainer("zookeeper:3.9.2")
+ .with_network(network)
+ .with_name("zookeeper-python-test")
+ )
+
+ coordinator_props = "\n".join([
+ "zookeeper.address: zookeeper-python-test:2181",
+ "bind.listeners: INTERNAL://coordinator-server-python-test:0,"
+ " CLIENT://coordinator-server-python-test:9123",
+ "advertised.listeners: CLIENT://localhost:9123",
+ "internal.listener.name: INTERNAL",
+ "netty.server.num-network-threads: 1",
+ "netty.server.num-worker-threads: 3",
+ ])
+ coordinator = (
+ DockerContainer(f"fluss/fluss:{FLUSS_VERSION}")
+ .with_network(network)
+ .with_name("coordinator-server-python-test")
+ .with_bind_ports(9123, 9123)
+ .with_command("coordinatorServer")
+ .with_env("FLUSS_PROPERTIES", coordinator_props)
+ )
+
+ tablet_props = "\n".join([
+ "zookeeper.address: zookeeper-python-test:2181",
+ "bind.listeners: INTERNAL://tablet-server-python-test:0,"
+ " CLIENT://tablet-server-python-test:9123",
+ "advertised.listeners: CLIENT://localhost:9124",
+ "internal.listener.name: INTERNAL",
+ "tablet-server.id: 0",
+ "netty.server.num-network-threads: 1",
+ "netty.server.num-worker-threads: 3",
+ ])
+ tablet_server = (
+ DockerContainer(f"fluss/fluss:{FLUSS_VERSION}")
+ .with_network(network)
+ .with_name("tablet-server-python-test")
+ .with_bind_ports(9123, 9124)
+ .with_command("tabletServer")
+ .with_env("FLUSS_PROPERTIES", tablet_props)
+ )
+
+ zookeeper.start()
+ coordinator.start()
+ tablet_server.start()
+
+ _wait_for_port("localhost", 9123)
+ _wait_for_port("localhost", 9124)
+ # Extra wait for cluster to fully initialize
+ time.sleep(10)
+
+ yield "127.0.0.1:9123"
+
+ tablet_server.stop()
+ coordinator.stop()
+ zookeeper.stop()
+ network.remove()
+
+
+@pytest_asyncio.fixture(scope="session")
+async def connection(fluss_cluster):
+ """Session-scoped connection to the Fluss cluster."""
+ config = fluss.Config({"bootstrap.servers": fluss_cluster})
+ conn = await fluss.FlussConnection.create(config)
+ yield conn
+ conn.close()
+
+
+@pytest_asyncio.fixture(scope="session")
+async def admin(connection):
+ """Session-scoped admin client."""
+ return await connection.get_admin()
diff --git a/bindings/python/test/test_admin.py
b/bindings/python/test/test_admin.py
new file mode 100644
index 0000000..f203400
--- /dev/null
+++ b/bindings/python/test/test_admin.py
@@ -0,0 +1,301 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Integration tests for FlussAdmin operations.
+
+Mirrors the Rust integration tests in crates/fluss/tests/integration/admin.rs.
+"""
+
+import pyarrow as pa
+import pytest
+
+import fluss
+
+
+async def test_create_database(admin):
+ """Test database create, exists, get_info, and drop lifecycle."""
+ db_name = "py_test_create_database"
+
+ # Cleanup in case of prior failed run
+ await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True)
+
+ assert not await admin.database_exists(db_name)
+
+ db_descriptor = fluss.DatabaseDescriptor(
+ comment="test_db",
+ custom_properties={"k1": "v1", "k2": "v2"},
+ )
+ await admin.create_database(db_name, db_descriptor, ignore_if_exists=False)
+
+ assert await admin.database_exists(db_name)
+
+ db_info = await admin.get_database_info(db_name)
+ assert db_info.database_name == db_name
+
+ descriptor = db_info.get_database_descriptor()
+ assert descriptor.comment == "test_db"
+ assert descriptor.get_custom_properties() == {"k1": "v1", "k2": "v2"}
+
+ await admin.drop_database(db_name, ignore_if_not_exists=False,
cascade=True)
+
+ assert not await admin.database_exists(db_name)
+
+
+async def test_create_table(admin):
+ """Test table create, exists, get_info, list, and drop lifecycle."""
+ db_name = "py_test_create_table_db"
+
+ await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True)
+
+ assert not await admin.database_exists(db_name)
+ await admin.create_database(
+ db_name,
+ fluss.DatabaseDescriptor(comment="Database for test_create_table"),
+ ignore_if_exists=False,
+ )
+
+ table_name = "test_user_table"
+ table_path = fluss.TablePath(db_name, table_name)
+
+ schema = fluss.Schema(
+ pa.schema(
+ [
+ pa.field("id", pa.int32()),
+ pa.field("name", pa.string()),
+ pa.field("age", pa.int32()),
+ pa.field("email", pa.string()),
+ ]
+ ),
+ primary_keys=["id"],
+ )
+
+ table_descriptor = fluss.TableDescriptor(
+ schema,
+ bucket_count=3,
+ bucket_keys=["id"],
+ comment="Test table for user data (id, name, age, email)",
+ log_format="arrow",
+ kv_format="indexed",
+ properties={"table.replication.factor": "1"},
+ )
+
+ await admin.create_table(table_path, table_descriptor,
ignore_if_exists=False)
+
+ assert await admin.table_exists(table_path)
+
+ tables = await admin.list_tables(db_name)
+ assert len(tables) == 1
+ assert table_name in tables
+
+ table_info = await admin.get_table_info(table_path)
+
+ assert table_info.comment == "Test table for user data (id, name, age,
email)"
+ assert table_info.get_primary_keys() == ["id"]
+ assert table_info.num_buckets == 3
+ assert table_info.get_bucket_keys() == ["id"]
+ assert table_info.get_column_names() == ["id", "name", "age", "email"]
+
+ await admin.drop_table(table_path, ignore_if_not_exists=False)
+ assert not await admin.table_exists(table_path)
+
+ await admin.drop_database(db_name, ignore_if_not_exists=False,
cascade=True)
+ assert not await admin.database_exists(db_name)
+
+
+async def test_partition_apis(admin):
+ """Test partition create, list, and drop lifecycle."""
+ db_name = "py_test_partition_apis_db"
+
+ await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True)
+ await admin.create_database(
+ db_name,
+ fluss.DatabaseDescriptor(comment="Database for test_partition_apis"),
+ ignore_if_exists=True,
+ )
+
+ table_path = fluss.TablePath(db_name, "partitioned_table")
+
+ schema = fluss.Schema(
+ pa.schema(
+ [
+ pa.field("id", pa.int32()),
+ pa.field("name", pa.string()),
+ pa.field("dt", pa.string()),
+ pa.field("region", pa.string()),
+ ]
+ ),
+ primary_keys=["id", "dt", "region"],
+ )
+
+ table_descriptor = fluss.TableDescriptor(
+ schema,
+ partition_keys=["dt", "region"],
+ bucket_count=3,
+ bucket_keys=["id"],
+ log_format="arrow",
+ kv_format="compacted",
+ properties={"table.replication.factor": "1"},
+ )
+
+ await admin.create_table(table_path, table_descriptor,
ignore_if_exists=True)
+
+ # Initially no partitions
+ partitions = await admin.list_partition_infos(table_path)
+ assert len(partitions) == 0
+
+ # Create a partition
+ await admin.create_partition(
+ table_path,
+ {"dt": "2024-01-15", "region": "EMEA"},
+ ignore_if_exists=False,
+ )
+
+ partitions = await admin.list_partition_infos(table_path)
+ assert len(partitions) == 1
+ assert partitions[0].partition_name == "2024-01-15$EMEA"
+
+ # Drop the partition
+ await admin.drop_partition(
+ table_path,
+ {"dt": "2024-01-15", "region": "EMEA"},
+ ignore_if_not_exists=False,
+ )
+
+ partitions = await admin.list_partition_infos(table_path)
+ assert len(partitions) == 0
+
+ await admin.drop_table(table_path, ignore_if_not_exists=True)
+ await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True)
+
+
+async def test_fluss_error_response(admin):
+ """Test that API errors are raised as FlussError with correct error
codes."""
+ table_path = fluss.TablePath("fluss", "py_not_exist")
+
+ with pytest.raises(fluss.FlussError) as exc_info:
+ await admin.get_table_info(table_path)
+
+ assert exc_info.value.error_code == fluss.ErrorCode.TABLE_NOT_EXIST
+
+
+async def test_error_database_not_exist(admin):
+ """Test error handling for non-existent database operations."""
+ # get_database_info
+ with pytest.raises(fluss.FlussError) as exc_info:
+ await admin.get_database_info("py_no_such_db")
+ assert exc_info.value.error_code == fluss.ErrorCode.DATABASE_NOT_EXIST
+
+ # drop_database without ignore flag
+ with pytest.raises(fluss.FlussError) as exc_info:
+ await admin.drop_database("py_no_such_db", ignore_if_not_exists=False)
+ assert exc_info.value.error_code == fluss.ErrorCode.DATABASE_NOT_EXIST
+
+ # list_tables for non-existent database
+ with pytest.raises(fluss.FlussError) as exc_info:
+ await admin.list_tables("py_no_such_db")
+ assert exc_info.value.error_code == fluss.ErrorCode.DATABASE_NOT_EXIST
+
+
+async def test_error_database_already_exist(admin):
+ """Test error when creating a database that already exists."""
+ db_name = "py_test_error_db_already_exist"
+
+ await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True)
+ await admin.create_database(db_name, ignore_if_exists=False)
+
+ # Create same database again without ignore flag
+ with pytest.raises(fluss.FlussError) as exc_info:
+ await admin.create_database(db_name, ignore_if_exists=False)
+ assert exc_info.value.error_code == fluss.ErrorCode.DATABASE_ALREADY_EXIST
+
+ # With ignore flag should succeed
+ await admin.create_database(db_name, ignore_if_exists=True)
+
+ await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True)
+
+
+async def test_error_table_already_exist(admin):
+ """Test error when creating a table that already exists."""
+ db_name = "py_test_error_tbl_already_exist_db"
+
+ await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True)
+ await admin.create_database(db_name, ignore_if_exists=True)
+
+ table_path = fluss.TablePath(db_name, "my_table")
+ schema = fluss.Schema(
+ pa.schema([pa.field("id", pa.int32()), pa.field("name", pa.string())])
+ )
+ table_descriptor = fluss.TableDescriptor(
+ schema,
+ bucket_count=1,
+ properties={"table.replication.factor": "1"},
+ )
+
+ await admin.create_table(table_path, table_descriptor,
ignore_if_exists=False)
+
+ # Create same table again without ignore flag
+ with pytest.raises(fluss.FlussError) as exc_info:
+ await admin.create_table(table_path, table_descriptor,
ignore_if_exists=False)
+ assert exc_info.value.error_code == fluss.ErrorCode.TABLE_ALREADY_EXIST
+
+ # With ignore flag should succeed
+ await admin.create_table(table_path, table_descriptor,
ignore_if_exists=True)
+
+ await admin.drop_table(table_path, ignore_if_not_exists=True)
+ await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True)
+
+
+async def test_error_table_not_exist(admin):
+ """Test error handling for non-existent table operations."""
+ table_path = fluss.TablePath("fluss", "py_no_such_table")
+
+ # drop without ignore flag
+ with pytest.raises(fluss.FlussError) as exc_info:
+ await admin.drop_table(table_path, ignore_if_not_exists=False)
+ assert exc_info.value.error_code == fluss.ErrorCode.TABLE_NOT_EXIST
+
+ # drop with ignore flag should succeed
+ await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+
+async def test_error_table_not_partitioned(admin):
+ """Test error when calling partition operations on non-partitioned
table."""
+ db_name = "py_test_error_not_partitioned_db"
+
+ await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True)
+ await admin.create_database(db_name, ignore_if_exists=True)
+
+ table_path = fluss.TablePath(db_name, "non_partitioned_table")
+ schema = fluss.Schema(
+ pa.schema([pa.field("id", pa.int32()), pa.field("name", pa.string())])
+ )
+ table_descriptor = fluss.TableDescriptor(
+ schema,
+ bucket_count=1,
+ properties={"table.replication.factor": "1"},
+ )
+
+ await admin.create_table(table_path, table_descriptor,
ignore_if_exists=False)
+
+ with pytest.raises(fluss.FlussError) as exc_info:
+ await admin.list_partition_infos(table_path)
+ assert (
+ exc_info.value.error_code ==
fluss.ErrorCode.TABLE_NOT_PARTITIONED_EXCEPTION
+ )
+
+ await admin.drop_table(table_path, ignore_if_not_exists=True)
+ await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True)
diff --git a/bindings/python/test/test_kv_table.py
b/bindings/python/test/test_kv_table.py
new file mode 100644
index 0000000..98b0cee
--- /dev/null
+++ b/bindings/python/test/test_kv_table.py
@@ -0,0 +1,428 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Integration tests for KV (primary key) table operations.
+
+Mirrors the Rust integration tests in
crates/fluss/tests/integration/kv_table.rs.
+"""
+
+import math
+from datetime import date, datetime, timezone
+from datetime import time as dt_time
+from decimal import Decimal
+
+import pyarrow as pa
+
+import fluss
+
+
+async def test_upsert_delete_and_lookup(connection, admin):
+ """Test upsert, lookup, update, delete, and non-existent key lookup."""
+ table_path = fluss.TablePath("fluss", "py_test_upsert_and_lookup")
+ await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+ schema = fluss.Schema(
+ pa.schema(
+ [
+ pa.field("id", pa.int32()),
+ pa.field("name", pa.string()),
+ pa.field("age", pa.int64()),
+ ]
+ ),
+ primary_keys=["id"],
+ )
+ table_descriptor = fluss.TableDescriptor(schema)
+ await admin.create_table(table_path, table_descriptor,
ignore_if_exists=False)
+
+ table = await connection.get_table(table_path)
+ upsert_writer = table.new_upsert().create_writer()
+
+ test_data = [(1, "Verso", 32), (2, "Noco", 25), (3, "Esquie", 35)]
+
+ # Upsert rows (fire-and-forget, then flush)
+ for id_, name, age in test_data:
+ upsert_writer.upsert({"id": id_, "name": name, "age": age})
+ await upsert_writer.flush()
+
+ # Lookup and verify
+ lookuper = table.new_lookup().create_lookuper()
+
+ for id_, expected_name, expected_age in test_data:
+ result = await lookuper.lookup({"id": id_})
+ assert result is not None, f"Row with id={id_} should exist"
+ assert result["id"] == id_
+ assert result["name"] == expected_name
+ assert result["age"] == expected_age
+
+ # Update record with id=1 (await acknowledgment)
+ handle = upsert_writer.upsert({"id": 1, "name": "Verso", "age": 33})
+ await handle.wait()
+
+ result = await lookuper.lookup({"id": 1})
+ assert result is not None
+ assert result["age"] == 33
+ assert result["name"] == "Verso"
+
+ # Delete record with id=1 (await acknowledgment)
+ handle = upsert_writer.delete({"id": 1})
+ await handle.wait()
+
+ result = await lookuper.lookup({"id": 1})
+ assert result is None, "Record 1 should not exist after delete"
+
+ # Verify other records still exist
+ for id_ in [2, 3]:
+ result = await lookuper.lookup({"id": id_})
+ assert result is not None, f"Record {id_} should still exist"
+
+ # Lookup non-existent key
+ result = await lookuper.lookup({"id": 999})
+ assert result is None, "Non-existent key should return None"
+
+ await admin.drop_table(table_path, ignore_if_not_exists=False)
+
+
+async def test_composite_primary_keys(connection, admin):
+ """Test upsert and lookup with composite (multi-column) primary keys."""
+ table_path = fluss.TablePath("fluss", "py_test_composite_pk")
+ await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+ schema = fluss.Schema(
+ pa.schema(
+ [
+ pa.field("region", pa.string()),
+ pa.field("user_id", pa.int32()),
+ pa.field("score", pa.int64()),
+ ]
+ ),
+ primary_keys=["region", "user_id"],
+ )
+ table_descriptor = fluss.TableDescriptor(schema)
+ await admin.create_table(table_path, table_descriptor,
ignore_if_exists=False)
+
+ table = await connection.get_table(table_path)
+ upsert_writer = table.new_upsert().create_writer()
+
+ test_data = [
+ ("US", 1, 100),
+ ("US", 2, 200),
+ ("EU", 1, 150),
+ ("EU", 2, 250),
+ ]
+
+ for region, user_id, score in test_data:
+ upsert_writer.upsert({"region": region, "user_id": user_id, "score":
score})
+ await upsert_writer.flush()
+
+ lookuper = table.new_lookup().create_lookuper()
+
+ # Lookup (US, 1) -> score 100
+ result = await lookuper.lookup({"region": "US", "user_id": 1})
+ assert result is not None
+ assert result["score"] == 100
+
+ # Lookup (EU, 2) -> score 250
+ result = await lookuper.lookup({"region": "EU", "user_id": 2})
+ assert result is not None
+ assert result["score"] == 250
+
+ # Update (US, 1) score (await acknowledgment)
+ handle = upsert_writer.upsert({"region": "US", "user_id": 1, "score": 500})
+ await handle.wait()
+
+ result = await lookuper.lookup({"region": "US", "user_id": 1})
+ assert result is not None
+ assert result["score"] == 500
+
+ await admin.drop_table(table_path, ignore_if_not_exists=False)
+
+
+async def test_partial_update(connection, admin):
+ """Test partial column update via partial_update_by_name."""
+ table_path = fluss.TablePath("fluss", "py_test_partial_update")
+ await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+ schema = fluss.Schema(
+ pa.schema(
+ [
+ pa.field("id", pa.int32()),
+ pa.field("name", pa.string()),
+ pa.field("age", pa.int64()),
+ pa.field("score", pa.int64()),
+ ]
+ ),
+ primary_keys=["id"],
+ )
+ table_descriptor = fluss.TableDescriptor(schema)
+ await admin.create_table(table_path, table_descriptor,
ignore_if_exists=False)
+
+ table = await connection.get_table(table_path)
+
+ # Insert initial record
+ upsert_writer = table.new_upsert().create_writer()
+ handle = upsert_writer.upsert(
+ {"id": 1, "name": "Verso", "age": 32, "score": 6942}
+ )
+ await handle.wait()
+
+ lookuper = table.new_lookup().create_lookuper()
+ result = await lookuper.lookup({"id": 1})
+ assert result is not None
+ assert result["id"] == 1
+ assert result["name"] == "Verso"
+ assert result["age"] == 32
+ assert result["score"] == 6942
+
+ # Partial update: only update score column
+ partial_writer = (
+ table.new_upsert().partial_update_by_name(["id",
"score"]).create_writer()
+ )
+ handle = partial_writer.upsert({"id": 1, "score": 420})
+ await handle.wait()
+
+ result = await lookuper.lookup({"id": 1})
+ assert result is not None
+ assert result["id"] == 1
+ assert result["name"] == "Verso", "name should remain unchanged"
+ assert result["age"] == 32, "age should remain unchanged"
+ assert result["score"] == 420, "score should be updated to 420"
+
+ await admin.drop_table(table_path, ignore_if_not_exists=False)
+
+
+async def test_partial_update_by_index(connection, admin):
+ """Test partial column update via partial_update_by_index."""
+ table_path = fluss.TablePath("fluss", "py_test_partial_update_by_index")
+ await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+ schema = fluss.Schema(
+ pa.schema(
+ [
+ pa.field("id", pa.int32()),
+ pa.field("name", pa.string()),
+ pa.field("age", pa.int64()),
+ pa.field("score", pa.int64()),
+ ]
+ ),
+ primary_keys=["id"],
+ )
+ table_descriptor = fluss.TableDescriptor(schema)
+ await admin.create_table(table_path, table_descriptor,
ignore_if_exists=False)
+
+ table = await connection.get_table(table_path)
+
+ upsert_writer = table.new_upsert().create_writer()
+ handle = upsert_writer.upsert(
+ {"id": 1, "name": "Verso", "age": 32, "score": 6942}
+ )
+ await handle.wait()
+
+ # Partial update by indices: columns 0=id (PK), 1=name
+ partial_writer = (
+ table.new_upsert().partial_update_by_index([0, 1]).create_writer()
+ )
+ handle = partial_writer.upsert([1, "Verso Renamed"])
+ await handle.wait()
+
+ lookuper = table.new_lookup().create_lookuper()
+ result = await lookuper.lookup({"id": 1})
+ assert result is not None
+ assert result["name"] == "Verso Renamed", "name should be updated"
+ assert result["score"] == 6942, "score should remain unchanged"
+
+ await admin.drop_table(table_path, ignore_if_not_exists=False)
+
+
+async def test_partitioned_table_upsert_and_lookup(connection, admin):
+ """Test upsert/lookup/delete on a partitioned KV table."""
+ table_path = fluss.TablePath("fluss", "py_test_partitioned_kv_table")
+ await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+ schema = fluss.Schema(
+ pa.schema(
+ [
+ pa.field("region", pa.string()),
+ pa.field("user_id", pa.int32()),
+ pa.field("name", pa.string()),
+ pa.field("score", pa.int64()),
+ ]
+ ),
+ primary_keys=["region", "user_id"],
+ )
+ table_descriptor = fluss.TableDescriptor(
+ schema,
+ partition_keys=["region"],
+ )
+ await admin.create_table(table_path, table_descriptor,
ignore_if_exists=False)
+
+ # Create partitions
+ for region in ["US", "EU", "APAC"]:
+ await admin.create_partition(
+ table_path, {"region": region}, ignore_if_exists=True
+ )
+
+ table = await connection.get_table(table_path)
+ upsert_writer = table.new_upsert().create_writer()
+
+ test_data = [
+ ("US", 1, "Gustave", 100),
+ ("US", 2, "Lune", 200),
+ ("EU", 1, "Sciel", 150),
+ ("EU", 2, "Maelle", 250),
+ ("APAC", 1, "Noco", 300),
+ ]
+
+ for region, user_id, name, score in test_data:
+ upsert_writer.upsert(
+ {"region": region, "user_id": user_id, "name": name, "score":
score}
+ )
+ await upsert_writer.flush()
+
+ lookuper = table.new_lookup().create_lookuper()
+
+ # Verify all rows across partitions
+ for region, user_id, expected_name, expected_score in test_data:
+ result = await lookuper.lookup({"region": region, "user_id": user_id})
+ assert result is not None, f"Row ({region}, {user_id}) should exist"
+ assert result["region"] == region
+ assert result["user_id"] == user_id
+ assert result["name"] == expected_name
+ assert result["score"] == expected_score
+
+ # Update within a partition (await acknowledgment)
+ handle = upsert_writer.upsert(
+ {"region": "US", "user_id": 1, "name": "Gustave Updated", "score": 999}
+ )
+ await handle.wait()
+
+ result = await lookuper.lookup({"region": "US", "user_id": 1})
+ assert result is not None
+ assert result["name"] == "Gustave Updated"
+ assert result["score"] == 999
+
+ # Lookup in non-existent partition should return None
+ result = await lookuper.lookup({"region": "UNKNOWN_REGION", "user_id": 1})
+ assert result is None, "Lookup in non-existent partition should return
None"
+
+ # Delete within a partition (await acknowledgment)
+ handle = upsert_writer.delete({"region": "EU", "user_id": 1})
+ await handle.wait()
+
+ result = await lookuper.lookup({"region": "EU", "user_id": 1})
+ assert result is None, "Deleted record should not exist"
+
+ # Verify sibling record still exists
+ result = await lookuper.lookup({"region": "EU", "user_id": 2})
+ assert result is not None
+ assert result["name"] == "Maelle"
+
+ await admin.drop_table(table_path, ignore_if_not_exists=False)
+
+
+async def test_all_supported_datatypes(connection, admin):
+ """Test upsert/lookup for all supported data types, including nulls."""
+ table_path = fluss.TablePath("fluss", "py_test_kv_all_datatypes")
+ await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+ schema = fluss.Schema(
+ pa.schema(
+ [
+ pa.field("pk_int", pa.int32()),
+ pa.field("col_boolean", pa.bool_()),
+ pa.field("col_tinyint", pa.int8()),
+ pa.field("col_smallint", pa.int16()),
+ pa.field("col_int", pa.int32()),
+ pa.field("col_bigint", pa.int64()),
+ pa.field("col_float", pa.float32()),
+ pa.field("col_double", pa.float64()),
+ pa.field("col_string", pa.string()),
+ pa.field("col_decimal", pa.decimal128(10, 2)),
+ pa.field("col_date", pa.date32()),
+ pa.field("col_time", pa.time32("ms")),
+ pa.field("col_timestamp_ntz", pa.timestamp("us")),
+ pa.field("col_timestamp_ltz", pa.timestamp("us", tz="UTC")),
+ pa.field("col_bytes", pa.binary()),
+ ]
+ ),
+ primary_keys=["pk_int"],
+ )
+ table_descriptor = fluss.TableDescriptor(schema)
+ await admin.create_table(table_path, table_descriptor,
ignore_if_exists=False)
+
+ table = await connection.get_table(table_path)
+ upsert_writer = table.new_upsert().create_writer()
+
+ # Test data for all types
+ row_data = {
+ "pk_int": 1,
+ "col_boolean": True,
+ "col_tinyint": 127,
+ "col_smallint": 32767,
+ "col_int": 2147483647,
+ "col_bigint": 9223372036854775807,
+ "col_float": 3.14,
+ "col_double": 2.718281828459045,
+ "col_string": "world of fluss python client",
+ "col_decimal": Decimal("123.45"),
+ "col_date": date(2026, 1, 23),
+ "col_time": dt_time(10, 13, 47, 123000), # millisecond precision
+ "col_timestamp_ntz": datetime(2026, 1, 23, 10, 13, 47, 123000),
+ "col_timestamp_ltz": datetime(2026, 1, 23, 10, 13, 47, 123000),
+ "col_bytes": b"binary data",
+ }
+
+ handle = upsert_writer.upsert(row_data)
+ await handle.wait()
+
+ lookuper = table.new_lookup().create_lookuper()
+ result = await lookuper.lookup({"pk_int": 1})
+ assert result is not None, "Row should exist"
+
+ assert result["pk_int"] == 1
+ assert result["col_boolean"] is True
+ assert result["col_tinyint"] == 127
+ assert result["col_smallint"] == 32767
+ assert result["col_int"] == 2147483647
+ assert result["col_bigint"] == 9223372036854775807
+ assert math.isclose(result["col_float"], 3.14, rel_tol=1e-6)
+ assert math.isclose(result["col_double"], 2.718281828459045, rel_tol=1e-15)
+ assert result["col_string"] == "world of fluss python client"
+ assert result["col_decimal"] == Decimal("123.45")
+ assert result["col_date"] == date(2026, 1, 23)
+ assert result["col_time"] == dt_time(10, 13, 47, 123000)
+ assert result["col_timestamp_ntz"] == datetime(2026, 1, 23, 10, 13, 47,
123000)
+ assert result["col_timestamp_ltz"] == datetime(
+ 2026, 1, 23, 10, 13, 47, 123000, tzinfo=timezone.utc
+ )
+ assert result["col_bytes"] == b"binary data"
+
+ # Test with null values for all nullable columns
+ null_row = {"pk_int": 2}
+ for col in row_data:
+ if col != "pk_int":
+ null_row[col] = None
+ handle = upsert_writer.upsert(null_row)
+ await handle.wait()
+
+ result = await lookuper.lookup({"pk_int": 2})
+ assert result is not None, "Row with nulls should exist"
+ assert result["pk_int"] == 2
+ for col in row_data:
+ if col != "pk_int":
+ assert result[col] is None, f"{col} should be null"
+
+ await admin.drop_table(table_path, ignore_if_not_exists=False)
diff --git a/bindings/python/test/test_log_table.py
b/bindings/python/test/test_log_table.py
new file mode 100644
index 0000000..3219f03
--- /dev/null
+++ b/bindings/python/test/test_log_table.py
@@ -0,0 +1,675 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Integration tests for log (append-only) table operations.
+
+Mirrors the Rust integration tests in
crates/fluss/tests/integration/log_table.rs.
+"""
+
+import asyncio
+import time
+
+import pyarrow as pa
+
+import fluss
+
+
+async def test_append_and_scan(connection, admin):
+ """Test appending record batches and scanning with a record-based
scanner."""
+ table_path = fluss.TablePath("fluss", "py_test_append_and_scan")
+ await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+ schema = fluss.Schema(
+ pa.schema([pa.field("c1", pa.int32()), pa.field("c2", pa.string())])
+ )
+ table_descriptor = fluss.TableDescriptor(schema)
+ await admin.create_table(table_path, table_descriptor,
ignore_if_exists=False)
+
+ table = await connection.get_table(table_path)
+ append_writer = table.new_append().create_writer()
+
+ batch1 = pa.RecordBatch.from_arrays(
+ [pa.array([1, 2, 3], type=pa.int32()), pa.array(["a1", "a2", "a3"])],
+ schema=pa.schema([pa.field("c1", pa.int32()), pa.field("c2",
pa.string())]),
+ )
+ append_writer.write_arrow_batch(batch1)
+
+ batch2 = pa.RecordBatch.from_arrays(
+ [pa.array([4, 5, 6], type=pa.int32()), pa.array(["a4", "a5", "a6"])],
+ schema=pa.schema([pa.field("c1", pa.int32()), pa.field("c2",
pa.string())]),
+ )
+ append_writer.write_arrow_batch(batch2)
+
+ await append_writer.flush()
+
+ # Scan with record-based scanner
+ scanner = await table.new_scan().create_log_scanner()
+ num_buckets = (await admin.get_table_info(table_path)).num_buckets
+ scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in
range(num_buckets)})
+
+ records = _poll_records(scanner, expected_count=6)
+
+ assert len(records) == 6, f"Expected 6 records, got {len(records)}"
+
+ records.sort(key=lambda r: r.row["c1"])
+
+ expected_c1 = [1, 2, 3, 4, 5, 6]
+ expected_c2 = ["a1", "a2", "a3", "a4", "a5", "a6"]
+ for i, record in enumerate(records):
+ assert record.row["c1"] == expected_c1[i], f"c1 mismatch at row {i}"
+ assert record.row["c2"] == expected_c2[i], f"c2 mismatch at row {i}"
+
+ # Test unsubscribe
+ scanner.unsubscribe(bucket_id=0)
+
+ await admin.drop_table(table_path, ignore_if_not_exists=False)
+
+
+async def test_append_dict_rows(connection, admin):
+ """Test appending rows as dicts and scanning."""
+ table_path = fluss.TablePath("fluss", "py_test_append_dict_rows")
+ await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+ schema = fluss.Schema(
+ pa.schema([pa.field("id", pa.int32()), pa.field("name", pa.string())])
+ )
+ table_descriptor = fluss.TableDescriptor(schema)
+ await admin.create_table(table_path, table_descriptor,
ignore_if_exists=False)
+
+ table = await connection.get_table(table_path)
+ append_writer = table.new_append().create_writer()
+
+ # Append using dicts
+ append_writer.append({"id": 1, "name": "Alice"})
+ append_writer.append({"id": 2, "name": "Bob"})
+ # Append using lists
+ append_writer.append([3, "Charlie"])
+ await append_writer.flush()
+
+ scanner = await table.new_scan().create_log_scanner()
+ num_buckets = (await admin.get_table_info(table_path)).num_buckets
+ scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in
range(num_buckets)})
+
+ records = _poll_records(scanner, expected_count=3)
+ assert len(records) == 3
+
+ rows = sorted([r.row for r in records], key=lambda r: r["id"])
+ assert rows[0] == {"id": 1, "name": "Alice"}
+ assert rows[1] == {"id": 2, "name": "Bob"}
+ assert rows[2] == {"id": 3, "name": "Charlie"}
+
+ await admin.drop_table(table_path, ignore_if_not_exists=False)
+
+
+async def test_list_offsets(connection, admin):
+ """Test listing earliest, latest, and timestamp-based offsets."""
+ table_path = fluss.TablePath("fluss", "py_test_list_offsets")
+ await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+ schema = fluss.Schema(
+ pa.schema([pa.field("id", pa.int32()), pa.field("name", pa.string())])
+ )
+ table_descriptor = fluss.TableDescriptor(schema)
+ await admin.create_table(table_path, table_descriptor,
ignore_if_exists=False)
+
+ await asyncio.sleep(2) # Wait for table initialization
+
+ # Earliest offset should be 0 for empty table
+ earliest = await admin.list_offsets(
+ table_path, bucket_ids=[0], offset_spec=fluss.OffsetSpec.earliest()
+ )
+ assert earliest[0] == 0
+
+ # Latest offset should be 0 for empty table
+ latest = await admin.list_offsets(
+ table_path, bucket_ids=[0], offset_spec=fluss.OffsetSpec.latest()
+ )
+ assert latest[0] == 0
+
+ before_append_ms = int(time.time() * 1000)
+
+ # Append some records
+ table = await connection.get_table(table_path)
+ append_writer = table.new_append().create_writer()
+ batch = pa.RecordBatch.from_arrays(
+ [
+ pa.array([1, 2, 3], type=pa.int32()),
+ pa.array(["alice", "bob", "charlie"]),
+ ],
+ schema=pa.schema([pa.field("id", pa.int32()), pa.field("name",
pa.string())]),
+ )
+ append_writer.write_arrow_batch(batch)
+ await append_writer.flush()
+
+ await asyncio.sleep(1)
+
+ after_append_ms = int(time.time() * 1000)
+
+ # Latest offset should be 3 after appending 3 records
+ latest_after = await admin.list_offsets(
+ table_path, bucket_ids=[0], offset_spec=fluss.OffsetSpec.latest()
+ )
+ assert latest_after[0] == 3
+
+ # Earliest offset should still be 0
+ earliest_after = await admin.list_offsets(
+ table_path, bucket_ids=[0], offset_spec=fluss.OffsetSpec.earliest()
+ )
+ assert earliest_after[0] == 0
+
+ # Timestamp before append should resolve to offset 0
+ ts_before = await admin.list_offsets(
+ table_path,
+ bucket_ids=[0],
+ offset_spec=fluss.OffsetSpec.timestamp(before_append_ms),
+ )
+ assert ts_before[0] == 0
+
+ # Timestamp after append should resolve to offset 3
+ ts_after = await admin.list_offsets(
+ table_path,
+ bucket_ids=[0],
+ offset_spec=fluss.OffsetSpec.timestamp(after_append_ms),
+ )
+ assert ts_after[0] == 3
+
+ await admin.drop_table(table_path, ignore_if_not_exists=False)
+
+
+async def test_project(connection, admin):
+ """Test column projection by name and by index."""
+ table_path = fluss.TablePath("fluss", "py_test_project")
+ await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+ schema = fluss.Schema(
+ pa.schema(
+ [
+ pa.field("col_a", pa.int32()),
+ pa.field("col_b", pa.string()),
+ pa.field("col_c", pa.int32()),
+ ]
+ )
+ )
+ table_descriptor = fluss.TableDescriptor(schema)
+ await admin.create_table(table_path, table_descriptor,
ignore_if_exists=False)
+
+ table = await connection.get_table(table_path)
+ append_writer = table.new_append().create_writer()
+
+ batch = pa.RecordBatch.from_arrays(
+ [
+ pa.array([1, 2, 3], type=pa.int32()),
+ pa.array(["x", "y", "z"]),
+ pa.array([10, 20, 30], type=pa.int32()),
+ ],
+ schema=pa.schema(
+ [
+ pa.field("col_a", pa.int32()),
+ pa.field("col_b", pa.string()),
+ pa.field("col_c", pa.int32()),
+ ]
+ ),
+ )
+ append_writer.write_arrow_batch(batch)
+ await append_writer.flush()
+
+ # Test project_by_name: select col_b and col_c only
+ scan = table.new_scan().project_by_name(["col_b", "col_c"])
+ scanner = await scan.create_log_scanner()
+ scanner.subscribe_buckets({0: 0})
+
+ records = _poll_records(scanner, expected_count=3)
+ assert len(records) == 3
+
+ records.sort(key=lambda r: r.row["col_c"])
+ expected_col_b = ["x", "y", "z"]
+ expected_col_c = [10, 20, 30]
+ for i, record in enumerate(records):
+ assert record.row["col_b"] == expected_col_b[i]
+ assert record.row["col_c"] == expected_col_c[i]
+ # col_a should not be present in projected results
+ assert "col_a" not in record.row
+
+ # Test project by indices [1, 0] -> (col_b, col_a)
+ scanner2 = await table.new_scan().project([1, 0]).create_log_scanner()
+ scanner2.subscribe_buckets({0: 0})
+
+ records2 = _poll_records(scanner2, expected_count=3)
+ assert len(records2) == 3
+
+ records2.sort(key=lambda r: r.row["col_a"])
+ for i, record in enumerate(records2):
+ assert record.row["col_b"] == expected_col_b[i]
+ assert record.row["col_a"] == [1, 2, 3][i]
+ assert "col_c" not in record.row
+
+ await admin.drop_table(table_path, ignore_if_not_exists=False)
+
+
+async def test_poll_batches(connection, admin):
+ """Test batch-based scanning with poll_arrow and poll_record_batch."""
+ table_path = fluss.TablePath("fluss", "py_test_poll_batches")
+ await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+ schema = fluss.Schema(
+ pa.schema([pa.field("id", pa.int32()), pa.field("name", pa.string())])
+ )
+ table_descriptor = fluss.TableDescriptor(schema)
+ await admin.create_table(table_path, table_descriptor,
ignore_if_exists=False)
+
+ await asyncio.sleep(1)
+
+ table = await connection.get_table(table_path)
+ scanner = await table.new_scan().create_record_batch_log_scanner()
+ scanner.subscribe(bucket_id=0, start_offset=0)
+
+ # Empty table should return empty result
+ result = scanner.poll_arrow(500)
+ assert result.num_rows == 0
+
+ writer = table.new_append().create_writer()
+ pa_schema = pa.schema([pa.field("id", pa.int32()), pa.field("name",
pa.string())])
+ writer.write_arrow_batch(
+ pa.RecordBatch.from_arrays(
+ [pa.array([1, 2], type=pa.int32()), pa.array(["a", "b"])],
+ schema=pa_schema,
+ )
+ )
+ writer.write_arrow_batch(
+ pa.RecordBatch.from_arrays(
+ [pa.array([3, 4], type=pa.int32()), pa.array(["c", "d"])],
+ schema=pa_schema,
+ )
+ )
+ writer.write_arrow_batch(
+ pa.RecordBatch.from_arrays(
+ [pa.array([5, 6], type=pa.int32()), pa.array(["e", "f"])],
+ schema=pa_schema,
+ )
+ )
+ await writer.flush()
+
+ # Poll until we get all 6 records
+ all_ids = _poll_arrow_ids(scanner, expected_count=6)
+ assert all_ids == [1, 2, 3, 4, 5, 6]
+
+ # Append more and verify offset continuation (no duplicates)
+ writer.write_arrow_batch(
+ pa.RecordBatch.from_arrays(
+ [pa.array([7, 8], type=pa.int32()), pa.array(["g", "h"])],
+ schema=pa_schema,
+ )
+ )
+ await writer.flush()
+
+ new_ids = _poll_arrow_ids(scanner, expected_count=2)
+ assert new_ids == [7, 8]
+
+ # Subscribe from mid-offset should truncate (skip earlier records)
+ trunc_scanner = await table.new_scan().create_record_batch_log_scanner()
+ trunc_scanner.subscribe(bucket_id=0, start_offset=3)
+
+ trunc_ids = _poll_arrow_ids(trunc_scanner, expected_count=5)
+ assert trunc_ids == [4, 5, 6, 7, 8]
+
+ # Projection with batch scanner
+ proj_scanner = (
+ await table.new_scan()
+ .project_by_name(["id"])
+ .create_record_batch_log_scanner()
+ )
+ proj_scanner.subscribe(bucket_id=0, start_offset=0)
+ batches = proj_scanner.poll_record_batch(10000)
+ assert len(batches) > 0
+ assert batches[0].batch.num_columns == 1
+
+ await admin.drop_table(table_path, ignore_if_not_exists=False)
+
+
+async def test_to_arrow_and_to_pandas(connection, admin):
+ """Test to_arrow() and to_pandas() convenience methods."""
+ table_path = fluss.TablePath("fluss", "py_test_to_arrow_pandas")
+ await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+ schema = fluss.Schema(
+ pa.schema([pa.field("id", pa.int32()), pa.field("name", pa.string())])
+ )
+ table_descriptor = fluss.TableDescriptor(schema)
+ await admin.create_table(table_path, table_descriptor,
ignore_if_exists=False)
+
+ table = await connection.get_table(table_path)
+ writer = table.new_append().create_writer()
+
+ pa_schema = pa.schema([pa.field("id", pa.int32()), pa.field("name",
pa.string())])
+ writer.write_arrow_batch(
+ pa.RecordBatch.from_arrays(
+ [pa.array([1, 2, 3], type=pa.int32()), pa.array(["a", "b", "c"])],
+ schema=pa_schema,
+ )
+ )
+ await writer.flush()
+
+ num_buckets = (await admin.get_table_info(table_path)).num_buckets
+
+ # to_arrow()
+ scanner = await table.new_scan().create_record_batch_log_scanner()
+ scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in
range(num_buckets)})
+ arrow_table = scanner.to_arrow()
+ assert arrow_table.num_rows == 3
+ assert arrow_table.schema.names == ["id", "name"]
+
+ # to_pandas()
+ scanner2 = await table.new_scan().create_record_batch_log_scanner()
+ scanner2.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in
range(num_buckets)})
+ df = scanner2.to_pandas()
+ assert len(df) == 3
+ assert list(df.columns) == ["id", "name"]
+
+ await admin.drop_table(table_path, ignore_if_not_exists=False)
+
+
+async def test_partitioned_table_append_scan(connection, admin):
+ """Test append and scan on a partitioned log table."""
+ table_path = fluss.TablePath("fluss", "py_test_partitioned_log_append")
+ await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+ schema = fluss.Schema(
+ pa.schema(
+ [
+ pa.field("id", pa.int32()),
+ pa.field("region", pa.string()),
+ pa.field("value", pa.int64()),
+ ]
+ )
+ )
+ table_descriptor = fluss.TableDescriptor(
+ schema,
+ partition_keys=["region"],
+ )
+ await admin.create_table(table_path, table_descriptor,
ignore_if_exists=False)
+
+ # Create partitions
+ for region in ["US", "EU"]:
+ await admin.create_partition(
+ table_path, {"region": region}, ignore_if_exists=True
+ )
+
+ await asyncio.sleep(2) # Wait for partitions to be available
+
+ table = await connection.get_table(table_path)
+ append_writer = table.new_append().create_writer()
+
+ # Append rows
+ test_data = [
+ (1, "US", 100),
+ (2, "US", 200),
+ (3, "EU", 300),
+ (4, "EU", 400),
+ ]
+ for id_, region, value in test_data:
+ append_writer.append({"id": id_, "region": region, "value": value})
+ await append_writer.flush()
+
+ # Append arrow batches per partition
+ pa_schema = pa.schema(
+ [
+ pa.field("id", pa.int32()),
+ pa.field("region", pa.string()),
+ pa.field("value", pa.int64()),
+ ]
+ )
+ us_batch = pa.RecordBatch.from_arrays(
+ [
+ pa.array([5, 6], type=pa.int32()),
+ pa.array(["US", "US"]),
+ pa.array([500, 600], type=pa.int64()),
+ ],
+ schema=pa_schema,
+ )
+ append_writer.write_arrow_batch(us_batch)
+
+ eu_batch = pa.RecordBatch.from_arrays(
+ [
+ pa.array([7, 8], type=pa.int32()),
+ pa.array(["EU", "EU"]),
+ pa.array([700, 800], type=pa.int64()),
+ ],
+ schema=pa_schema,
+ )
+ append_writer.write_arrow_batch(eu_batch)
+ await append_writer.flush()
+
+ # Verify partition offsets
+ us_offsets = await admin.list_partition_offsets(
+ table_path,
+ partition_name="US",
+ bucket_ids=[0],
+ offset_spec=fluss.OffsetSpec.latest(),
+ )
+ assert us_offsets[0] == 4, "US partition should have 4 records"
+
+ eu_offsets = await admin.list_partition_offsets(
+ table_path,
+ partition_name="EU",
+ bucket_ids=[0],
+ offset_spec=fluss.OffsetSpec.latest(),
+ )
+ assert eu_offsets[0] == 4, "EU partition should have 4 records"
+
+ # Scan all partitions
+ scanner = await table.new_scan().create_log_scanner()
+ partition_infos = await admin.list_partition_infos(table_path)
+ for p in partition_infos:
+ scanner.subscribe_partition(
+ partition_id=p.partition_id, bucket_id=0, start_offset=0
+ )
+
+ expected = [
+ (1, "US", 100),
+ (2, "US", 200),
+ (3, "EU", 300),
+ (4, "EU", 400),
+ (5, "US", 500),
+ (6, "US", 600),
+ (7, "EU", 700),
+ (8, "EU", 800),
+ ]
+
+ records = _poll_records(scanner, expected_count=8)
+ assert len(records) == 8
+
+ collected = sorted(
+ [(r.row["id"], r.row["region"], r.row["value"]) for r in records],
+ key=lambda x: x[0],
+ )
+ assert collected == expected
+
+ # Test unsubscribe_partition: unsubscribe from EU, only US data should
remain
+ unsub_scanner = await table.new_scan().create_log_scanner()
+ eu_partition_id = next(
+ p.partition_id for p in partition_infos if p.partition_name == "EU"
+ )
+ for p in partition_infos:
+ unsub_scanner.subscribe_partition(p.partition_id, 0, 0)
+ unsub_scanner.unsubscribe_partition(eu_partition_id, 0)
+
+ remaining = _poll_records(unsub_scanner, expected_count=4, timeout_s=5)
+ assert len(remaining) == 4
+ assert all(r.row["region"] == "US" for r in remaining)
+
+ # Test subscribe_partition_buckets (batch subscribe)
+ batch_scanner = await table.new_scan().create_log_scanner()
+ partition_bucket_offsets = {
+ (p.partition_id, 0): fluss.EARLIEST_OFFSET for p in partition_infos
+ }
+ batch_scanner.subscribe_partition_buckets(partition_bucket_offsets)
+
+ batch_records = _poll_records(batch_scanner, expected_count=8)
+ assert len(batch_records) == 8
+ batch_collected = sorted(
+ [(r.row["id"], r.row["region"], r.row["value"]) for r in
batch_records],
+ key=lambda x: x[0],
+ )
+ assert batch_collected == expected
+
+ await admin.drop_table(table_path, ignore_if_not_exists=False)
+
+
+async def test_write_arrow(connection, admin):
+ """Test writing a full PyArrow Table via write_arrow()."""
+ table_path = fluss.TablePath("fluss", "py_test_write_arrow")
+ await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+ schema = fluss.Schema(
+ pa.schema([pa.field("id", pa.int32()), pa.field("name", pa.string())])
+ )
+ table_descriptor = fluss.TableDescriptor(schema)
+ await admin.create_table(table_path, table_descriptor,
ignore_if_exists=False)
+
+ table = await connection.get_table(table_path)
+ writer = table.new_append().create_writer()
+
+ pa_schema = pa.schema([pa.field("id", pa.int32()), pa.field("name",
pa.string())])
+ arrow_table = pa.table(
+ {
+ "id": pa.array([1, 2, 3, 4, 5], type=pa.int32()),
+ "name": pa.array(["alice", "bob", "charlie", "dave", "eve"]),
+ },
+ schema=pa_schema,
+ )
+ writer.write_arrow(arrow_table)
+ await writer.flush()
+
+ num_buckets = (await admin.get_table_info(table_path)).num_buckets
+ scanner = await table.new_scan().create_record_batch_log_scanner()
+ scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in
range(num_buckets)})
+
+ result = scanner.to_arrow()
+ assert result.num_rows == 5
+
+ ids = sorted(result.column("id").to_pylist())
+ names = [
+ n
+ for _, n in sorted(
+ zip(result.column("id").to_pylist(),
result.column("name").to_pylist())
+ )
+ ]
+ assert ids == [1, 2, 3, 4, 5]
+ assert names == ["alice", "bob", "charlie", "dave", "eve"]
+
+ await admin.drop_table(table_path, ignore_if_not_exists=False)
+
+
+async def test_write_pandas(connection, admin):
+ """Test writing a Pandas DataFrame via write_pandas()."""
+ import pandas as pd
+
+ table_path = fluss.TablePath("fluss", "py_test_write_pandas")
+ await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+ schema = fluss.Schema(
+ pa.schema([pa.field("id", pa.int32()), pa.field("name", pa.string())])
+ )
+ table_descriptor = fluss.TableDescriptor(schema)
+ await admin.create_table(table_path, table_descriptor,
ignore_if_exists=False)
+
+ table = await connection.get_table(table_path)
+ writer = table.new_append().create_writer()
+
+ df = pd.DataFrame({"id": [10, 20, 30], "name": ["x", "y", "z"]})
+ writer.write_pandas(df)
+ await writer.flush()
+
+ num_buckets = (await admin.get_table_info(table_path)).num_buckets
+ scanner = await table.new_scan().create_record_batch_log_scanner()
+ scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in
range(num_buckets)})
+
+ result = scanner.to_pandas()
+ assert len(result) == 3
+
+ result_sorted = result.sort_values("id").reset_index(drop=True)
+ assert result_sorted["id"].tolist() == [10, 20, 30]
+ assert result_sorted["name"].tolist() == ["x", "y", "z"]
+
+ await admin.drop_table(table_path, ignore_if_not_exists=False)
+
+
+async def test_partitioned_table_to_arrow(connection, admin):
+ """Test to_arrow() on partitioned tables."""
+ table_path = fluss.TablePath("fluss", "py_test_partitioned_to_arrow")
+ await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+ schema = fluss.Schema(
+ pa.schema(
+ [
+ pa.field("id", pa.int32()),
+ pa.field("region", pa.string()),
+ pa.field("value", pa.int64()),
+ ]
+ )
+ )
+ table_descriptor = fluss.TableDescriptor(schema, partition_keys=["region"])
+ await admin.create_table(table_path, table_descriptor,
ignore_if_exists=False)
+
+ for region in ["US", "EU"]:
+ await admin.create_partition(
+ table_path, {"region": region}, ignore_if_exists=True
+ )
+
+ await asyncio.sleep(2)
+
+ table = await connection.get_table(table_path)
+ writer = table.new_append().create_writer()
+ writer.append({"id": 1, "region": "US", "value": 100})
+ writer.append({"id": 2, "region": "EU", "value": 200})
+ await writer.flush()
+
+ scanner = await table.new_scan().create_record_batch_log_scanner()
+ partition_infos = await admin.list_partition_infos(table_path)
+ for p in partition_infos:
+ scanner.subscribe_partition(p.partition_id, 0, fluss.EARLIEST_OFFSET)
+
+ arrow_table = scanner.to_arrow()
+ assert arrow_table.num_rows == 2
+
+ await admin.drop_table(table_path, ignore_if_not_exists=False)
+
+
+# ---------------------------------------------------------------------------
+# Helpers
+# ---------------------------------------------------------------------------
+
+
+def _poll_records(scanner, expected_count, timeout_s=10):
+ """Poll a record-based scanner until expected_count records are
collected."""
+ collected = []
+ deadline = time.monotonic() + timeout_s
+ while len(collected) < expected_count and time.monotonic() < deadline:
+ records = scanner.poll(5000)
+ collected.extend(records)
+ return collected
+
+
+def _poll_arrow_ids(scanner, expected_count, timeout_s=10):
+ """Poll a batch scanner and extract 'id' column values."""
+ all_ids = []
+ deadline = time.monotonic() + timeout_s
+ while len(all_ids) < expected_count and time.monotonic() < deadline:
+ arrow_table = scanner.poll_arrow(5000)
+ if arrow_table.num_rows > 0:
+ all_ids.extend(arrow_table.column("id").to_pylist())
+ return all_ids
diff --git a/website/docs/developer-guide/contributing.md
b/website/docs/developer-guide/contributing.md
index eced106..38b792e 100644
--- a/website/docs/developer-guide/contributing.md
+++ b/website/docs/developer-guide/contributing.md
@@ -82,7 +82,7 @@ cargo build --workspace --all-targets
# Run unit tests
cargo test --workspace
-# Run integration tests (requires a running Fluss cluster)
+# Run integration tests (requires Docker)
RUST_TEST_THREADS=1 cargo test --features integration_tests --workspace
# Run a single test
@@ -93,9 +93,15 @@ cargo test test_name
```bash
cd bindings/python
-pip install maturin
-pip install -e ".[dev]"
-maturin develop
+
+# Install dev dependencies and build the extension
+uv sync --extra dev && uv run maturin develop
+
+# Run integration tests (requires Docker)
+uv run pytest test/ -v
+
+# To run against an existing cluster instead
+FLUSS_BOOTSTRAP_SERVERS=127.0.0.1:9123 uv run pytest test/ -v
```
### C++ Bindings