This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new e0b3dac  doc: add python docs and API reference (#291)
e0b3dac is described below

commit e0b3dac8313bb3df7ac2a338cbb86037cf0f0326
Author: Anton Borisov <[email protected]>
AuthorDate: Tue Feb 10 01:29:40 2026 +0000

    doc: add python docs and API reference (#291)
---
 bindings/python/API_REFERENCE.md   | 278 +++++++++++++++++++++
 bindings/python/DEVELOPMENT.md     | 114 +++++++++
 bindings/python/README.md          | 494 +++++++++++++++++++++++++++++--------
 bindings/python/fluss/__init__.pyi |   3 +-
 4 files changed, 790 insertions(+), 99 deletions(-)

diff --git a/bindings/python/API_REFERENCE.md b/bindings/python/API_REFERENCE.md
new file mode 100644
index 0000000..258b26e
--- /dev/null
+++ b/bindings/python/API_REFERENCE.md
@@ -0,0 +1,278 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+# Python API Reference
+
+Complete API reference for the Fluss Python client. For a usage guide with 
examples, see the [Python Client Guide](README.md).
+
+## `Config`
+
+| Method / Property | Description |
+|---|---|
+| `Config(properties: dict = None)` | Create config from a dict of key-value 
pairs |
+| `.bootstrap_server` | Get/set coordinator server address |
+| `.request_max_size` | Get/set max request size in bytes |
+| `.writer_batch_size` | Get/set write batch size in bytes |
+
+## `FlussConnection`
+
+| Method | Description |
+|---|---|
+| `await FlussConnection.connect(config) -> FlussConnection` | Connect to a 
Fluss cluster |
+| `await conn.get_admin() -> FlussAdmin` | Get admin interface |
+| `await conn.get_table(table_path) -> FlussTable` | Get a table for 
read/write operations |
+| `conn.close()` | Close the connection |
+
+Supports `with` statement (context manager).
+
+## `FlussAdmin`
+
+| Method | Description |
+|---|---|
+| `await create_database(name, ignore_if_exists=False, 
database_descriptor=None)` | Create a database |
+| `await drop_database(name, ignore_if_not_exists=False, cascade=True)` | Drop 
a database |
+| `await list_databases() -> list[str]` | List all databases |
+| `await database_exists(name) -> bool` | Check if a database exists |
+| `await get_database_info(name) -> DatabaseInfo` | Get database metadata |
+| `await create_table(table_path, table_descriptor, ignore_if_exists=False)` | 
Create a table |
+| `await drop_table(table_path, ignore_if_not_exists=False)` | Drop a table |
+| `await get_table(table_path) -> TableInfo` | Get table metadata |
+| `await list_tables(database_name) -> list[str]` | List tables in a database |
+| `await table_exists(table_path) -> bool` | Check if a table exists |
+| `await list_offsets(table_path, bucket_ids, offset_type, timestamp=None) -> 
dict[int, int]` | Get offsets for buckets |
+| `await list_partition_offsets(table_path, partition_name, bucket_ids, 
offset_type, timestamp=None) -> dict[int, int]` | Get offsets for a partition's 
buckets |
+| `await create_partition(table_path, partition_spec, ignore_if_exists=False)` 
| Create a partition |
+| `await drop_partition(table_path, partition_spec, 
ignore_if_not_exists=False)` | Drop a partition |
+| `await list_partition_infos(table_path) -> list[PartitionInfo]` | List 
partitions |
+| `await get_latest_lake_snapshot(table_path) -> LakeSnapshot` | Get latest 
lake snapshot |
+
+## `FlussTable`
+
+| Method | Description |
+|---|---|
+| `new_scan() -> TableScan` | Create a scan builder |
+| `await new_append_writer() -> AppendWriter` | Create writer for log tables |
+| `new_upsert(columns=None, column_indices=None) -> UpsertWriter` | Create 
writer for PK tables (optionally partial) |
+| `new_lookup() -> Lookuper` | Create lookuper for PK tables |
+| `get_table_info() -> TableInfo` | Get table metadata |
+| `get_table_path() -> TablePath` | Get table path |
+| `has_primary_key() -> bool` | Check if table has a primary key |
+
+## `TableScan`
+
+| Method | Description |
+|---|---|
+| `.project(indices) -> TableScan` | Project columns by index |
+| `.project_by_name(names) -> TableScan` | Project columns by name |
+| `await .create_log_scanner() -> LogScanner` | Create record-based scanner 
(for `poll()`) |
+| `await .create_batch_scanner() -> LogScanner` | Create batch-based scanner 
(for `poll_arrow()`, `to_arrow()`, etc.) |
+
+## `AppendWriter`
+
+| Method | Description |
+|---|---|
+| `.append(row) -> WriteResultHandle` | Append a row (dict, list, or tuple) |
+| `.write_arrow(table)` | Write a PyArrow Table |
+| `.write_arrow_batch(batch) -> WriteResultHandle` | Write a PyArrow 
RecordBatch |
+| `.write_pandas(df)` | Write a Pandas DataFrame |
+| `await .flush()` | Flush all pending writes |
+
+## `UpsertWriter`
+
+| Method | Description |
+|---|---|
+| `.upsert(row) -> WriteResultHandle` | Upsert a row (insert or update by PK) |
+| `.delete(pk) -> WriteResultHandle` | Delete a row by primary key |
+| `await .flush()` | Flush all pending operations |
+
+## `WriteResultHandle`
+
+| Method | Description |
+|---|---|
+| `await .wait()` | Wait for server acknowledgment of this write |
+
+## `Lookuper`
+
+| Method | Description |
+|---|---|
+| `await .lookup(pk) -> dict \| None` | Lookup a row by primary key |
+
+## `LogScanner`
+
+| Method | Description |
+|---|---|
+| `.subscribe(bucket_id, start_offset)` | Subscribe to a bucket |
+| `.subscribe_buckets(bucket_offsets)` | Subscribe to multiple buckets 
(`{bucket_id: offset}`) |
+| `.subscribe_partition(partition_id, bucket_id, start_offset)` | Subscribe to 
a partition bucket |
+| `.subscribe_partition_buckets(partition_bucket_offsets)` | Subscribe to 
multiple partition+bucket combos (`{(part_id, bucket_id): offset}`) |
+| `.unsubscribe_partition(partition_id, bucket_id)` | Unsubscribe from a 
partition bucket |
+| `.poll(timeout_ms) -> list[ScanRecord]` | Poll individual records (record 
scanner only) |
+| `.poll_arrow(timeout_ms) -> pa.Table` | Poll as Arrow Table (batch scanner 
only) |
+| `.poll_batches(timeout_ms) -> list[RecordBatch]` | Poll batches with 
metadata (batch scanner only) |
+| `.to_arrow() -> pa.Table` | Read all subscribed data as Arrow Table (batch 
scanner only) |
+| `.to_pandas() -> pd.DataFrame` | Read all subscribed data as DataFrame 
(batch scanner only) |
+
+## `ScanRecord`
+
+| Property | Description |
+|---|---|
+| `.bucket -> TableBucket` | Bucket this record belongs to |
+| `.offset -> int` | Record offset in the log |
+| `.timestamp -> int` | Record timestamp |
+| `.change_type -> ChangeType` | Change type (AppendOnly, Insert, 
UpdateBefore, UpdateAfter, Delete) |
+| `.row -> dict` | Row data as `{column_name: value}` |
+
+## `RecordBatch`
+
+| Property | Description |
+|---|---|
+| `.batch -> pa.RecordBatch` | Arrow RecordBatch data |
+| `.bucket -> TableBucket` | Bucket this batch belongs to |
+| `.base_offset -> int` | First record offset |
+| `.last_offset -> int` | Last record offset |
+
+## `Schema`
+
+| Method | Description |
+|---|---|
+| `Schema(schema: pa.Schema, primary_keys=None)` | Create from PyArrow schema |
+| `.get_column_names() -> list[str]` | Get column names |
+| `.get_column_types() -> list[str]` | Get column type names |
+
+## `TableDescriptor`
+
+| Method | Description |
+|---|---|
+| `TableDescriptor(schema, *, partition_keys=None, bucket_count=None, 
bucket_keys=None, comment=None, log_format=None, kv_format=None, 
properties=None, custom_properties=None)` | Create table descriptor |
+| `.get_schema() -> Schema` | Get the schema |
+
+## `TablePath`
+
+| Method / Property | Description |
+|---|---|
+| `TablePath(database, table)` | Create a table path |
+| `.database_name -> str` | Database name |
+| `.table_name -> str` | Table name |
+
+## `TableInfo`
+
+| Property / Method | Description |
+|---|---|
+| `.table_id -> int` | Table ID |
+| `.table_path -> TablePath` | Table path |
+| `.num_buckets -> int` | Number of buckets |
+| `.schema_id -> int` | Schema ID |
+| `.comment -> str \| None` | Table comment |
+| `.created_time -> int` | Creation timestamp |
+| `.modified_time -> int` | Last modification timestamp |
+| `.get_primary_keys() -> list[str]` | Primary key columns |
+| `.get_partition_keys() -> list[str]` | Partition columns |
+| `.get_bucket_keys() -> list[str]` | Bucket key columns |
+| `.has_primary_key() -> bool` | Has primary key? |
+| `.is_partitioned() -> bool` | Is partitioned? |
+| `.get_schema() -> Schema` | Get table schema |
+| `.get_column_names() -> list[str]` | Column names |
+| `.get_column_count() -> int` | Number of columns |
+| `.get_properties() -> dict` | All table properties |
+| `.get_custom_properties() -> dict` | Custom properties only |
+
+## `PartitionInfo`
+
+| Property | Description |
+|---|---|
+| `.partition_id -> int` | Partition ID |
+| `.partition_name -> str` | Partition name |
+
+## `DatabaseDescriptor`
+
+| Method / Property | Description |
+|---|---|
+| `DatabaseDescriptor(comment=None, custom_properties=None)` | Create 
descriptor |
+| `.comment -> str \| None` | Database comment |
+| `.get_custom_properties() -> dict` | Custom properties |
+
+## `DatabaseInfo`
+
+| Property / Method | Description |
+|---|---|
+| `.database_name -> str` | Database name |
+| `.created_time -> int` | Creation timestamp |
+| `.modified_time -> int` | Last modification timestamp |
+| `.get_database_descriptor() -> DatabaseDescriptor` | Get descriptor |
+
+## `LakeSnapshot`
+
+| Property / Method | Description |
+|---|---|
+| `.snapshot_id -> int` | Snapshot ID |
+| `.table_buckets_offset -> dict[TableBucket, int]` | All bucket offsets |
+| `.get_bucket_offset(bucket) -> int \| None` | Get offset for a bucket |
+| `.get_table_buckets() -> list[TableBucket]` | Get all buckets |
+
+## `TableBucket`
+
+| Method / Property | Description |
+|---|---|
+| `TableBucket(table_id, bucket)` | Create non-partitioned bucket |
+| `TableBucket.with_partition(table_id, partition_id, bucket)` | Create 
partitioned bucket |
+| `.table_id -> int` | Table ID |
+| `.bucket_id -> int` | Bucket ID |
+| `.partition_id -> int \| None` | Partition ID (None if non-partitioned) |
+
+## `FlussError`
+
+| Property | Description |
+|---|---|
+| `.message -> str` | Error message |
+
+Raised for all Fluss-specific errors (connection failures, table not found, 
schema mismatches, etc.). Inherits from `Exception`.
+
+## Constants
+
+| Constant | Value | Description |
+|---|---|---|
+| `fluss.EARLIEST_OFFSET` | `-2` | Start reading from earliest available 
offset |
+| `fluss.LATEST_OFFSET` | `-1` | Start reading from latest offset (only new 
records) |
+| `fluss.OffsetType.EARLIEST` | `"earliest"` | For `list_offsets()` |
+| `fluss.OffsetType.LATEST` | `"latest"` | For `list_offsets()` |
+| `fluss.OffsetType.TIMESTAMP` | `"timestamp"` | For `list_offsets()` with 
timestamp |
+
+## `ChangeType`
+
+| Value | Short String | Description |
+|---|---|---|
+| `ChangeType.AppendOnly` (0) | `+A` | Append-only |
+| `ChangeType.Insert` (1) | `+I` | Insert |
+| `ChangeType.UpdateBefore` (2) | `-U` | Previous value of updated row |
+| `ChangeType.UpdateAfter` (3) | `+U` | New value of updated row |
+| `ChangeType.Delete` (4) | `-D` | Delete |
+
+## Data Types
+
+| PyArrow Type | Fluss Type | Python Type |
+|---|---|---|
+| `pa.boolean()` | Boolean | `bool` |
+| `pa.int8()` / `int16()` / `int32()` / `int64()` | TinyInt / SmallInt / Int / 
BigInt | `int` |
+| `pa.float32()` / `float64()` | Float / Double | `float` |
+| `pa.string()` | String | `str` |
+| `pa.binary()` | Bytes | `bytes` |
+| `pa.date32()` | Date | `datetime.date` |
+| `pa.time32("ms")` | Time | `datetime.time` |
+| `pa.timestamp("us")` | Timestamp (NTZ) | `datetime.datetime` |
+| `pa.timestamp("us", tz="UTC")` | TimestampLTZ | `datetime.datetime` |
+| `pa.decimal128(precision, scale)` | Decimal | `decimal.Decimal` |
diff --git a/bindings/python/DEVELOPMENT.md b/bindings/python/DEVELOPMENT.md
new file mode 100644
index 0000000..e316f5e
--- /dev/null
+++ b/bindings/python/DEVELOPMENT.md
@@ -0,0 +1,114 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+-->
+
+# Development
+
+## Requirements
+
+- Python 3.9+
+- Rust 1.70+
+- [uv](https://docs.astral.sh/uv/) package manager
+- Linux or MacOS
+
+> **Before you start:**
+> Please make sure you can successfully build and run the [Fluss Rust 
client](../../crates/fluss/README.md) on your machine.
+> The Python bindings require a working Fluss Rust backend and compatible 
environment.
+
+## Install Development Dependencies
+
+```bash
+cd bindings/python
+uv sync --all-extras
+```
+
+## Build Development Version
+
+```bash
+source .venv/bin/activate
+uv run maturin develop
+```
+
+## Build Release Version
+
+```bash
+uv run maturin build --release
+```
+
+## Code Formatting and Linting
+
+```bash
+uv run ruff format python/
+uv run ruff check python/
+```
+
+## Type Checking
+
+```bash
+uv run mypy python/
+```
+
+## Run Examples
+
+```bash
+uv run python example/example.py
+```
+
+## Build API Docs
+
+```bash
+uv run pdoc fluss
+```
+
+## Release
+
+```bash
+# Build wheel
+uv run maturin build --release
+
+# Publish to PyPI
+uv run maturin publish
+```
+
+## Project Structure
+
+```
+bindings/python/
+├── Cargo.toml            # Rust dependency configuration
+├── pyproject.toml         # Python project configuration
+├── README.md              # User guide
+├── DEVELOPMENT.md         # This file
+├── API_REFERENCE.md       # API reference
+├── src/                   # Rust source code (PyO3 bindings)
+│   ├── lib.rs
+│   ├── config.rs
+│   ├── connection.rs
+│   ├── admin.rs
+│   ├── table.rs
+│   └── error.rs
+├── fluss/                 # Python package
+│   ├── __init__.py
+│   ├── __init__.pyi       # Type stubs
+│   └── py.typed
+└── example/
+    └── example.py
+```
+
+## License
+
+Apache 2.0 License
diff --git a/bindings/python/README.md b/bindings/python/README.md
index b097039..a31c990 100644
--- a/bindings/python/README.md
+++ b/bindings/python/README.md
@@ -1,155 +1,453 @@
 <!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~   http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing,
-  ~ software distributed under the License is distributed on an
-  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  ~ KIND, either express or implied.  See the License for the
-  ~ specific language governing permissions and limitations
-  ~ under the License.
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
 -->
 
-# Apache Fluss™ Python Bindings
+# Fluss Python Client
 
-Python bindings for Fluss using PyO3 and Maturin.
+This guide covers how to use the Fluss Python client for reading and writing 
data to log tables and primary key tables.
 
-## API Overview
+The Python client is async-first, built on top of the Rust core via 
[PyO3](https://pyo3.rs/), and uses 
[PyArrow](https://arrow.apache.org/docs/python/) for schema definitions and 
data interchange.
 
-### Basic Usage
+## Key Concepts
 
-TODO: Add basic usage examples here
+- **Log table** — an append-only table (no primary key). Records are immutable 
once written. Use for event streams, logs, and audit trails.
+- **Primary key (PK) table** — a table with a primary key. Supports upsert, 
delete, and point lookups.
+- **Bucket** — the unit of parallelism within a table (similar to Kafka 
partitions). Each table has one or more buckets. Readers subscribe to 
individual buckets.
+- **Partition** — a way to organize data by column values (e.g. by date or 
region). Each partition contains its own set of buckets. Partitions must be 
created explicitly before writing.
+- **Offset** — the position of a record within a bucket. Used to track reading 
progress. Start from `EARLIEST_OFFSET` to read all data, or `LATEST_OFFSET` to 
only read new records.
 
-### Core Classes
+## Prerequisites
 
-#### `Config`
+You need a running Fluss cluster to use the Python client. See the 
[Quick-Start guide](../../README.md#quick-start) for how to start a local 
cluster.
 
-Configuration for Fluss connection parameters
+## Installation
 
-#### `FlussConnection`
+```bash
+pip install pyfluss
+```
 
-Main interface for connecting to Fluss cluster
+To build from source instead, see the [Development Guide](DEVELOPMENT.md).
+
+## Quick Start
+
+A minimal end-to-end example: connect, create a table, write data, and read it 
back. Assumes a Fluss cluster is running on `localhost:9123`.
+
+```python
+import asyncio
+import pyarrow as pa
+import fluss
+
+async def main():
+    # Connect
+    config = fluss.Config({"bootstrap.servers": "127.0.0.1:9123"})
+    conn = await fluss.FlussConnection.connect(config)
+    admin = await conn.get_admin()
+
+    # Create a log table
+    schema = fluss.Schema(pa.schema([
+        pa.field("id", pa.int32()),
+        pa.field("name", pa.string()),
+        pa.field("score", pa.float32()),
+    ]))
+    table_path = fluss.TablePath("fluss", "quick_start")
+    await admin.create_table(table_path, fluss.TableDescriptor(schema), 
ignore_if_exists=True)
+
+    # Write
+    table = await conn.get_table(table_path)
+    writer = await table.new_append_writer()
+    writer.append({"id": 1, "name": "Alice", "score": 95.5})
+    writer.append({"id": 2, "name": "Bob", "score": 87.0})
+    await writer.flush()
+
+    # Read
+    num_buckets = (await admin.get_table(table_path)).num_buckets
+    scanner = await table.new_scan().create_batch_scanner()
+    scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in 
range(num_buckets)})
+    print(scanner.to_pandas())
+
+    # Cleanup
+    await admin.drop_table(table_path, ignore_if_not_exists=True)
+    conn.close()
+
+asyncio.run(main())
+```
 
-#### `FlussAdmin`
+## Connection Setup
+
+```python
+config = fluss.Config({"bootstrap.servers": "127.0.0.1:9123"})
+conn = await fluss.FlussConnection.connect(config)
+```
 
-Administrative operations for managing tables (create, delete, etc.)
+The connection also supports context managers:
 
-#### `FlussTable`
+```python
+with await fluss.FlussConnection.connect(config) as conn:
+    ...
+```
 
-Represents a Fluss table, providing read and write operations
+### Configuration Options
 
-#### `TableWriter`
+| Key | Description | Default |
+|-----|-------------|---------|
+| `bootstrap.servers` | Coordinator server address | `127.0.0.1:9123` |
+| `request.max.size` | Maximum request size in bytes | `10485760` (10 MB) |
+| `writer.acks` | Acknowledgment setting (`all` waits for all replicas) | 
`all` |
+| `writer.retries` | Number of retries on failure | `2147483647` |
+| `writer.batch.size` | Batch size for writes in bytes | `2097152` (2 MB) |
 
-Used for writing data to tables, supports PyArrow and Pandas
+## Admin Operations
 
-#### `LogScanner`
+```python
+admin = await conn.get_admin()
+```
 
-Used for scanning table log data
+### Databases
 
+```python
+await admin.create_database("my_database", ignore_if_exists=True)
+databases = await admin.list_databases()
+exists = await admin.database_exists("my_database")
+await admin.drop_database("my_database", ignore_if_not_exists=True, 
cascade=True)
+```
 
-# todo: we may move the following part to DEVELOPMENT.md
-## Development
+### Tables
 
-## Requirements
+Schemas are defined using PyArrow and wrapped in `fluss.Schema`:
 
-- Python 3.9+
-- Rust 1.70+
-- [uv](https://docs.astral.sh/uv/) package manager
-- Linux or MacOS
+```python
+import pyarrow as pa
 
-> **⚠️ Before you start:**  
-> Please make sure you can successfully build and run the [Fluss Rust 
client](../../crates/fluss/README.md) on your machine.  
-> The Python bindings require a working Fluss Rust backend and compatible 
environment.
+schema = fluss.Schema(pa.schema([
+    pa.field("id", pa.int32()),
+    pa.field("name", pa.string()),
+    pa.field("amount", pa.int64()),
+]))
 
-### Install Development Dependencies
+table_path = fluss.TablePath("my_database", "my_table")
+await admin.create_table(table_path, fluss.TableDescriptor(schema), 
ignore_if_exists=True)
 
-```bash
-cd bindings/python
-uv sync --all-extras
+table_info = await admin.get_table(table_path)
+tables = await admin.list_tables("my_database")
+await admin.drop_table(table_path, ignore_if_not_exists=True)
 ```
 
-### Build Development Version
+`TableDescriptor` accepts these optional parameters:
 
-```bash
-source .venv/bin/activate
-uv run maturin develop
+| Parameter | Description |
+|---|---|
+| `partition_keys` | Column names to partition by (e.g. `["region"]`) |
+| `bucket_count` | Number of buckets (parallelism units) for the table |
+| `bucket_keys` | Columns used to determine bucket assignment |
+| `comment` | Table comment / description |
+| `log_format` | Log storage format: `"ARROW"` or `"INDEXED"` |
+| `kv_format` | KV storage format for primary key tables: `"INDEXED"` or 
`"COMPACTED"` |
+| `properties` | Table configuration properties as a dict (e.g. 
`{"table.replication.factor": "1"}`) |
+| `custom_properties` | User-defined properties as a dict |
+
+### Offsets
+
+```python
+# Latest offsets for buckets
+offsets = await admin.list_offsets(table_path, bucket_ids=[0, 1], 
offset_type="latest")
+
+# By timestamp
+offsets = await admin.list_offsets(table_path, bucket_ids=[0], 
offset_type="timestamp", timestamp=1704067200000)
+
+# Per-partition offsets
+offsets = await admin.list_partition_offsets(table_path, partition_name="US", 
bucket_ids=[0], offset_type="latest")
 ```
 
-### Build Release Version
+## Log Tables
 
-```bash
-uv run maturin build --release
+Log tables are append-only tables without primary keys, suitable for event 
streaming.
+
+### Writing
+
+Rows can be appended as dicts, lists, or tuples. For bulk writes, use 
`write_arrow()`, `write_arrow_batch()`, or `write_pandas()`.
+
+Write methods like `append()` and `write_arrow_batch()` return a 
`WriteResultHandle`. You can ignore it for fire-and-forget semantics (flush at 
the end), or `await handle.wait()` to block until the server acknowledges that 
specific write.
+
+```python
+table = await conn.get_table(table_path)
+writer = await table.new_append_writer()
+
+# Fire-and-forget: queue writes, flush at the end
+writer.append({"id": 1, "name": "Alice", "score": 95.5})
+writer.append([2, "Bob", 87.0])
+await writer.flush()
+
+# Per-record acknowledgment
+handle = writer.append({"id": 3, "name": "Charlie", "score": 91.0})
+await handle.wait()
+
+# Bulk writes
+writer.write_arrow(pa_table)          # PyArrow Table
+writer.write_arrow_batch(record_batch) # PyArrow RecordBatch
+writer.write_pandas(df)                # Pandas DataFrame
+await writer.flush()
 ```
 
-### Code Formatting and Linting
+### Reading
 
-```bash
-uv run ruff format python/
-uv run ruff check python/
+There are two scanner types:
+- **Batch scanner** (`create_batch_scanner()`) — returns Arrow Tables or 
DataFrames, best for analytics
+- **Record scanner** (`create_log_scanner()`) — returns individual records 
with metadata (offset, timestamp, change type), best for streaming
+
+And two reading modes:
+- **`to_arrow()` / `to_pandas()`** — reads all data from subscribed buckets up 
to the current latest offset, then returns. Best for one-shot batch reads.
+- **`poll_arrow()` / `poll()` / `poll_batches()`** — returns whatever data is 
available within the timeout, then returns. Call in a loop for continuous 
streaming.
+
+#### Batch Read (One-Shot)
+
+```python
+num_buckets = (await admin.get_table(table_path)).num_buckets
+
+scanner = await table.new_scan().create_batch_scanner()
+scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in 
range(num_buckets)})
+
+# Reads everything up to current latest offset, then returns
+arrow_table = scanner.to_arrow()
+df = scanner.to_pandas()
 ```
 
-### Type Checking
+#### Continuous Polling
 
-```bash
-uv run mypy python/
+Use `poll_arrow()` or `poll()` in a loop for streaming consumption:
+
+```python
+# Batch scanner: poll as Arrow Tables
+scanner = await table.new_scan().create_batch_scanner()
+scanner.subscribe(bucket_id=0, start_offset=fluss.EARLIEST_OFFSET)
+
+while True:
+    result = scanner.poll_arrow(timeout_ms=5000)
+    if result.num_rows > 0:
+        print(result.to_pandas())
+
+# Record scanner: poll individual records with metadata
+scanner = await table.new_scan().create_log_scanner()
+scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in 
range(num_buckets)})
+
+while True:
+    for record in scanner.poll(timeout_ms=5000):
+        print(f"offset={record.offset}, 
change={record.change_type.short_string()}, row={record.row}")
 ```
 
-### Run Examples
+#### Subscribe from Latest Offset
 
-```bash
-uv run python example/example.py
+To only consume new records (skip existing data), use `LATEST_OFFSET`:
+
+```python
+scanner = await table.new_scan().create_batch_scanner()
+scanner.subscribe(bucket_id=0, start_offset=fluss.LATEST_OFFSET)
 ```
 
-### Build API docs:
+### Column Projection
 
-```bash
-uv run pdoc fluss
+```python
+scanner = await table.new_scan().project([0, 2]).create_batch_scanner()
+# or by name
+scanner = await table.new_scan().project_by_name(["id", 
"score"]).create_batch_scanner()
 ```
 
-### Release
+## Primary Key Tables
 
-```bash
-# Build wheel
-uv run maturin build --release
+Primary key tables support upsert, delete, and point lookup operations.
 
-# Publish to PyPI
-uv run maturin publish
+### Creating
+
+Pass `primary_keys` to `fluss.Schema`:
+
+```python
+schema = fluss.Schema(
+    pa.schema([
+        pa.field("id", pa.int32()),
+        pa.field("name", pa.string()),
+        pa.field("age", pa.int64()),
+    ]),
+    primary_keys=["id"],
+)
+table_path = fluss.TablePath("fluss", "users")
+await admin.create_table(table_path, fluss.TableDescriptor(schema, 
bucket_count=3), ignore_if_exists=True)
 ```
 
-## Project Structure
+### Upsert, Delete, Lookup
+
+```python
+table = await conn.get_table(table_path)
+
+# Upsert (fire-and-forget, flush at the end)
+writer = table.new_upsert()
+writer.upsert({"id": 1, "name": "Alice", "age": 25})
+writer.upsert({"id": 2, "name": "Bob", "age": 30})
+await writer.flush()
+
+# Per-record acknowledgment (for read-after-write)
+handle = writer.upsert({"id": 3, "name": "Charlie", "age": 35})
+await handle.wait()
+
+# Delete by primary key
+handle = writer.delete({"id": 2})
+await handle.wait()
+
+# Lookup
+lookuper = table.new_lookup()
+result = await lookuper.lookup({"id": 1})
+if result:
+    print(f"Found: name={result['name']}, age={result['age']}")
 ```
-bindings/python/
-├── Cargo.toml            # Rust dependency configuration
-├── pyproject.toml        # Python project configuration
-├── README.md             # This file
-├── src/                  # Rust source code
-│   ├── lib.rs            # Main entry module
-│   ├── config.rs         # Configuration related
-│   ├── connection.rs     # Connection management
-│   ├── admin.rs          # Admin operations
-│   ├── table.rs          # Table operations
-│   ├── types.rs          # Data types
-│   └── error.rs          # Error handling
-├── fluss/                # Python package source
-│   ├── __init__.py       # Python package entry
-│   ├── __init__.pyi      # Stub file
-│   └── py.typed          # Type declarations
-└── example/              # Example code
-    └── example.py
+
+### Partial Updates
+
+Update specific columns while preserving others:
+
+```python
+partial_writer = table.new_upsert(columns=["id", "age"])
+partial_writer.upsert({"id": 1, "age": 27})  # only updates age
+await partial_writer.flush()
 ```
 
-## TODO
+## Partitioned Tables
+
+Partitioned tables distribute data across partitions based on column values. 
Partitions must be created before writing.
+
+### Creating and Managing Partitions
+
+```python
+schema = fluss.Schema(pa.schema([
+    pa.field("id", pa.int32()),
+    pa.field("region", pa.string()),
+    pa.field("value", pa.int64()),
+]))
+
+table_path = fluss.TablePath("fluss", "partitioned_events")
+await admin.create_table(
+    table_path,
+    fluss.TableDescriptor(schema, partition_keys=["region"], bucket_count=1),
+    ignore_if_exists=True,
+)
+
+# Create partitions
+await admin.create_partition(table_path, {"region": "US"}, 
ignore_if_exists=True)
+await admin.create_partition(table_path, {"region": "EU"}, 
ignore_if_exists=True)
+
+# List partitions
+partition_infos = await admin.list_partition_infos(table_path)
+```
+
+### Writing
+
+Same as non-partitioned tables — include partition column values in each row:
+
+```python
+table = await conn.get_table(table_path)
+writer = await table.new_append_writer()
+writer.append({"id": 1, "region": "US", "value": 100})
+writer.append({"id": 2, "region": "EU", "value": 200})
+await writer.flush()
+```
+
+### Reading
+
+Use `subscribe_partition()` or `subscribe_partition_buckets()` instead of 
`subscribe()`:
+
+```python
+scanner = await table.new_scan().create_batch_scanner()
+
+# Subscribe to individual partitions
+for p in partition_infos:
+    scanner.subscribe_partition(partition_id=p.partition_id, bucket_id=0, 
start_offset=fluss.EARLIEST_OFFSET)
+
+# Or batch-subscribe
+scanner.subscribe_partition_buckets({
+    (p.partition_id, 0): fluss.EARLIEST_OFFSET for p in partition_infos
+})
+
+print(scanner.to_pandas())
+```
+
+### Partitioned Primary Key Tables
+
+Partition columns must be part of the primary key. Partitions must be created 
before upserting.
+
+```python
+schema = fluss.Schema(
+    pa.schema([
+        pa.field("user_id", pa.int32()),
+        pa.field("region", pa.string()),
+        pa.field("score", pa.int64()),
+    ]),
+    primary_keys=["user_id", "region"],
+)
+
+table_path = fluss.TablePath("fluss", "partitioned_users")
+await admin.create_table(
+    table_path,
+    fluss.TableDescriptor(schema, partition_keys=["region"]),
+    ignore_if_exists=True,
+)
+
+await admin.create_partition(table_path, {"region": "US"}, 
ignore_if_exists=True)
+
+table = await conn.get_table(table_path)
+writer = table.new_upsert()
+writer.upsert({"user_id": 1, "region": "US", "score": 1234})
+await writer.flush()
+
+# Lookup includes partition columns
+lookuper = table.new_lookup()
+result = await lookuper.lookup({"user_id": 1, "region": "US"})
+```
+
+## Error Handling
+
+The client raises `fluss.FlussError` for Fluss-specific errors (connection 
failures, table not found, invalid operations, etc.):
+
+```python
+try:
+    await admin.create_table(table_path, table_descriptor)
+except fluss.FlussError as e:
+    print(f"Fluss error: {e.message}")
+```
+
+Common error scenarios:
+- **Connection refused** — Fluss cluster is not running or wrong address in 
`bootstrap.servers`
+- **Table not found** — table doesn't exist or wrong database/table name
+- **Partition not found** — writing to a partitioned table before creating 
partitions
+- **Schema mismatch** — row data doesn't match the table schema
+
+## Data Types
+
+The Python client uses PyArrow types for schema definitions:
 
-- [ ] Add basic usage examples in API Overview (code snippets for Config, 
FlussConnection, FlussAdmin, FlussTable).
-- [ ] Add a "Verifying a release" subsection with install-from-TestPyPI/PyPI 
and smoke-test steps.
+| PyArrow Type | Fluss Type | Python Type |
+|---|---|---|
+| `pa.boolean()` | Boolean | `bool` |
+| `pa.int8()` / `int16()` / `int32()` / `int64()` | TinyInt / SmallInt / Int / 
BigInt | `int` |
+| `pa.float32()` / `float64()` | Float / Double | `float` |
+| `pa.string()` | String | `str` |
+| `pa.binary()` | Bytes | `bytes` |
+| `pa.date32()` | Date | `datetime.date` |
+| `pa.time32("ms")` | Time | `datetime.time` |
+| `pa.timestamp("us")` | Timestamp (NTZ) | `datetime.datetime` |
+| `pa.timestamp("us", tz="UTC")` | TimestampLTZ | `datetime.datetime` |
+| `pa.decimal128(precision, scale)` | Decimal | `decimal.Decimal` |
 
-## License
+All Python native types (`date`, `time`, `datetime`, `Decimal`) work when 
appending rows via dicts.
 
-Apache 2.0 License
+For a complete list of classes, methods, and properties, see the [API 
Reference](API_REFERENCE.md).
diff --git a/bindings/python/fluss/__init__.pyi 
b/bindings/python/fluss/__init__.pyi
index cc7053e..adbfc2f 100644
--- a/bindings/python/fluss/__init__.pyi
+++ b/bindings/python/fluss/__init__.pyi
@@ -610,7 +610,8 @@ class TableDescriptor:
         comment: Optional[str] = None,
         log_format: Optional[str] = None,
         kv_format: Optional[str] = None,
-        **properties: str,
+        properties: Optional[Dict[str, str]] = None,
+        custom_properties: Optional[Dict[str, str]] = None,
     ) -> None: ...
     def get_schema(self) -> Schema: ...
 


Reply via email to