Copilot commented on code in PR #242:
URL: https://github.com/apache/fluss-rust/pull/242#discussion_r2781097978


##########
docs/rust-client.md:
##########
@@ -0,0 +1,748 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+# Fluss Rust Client Guide
+
+This guide covers how to use the Fluss Rust client for reading and writing 
data to log tables and primary key tables.
+
+## Prerequisites
+
+- Rust 1.85+
+- Protobuf compiler (`protoc`) - only required when [building from 
source](#building-from-source)
+
+## Adding to Your Project
+
+The Fluss Rust client is published to 
[crates.io](https://crates.io/crates/fluss-rs) as `fluss-rs`. The crate's 
library name is `fluss`, so you import it with `use fluss::...`.
+
+```toml
+[dependencies]
+fluss-rs = "0.1"
+tokio = { version = "1", features = ["full"] }
+```
+
+### Feature Flags
+
+The Fluss crate supports optional storage backends:
+
+```toml
+[dependencies]
+# Default: memory and filesystem storage
+fluss-rs = "0.1"
+
+# With S3 storage support
+fluss-rs = { version = "0.1", features = ["storage-s3"] }
+
+# With OSS storage support
+fluss-rs = { version = "0.1", features = ["storage-oss"] }
+
+# All storage backends
+fluss-rs = { version = "0.1", features = ["storage-all"] }
+```
+
+Available features:
+- `storage-memory` (default) - In-memory storage
+- `storage-fs` (default) - Filesystem storage
+- `storage-s3` - Amazon S3 storage
+- `storage-oss` - Alibaba OSS storage
+- `storage-all` - All storage backends
+
+### Alternative: Git or Path Dependency
+
+For development against unreleased changes, you can depend on the Git 
repository or a local checkout:
+
+```toml
+[dependencies]
+# From Git
+fluss = { git = "https://github.com/apache/fluss-rust.git";, package = 
"fluss-rs" }
+
+# From local path
+fluss = { path = "/path/to/fluss-rust/crates/fluss", package = "fluss-rs" }
+```
+
+> **Note:** When using `git` or `path` dependencies, the `package = 
"fluss-rs"` field is required so that Cargo resolves the correct package while 
still allowing `use fluss::...` imports.
+
+## Building from Source
+
+### 1. Clone the Repository
+
+```bash
+git clone https://github.com/apache/fluss-rust.git
+cd fluss-rust
+```
+
+### 2. Install Dependencies
+
+The Protobuf compiler (`protoc`) is required to build from source.
+
+#### macOS
+
+```bash
+brew install protobuf
+```
+
+#### Ubuntu/Debian
+
+```bash
+sudo apt-get install protobuf-compiler
+```
+
+### 3. Build the Library
+
+```bash
+cargo build --workspace --all-targets
+```
+
+### 4. Run Tests
+
+```bash
+# Unit tests
+cargo test --workspace
+
+# Integration tests (requires running Fluss cluster)
+RUST_TEST_THREADS=1 cargo test --features integration_tests --workspace
+```
+
+## Connection Setup
+
+```rust
+use fluss::client::FlussConnection;
+use fluss::config::Config;
+use fluss::error::Result;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+    let mut config = Config::default();
+    config.bootstrap_server = "127.0.0.1:9123".to_string();
+
+    let conn = FlussConnection::new(config).await?;
+
+    // Use the connection...
+
+    Ok(())
+}
+```
+
+### Configuration Options
+
+| Option | Description | Default |
+|--------|-------------|---------|
+| `bootstrap_server` | Coordinator server address | `127.0.0.1:9123` |
+| `request_max_size` | Maximum request size in bytes | 10 MB |
+| `writer_acks` | Acknowledgment setting (`all` waits for all replicas) | 
`all` |
+| `writer_retries` | Number of retries on failure | `i32::MAX` |
+| `writer_batch_size` | Batch size for writes | 2 MB |
+
+## Admin Operations
+
+### Get Admin Interface
+
+```rust
+let admin = conn.get_admin().await?;
+```
+
+### Database Operations
+
+```rust
+// Create database
+admin.create_database("my_database", true, None).await?;
+
+// List all databases
+let databases = admin.list_databases().await?;
+println!("Databases: {:?}", databases);
+
+// Check if database exists
+let exists = admin.database_exists("my_database").await?;
+
+// Get database information
+let db_info = admin.get_database_info("my_database").await?;
+
+// Drop database (with cascade option to drop all tables)
+admin.drop_database("my_database", true, false).await?;
+```
+
+### Table Operations
+
+```rust
+use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
+
+// Define table schema
+let table_descriptor = TableDescriptor::builder()
+    .schema(
+        Schema::builder()
+            .column("id", DataTypes::int())
+            .column("name", DataTypes::string())
+            .column("amount", DataTypes::bigint())
+            .build()?,
+    )
+    .build()?;
+
+let table_path = TablePath::new("my_database", "my_table");
+
+// Create table
+admin.create_table(&table_path, &table_descriptor, true).await?;
+
+// Get table information
+let table_info = admin.get_table(&table_path).await?;
+println!("Table: {}", table_info);
+
+// List tables in database
+let tables = admin.list_tables("my_database").await?;
+
+// Check if table exists
+let exists = admin.table_exists(&table_path).await?;
+
+// Drop table
+admin.drop_table(&table_path, true).await?;
+```
+
+### Partition Operations
+
+```rust
+use fluss::metadata::PartitionSpec;
+use std::collections::HashMap;
+
+// List all partitions
+let partitions = admin.list_partition_infos(&table_path).await?;
+
+// List partitions matching a spec
+let mut filter = HashMap::new();
+filter.insert("year", "2024");
+let spec = PartitionSpec::new(filter);
+let partitions = admin.list_partition_infos_with_spec(&table_path, 
Some(&spec)).await?;
+
+// Create partition
+admin.create_partition(&table_path, &spec, true).await?;
+
+// Drop partition
+admin.drop_partition(&table_path, &spec, true).await?;
+```
+
+### Offset Operations
+
+```rust
+use fluss::rpc::message::OffsetSpec;
+
+let bucket_ids = vec![0, 1, 2];
+
+// Get earliest offsets
+let earliest = admin.list_offsets(&table_path, &bucket_ids, 
OffsetSpec::Earliest).await?;
+
+// Get latest offsets
+let latest = admin.list_offsets(&table_path, &bucket_ids, 
OffsetSpec::Latest).await?;
+
+// Get offsets for a specific timestamp
+let timestamp_ms = 1704067200000; // 2024-01-01 00:00:00 UTC
+let offsets = admin.list_offsets(&table_path, &bucket_ids, 
OffsetSpec::Timestamp(timestamp_ms)).await?;
+
+// Get offsets for a specific partition
+let partition_offsets = admin.list_partition_offsets(
+    &table_path,
+    "partition_name",
+    &bucket_ids,
+    OffsetSpec::Latest,
+).await?;
+```
+
+### Lake Snapshot
+
+```rust
+// Get latest lake snapshot for lakehouse integration
+let snapshot = admin.get_latest_lake_snapshot(&table_path).await?;
+println!("Snapshot ID: {}", snapshot.snapshot_id);
+```
+
+## Log Table Operations
+
+Log tables are append-only tables without primary keys, suitable for event 
streaming.
+
+### Creating a Log Table
+
+```rust
+let table_descriptor = TableDescriptor::builder()
+    .schema(
+        Schema::builder()
+            .column("event_id", DataTypes::int())
+            .column("event_type", DataTypes::string())
+            .column("timestamp", DataTypes::bigint())
+            .build()?,
+    )
+    .build()?;
+
+let table_path = TablePath::new("fluss", "events");
+admin.create_table(&table_path, &table_descriptor, true).await?;
+```
+
+### Writing to Log Tables
+
+```rust
+use fluss::row::{GenericRow, InternalRow};
+
+let table = conn.get_table(&table_path).await?;
+let append_writer = table.new_append()?.create_writer()?;
+
+// Write a single row
+let mut row = GenericRow::new(3);
+row.set_field(0, 1);                    // event_id (int)
+row.set_field(1, "user_login");         // event_type (string)
+row.set_field(2, 1704067200000i64);     // timestamp (bigint)
+
+append_writer.append(&row)?;
+
+// Write multiple rows
+let mut row2 = GenericRow::new(3);
+row2.set_field(0, 2);
+row2.set_field(1, "page_view");
+row2.set_field(2, 1704067201000i64);
+
+append_writer.append(&row2)?;
+
+// Flush to ensure data is persisted
+append_writer.flush().await?;
+```
+
+Write operations (`append`, `upsert`, `delete`) use a **fire-and-forget** 
pattern for efficient batching. Each call queues the write and returns a 
`WriteResultFuture` immediately. Call `flush()` to ensure all queued writes are 
sent to the server.
+
+If you need per-record acknowledgment, you can await the returned future:
+
+```rust
+// Per-record acknowledgment (blocks until server confirms)
+append_writer.append(&row)?.await?;
+```
+
+### Reading from Log Tables
+
+```rust
+use std::time::Duration;
+
+let table = conn.get_table(&table_path).await?;
+let log_scanner = table.new_scan().create_log_scanner()?;
+
+// Subscribe to bucket 0 starting from offset 0
+log_scanner.subscribe(0, 0).await?;
+
+// Poll for records
+let records = log_scanner.poll(Duration::from_secs(10)).await?;
+
+for record in records {
+    let row = record.row();
+    println!(
+        "event_id={}, event_type={}, timestamp={} @ offset={}",
+        row.get_int(0),
+        row.get_string(1),
+        row.get_long(2),
+        record.offset()
+    );
+}
+```
+
+### Column Projection
+
+```rust
+// Project specific columns by index
+let scanner = table.new_scan().project(&[0, 2])?.create_log_scanner()?;
+
+// Or project by column names
+let scanner = table.new_scan().project_by_name(&["event_id", 
"timestamp"])?.create_log_scanner()?;
+```
+
+### Subscribe from Specific Offsets
+
+```rust
+use fluss::client::{EARLIEST_OFFSET, LATEST_OFFSET};
+
+// Subscribe from earliest available offset
+log_scanner.subscribe(0, EARLIEST_OFFSET).await?;
+
+// Subscribe from latest offset (only new records)
+log_scanner.subscribe(0, LATEST_OFFSET).await?;
+
+// Subscribe from a specific offset
+log_scanner.subscribe(0, 42).await?;
+
+// Subscribe to all buckets
+let num_buckets = table.get_table_info().get_num_buckets();
+for bucket_id in 0..num_buckets {
+    log_scanner.subscribe(bucket_id, 0).await?;
+}
+```
+
+### Subscribe to Multiple Buckets
+
+```rust
+use std::collections::HashMap;
+
+// Subscribe to multiple buckets at once with specific offsets
+let mut bucket_offsets = HashMap::new();
+bucket_offsets.insert(0, 0i64);    // bucket 0 from offset 0
+bucket_offsets.insert(1, 100i64);  // bucket 1 from offset 100
+log_scanner.subscribe_buckets(&bucket_offsets).await?;
+```
+
+### Unsubscribe from a Partition
+
+```rust
+// Unsubscribe from a specific partition bucket
+log_scanner.unsubscribe_partition(partition_id, bucket_id).await?;
+```
+
+## Partitioned Log Tables
+
+Partitioned tables distribute data across partitions based on partition column 
values, enabling efficient data organization and querying.
+
+### Creating a Partitioned Log Table
+
+```rust
+use fluss::metadata::{DataTypes, LogFormat, Schema, TableDescriptor, 
TablePath};
+
+let table_descriptor = TableDescriptor::builder()
+    .schema(
+        Schema::builder()
+            .column("event_id", DataTypes::int())
+            .column("event_type", DataTypes::string())
+            .column("dt", DataTypes::string())       // partition column
+            .column("region", DataTypes::string())   // partition column
+            .build()?,
+    )
+    .partitioned_by(vec!["dt", "region"])  // Define partition columns
+    .log_format(LogFormat::ARROW)
+    .build()?;
+
+let table_path = TablePath::new("fluss", "partitioned_events");
+admin.create_table(&table_path, &table_descriptor, true).await?;
+```
+
+### Writing to Partitioned Log Tables
+
+Writing works the same as non-partitioned tables. Include partition column 
values in each row:
+
+```rust
+let table = conn.get_table(&table_path).await?;
+let append_writer = table.new_append()?.create_writer()?;
+
+// Partition column values determine which partition the record goes to
+let mut row = GenericRow::new(4);
+row.set_field(0, 1);                  // event_id
+row.set_field(1, "user_login");       // event_type
+row.set_field(2, "2024-01-15");       // dt (partition column)
+row.set_field(3, "US");               // region (partition column)
+
+append_writer.append(&row)?;
+append_writer.flush().await?;
+```
+
+### Reading from Partitioned Log Tables
+
+For partitioned tables, use `subscribe_partition()` instead of `subscribe()`:
+
+```rust
+use std::time::Duration;
+
+let table = conn.get_table(&table_path).await?;
+let admin = conn.get_admin().await?;
+
+// Get partition information
+let partitions = admin.list_partition_infos(&table_path).await?;
+
+let log_scanner = table.new_scan().create_log_scanner()?;
+
+// Subscribe to each partition's buckets
+for partition_info in &partitions {
+    let partition_id = partition_info.get_partition_id();
+    let num_buckets = table.get_table_info().get_num_buckets();
+
+    for bucket_id in 0..num_buckets {
+        log_scanner.subscribe_partition(partition_id, bucket_id, 0).await?;
+    }
+}
+
+// Poll for records
+let records = log_scanner.poll(Duration::from_secs(10)).await?;
+for record in records {
+    println!("Record from partition: {:?}", record.row());
+}
+```
+
+You can also subscribe to multiple partition-buckets at once:
+
+```rust
+use std::collections::HashMap;
+
+let mut partition_bucket_offsets = HashMap::new();
+partition_bucket_offsets.insert((partition_id, 0), 0i64);  // partition, 
bucket 0, offset 0
+partition_bucket_offsets.insert((partition_id, 1), 0i64);  // partition, 
bucket 1, offset 0
+log_scanner.subscribe_partition_buckets(&partition_bucket_offsets).await?;

Review Comment:
   The scanner API doesn’t support subscribing multiple partition-buckets in 
one call (and `subscribe_batch` explicitly errors for partitioned tables). The 
`subscribe_partition_buckets` example won’t compile and suggests unsupported 
functionality; please remove it or replace it with the supported per-partition 
`subscribe_partition(...)` loop.
   ```suggestion
   You can also subscribe to multiple partition-buckets by looping and calling 
`subscribe_partition` for each one:
   
   ```rust
   // (partition_id, bucket_id, offset)
   let partition_buckets = vec![
       (partition_id, 0, 0i64), // partition, bucket 0, offset 0
       (partition_id, 1, 0i64), // partition, bucket 1, offset 0
   ];
   
   for (partition_id, bucket_id, offset) in partition_buckets {
       log_scanner
           .subscribe_partition(partition_id, bucket_id, offset)
           .await?;
   }
   ```



##########
docs/rust-client.md:
##########
@@ -0,0 +1,748 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+# Fluss Rust Client Guide
+
+This guide covers how to use the Fluss Rust client for reading and writing 
data to log tables and primary key tables.
+
+## Prerequisites
+
+- Rust 1.85+
+- Protobuf compiler (`protoc`) - only required when [building from 
source](#building-from-source)
+
+## Adding to Your Project
+
+The Fluss Rust client is published to 
[crates.io](https://crates.io/crates/fluss-rs) as `fluss-rs`. The crate's 
library name is `fluss`, so you import it with `use fluss::...`.
+
+```toml
+[dependencies]
+fluss-rs = "0.1"
+tokio = { version = "1", features = ["full"] }
+```
+
+### Feature Flags
+
+The Fluss crate supports optional storage backends:
+
+```toml
+[dependencies]
+# Default: memory and filesystem storage
+fluss-rs = "0.1"
+
+# With S3 storage support
+fluss-rs = { version = "0.1", features = ["storage-s3"] }
+
+# With OSS storage support
+fluss-rs = { version = "0.1", features = ["storage-oss"] }
+
+# All storage backends
+fluss-rs = { version = "0.1", features = ["storage-all"] }
+```
+
+Available features:
+- `storage-memory` (default) - In-memory storage
+- `storage-fs` (default) - Filesystem storage
+- `storage-s3` - Amazon S3 storage
+- `storage-oss` - Alibaba OSS storage
+- `storage-all` - All storage backends
+
+### Alternative: Git or Path Dependency
+
+For development against unreleased changes, you can depend on the Git 
repository or a local checkout:
+
+```toml
+[dependencies]
+# From Git
+fluss = { git = "https://github.com/apache/fluss-rust.git";, package = 
"fluss-rs" }
+
+# From local path
+fluss = { path = "/path/to/fluss-rust/crates/fluss", package = "fluss-rs" }
+```
+
+> **Note:** When using `git` or `path` dependencies, the `package = 
"fluss-rs"` field is required so that Cargo resolves the correct package while 
still allowing `use fluss::...` imports.
+
+## Building from Source
+
+### 1. Clone the Repository
+
+```bash
+git clone https://github.com/apache/fluss-rust.git
+cd fluss-rust
+```
+
+### 2. Install Dependencies
+
+The Protobuf compiler (`protoc`) is required to build from source.
+
+#### macOS
+
+```bash
+brew install protobuf
+```
+
+#### Ubuntu/Debian
+
+```bash
+sudo apt-get install protobuf-compiler
+```
+
+### 3. Build the Library
+
+```bash
+cargo build --workspace --all-targets
+```
+
+### 4. Run Tests
+
+```bash
+# Unit tests
+cargo test --workspace
+
+# Integration tests (requires running Fluss cluster)
+RUST_TEST_THREADS=1 cargo test --features integration_tests --workspace
+```
+
+## Connection Setup
+
+```rust
+use fluss::client::FlussConnection;
+use fluss::config::Config;
+use fluss::error::Result;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+    let mut config = Config::default();
+    config.bootstrap_server = "127.0.0.1:9123".to_string();
+
+    let conn = FlussConnection::new(config).await?;
+
+    // Use the connection...
+
+    Ok(())
+}
+```
+
+### Configuration Options
+
+| Option | Description | Default |
+|--------|-------------|---------|
+| `bootstrap_server` | Coordinator server address | `127.0.0.1:9123` |
+| `request_max_size` | Maximum request size in bytes | 10 MB |
+| `writer_acks` | Acknowledgment setting (`all` waits for all replicas) | 
`all` |
+| `writer_retries` | Number of retries on failure | `i32::MAX` |
+| `writer_batch_size` | Batch size for writes | 2 MB |
+
+## Admin Operations
+
+### Get Admin Interface
+
+```rust
+let admin = conn.get_admin().await?;
+```
+
+### Database Operations
+
+```rust
+// Create database
+admin.create_database("my_database", true, None).await?;
+
+// List all databases
+let databases = admin.list_databases().await?;
+println!("Databases: {:?}", databases);
+
+// Check if database exists
+let exists = admin.database_exists("my_database").await?;
+
+// Get database information
+let db_info = admin.get_database_info("my_database").await?;
+
+// Drop database (with cascade option to drop all tables)
+admin.drop_database("my_database", true, false).await?;
+```
+
+### Table Operations
+
+```rust
+use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
+
+// Define table schema
+let table_descriptor = TableDescriptor::builder()
+    .schema(
+        Schema::builder()
+            .column("id", DataTypes::int())
+            .column("name", DataTypes::string())
+            .column("amount", DataTypes::bigint())
+            .build()?,
+    )
+    .build()?;
+
+let table_path = TablePath::new("my_database", "my_table");
+
+// Create table
+admin.create_table(&table_path, &table_descriptor, true).await?;
+
+// Get table information
+let table_info = admin.get_table(&table_path).await?;
+println!("Table: {}", table_info);
+
+// List tables in database
+let tables = admin.list_tables("my_database").await?;
+
+// Check if table exists
+let exists = admin.table_exists(&table_path).await?;
+
+// Drop table
+admin.drop_table(&table_path, true).await?;
+```
+
+### Partition Operations
+
+```rust
+use fluss::metadata::PartitionSpec;
+use std::collections::HashMap;
+
+// List all partitions
+let partitions = admin.list_partition_infos(&table_path).await?;
+
+// List partitions matching a spec
+let mut filter = HashMap::new();
+filter.insert("year", "2024");
+let spec = PartitionSpec::new(filter);
+let partitions = admin.list_partition_infos_with_spec(&table_path, 
Some(&spec)).await?;
+
+// Create partition
+admin.create_partition(&table_path, &spec, true).await?;
+
+// Drop partition
+admin.drop_partition(&table_path, &spec, true).await?;
+```
+
+### Offset Operations
+
+```rust
+use fluss::rpc::message::OffsetSpec;
+
+let bucket_ids = vec![0, 1, 2];
+
+// Get earliest offsets
+let earliest = admin.list_offsets(&table_path, &bucket_ids, 
OffsetSpec::Earliest).await?;
+
+// Get latest offsets
+let latest = admin.list_offsets(&table_path, &bucket_ids, 
OffsetSpec::Latest).await?;
+
+// Get offsets for a specific timestamp
+let timestamp_ms = 1704067200000; // 2024-01-01 00:00:00 UTC
+let offsets = admin.list_offsets(&table_path, &bucket_ids, 
OffsetSpec::Timestamp(timestamp_ms)).await?;
+
+// Get offsets for a specific partition
+let partition_offsets = admin.list_partition_offsets(
+    &table_path,
+    "partition_name",
+    &bucket_ids,
+    OffsetSpec::Latest,
+).await?;
+```
+
+### Lake Snapshot
+
+```rust
+// Get latest lake snapshot for lakehouse integration
+let snapshot = admin.get_latest_lake_snapshot(&table_path).await?;
+println!("Snapshot ID: {}", snapshot.snapshot_id);
+```
+
+## Log Table Operations
+
+Log tables are append-only tables without primary keys, suitable for event 
streaming.
+
+### Creating a Log Table
+
+```rust
+let table_descriptor = TableDescriptor::builder()
+    .schema(
+        Schema::builder()
+            .column("event_id", DataTypes::int())
+            .column("event_type", DataTypes::string())
+            .column("timestamp", DataTypes::bigint())
+            .build()?,
+    )
+    .build()?;
+
+let table_path = TablePath::new("fluss", "events");
+admin.create_table(&table_path, &table_descriptor, true).await?;
+```
+
+### Writing to Log Tables
+
+```rust
+use fluss::row::{GenericRow, InternalRow};
+
+let table = conn.get_table(&table_path).await?;
+let append_writer = table.new_append()?.create_writer()?;
+
+// Write a single row
+let mut row = GenericRow::new(3);
+row.set_field(0, 1);                    // event_id (int)
+row.set_field(1, "user_login");         // event_type (string)
+row.set_field(2, 1704067200000i64);     // timestamp (bigint)
+
+append_writer.append(&row)?;
+
+// Write multiple rows
+let mut row2 = GenericRow::new(3);
+row2.set_field(0, 2);
+row2.set_field(1, "page_view");
+row2.set_field(2, 1704067201000i64);
+
+append_writer.append(&row2)?;
+
+// Flush to ensure data is persisted
+append_writer.flush().await?;
+```
+
+Write operations (`append`, `upsert`, `delete`) use a **fire-and-forget** 
pattern for efficient batching. Each call queues the write and returns a 
`WriteResultFuture` immediately. Call `flush()` to ensure all queued writes are 
sent to the server.
+
+If you need per-record acknowledgment, you can await the returned future:
+
+```rust
+// Per-record acknowledgment (blocks until server confirms)
+append_writer.append(&row)?.await?;
+```
+
+### Reading from Log Tables
+
+```rust
+use std::time::Duration;
+
+let table = conn.get_table(&table_path).await?;
+let log_scanner = table.new_scan().create_log_scanner()?;
+
+// Subscribe to bucket 0 starting from offset 0
+log_scanner.subscribe(0, 0).await?;
+
+// Poll for records
+let records = log_scanner.poll(Duration::from_secs(10)).await?;
+
+for record in records {
+    let row = record.row();
+    println!(
+        "event_id={}, event_type={}, timestamp={} @ offset={}",
+        row.get_int(0),
+        row.get_string(1),
+        row.get_long(2),
+        record.offset()
+    );
+}
+```
+
+### Column Projection
+
+```rust
+// Project specific columns by index
+let scanner = table.new_scan().project(&[0, 2])?.create_log_scanner()?;
+
+// Or project by column names
+let scanner = table.new_scan().project_by_name(&["event_id", 
"timestamp"])?.create_log_scanner()?;
+```
+
+### Subscribe from Specific Offsets
+
+```rust
+use fluss::client::{EARLIEST_OFFSET, LATEST_OFFSET};
+
+// Subscribe from earliest available offset
+log_scanner.subscribe(0, EARLIEST_OFFSET).await?;
+
+// Subscribe from latest offset (only new records)
+log_scanner.subscribe(0, LATEST_OFFSET).await?;
+
+// Subscribe from a specific offset
+log_scanner.subscribe(0, 42).await?;
+
+// Subscribe to all buckets
+let num_buckets = table.get_table_info().get_num_buckets();
+for bucket_id in 0..num_buckets {
+    log_scanner.subscribe(bucket_id, 0).await?;
+}
+```
+
+### Subscribe to Multiple Buckets
+
+```rust
+use std::collections::HashMap;
+
+// Subscribe to multiple buckets at once with specific offsets
+let mut bucket_offsets = HashMap::new();
+bucket_offsets.insert(0, 0i64);    // bucket 0 from offset 0
+bucket_offsets.insert(1, 100i64);  // bucket 1 from offset 100
+log_scanner.subscribe_buckets(&bucket_offsets).await?;
+```
+
+### Unsubscribe from a Partition
+
+```rust
+// Unsubscribe from a specific partition bucket
+log_scanner.unsubscribe_partition(partition_id, bucket_id).await?;

Review Comment:
   There is no public `unsubscribe_partition(...)` API on 
`LogScanner`/`RecordBatchLogScanner` (only internal handling/log messages). 
This example won’t compile as-is; either document the supported way to stop 
consuming (if any), or add/land an unsubscribe API in code.
   ```suggestion
   ### Stopping Consumption
   
   Currently, the Rust client does not expose a dedicated 
`unsubscribe_partition(...)`
   API on `LogScanner`. To stop consuming from a partition (or entirely), break 
out of
   your polling loop and let the `log_scanner` be dropped when it goes out of 
scope:
   
   ```rust
   // inside your async task or function
   loop {
       // read from the log
       let batch = log_scanner.next().await?;
   
       // application-specific condition to stop consuming
       if should_stop(&batch) {
           break;
       }
   
       // process batch ...
   }
   
   // Once you break out of the loop, `log_scanner` will be dropped when it
   // goes out of scope. You can also explicitly drop it if you prefer:
   drop(log_scanner);
   ```



##########
docs/rust-client.md:
##########
@@ -0,0 +1,748 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+# Fluss Rust Client Guide
+
+This guide covers how to use the Fluss Rust client for reading and writing 
data to log tables and primary key tables.
+
+## Prerequisites
+
+- Rust 1.85+
+- Protobuf compiler (`protoc`) - only required when [building from 
source](#building-from-source)
+
+## Adding to Your Project
+
+The Fluss Rust client is published to 
[crates.io](https://crates.io/crates/fluss-rs) as `fluss-rs`. The crate's 
library name is `fluss`, so you import it with `use fluss::...`.
+
+```toml
+[dependencies]
+fluss-rs = "0.1"
+tokio = { version = "1", features = ["full"] }
+```
+
+### Feature Flags
+
+The Fluss crate supports optional storage backends:
+
+```toml
+[dependencies]
+# Default: memory and filesystem storage
+fluss-rs = "0.1"
+
+# With S3 storage support
+fluss-rs = { version = "0.1", features = ["storage-s3"] }
+
+# With OSS storage support
+fluss-rs = { version = "0.1", features = ["storage-oss"] }
+
+# All storage backends
+fluss-rs = { version = "0.1", features = ["storage-all"] }
+```
+
+Available features:
+- `storage-memory` (default) - In-memory storage
+- `storage-fs` (default) - Filesystem storage
+- `storage-s3` - Amazon S3 storage
+- `storage-oss` - Alibaba OSS storage
+- `storage-all` - All storage backends
+
+### Alternative: Git or Path Dependency
+
+For development against unreleased changes, you can depend on the Git 
repository or a local checkout:
+
+```toml
+[dependencies]
+# From Git
+fluss = { git = "https://github.com/apache/fluss-rust.git";, package = 
"fluss-rs" }
+
+# From local path
+fluss = { path = "/path/to/fluss-rust/crates/fluss", package = "fluss-rs" }
+```
+
+> **Note:** When using `git` or `path` dependencies, the `package = 
"fluss-rs"` field is required so that Cargo resolves the correct package while 
still allowing `use fluss::...` imports.
+
+## Building from Source
+
+### 1. Clone the Repository
+
+```bash
+git clone https://github.com/apache/fluss-rust.git
+cd fluss-rust
+```
+
+### 2. Install Dependencies
+
+The Protobuf compiler (`protoc`) is required to build from source.
+
+#### macOS
+
+```bash
+brew install protobuf
+```
+
+#### Ubuntu/Debian
+
+```bash
+sudo apt-get install protobuf-compiler
+```
+
+### 3. Build the Library
+
+```bash
+cargo build --workspace --all-targets
+```
+
+### 4. Run Tests
+
+```bash
+# Unit tests
+cargo test --workspace
+
+# Integration tests (requires running Fluss cluster)
+RUST_TEST_THREADS=1 cargo test --features integration_tests --workspace
+```
+
+## Connection Setup
+
+```rust
+use fluss::client::FlussConnection;
+use fluss::config::Config;
+use fluss::error::Result;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+    let mut config = Config::default();
+    config.bootstrap_server = "127.0.0.1:9123".to_string();
+
+    let conn = FlussConnection::new(config).await?;
+
+    // Use the connection...
+
+    Ok(())
+}
+```
+
+### Configuration Options
+
+| Option | Description | Default |
+|--------|-------------|---------|
+| `bootstrap_server` | Coordinator server address | `127.0.0.1:9123` |
+| `request_max_size` | Maximum request size in bytes | 10 MB |
+| `writer_acks` | Acknowledgment setting (`all` waits for all replicas) | 
`all` |
+| `writer_retries` | Number of retries on failure | `i32::MAX` |
+| `writer_batch_size` | Batch size for writes | 2 MB |
+
+## Admin Operations
+
+### Get Admin Interface
+
+```rust
+let admin = conn.get_admin().await?;
+```
+
+### Database Operations
+
+```rust
+// Create database
+admin.create_database("my_database", true, None).await?;
+
+// List all databases
+let databases = admin.list_databases().await?;
+println!("Databases: {:?}", databases);
+
+// Check if database exists
+let exists = admin.database_exists("my_database").await?;
+
+// Get database information
+let db_info = admin.get_database_info("my_database").await?;
+
+// Drop database (with cascade option to drop all tables)
+admin.drop_database("my_database", true, false).await?;
+```
+
+### Table Operations
+
+```rust
+use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
+
+// Define table schema
+let table_descriptor = TableDescriptor::builder()
+    .schema(
+        Schema::builder()
+            .column("id", DataTypes::int())
+            .column("name", DataTypes::string())
+            .column("amount", DataTypes::bigint())
+            .build()?,
+    )
+    .build()?;
+
+let table_path = TablePath::new("my_database", "my_table");
+
+// Create table
+admin.create_table(&table_path, &table_descriptor, true).await?;
+
+// Get table information
+let table_info = admin.get_table(&table_path).await?;
+println!("Table: {}", table_info);
+
+// List tables in database
+let tables = admin.list_tables("my_database").await?;
+
+// Check if table exists
+let exists = admin.table_exists(&table_path).await?;
+
+// Drop table
+admin.drop_table(&table_path, true).await?;
+```
+
+### Partition Operations
+
+```rust
+use fluss::metadata::PartitionSpec;
+use std::collections::HashMap;
+
+// List all partitions
+let partitions = admin.list_partition_infos(&table_path).await?;
+
+// List partitions matching a spec
+let mut filter = HashMap::new();
+filter.insert("year", "2024");
+let spec = PartitionSpec::new(filter);
+let partitions = admin.list_partition_infos_with_spec(&table_path, 
Some(&spec)).await?;
+
+// Create partition
+admin.create_partition(&table_path, &spec, true).await?;
+
+// Drop partition
+admin.drop_partition(&table_path, &spec, true).await?;
+```
+
+### Offset Operations
+
+```rust
+use fluss::rpc::message::OffsetSpec;
+
+let bucket_ids = vec![0, 1, 2];
+
+// Get earliest offsets
+let earliest = admin.list_offsets(&table_path, &bucket_ids, 
OffsetSpec::Earliest).await?;
+
+// Get latest offsets
+let latest = admin.list_offsets(&table_path, &bucket_ids, 
OffsetSpec::Latest).await?;
+
+// Get offsets for a specific timestamp
+let timestamp_ms = 1704067200000; // 2024-01-01 00:00:00 UTC
+let offsets = admin.list_offsets(&table_path, &bucket_ids, 
OffsetSpec::Timestamp(timestamp_ms)).await?;
+
+// Get offsets for a specific partition
+let partition_offsets = admin.list_partition_offsets(
+    &table_path,
+    "partition_name",
+    &bucket_ids,
+    OffsetSpec::Latest,
+).await?;
+```
+
+### Lake Snapshot
+
+```rust
+// Get latest lake snapshot for lakehouse integration
+let snapshot = admin.get_latest_lake_snapshot(&table_path).await?;
+println!("Snapshot ID: {}", snapshot.snapshot_id);
+```
+
+## Log Table Operations
+
+Log tables are append-only tables without primary keys, suitable for event 
streaming.
+
+### Creating a Log Table
+
+```rust
+let table_descriptor = TableDescriptor::builder()
+    .schema(
+        Schema::builder()
+            .column("event_id", DataTypes::int())
+            .column("event_type", DataTypes::string())
+            .column("timestamp", DataTypes::bigint())
+            .build()?,
+    )
+    .build()?;
+
+let table_path = TablePath::new("fluss", "events");
+admin.create_table(&table_path, &table_descriptor, true).await?;
+```
+
+### Writing to Log Tables
+
+```rust
+use fluss::row::{GenericRow, InternalRow};
+
+let table = conn.get_table(&table_path).await?;
+let append_writer = table.new_append()?.create_writer()?;
+
+// Write a single row
+let mut row = GenericRow::new(3);
+row.set_field(0, 1);                    // event_id (int)
+row.set_field(1, "user_login");         // event_type (string)
+row.set_field(2, 1704067200000i64);     // timestamp (bigint)
+
+append_writer.append(&row)?;
+
+// Write multiple rows
+let mut row2 = GenericRow::new(3);
+row2.set_field(0, 2);
+row2.set_field(1, "page_view");
+row2.set_field(2, 1704067201000i64);
+
+append_writer.append(&row2)?;
+
+// Flush to ensure data is persisted
+append_writer.flush().await?;
+```
+
+Write operations (`append`, `upsert`, `delete`) use a **fire-and-forget** 
pattern for efficient batching. Each call queues the write and returns a 
`WriteResultFuture` immediately. Call `flush()` to ensure all queued writes are 
sent to the server.
+
+If you need per-record acknowledgment, you can await the returned future:
+
+```rust
+// Per-record acknowledgment (blocks until server confirms)
+append_writer.append(&row)?.await?;
+```
+
+### Reading from Log Tables
+
+```rust
+use std::time::Duration;
+
+let table = conn.get_table(&table_path).await?;
+let log_scanner = table.new_scan().create_log_scanner()?;
+
+// Subscribe to bucket 0 starting from offset 0
+log_scanner.subscribe(0, 0).await?;
+
+// Poll for records
+let records = log_scanner.poll(Duration::from_secs(10)).await?;
+
+for record in records {
+    let row = record.row();
+    println!(
+        "event_id={}, event_type={}, timestamp={} @ offset={}",
+        row.get_int(0),
+        row.get_string(1),
+        row.get_long(2),
+        record.offset()
+    );
+}
+```
+
+### Column Projection
+
+```rust
+// Project specific columns by index
+let scanner = table.new_scan().project(&[0, 2])?.create_log_scanner()?;
+
+// Or project by column names
+let scanner = table.new_scan().project_by_name(&["event_id", 
"timestamp"])?.create_log_scanner()?;
+```
+
+### Subscribe from Specific Offsets
+
+```rust
+use fluss::client::{EARLIEST_OFFSET, LATEST_OFFSET};
+
+// Subscribe from earliest available offset
+log_scanner.subscribe(0, EARLIEST_OFFSET).await?;
+
+// Subscribe from latest offset (only new records)
+log_scanner.subscribe(0, LATEST_OFFSET).await?;
+
+// Subscribe from a specific offset
+log_scanner.subscribe(0, 42).await?;
+
+// Subscribe to all buckets
+let num_buckets = table.get_table_info().get_num_buckets();
+for bucket_id in 0..num_buckets {
+    log_scanner.subscribe(bucket_id, 0).await?;
+}
+```
+
+### Subscribe to Multiple Buckets
+
+```rust
+use std::collections::HashMap;
+
+// Subscribe to multiple buckets at once with specific offsets
+let mut bucket_offsets = HashMap::new();
+bucket_offsets.insert(0, 0i64);    // bucket 0 from offset 0
+bucket_offsets.insert(1, 100i64);  // bucket 1 from offset 100
+log_scanner.subscribe_buckets(&bucket_offsets).await?;
+```
+
+### Unsubscribe from a Partition
+
+```rust
+// Unsubscribe from a specific partition bucket
+log_scanner.unsubscribe_partition(partition_id, bucket_id).await?;
+```
+
+## Partitioned Log Tables
+
+Partitioned tables distribute data across partitions based on partition column 
values, enabling efficient data organization and querying.
+
+### Creating a Partitioned Log Table
+
+```rust
+use fluss::metadata::{DataTypes, LogFormat, Schema, TableDescriptor, 
TablePath};
+
+let table_descriptor = TableDescriptor::builder()
+    .schema(
+        Schema::builder()
+            .column("event_id", DataTypes::int())
+            .column("event_type", DataTypes::string())
+            .column("dt", DataTypes::string())       // partition column
+            .column("region", DataTypes::string())   // partition column
+            .build()?,
+    )
+    .partitioned_by(vec!["dt", "region"])  // Define partition columns
+    .log_format(LogFormat::ARROW)
+    .build()?;
+
+let table_path = TablePath::new("fluss", "partitioned_events");
+admin.create_table(&table_path, &table_descriptor, true).await?;
+```
+
+### Writing to Partitioned Log Tables
+
+Writing works the same as non-partitioned tables. Include partition column 
values in each row:
+
+```rust
+let table = conn.get_table(&table_path).await?;
+let append_writer = table.new_append()?.create_writer()?;
+
+// Partition column values determine which partition the record goes to
+let mut row = GenericRow::new(4);
+row.set_field(0, 1);                  // event_id
+row.set_field(1, "user_login");       // event_type
+row.set_field(2, "2024-01-15");       // dt (partition column)
+row.set_field(3, "US");               // region (partition column)
+
+append_writer.append(&row)?;
+append_writer.flush().await?;
+```
+
+### Reading from Partitioned Log Tables
+
+For partitioned tables, use `subscribe_partition()` instead of `subscribe()`:
+
+```rust
+use std::time::Duration;
+
+let table = conn.get_table(&table_path).await?;
+let admin = conn.get_admin().await?;
+
+// Get partition information
+let partitions = admin.list_partition_infos(&table_path).await?;
+
+let log_scanner = table.new_scan().create_log_scanner()?;
+
+// Subscribe to each partition's buckets
+for partition_info in &partitions {
+    let partition_id = partition_info.get_partition_id();
+    let num_buckets = table.get_table_info().get_num_buckets();
+
+    for bucket_id in 0..num_buckets {
+        log_scanner.subscribe_partition(partition_id, bucket_id, 0).await?;
+    }
+}
+
+// Poll for records
+let records = log_scanner.poll(Duration::from_secs(10)).await?;
+for record in records {
+    println!("Record from partition: {:?}", record.row());
+}
+```
+
+You can also subscribe to multiple partition-buckets at once:
+
+```rust
+use std::collections::HashMap;
+
+let mut partition_bucket_offsets = HashMap::new();
+partition_bucket_offsets.insert((partition_id, 0), 0i64);  // partition, 
bucket 0, offset 0
+partition_bucket_offsets.insert((partition_id, 1), 0i64);  // partition, 
bucket 1, offset 0
+log_scanner.subscribe_partition_buckets(&partition_bucket_offsets).await?;
+```
+
+### Managing Partitions
+
+```rust
+use fluss::metadata::PartitionSpec;
+use std::collections::HashMap;
+
+// Create a partition
+let mut partition_values = HashMap::new();
+partition_values.insert("dt", "2024-01-15");
+partition_values.insert("region", "EMEA");
+let spec = PartitionSpec::new(partition_values);
+admin.create_partition(&table_path, &spec, true).await?;
+
+// List all partitions
+let partitions = admin.list_partition_infos(&table_path).await?;
+for partition in &partitions {
+    println!(
+        "Partition: id={}, name={}",
+        partition.get_partition_id(),
+        partition.get_partition_name()  // Format: "value1$value2"
+    );
+}
+
+// List partitions with filter (partial spec)
+let mut partial_values = HashMap::new();
+partial_values.insert("dt", "2024-01-15");
+let partial_spec = PartitionSpec::new(partial_values);
+let filtered = admin.list_partition_infos_with_spec(&table_path, 
Some(&partial_spec)).await?;
+
+// Drop a partition
+admin.drop_partition(&table_path, &spec, true).await?;
+```
+
+## Primary Key Table Operations
+
+Primary key tables (KV tables) support upsert, delete, and lookup operations.
+
+### Creating a Primary Key Table
+
+```rust
+let table_descriptor = TableDescriptor::builder()
+    .schema(
+        Schema::builder()
+            .column("id", DataTypes::int())
+            .column("name", DataTypes::string())
+            .column("age", DataTypes::bigint())
+            .primary_key(vec!["id"])  // Define primary key
+            .build()?,
+    )
+    .build()?;
+
+let table_path = TablePath::new("fluss", "users");
+admin.create_table(&table_path, &table_descriptor, true).await?;
+```
+
+### Upserting Records
+
+```rust
+let table = conn.get_table(&table_path).await?;
+let table_upsert = table.new_upsert()?;
+let upsert_writer = table_upsert.create_writer()?;
+
+// Insert or update records
+for (id, name, age) in [(1, "Alice", 25i64), (2, "Bob", 30), (3, "Charlie", 
35)] {
+    let mut row = GenericRow::new(3);
+    row.set_field(0, id);
+    row.set_field(1, name);
+    row.set_field(2, age);
+    upsert_writer.upsert(&row)?;
+}
+upsert_writer.flush().await?;
+```
+
+### Updating Records
+
+```rust
+// Update existing record (same primary key)
+let mut row = GenericRow::new(3);
+row.set_field(0, 1);        // id (primary key)
+row.set_field(1, "Alice");  // name
+row.set_field(2, 26i64);    // Updated age
+
+upsert_writer.upsert(&row)?;
+upsert_writer.flush().await?;
+```
+
+### Deleting Records
+
+```rust
+// Delete by primary key (only primary key field needs to be set)
+let mut row = GenericRow::new(3);
+row.set_field(0, 2);  // id of record to delete
+
+upsert_writer.delete(&row)?;
+upsert_writer.flush().await?;
+```
+
+### Partial Updates
+
+Update only specific columns while preserving others:
+
+```rust
+// By column indices
+let partial_upsert = table_upsert.partial_update(Some(vec![0, 2]))?;
+let partial_writer = partial_upsert.create_writer()?;
+
+let mut row = GenericRow::new(3);
+row.set_field(0, 1);       // id (primary key, required)
+row.set_field(2, 27i64);   // age (will be updated)
+// name will remain unchanged
+
+partial_writer.upsert(&row)?;
+partial_writer.flush().await?;
+
+// By column names
+let partial_upsert = table_upsert.partial_update_with_column_names(&["id", 
"age"])?;
+let partial_writer = partial_upsert.create_writer()?;

Review Comment:
   In the partial update example, `partial_writer.upsert` is `async` and 
requires `&mut self`. As written (immutable + no `.await`), it won’t compile. 
Please make `partial_writer` mutable and await the write call(s).
   ```suggestion
   let mut partial_writer = partial_upsert.create_writer()?;
   
   let mut row = GenericRow::new(3);
   row.set_field(0, 1);       // id (primary key, required)
   row.set_field(2, 27i64);   // age (will be updated)
   // name will remain unchanged
   
   partial_writer.upsert(&row).await?;
   partial_writer.flush().await?;
   
   // By column names
   let partial_upsert = table_upsert.partial_update_with_column_names(&["id", 
"age"])?;
   let mut partial_writer = partial_upsert.create_writer()?;
   ```



##########
docs/rust-client.md:
##########
@@ -0,0 +1,748 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+# Fluss Rust Client Guide
+
+This guide covers how to use the Fluss Rust client for reading and writing 
data to log tables and primary key tables.
+
+## Prerequisites
+
+- Rust 1.85+
+- Protobuf compiler (`protoc`) - only required when [building from 
source](#building-from-source)
+
+## Adding to Your Project
+
+The Fluss Rust client is published to 
[crates.io](https://crates.io/crates/fluss-rs) as `fluss-rs`. The crate's 
library name is `fluss`, so you import it with `use fluss::...`.
+
+```toml
+[dependencies]
+fluss-rs = "0.1"
+tokio = { version = "1", features = ["full"] }
+```
+
+### Feature Flags
+
+The Fluss crate supports optional storage backends:
+
+```toml
+[dependencies]
+# Default: memory and filesystem storage
+fluss-rs = "0.1"
+
+# With S3 storage support
+fluss-rs = { version = "0.1", features = ["storage-s3"] }
+
+# With OSS storage support
+fluss-rs = { version = "0.1", features = ["storage-oss"] }
+
+# All storage backends
+fluss-rs = { version = "0.1", features = ["storage-all"] }
+```
+
+Available features:
+- `storage-memory` (default) - In-memory storage
+- `storage-fs` (default) - Filesystem storage
+- `storage-s3` - Amazon S3 storage
+- `storage-oss` - Alibaba OSS storage
+- `storage-all` - All storage backends
+
+### Alternative: Git or Path Dependency
+
+For development against unreleased changes, you can depend on the Git 
repository or a local checkout:
+
+```toml
+[dependencies]
+# From Git
+fluss = { git = "https://github.com/apache/fluss-rust.git";, package = 
"fluss-rs" }
+
+# From local path
+fluss = { path = "/path/to/fluss-rust/crates/fluss", package = "fluss-rs" }
+```
+
+> **Note:** When using `git` or `path` dependencies, the `package = 
"fluss-rs"` field is required so that Cargo resolves the correct package while 
still allowing `use fluss::...` imports.
+
+## Building from Source
+
+### 1. Clone the Repository
+
+```bash
+git clone https://github.com/apache/fluss-rust.git
+cd fluss-rust
+```
+
+### 2. Install Dependencies
+
+The Protobuf compiler (`protoc`) is required to build from source.
+
+#### macOS
+
+```bash
+brew install protobuf
+```
+
+#### Ubuntu/Debian
+
+```bash
+sudo apt-get install protobuf-compiler
+```
+
+### 3. Build the Library
+
+```bash
+cargo build --workspace --all-targets
+```
+
+### 4. Run Tests
+
+```bash
+# Unit tests
+cargo test --workspace
+
+# Integration tests (requires running Fluss cluster)
+RUST_TEST_THREADS=1 cargo test --features integration_tests --workspace
+```
+
+## Connection Setup
+
+```rust
+use fluss::client::FlussConnection;
+use fluss::config::Config;
+use fluss::error::Result;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+    let mut config = Config::default();
+    config.bootstrap_server = "127.0.0.1:9123".to_string();
+
+    let conn = FlussConnection::new(config).await?;
+
+    // Use the connection...
+
+    Ok(())
+}
+```
+
+### Configuration Options
+
+| Option | Description | Default |
+|--------|-------------|---------|
+| `bootstrap_server` | Coordinator server address | `127.0.0.1:9123` |
+| `request_max_size` | Maximum request size in bytes | 10 MB |
+| `writer_acks` | Acknowledgment setting (`all` waits for all replicas) | 
`all` |
+| `writer_retries` | Number of retries on failure | `i32::MAX` |
+| `writer_batch_size` | Batch size for writes | 2 MB |
+
+## Admin Operations
+
+### Get Admin Interface
+
+```rust
+let admin = conn.get_admin().await?;
+```
+
+### Database Operations
+
+```rust
+// Create database
+admin.create_database("my_database", true, None).await?;
+
+// List all databases
+let databases = admin.list_databases().await?;
+println!("Databases: {:?}", databases);
+
+// Check if database exists
+let exists = admin.database_exists("my_database").await?;
+
+// Get database information
+let db_info = admin.get_database_info("my_database").await?;
+
+// Drop database (with cascade option to drop all tables)
+admin.drop_database("my_database", true, false).await?;
+```
+
+### Table Operations
+
+```rust
+use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
+
+// Define table schema
+let table_descriptor = TableDescriptor::builder()
+    .schema(
+        Schema::builder()
+            .column("id", DataTypes::int())
+            .column("name", DataTypes::string())
+            .column("amount", DataTypes::bigint())
+            .build()?,
+    )
+    .build()?;
+
+let table_path = TablePath::new("my_database", "my_table");
+
+// Create table
+admin.create_table(&table_path, &table_descriptor, true).await?;
+
+// Get table information
+let table_info = admin.get_table(&table_path).await?;
+println!("Table: {}", table_info);
+
+// List tables in database
+let tables = admin.list_tables("my_database").await?;
+
+// Check if table exists
+let exists = admin.table_exists(&table_path).await?;
+
+// Drop table
+admin.drop_table(&table_path, true).await?;
+```
+
+### Partition Operations
+
+```rust
+use fluss::metadata::PartitionSpec;
+use std::collections::HashMap;
+
+// List all partitions
+let partitions = admin.list_partition_infos(&table_path).await?;
+
+// List partitions matching a spec
+let mut filter = HashMap::new();
+filter.insert("year", "2024");
+let spec = PartitionSpec::new(filter);
+let partitions = admin.list_partition_infos_with_spec(&table_path, 
Some(&spec)).await?;
+
+// Create partition
+admin.create_partition(&table_path, &spec, true).await?;
+
+// Drop partition
+admin.drop_partition(&table_path, &spec, true).await?;
+```
+
+### Offset Operations
+
+```rust
+use fluss::rpc::message::OffsetSpec;
+
+let bucket_ids = vec![0, 1, 2];
+
+// Get earliest offsets
+let earliest = admin.list_offsets(&table_path, &bucket_ids, 
OffsetSpec::Earliest).await?;
+
+// Get latest offsets
+let latest = admin.list_offsets(&table_path, &bucket_ids, 
OffsetSpec::Latest).await?;
+
+// Get offsets for a specific timestamp
+let timestamp_ms = 1704067200000; // 2024-01-01 00:00:00 UTC
+let offsets = admin.list_offsets(&table_path, &bucket_ids, 
OffsetSpec::Timestamp(timestamp_ms)).await?;
+
+// Get offsets for a specific partition
+let partition_offsets = admin.list_partition_offsets(
+    &table_path,
+    "partition_name",
+    &bucket_ids,
+    OffsetSpec::Latest,
+).await?;
+```
+
+### Lake Snapshot
+
+```rust
+// Get latest lake snapshot for lakehouse integration
+let snapshot = admin.get_latest_lake_snapshot(&table_path).await?;
+println!("Snapshot ID: {}", snapshot.snapshot_id);
+```
+
+## Log Table Operations
+
+Log tables are append-only tables without primary keys, suitable for event 
streaming.
+
+### Creating a Log Table
+
+```rust
+let table_descriptor = TableDescriptor::builder()
+    .schema(
+        Schema::builder()
+            .column("event_id", DataTypes::int())
+            .column("event_type", DataTypes::string())
+            .column("timestamp", DataTypes::bigint())
+            .build()?,
+    )
+    .build()?;
+
+let table_path = TablePath::new("fluss", "events");
+admin.create_table(&table_path, &table_descriptor, true).await?;
+```
+
+### Writing to Log Tables
+
+```rust
+use fluss::row::{GenericRow, InternalRow};
+
+let table = conn.get_table(&table_path).await?;
+let append_writer = table.new_append()?.create_writer()?;
+
+// Write a single row
+let mut row = GenericRow::new(3);
+row.set_field(0, 1);                    // event_id (int)
+row.set_field(1, "user_login");         // event_type (string)
+row.set_field(2, 1704067200000i64);     // timestamp (bigint)
+
+append_writer.append(&row)?;
+
+// Write multiple rows
+let mut row2 = GenericRow::new(3);
+row2.set_field(0, 2);
+row2.set_field(1, "page_view");
+row2.set_field(2, 1704067201000i64);
+
+append_writer.append(&row2)?;
+
+// Flush to ensure data is persisted
+append_writer.flush().await?;
+```
+
+Write operations (`append`, `upsert`, `delete`) use a **fire-and-forget** 
pattern for efficient batching. Each call queues the write and returns a 
`WriteResultFuture` immediately. Call `flush()` to ensure all queued writes are 
sent to the server.
+
+If you need per-record acknowledgment, you can await the returned future:
+
+```rust
+// Per-record acknowledgment (blocks until server confirms)
+append_writer.append(&row)?.await?;
+```
+
+### Reading from Log Tables
+
+```rust
+use std::time::Duration;
+
+let table = conn.get_table(&table_path).await?;
+let log_scanner = table.new_scan().create_log_scanner()?;
+
+// Subscribe to bucket 0 starting from offset 0
+log_scanner.subscribe(0, 0).await?;
+
+// Poll for records
+let records = log_scanner.poll(Duration::from_secs(10)).await?;
+
+for record in records {
+    let row = record.row();
+    println!(
+        "event_id={}, event_type={}, timestamp={} @ offset={}",
+        row.get_int(0),
+        row.get_string(1),
+        row.get_long(2),
+        record.offset()
+    );
+}
+```
+
+### Column Projection
+
+```rust
+// Project specific columns by index
+let scanner = table.new_scan().project(&[0, 2])?.create_log_scanner()?;
+
+// Or project by column names
+let scanner = table.new_scan().project_by_name(&["event_id", 
"timestamp"])?.create_log_scanner()?;
+```
+
+### Subscribe from Specific Offsets
+
+```rust
+use fluss::client::{EARLIEST_OFFSET, LATEST_OFFSET};
+
+// Subscribe from earliest available offset
+log_scanner.subscribe(0, EARLIEST_OFFSET).await?;
+
+// Subscribe from latest offset (only new records)
+log_scanner.subscribe(0, LATEST_OFFSET).await?;
+
+// Subscribe from a specific offset
+log_scanner.subscribe(0, 42).await?;
+
+// Subscribe to all buckets
+let num_buckets = table.get_table_info().get_num_buckets();
+for bucket_id in 0..num_buckets {
+    log_scanner.subscribe(bucket_id, 0).await?;
+}
+```
+
+### Subscribe to Multiple Buckets
+
+```rust
+use std::collections::HashMap;
+
+// Subscribe to multiple buckets at once with specific offsets
+let mut bucket_offsets = HashMap::new();
+bucket_offsets.insert(0, 0i64);    // bucket 0 from offset 0
+bucket_offsets.insert(1, 100i64);  // bucket 1 from offset 100
+log_scanner.subscribe_buckets(&bucket_offsets).await?;
+```
+
+### Unsubscribe from a Partition
+
+```rust
+// Unsubscribe from a specific partition bucket
+log_scanner.unsubscribe_partition(partition_id, bucket_id).await?;
+```
+
+## Partitioned Log Tables
+
+Partitioned tables distribute data across partitions based on partition column 
values, enabling efficient data organization and querying.
+
+### Creating a Partitioned Log Table
+
+```rust
+use fluss::metadata::{DataTypes, LogFormat, Schema, TableDescriptor, 
TablePath};
+
+let table_descriptor = TableDescriptor::builder()
+    .schema(
+        Schema::builder()
+            .column("event_id", DataTypes::int())
+            .column("event_type", DataTypes::string())
+            .column("dt", DataTypes::string())       // partition column
+            .column("region", DataTypes::string())   // partition column
+            .build()?,
+    )
+    .partitioned_by(vec!["dt", "region"])  // Define partition columns
+    .log_format(LogFormat::ARROW)
+    .build()?;
+
+let table_path = TablePath::new("fluss", "partitioned_events");
+admin.create_table(&table_path, &table_descriptor, true).await?;
+```
+
+### Writing to Partitioned Log Tables
+
+Writing works the same as non-partitioned tables. Include partition column 
values in each row:
+
+```rust
+let table = conn.get_table(&table_path).await?;
+let append_writer = table.new_append()?.create_writer()?;
+
+// Partition column values determine which partition the record goes to
+let mut row = GenericRow::new(4);
+row.set_field(0, 1);                  // event_id
+row.set_field(1, "user_login");       // event_type
+row.set_field(2, "2024-01-15");       // dt (partition column)
+row.set_field(3, "US");               // region (partition column)
+
+append_writer.append(&row)?;
+append_writer.flush().await?;
+```
+
+### Reading from Partitioned Log Tables
+
+For partitioned tables, use `subscribe_partition()` instead of `subscribe()`:
+
+```rust
+use std::time::Duration;
+
+let table = conn.get_table(&table_path).await?;
+let admin = conn.get_admin().await?;
+
+// Get partition information
+let partitions = admin.list_partition_infos(&table_path).await?;
+
+let log_scanner = table.new_scan().create_log_scanner()?;
+
+// Subscribe to each partition's buckets
+for partition_info in &partitions {
+    let partition_id = partition_info.get_partition_id();
+    let num_buckets = table.get_table_info().get_num_buckets();
+
+    for bucket_id in 0..num_buckets {
+        log_scanner.subscribe_partition(partition_id, bucket_id, 0).await?;
+    }
+}
+
+// Poll for records
+let records = log_scanner.poll(Duration::from_secs(10)).await?;
+for record in records {
+    println!("Record from partition: {:?}", record.row());

Review Comment:
   Same issue here: `poll(...)` returns `ScanRecords` (bucket->Vec<ScanRecord>) 
rather than a flat iterator of records, so `for record in records { ... }` 
won’t compile. Iterate over buckets and then over each bucket’s record list.
   ```suggestion
   let scan_records = log_scanner.poll(Duration::from_secs(10)).await?;
   for (_bucket_id, records) in scan_records {
       for record in records {
           println!("Record from partition: {:?}", record.row());
       }
   ```



##########
docs/rust-client.md:
##########
@@ -0,0 +1,748 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+# Fluss Rust Client Guide
+
+This guide covers how to use the Fluss Rust client for reading and writing 
data to log tables and primary key tables.
+
+## Prerequisites
+
+- Rust 1.85+
+- Protobuf compiler (`protoc`) - only required when [building from 
source](#building-from-source)
+
+## Adding to Your Project
+
+The Fluss Rust client is published to 
[crates.io](https://crates.io/crates/fluss-rs) as `fluss-rs`. The crate's 
library name is `fluss`, so you import it with `use fluss::...`.
+
+```toml
+[dependencies]
+fluss-rs = "0.1"
+tokio = { version = "1", features = ["full"] }
+```
+
+### Feature Flags
+
+The Fluss crate supports optional storage backends:
+
+```toml
+[dependencies]
+# Default: memory and filesystem storage
+fluss-rs = "0.1"
+
+# With S3 storage support
+fluss-rs = { version = "0.1", features = ["storage-s3"] }
+
+# With OSS storage support
+fluss-rs = { version = "0.1", features = ["storage-oss"] }
+
+# All storage backends
+fluss-rs = { version = "0.1", features = ["storage-all"] }
+```
+
+Available features:
+- `storage-memory` (default) - In-memory storage
+- `storage-fs` (default) - Filesystem storage
+- `storage-s3` - Amazon S3 storage
+- `storage-oss` - Alibaba OSS storage
+- `storage-all` - All storage backends
+
+### Alternative: Git or Path Dependency
+
+For development against unreleased changes, you can depend on the Git 
repository or a local checkout:
+
+```toml
+[dependencies]
+# From Git
+fluss = { git = "https://github.com/apache/fluss-rust.git";, package = 
"fluss-rs" }
+
+# From local path
+fluss = { path = "/path/to/fluss-rust/crates/fluss", package = "fluss-rs" }
+```
+
+> **Note:** When using `git` or `path` dependencies, the `package = 
"fluss-rs"` field is required so that Cargo resolves the correct package while 
still allowing `use fluss::...` imports.
+
+## Building from Source
+
+### 1. Clone the Repository
+
+```bash
+git clone https://github.com/apache/fluss-rust.git
+cd fluss-rust
+```
+
+### 2. Install Dependencies
+
+The Protobuf compiler (`protoc`) is required to build from source.
+
+#### macOS
+
+```bash
+brew install protobuf
+```
+
+#### Ubuntu/Debian
+
+```bash
+sudo apt-get install protobuf-compiler
+```
+
+### 3. Build the Library
+
+```bash
+cargo build --workspace --all-targets
+```
+
+### 4. Run Tests
+
+```bash
+# Unit tests
+cargo test --workspace
+
+# Integration tests (requires running Fluss cluster)
+RUST_TEST_THREADS=1 cargo test --features integration_tests --workspace
+```
+
+## Connection Setup
+
+```rust
+use fluss::client::FlussConnection;
+use fluss::config::Config;
+use fluss::error::Result;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+    let mut config = Config::default();
+    config.bootstrap_server = "127.0.0.1:9123".to_string();
+
+    let conn = FlussConnection::new(config).await?;
+
+    // Use the connection...
+
+    Ok(())
+}
+```
+
+### Configuration Options
+
+| Option | Description | Default |
+|--------|-------------|---------|
+| `bootstrap_server` | Coordinator server address | `127.0.0.1:9123` |
+| `request_max_size` | Maximum request size in bytes | 10 MB |
+| `writer_acks` | Acknowledgment setting (`all` waits for all replicas) | 
`all` |
+| `writer_retries` | Number of retries on failure | `i32::MAX` |
+| `writer_batch_size` | Batch size for writes | 2 MB |
+
+## Admin Operations
+
+### Get Admin Interface
+
+```rust
+let admin = conn.get_admin().await?;
+```
+
+### Database Operations
+
+```rust
+// Create database
+admin.create_database("my_database", true, None).await?;
+
+// List all databases
+let databases = admin.list_databases().await?;
+println!("Databases: {:?}", databases);
+
+// Check if database exists
+let exists = admin.database_exists("my_database").await?;
+
+// Get database information
+let db_info = admin.get_database_info("my_database").await?;
+
+// Drop database (with cascade option to drop all tables)
+admin.drop_database("my_database", true, false).await?;
+```
+
+### Table Operations
+
+```rust
+use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
+
+// Define table schema
+let table_descriptor = TableDescriptor::builder()
+    .schema(
+        Schema::builder()
+            .column("id", DataTypes::int())
+            .column("name", DataTypes::string())
+            .column("amount", DataTypes::bigint())
+            .build()?,
+    )
+    .build()?;
+
+let table_path = TablePath::new("my_database", "my_table");
+
+// Create table
+admin.create_table(&table_path, &table_descriptor, true).await?;
+
+// Get table information
+let table_info = admin.get_table(&table_path).await?;
+println!("Table: {}", table_info);
+
+// List tables in database
+let tables = admin.list_tables("my_database").await?;
+
+// Check if table exists
+let exists = admin.table_exists(&table_path).await?;
+
+// Drop table
+admin.drop_table(&table_path, true).await?;
+```
+
+### Partition Operations
+
+```rust
+use fluss::metadata::PartitionSpec;
+use std::collections::HashMap;
+
+// List all partitions
+let partitions = admin.list_partition_infos(&table_path).await?;
+
+// List partitions matching a spec
+let mut filter = HashMap::new();
+filter.insert("year", "2024");
+let spec = PartitionSpec::new(filter);
+let partitions = admin.list_partition_infos_with_spec(&table_path, 
Some(&spec)).await?;
+
+// Create partition
+admin.create_partition(&table_path, &spec, true).await?;
+
+// Drop partition
+admin.drop_partition(&table_path, &spec, true).await?;
+```
+
+### Offset Operations
+
+```rust
+use fluss::rpc::message::OffsetSpec;
+
+let bucket_ids = vec![0, 1, 2];
+
+// Get earliest offsets
+let earliest = admin.list_offsets(&table_path, &bucket_ids, 
OffsetSpec::Earliest).await?;
+
+// Get latest offsets
+let latest = admin.list_offsets(&table_path, &bucket_ids, 
OffsetSpec::Latest).await?;
+
+// Get offsets for a specific timestamp
+let timestamp_ms = 1704067200000; // 2024-01-01 00:00:00 UTC
+let offsets = admin.list_offsets(&table_path, &bucket_ids, 
OffsetSpec::Timestamp(timestamp_ms)).await?;
+
+// Get offsets for a specific partition
+let partition_offsets = admin.list_partition_offsets(
+    &table_path,
+    "partition_name",
+    &bucket_ids,
+    OffsetSpec::Latest,
+).await?;
+```
+
+### Lake Snapshot
+
+```rust
+// Get latest lake snapshot for lakehouse integration
+let snapshot = admin.get_latest_lake_snapshot(&table_path).await?;
+println!("Snapshot ID: {}", snapshot.snapshot_id);
+```
+
+## Log Table Operations
+
+Log tables are append-only tables without primary keys, suitable for event 
streaming.
+
+### Creating a Log Table
+
+```rust
+let table_descriptor = TableDescriptor::builder()
+    .schema(
+        Schema::builder()
+            .column("event_id", DataTypes::int())
+            .column("event_type", DataTypes::string())
+            .column("timestamp", DataTypes::bigint())
+            .build()?,
+    )
+    .build()?;
+
+let table_path = TablePath::new("fluss", "events");
+admin.create_table(&table_path, &table_descriptor, true).await?;
+```
+
+### Writing to Log Tables
+
+```rust
+use fluss::row::{GenericRow, InternalRow};
+
+let table = conn.get_table(&table_path).await?;
+let append_writer = table.new_append()?.create_writer()?;
+
+// Write a single row
+let mut row = GenericRow::new(3);
+row.set_field(0, 1);                    // event_id (int)
+row.set_field(1, "user_login");         // event_type (string)
+row.set_field(2, 1704067200000i64);     // timestamp (bigint)
+
+append_writer.append(&row)?;
+
+// Write multiple rows
+let mut row2 = GenericRow::new(3);
+row2.set_field(0, 2);
+row2.set_field(1, "page_view");
+row2.set_field(2, 1704067201000i64);
+
+append_writer.append(&row2)?;
+
+// Flush to ensure data is persisted
+append_writer.flush().await?;
+```
+
+Write operations (`append`, `upsert`, `delete`) use a **fire-and-forget** 
pattern for efficient batching. Each call queues the write and returns a 
`WriteResultFuture` immediately. Call `flush()` to ensure all queued writes are 
sent to the server.
+
+If you need per-record acknowledgment, you can await the returned future:
+
+```rust
+// Per-record acknowledgment (blocks until server confirms)
+append_writer.append(&row)?.await?;
+```
+
+### Reading from Log Tables
+
+```rust
+use std::time::Duration;
+
+let table = conn.get_table(&table_path).await?;
+let log_scanner = table.new_scan().create_log_scanner()?;
+
+// Subscribe to bucket 0 starting from offset 0
+log_scanner.subscribe(0, 0).await?;
+
+// Poll for records
+let records = log_scanner.poll(Duration::from_secs(10)).await?;
+
+for record in records {
+    let row = record.row();
+    println!(
+        "event_id={}, event_type={}, timestamp={} @ offset={}",
+        row.get_int(0),
+        row.get_string(1),
+        row.get_long(2),
+        record.offset()
+    );
+}
+```
+
+### Column Projection
+
+```rust
+// Project specific columns by index
+let scanner = table.new_scan().project(&[0, 2])?.create_log_scanner()?;
+
+// Or project by column names
+let scanner = table.new_scan().project_by_name(&["event_id", 
"timestamp"])?.create_log_scanner()?;
+```
+
+### Subscribe from Specific Offsets
+
+```rust
+use fluss::client::{EARLIEST_OFFSET, LATEST_OFFSET};
+
+// Subscribe from earliest available offset
+log_scanner.subscribe(0, EARLIEST_OFFSET).await?;
+
+// Subscribe from latest offset (only new records)
+log_scanner.subscribe(0, LATEST_OFFSET).await?;
+
+// Subscribe from a specific offset
+log_scanner.subscribe(0, 42).await?;
+
+// Subscribe to all buckets
+let num_buckets = table.get_table_info().get_num_buckets();
+for bucket_id in 0..num_buckets {
+    log_scanner.subscribe(bucket_id, 0).await?;
+}
+```
+
+### Subscribe to Multiple Buckets
+
+```rust
+use std::collections::HashMap;
+
+// Subscribe to multiple buckets at once with specific offsets
+let mut bucket_offsets = HashMap::new();
+bucket_offsets.insert(0, 0i64);    // bucket 0 from offset 0
+bucket_offsets.insert(1, 100i64);  // bucket 1 from offset 100
+log_scanner.subscribe_buckets(&bucket_offsets).await?;

Review Comment:
   `LogScanner` doesn’t have a `subscribe_buckets` method; the API is 
`subscribe_batch(&HashMap<i32, i64>)`. Please update the example to use the 
correct method name.
   ```suggestion
   log_scanner.subscribe_batch(&bucket_offsets).await?;
   ```



##########
docs/rust-client.md:
##########
@@ -0,0 +1,748 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+# Fluss Rust Client Guide
+
+This guide covers how to use the Fluss Rust client for reading and writing 
data to log tables and primary key tables.
+
+## Prerequisites
+
+- Rust 1.85+
+- Protobuf compiler (`protoc`) - only required when [building from 
source](#building-from-source)
+
+## Adding to Your Project
+
+The Fluss Rust client is published to 
[crates.io](https://crates.io/crates/fluss-rs) as `fluss-rs`. The crate's 
library name is `fluss`, so you import it with `use fluss::...`.
+
+```toml
+[dependencies]
+fluss-rs = "0.1"
+tokio = { version = "1", features = ["full"] }
+```
+
+### Feature Flags
+
+The Fluss crate supports optional storage backends:
+
+```toml
+[dependencies]
+# Default: memory and filesystem storage
+fluss-rs = "0.1"
+
+# With S3 storage support
+fluss-rs = { version = "0.1", features = ["storage-s3"] }
+
+# With OSS storage support
+fluss-rs = { version = "0.1", features = ["storage-oss"] }
+
+# All storage backends
+fluss-rs = { version = "0.1", features = ["storage-all"] }
+```
+
+Available features:
+- `storage-memory` (default) - In-memory storage
+- `storage-fs` (default) - Filesystem storage
+- `storage-s3` - Amazon S3 storage
+- `storage-oss` - Alibaba OSS storage
+- `storage-all` - All storage backends
+
+### Alternative: Git or Path Dependency
+
+For development against unreleased changes, you can depend on the Git 
repository or a local checkout:
+
+```toml
+[dependencies]
+# From Git
+fluss = { git = "https://github.com/apache/fluss-rust.git";, package = 
"fluss-rs" }
+
+# From local path
+fluss = { path = "/path/to/fluss-rust/crates/fluss", package = "fluss-rs" }
+```

Review Comment:
   The Git/path dependency examples specify `package = "fluss-rs"`, but the 
workspace package at `crates/fluss` is named `fluss`. As written, these 
examples won’t resolve. Please update `package = ...` (or the dependency key) 
to match the actual Cargo package name intended for users.



##########
docs/rust-client.md:
##########
@@ -0,0 +1,748 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+# Fluss Rust Client Guide
+
+This guide covers how to use the Fluss Rust client for reading and writing 
data to log tables and primary key tables.
+
+## Prerequisites
+
+- Rust 1.85+
+- Protobuf compiler (`protoc`) - only required when [building from 
source](#building-from-source)
+
+## Adding to Your Project
+
+The Fluss Rust client is published to 
[crates.io](https://crates.io/crates/fluss-rs) as `fluss-rs`. The crate's 
library name is `fluss`, so you import it with `use fluss::...`.
+
+```toml
+[dependencies]
+fluss-rs = "0.1"
+tokio = { version = "1", features = ["full"] }
+```
+
+### Feature Flags
+
+The Fluss crate supports optional storage backends:
+
+```toml
+[dependencies]
+# Default: memory and filesystem storage
+fluss-rs = "0.1"
+
+# With S3 storage support
+fluss-rs = { version = "0.1", features = ["storage-s3"] }
+
+# With OSS storage support
+fluss-rs = { version = "0.1", features = ["storage-oss"] }
+
+# All storage backends
+fluss-rs = { version = "0.1", features = ["storage-all"] }
+```
+
+Available features:
+- `storage-memory` (default) - In-memory storage
+- `storage-fs` (default) - Filesystem storage
+- `storage-s3` - Amazon S3 storage
+- `storage-oss` - Alibaba OSS storage
+- `storage-all` - All storage backends
+
+### Alternative: Git or Path Dependency
+
+For development against unreleased changes, you can depend on the Git 
repository or a local checkout:
+
+```toml
+[dependencies]
+# From Git
+fluss = { git = "https://github.com/apache/fluss-rust.git";, package = 
"fluss-rs" }
+
+# From local path
+fluss = { path = "/path/to/fluss-rust/crates/fluss", package = "fluss-rs" }
+```
+
+> **Note:** When using `git` or `path` dependencies, the `package = 
"fluss-rs"` field is required so that Cargo resolves the correct package while 
still allowing `use fluss::...` imports.
+
+## Building from Source
+
+### 1. Clone the Repository
+
+```bash
+git clone https://github.com/apache/fluss-rust.git
+cd fluss-rust
+```
+
+### 2. Install Dependencies
+
+The Protobuf compiler (`protoc`) is required to build from source.
+
+#### macOS
+
+```bash
+brew install protobuf
+```
+
+#### Ubuntu/Debian
+
+```bash
+sudo apt-get install protobuf-compiler
+```
+
+### 3. Build the Library
+
+```bash
+cargo build --workspace --all-targets
+```
+
+### 4. Run Tests
+
+```bash
+# Unit tests
+cargo test --workspace
+
+# Integration tests (requires running Fluss cluster)
+RUST_TEST_THREADS=1 cargo test --features integration_tests --workspace
+```
+
+## Connection Setup
+
+```rust
+use fluss::client::FlussConnection;
+use fluss::config::Config;
+use fluss::error::Result;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+    let mut config = Config::default();
+    config.bootstrap_server = "127.0.0.1:9123".to_string();
+
+    let conn = FlussConnection::new(config).await?;
+
+    // Use the connection...
+
+    Ok(())
+}
+```
+
+### Configuration Options
+
+| Option | Description | Default |
+|--------|-------------|---------|
+| `bootstrap_server` | Coordinator server address | `127.0.0.1:9123` |
+| `request_max_size` | Maximum request size in bytes | 10 MB |
+| `writer_acks` | Acknowledgment setting (`all` waits for all replicas) | 
`all` |
+| `writer_retries` | Number of retries on failure | `i32::MAX` |
+| `writer_batch_size` | Batch size for writes | 2 MB |
+
+## Admin Operations
+
+### Get Admin Interface
+
+```rust
+let admin = conn.get_admin().await?;
+```
+
+### Database Operations
+
+```rust
+// Create database
+admin.create_database("my_database", true, None).await?;
+
+// List all databases
+let databases = admin.list_databases().await?;
+println!("Databases: {:?}", databases);
+
+// Check if database exists
+let exists = admin.database_exists("my_database").await?;
+
+// Get database information
+let db_info = admin.get_database_info("my_database").await?;
+
+// Drop database (with cascade option to drop all tables)
+admin.drop_database("my_database", true, false).await?;
+```
+
+### Table Operations
+
+```rust
+use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
+
+// Define table schema
+let table_descriptor = TableDescriptor::builder()
+    .schema(
+        Schema::builder()
+            .column("id", DataTypes::int())
+            .column("name", DataTypes::string())
+            .column("amount", DataTypes::bigint())
+            .build()?,
+    )
+    .build()?;
+
+let table_path = TablePath::new("my_database", "my_table");
+
+// Create table
+admin.create_table(&table_path, &table_descriptor, true).await?;
+
+// Get table information
+let table_info = admin.get_table(&table_path).await?;
+println!("Table: {}", table_info);
+
+// List tables in database
+let tables = admin.list_tables("my_database").await?;
+
+// Check if table exists
+let exists = admin.table_exists(&table_path).await?;
+
+// Drop table
+admin.drop_table(&table_path, true).await?;
+```
+
+### Partition Operations
+
+```rust
+use fluss::metadata::PartitionSpec;
+use std::collections::HashMap;
+
+// List all partitions
+let partitions = admin.list_partition_infos(&table_path).await?;
+
+// List partitions matching a spec
+let mut filter = HashMap::new();
+filter.insert("year", "2024");
+let spec = PartitionSpec::new(filter);
+let partitions = admin.list_partition_infos_with_spec(&table_path, 
Some(&spec)).await?;
+
+// Create partition
+admin.create_partition(&table_path, &spec, true).await?;
+
+// Drop partition
+admin.drop_partition(&table_path, &spec, true).await?;
+```
+
+### Offset Operations
+
+```rust
+use fluss::rpc::message::OffsetSpec;
+
+let bucket_ids = vec![0, 1, 2];
+
+// Get earliest offsets
+let earliest = admin.list_offsets(&table_path, &bucket_ids, 
OffsetSpec::Earliest).await?;
+
+// Get latest offsets
+let latest = admin.list_offsets(&table_path, &bucket_ids, 
OffsetSpec::Latest).await?;
+
+// Get offsets for a specific timestamp
+let timestamp_ms = 1704067200000; // 2024-01-01 00:00:00 UTC
+let offsets = admin.list_offsets(&table_path, &bucket_ids, 
OffsetSpec::Timestamp(timestamp_ms)).await?;
+
+// Get offsets for a specific partition
+let partition_offsets = admin.list_partition_offsets(
+    &table_path,
+    "partition_name",
+    &bucket_ids,
+    OffsetSpec::Latest,
+).await?;
+```
+
+### Lake Snapshot
+
+```rust
+// Get latest lake snapshot for lakehouse integration
+let snapshot = admin.get_latest_lake_snapshot(&table_path).await?;
+println!("Snapshot ID: {}", snapshot.snapshot_id);
+```
+
+## Log Table Operations
+
+Log tables are append-only tables without primary keys, suitable for event 
streaming.
+
+### Creating a Log Table
+
+```rust
+let table_descriptor = TableDescriptor::builder()
+    .schema(
+        Schema::builder()
+            .column("event_id", DataTypes::int())
+            .column("event_type", DataTypes::string())
+            .column("timestamp", DataTypes::bigint())
+            .build()?,
+    )
+    .build()?;
+
+let table_path = TablePath::new("fluss", "events");
+admin.create_table(&table_path, &table_descriptor, true).await?;
+```
+
+### Writing to Log Tables
+
+```rust
+use fluss::row::{GenericRow, InternalRow};
+
+let table = conn.get_table(&table_path).await?;
+let append_writer = table.new_append()?.create_writer()?;
+
+// Write a single row
+let mut row = GenericRow::new(3);
+row.set_field(0, 1);                    // event_id (int)
+row.set_field(1, "user_login");         // event_type (string)
+row.set_field(2, 1704067200000i64);     // timestamp (bigint)
+
+append_writer.append(&row)?;
+
+// Write multiple rows
+let mut row2 = GenericRow::new(3);
+row2.set_field(0, 2);
+row2.set_field(1, "page_view");
+row2.set_field(2, 1704067201000i64);
+
+append_writer.append(&row2)?;
+
+// Flush to ensure data is persisted
+append_writer.flush().await?;
+```
+
+Write operations (`append`, `upsert`, `delete`) use a **fire-and-forget** 
pattern for efficient batching. Each call queues the write and returns a 
`WriteResultFuture` immediately. Call `flush()` to ensure all queued writes are 
sent to the server.
+
+If you need per-record acknowledgment, you can await the returned future:
+
+```rust
+// Per-record acknowledgment (blocks until server confirms)
+append_writer.append(&row)?.await?;
+```
+
+### Reading from Log Tables
+
+```rust
+use std::time::Duration;
+
+let table = conn.get_table(&table_path).await?;
+let log_scanner = table.new_scan().create_log_scanner()?;
+
+// Subscribe to bucket 0 starting from offset 0
+log_scanner.subscribe(0, 0).await?;
+
+// Poll for records
+let records = log_scanner.poll(Duration::from_secs(10)).await?;
+
+for record in records {
+    let row = record.row();
+    println!(
+        "event_id={}, event_type={}, timestamp={} @ offset={}",
+        row.get_int(0),
+        row.get_string(1),
+        row.get_long(2),
+        record.offset()
+    );
+}
+```
+
+### Column Projection
+
+```rust
+// Project specific columns by index
+let scanner = table.new_scan().project(&[0, 2])?.create_log_scanner()?;
+
+// Or project by column names
+let scanner = table.new_scan().project_by_name(&["event_id", 
"timestamp"])?.create_log_scanner()?;
+```
+
+### Subscribe from Specific Offsets
+
+```rust
+use fluss::client::{EARLIEST_OFFSET, LATEST_OFFSET};
+
+// Subscribe from earliest available offset
+log_scanner.subscribe(0, EARLIEST_OFFSET).await?;
+
+// Subscribe from latest offset (only new records)
+log_scanner.subscribe(0, LATEST_OFFSET).await?;
+
+// Subscribe from a specific offset
+log_scanner.subscribe(0, 42).await?;
+
+// Subscribe to all buckets
+let num_buckets = table.get_table_info().get_num_buckets();
+for bucket_id in 0..num_buckets {
+    log_scanner.subscribe(bucket_id, 0).await?;
+}
+```
+
+### Subscribe to Multiple Buckets
+
+```rust
+use std::collections::HashMap;
+
+// Subscribe to multiple buckets at once with specific offsets
+let mut bucket_offsets = HashMap::new();
+bucket_offsets.insert(0, 0i64);    // bucket 0 from offset 0
+bucket_offsets.insert(1, 100i64);  // bucket 1 from offset 100
+log_scanner.subscribe_buckets(&bucket_offsets).await?;
+```
+
+### Unsubscribe from a Partition
+
+```rust
+// Unsubscribe from a specific partition bucket
+log_scanner.unsubscribe_partition(partition_id, bucket_id).await?;
+```
+
+## Partitioned Log Tables
+
+Partitioned tables distribute data across partitions based on partition column 
values, enabling efficient data organization and querying.
+
+### Creating a Partitioned Log Table
+
+```rust
+use fluss::metadata::{DataTypes, LogFormat, Schema, TableDescriptor, 
TablePath};
+
+let table_descriptor = TableDescriptor::builder()
+    .schema(
+        Schema::builder()
+            .column("event_id", DataTypes::int())
+            .column("event_type", DataTypes::string())
+            .column("dt", DataTypes::string())       // partition column
+            .column("region", DataTypes::string())   // partition column
+            .build()?,
+    )
+    .partitioned_by(vec!["dt", "region"])  // Define partition columns
+    .log_format(LogFormat::ARROW)
+    .build()?;
+
+let table_path = TablePath::new("fluss", "partitioned_events");
+admin.create_table(&table_path, &table_descriptor, true).await?;
+```
+
+### Writing to Partitioned Log Tables
+
+Writing works the same as non-partitioned tables. Include partition column 
values in each row:
+
+```rust
+let table = conn.get_table(&table_path).await?;
+let append_writer = table.new_append()?.create_writer()?;
+
+// Partition column values determine which partition the record goes to
+let mut row = GenericRow::new(4);
+row.set_field(0, 1);                  // event_id
+row.set_field(1, "user_login");       // event_type
+row.set_field(2, "2024-01-15");       // dt (partition column)
+row.set_field(3, "US");               // region (partition column)
+
+append_writer.append(&row)?;
+append_writer.flush().await?;
+```
+
+### Reading from Partitioned Log Tables
+
+For partitioned tables, use `subscribe_partition()` instead of `subscribe()`:
+
+```rust
+use std::time::Duration;
+
+let table = conn.get_table(&table_path).await?;
+let admin = conn.get_admin().await?;
+
+// Get partition information
+let partitions = admin.list_partition_infos(&table_path).await?;
+
+let log_scanner = table.new_scan().create_log_scanner()?;
+
+// Subscribe to each partition's buckets
+for partition_info in &partitions {
+    let partition_id = partition_info.get_partition_id();
+    let num_buckets = table.get_table_info().get_num_buckets();
+
+    for bucket_id in 0..num_buckets {
+        log_scanner.subscribe_partition(partition_id, bucket_id, 0).await?;
+    }
+}
+
+// Poll for records
+let records = log_scanner.poll(Duration::from_secs(10)).await?;
+for record in records {
+    println!("Record from partition: {:?}", record.row());
+}
+```
+
+You can also subscribe to multiple partition-buckets at once:
+
+```rust
+use std::collections::HashMap;
+
+let mut partition_bucket_offsets = HashMap::new();
+partition_bucket_offsets.insert((partition_id, 0), 0i64);  // partition, 
bucket 0, offset 0
+partition_bucket_offsets.insert((partition_id, 1), 0i64);  // partition, 
bucket 1, offset 0
+log_scanner.subscribe_partition_buckets(&partition_bucket_offsets).await?;
+```
+
+### Managing Partitions
+
+```rust
+use fluss::metadata::PartitionSpec;
+use std::collections::HashMap;
+
+// Create a partition
+let mut partition_values = HashMap::new();
+partition_values.insert("dt", "2024-01-15");
+partition_values.insert("region", "EMEA");
+let spec = PartitionSpec::new(partition_values);
+admin.create_partition(&table_path, &spec, true).await?;
+
+// List all partitions
+let partitions = admin.list_partition_infos(&table_path).await?;
+for partition in &partitions {
+    println!(
+        "Partition: id={}, name={}",
+        partition.get_partition_id(),
+        partition.get_partition_name()  // Format: "value1$value2"
+    );
+}
+
+// List partitions with filter (partial spec)
+let mut partial_values = HashMap::new();
+partial_values.insert("dt", "2024-01-15");
+let partial_spec = PartitionSpec::new(partial_values);
+let filtered = admin.list_partition_infos_with_spec(&table_path, 
Some(&partial_spec)).await?;
+
+// Drop a partition
+admin.drop_partition(&table_path, &spec, true).await?;
+```
+
+## Primary Key Table Operations
+
+Primary key tables (KV tables) support upsert, delete, and lookup operations.
+
+### Creating a Primary Key Table
+
+```rust
+let table_descriptor = TableDescriptor::builder()
+    .schema(
+        Schema::builder()
+            .column("id", DataTypes::int())
+            .column("name", DataTypes::string())
+            .column("age", DataTypes::bigint())
+            .primary_key(vec!["id"])  // Define primary key
+            .build()?,
+    )
+    .build()?;
+
+let table_path = TablePath::new("fluss", "users");
+admin.create_table(&table_path, &table_descriptor, true).await?;
+```
+
+### Upserting Records
+
+```rust
+let table = conn.get_table(&table_path).await?;
+let table_upsert = table.new_upsert()?;
+let upsert_writer = table_upsert.create_writer()?;
+
+// Insert or update records
+for (id, name, age) in [(1, "Alice", 25i64), (2, "Bob", 30), (3, "Charlie", 
35)] {
+    let mut row = GenericRow::new(3);
+    row.set_field(0, id);
+    row.set_field(1, name);
+    row.set_field(2, age);
+    upsert_writer.upsert(&row)?;
+}
+upsert_writer.flush().await?;
+```
+
+### Updating Records
+
+```rust
+// Update existing record (same primary key)
+let mut row = GenericRow::new(3);
+row.set_field(0, 1);        // id (primary key)
+row.set_field(1, "Alice");  // name
+row.set_field(2, 26i64);    // Updated age
+
+upsert_writer.upsert(&row)?;
+upsert_writer.flush().await?;
+```
+
+### Deleting Records
+
+```rust
+// Delete by primary key (only primary key field needs to be set)
+let mut row = GenericRow::new(3);
+row.set_field(0, 2);  // id of record to delete
+
+upsert_writer.delete(&row)?;

Review Comment:
   `UpsertWriter::delete` is `async` and takes `&mut self`. The example calls 
`delete` without `.await` and uses an immutable writer, so it won’t compile. 
Please make the writer mutable and await the call.
   ```suggestion
   let mut upsert_writer = upsert_writer;
   upsert_writer.delete(&row).await?;
   ```



##########
docs/rust-client.md:
##########
@@ -0,0 +1,748 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+# Fluss Rust Client Guide
+
+This guide covers how to use the Fluss Rust client for reading and writing 
data to log tables and primary key tables.
+
+## Prerequisites
+
+- Rust 1.85+
+- Protobuf compiler (`protoc`) - only required when [building from 
source](#building-from-source)
+
+## Adding to Your Project
+
+The Fluss Rust client is published to 
[crates.io](https://crates.io/crates/fluss-rs) as `fluss-rs`. The crate's 
library name is `fluss`, so you import it with `use fluss::...`.
+
+```toml
+[dependencies]
+fluss-rs = "0.1"
+tokio = { version = "1", features = ["full"] }
+```
+
+### Feature Flags
+
+The Fluss crate supports optional storage backends:
+
+```toml
+[dependencies]
+# Default: memory and filesystem storage
+fluss-rs = "0.1"
+
+# With S3 storage support
+fluss-rs = { version = "0.1", features = ["storage-s3"] }
+
+# With OSS storage support
+fluss-rs = { version = "0.1", features = ["storage-oss"] }
+
+# All storage backends
+fluss-rs = { version = "0.1", features = ["storage-all"] }
+```
+
+Available features:
+- `storage-memory` (default) - In-memory storage
+- `storage-fs` (default) - Filesystem storage
+- `storage-s3` - Amazon S3 storage
+- `storage-oss` - Alibaba OSS storage
+- `storage-all` - All storage backends
+
+### Alternative: Git or Path Dependency
+
+For development against unreleased changes, you can depend on the Git 
repository or a local checkout:
+
+```toml
+[dependencies]
+# From Git
+fluss = { git = "https://github.com/apache/fluss-rust.git";, package = 
"fluss-rs" }
+
+# From local path
+fluss = { path = "/path/to/fluss-rust/crates/fluss", package = "fluss-rs" }
+```
+
+> **Note:** When using `git` or `path` dependencies, the `package = 
"fluss-rs"` field is required so that Cargo resolves the correct package while 
still allowing `use fluss::...` imports.
+
+## Building from Source
+
+### 1. Clone the Repository
+
+```bash
+git clone https://github.com/apache/fluss-rust.git
+cd fluss-rust
+```
+
+### 2. Install Dependencies
+
+The Protobuf compiler (`protoc`) is required to build from source.
+
+#### macOS
+
+```bash
+brew install protobuf
+```
+
+#### Ubuntu/Debian
+
+```bash
+sudo apt-get install protobuf-compiler
+```
+
+### 3. Build the Library
+
+```bash
+cargo build --workspace --all-targets
+```
+
+### 4. Run Tests
+
+```bash
+# Unit tests
+cargo test --workspace
+
+# Integration tests (requires running Fluss cluster)
+RUST_TEST_THREADS=1 cargo test --features integration_tests --workspace
+```
+
+## Connection Setup
+
+```rust
+use fluss::client::FlussConnection;
+use fluss::config::Config;
+use fluss::error::Result;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+    let mut config = Config::default();
+    config.bootstrap_server = "127.0.0.1:9123".to_string();
+
+    let conn = FlussConnection::new(config).await?;
+
+    // Use the connection...
+
+    Ok(())
+}
+```
+
+### Configuration Options
+
+| Option | Description | Default |
+|--------|-------------|---------|
+| `bootstrap_server` | Coordinator server address | `127.0.0.1:9123` |
+| `request_max_size` | Maximum request size in bytes | 10 MB |
+| `writer_acks` | Acknowledgment setting (`all` waits for all replicas) | 
`all` |
+| `writer_retries` | Number of retries on failure | `i32::MAX` |
+| `writer_batch_size` | Batch size for writes | 2 MB |
+
+## Admin Operations
+
+### Get Admin Interface
+
+```rust
+let admin = conn.get_admin().await?;
+```
+
+### Database Operations
+
+```rust
+// Create database
+admin.create_database("my_database", true, None).await?;
+
+// List all databases
+let databases = admin.list_databases().await?;
+println!("Databases: {:?}", databases);
+
+// Check if database exists
+let exists = admin.database_exists("my_database").await?;
+
+// Get database information
+let db_info = admin.get_database_info("my_database").await?;
+
+// Drop database (with cascade option to drop all tables)
+admin.drop_database("my_database", true, false).await?;
+```
+
+### Table Operations
+
+```rust
+use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
+
+// Define table schema
+let table_descriptor = TableDescriptor::builder()
+    .schema(
+        Schema::builder()
+            .column("id", DataTypes::int())
+            .column("name", DataTypes::string())
+            .column("amount", DataTypes::bigint())
+            .build()?,
+    )
+    .build()?;
+
+let table_path = TablePath::new("my_database", "my_table");
+
+// Create table
+admin.create_table(&table_path, &table_descriptor, true).await?;
+
+// Get table information
+let table_info = admin.get_table(&table_path).await?;
+println!("Table: {}", table_info);
+
+// List tables in database
+let tables = admin.list_tables("my_database").await?;
+
+// Check if table exists
+let exists = admin.table_exists(&table_path).await?;
+
+// Drop table
+admin.drop_table(&table_path, true).await?;
+```
+
+### Partition Operations
+
+```rust
+use fluss::metadata::PartitionSpec;
+use std::collections::HashMap;
+
+// List all partitions
+let partitions = admin.list_partition_infos(&table_path).await?;
+
+// List partitions matching a spec
+let mut filter = HashMap::new();
+filter.insert("year", "2024");
+let spec = PartitionSpec::new(filter);
+let partitions = admin.list_partition_infos_with_spec(&table_path, 
Some(&spec)).await?;
+
+// Create partition
+admin.create_partition(&table_path, &spec, true).await?;
+
+// Drop partition
+admin.drop_partition(&table_path, &spec, true).await?;
+```
+
+### Offset Operations
+
+```rust
+use fluss::rpc::message::OffsetSpec;
+
+let bucket_ids = vec![0, 1, 2];
+
+// Get earliest offsets
+let earliest = admin.list_offsets(&table_path, &bucket_ids, 
OffsetSpec::Earliest).await?;
+
+// Get latest offsets
+let latest = admin.list_offsets(&table_path, &bucket_ids, 
OffsetSpec::Latest).await?;
+
+// Get offsets for a specific timestamp
+let timestamp_ms = 1704067200000; // 2024-01-01 00:00:00 UTC
+let offsets = admin.list_offsets(&table_path, &bucket_ids, 
OffsetSpec::Timestamp(timestamp_ms)).await?;
+
+// Get offsets for a specific partition
+let partition_offsets = admin.list_partition_offsets(
+    &table_path,
+    "partition_name",
+    &bucket_ids,
+    OffsetSpec::Latest,
+).await?;
+```
+
+### Lake Snapshot
+
+```rust
+// Get latest lake snapshot for lakehouse integration
+let snapshot = admin.get_latest_lake_snapshot(&table_path).await?;
+println!("Snapshot ID: {}", snapshot.snapshot_id);
+```
+
+## Log Table Operations
+
+Log tables are append-only tables without primary keys, suitable for event 
streaming.
+
+### Creating a Log Table
+
+```rust
+let table_descriptor = TableDescriptor::builder()
+    .schema(
+        Schema::builder()
+            .column("event_id", DataTypes::int())
+            .column("event_type", DataTypes::string())
+            .column("timestamp", DataTypes::bigint())
+            .build()?,
+    )
+    .build()?;
+
+let table_path = TablePath::new("fluss", "events");
+admin.create_table(&table_path, &table_descriptor, true).await?;
+```
+
+### Writing to Log Tables
+
+```rust
+use fluss::row::{GenericRow, InternalRow};
+
+let table = conn.get_table(&table_path).await?;
+let append_writer = table.new_append()?.create_writer()?;
+
+// Write a single row
+let mut row = GenericRow::new(3);
+row.set_field(0, 1);                    // event_id (int)
+row.set_field(1, "user_login");         // event_type (string)
+row.set_field(2, 1704067200000i64);     // timestamp (bigint)
+
+append_writer.append(&row)?;
+
+// Write multiple rows
+let mut row2 = GenericRow::new(3);
+row2.set_field(0, 2);
+row2.set_field(1, "page_view");
+row2.set_field(2, 1704067201000i64);
+
+append_writer.append(&row2)?;
+
+// Flush to ensure data is persisted
+append_writer.flush().await?;
+```
+
+Write operations (`append`, `upsert`, `delete`) use a **fire-and-forget** 
pattern for efficient batching. Each call queues the write and returns a 
`WriteResultFuture` immediately. Call `flush()` to ensure all queued writes are 
sent to the server.
+
+If you need per-record acknowledgment, you can await the returned future:
+
+```rust
+// Per-record acknowledgment (blocks until server confirms)
+append_writer.append(&row)?.await?;

Review Comment:
   This section describes a fire-and-forget API returning a 
`WriteResultFuture`, but the current writer APIs (`AppendWriter::append`, 
`UpsertWriter::upsert/delete`) await the send+ack internally and return 
`Result`/`UpsertResult`/`DeleteResult` directly. Please update the narrative 
and the per-record acknowledgment example to match the actual API.
   ```suggestion
   Write operations (`append`, `upsert`, `delete`) send the record and await 
the server acknowledgment internally. Each call returns a 
`Result`/`UpsertResult`/`DeleteResult` directly. Call `flush()` to ensure any 
buffered data is fully persisted.
   
   If you need per-record acknowledgment, handle the result of each write call:
   
   ```rust
   // Per-record acknowledgment (handle the result of each write)
   let result = append_writer.append(&row2)?;
   ```



##########
docs/rust-client.md:
##########
@@ -0,0 +1,748 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+# Fluss Rust Client Guide
+
+This guide covers how to use the Fluss Rust client for reading and writing 
data to log tables and primary key tables.
+
+## Prerequisites
+
+- Rust 1.85+
+- Protobuf compiler (`protoc`) - only required when [building from 
source](#building-from-source)

Review Comment:
   `protoc` is not only needed when building from this repository. The `fluss` 
crate runs `prost-build` in `crates/fluss/build.rs`, so downstream users 
building from crates.io will also need `protoc` installed unless the build is 
changed to use a vendored/provided protoc or checked-in generated code. Please 
update the prerequisite wording accordingly.
   ```suggestion
   - Protobuf compiler (`protoc`) - required to build the Fluss Rust client and 
any project that depends on it (including when using the crate from crates.io)
   ```



##########
docs/rust-client.md:
##########
@@ -0,0 +1,748 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+# Fluss Rust Client Guide
+
+This guide covers how to use the Fluss Rust client for reading and writing 
data to log tables and primary key tables.
+
+## Prerequisites
+
+- Rust 1.85+
+- Protobuf compiler (`protoc`) - only required when [building from 
source](#building-from-source)
+
+## Adding to Your Project
+
+The Fluss Rust client is published to 
[crates.io](https://crates.io/crates/fluss-rs) as `fluss-rs`. The crate's 
library name is `fluss`, so you import it with `use fluss::...`.
+
+```toml
+[dependencies]
+fluss-rs = "0.1"
+tokio = { version = "1", features = ["full"] }
+```
+
+### Feature Flags
+
+The Fluss crate supports optional storage backends:
+
+```toml
+[dependencies]
+# Default: memory and filesystem storage
+fluss-rs = "0.1"
+
+# With S3 storage support
+fluss-rs = { version = "0.1", features = ["storage-s3"] }
+
+# With OSS storage support
+fluss-rs = { version = "0.1", features = ["storage-oss"] }
+
+# All storage backends
+fluss-rs = { version = "0.1", features = ["storage-all"] }
+```
+
+Available features:
+- `storage-memory` (default) - In-memory storage
+- `storage-fs` (default) - Filesystem storage
+- `storage-s3` - Amazon S3 storage
+- `storage-oss` - Alibaba OSS storage
+- `storage-all` - All storage backends
+
+### Alternative: Git or Path Dependency
+
+For development against unreleased changes, you can depend on the Git 
repository or a local checkout:
+
+```toml
+[dependencies]
+# From Git
+fluss = { git = "https://github.com/apache/fluss-rust.git";, package = 
"fluss-rs" }
+
+# From local path
+fluss = { path = "/path/to/fluss-rust/crates/fluss", package = "fluss-rs" }
+```
+
+> **Note:** When using `git` or `path` dependencies, the `package = 
"fluss-rs"` field is required so that Cargo resolves the correct package while 
still allowing `use fluss::...` imports.
+
+## Building from Source
+
+### 1. Clone the Repository
+
+```bash
+git clone https://github.com/apache/fluss-rust.git
+cd fluss-rust
+```
+
+### 2. Install Dependencies
+
+The Protobuf compiler (`protoc`) is required to build from source.
+
+#### macOS
+
+```bash
+brew install protobuf
+```
+
+#### Ubuntu/Debian
+
+```bash
+sudo apt-get install protobuf-compiler
+```
+
+### 3. Build the Library
+
+```bash
+cargo build --workspace --all-targets
+```
+
+### 4. Run Tests
+
+```bash
+# Unit tests
+cargo test --workspace
+
+# Integration tests (requires running Fluss cluster)
+RUST_TEST_THREADS=1 cargo test --features integration_tests --workspace
+```
+
+## Connection Setup
+
+```rust
+use fluss::client::FlussConnection;
+use fluss::config::Config;
+use fluss::error::Result;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+    let mut config = Config::default();
+    config.bootstrap_server = "127.0.0.1:9123".to_string();
+
+    let conn = FlussConnection::new(config).await?;
+
+    // Use the connection...
+
+    Ok(())
+}
+```
+
+### Configuration Options
+
+| Option | Description | Default |
+|--------|-------------|---------|
+| `bootstrap_server` | Coordinator server address | `127.0.0.1:9123` |
+| `request_max_size` | Maximum request size in bytes | 10 MB |
+| `writer_acks` | Acknowledgment setting (`all` waits for all replicas) | 
`all` |
+| `writer_retries` | Number of retries on failure | `i32::MAX` |
+| `writer_batch_size` | Batch size for writes | 2 MB |
+
+## Admin Operations
+
+### Get Admin Interface
+
+```rust
+let admin = conn.get_admin().await?;
+```
+
+### Database Operations
+
+```rust
+// Create database
+admin.create_database("my_database", true, None).await?;
+
+// List all databases
+let databases = admin.list_databases().await?;
+println!("Databases: {:?}", databases);
+
+// Check if database exists
+let exists = admin.database_exists("my_database").await?;
+
+// Get database information
+let db_info = admin.get_database_info("my_database").await?;
+
+// Drop database (with cascade option to drop all tables)
+admin.drop_database("my_database", true, false).await?;
+```
+
+### Table Operations
+
+```rust
+use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
+
+// Define table schema
+let table_descriptor = TableDescriptor::builder()
+    .schema(
+        Schema::builder()
+            .column("id", DataTypes::int())
+            .column("name", DataTypes::string())
+            .column("amount", DataTypes::bigint())
+            .build()?,
+    )
+    .build()?;
+
+let table_path = TablePath::new("my_database", "my_table");
+
+// Create table
+admin.create_table(&table_path, &table_descriptor, true).await?;
+
+// Get table information
+let table_info = admin.get_table(&table_path).await?;
+println!("Table: {}", table_info);
+
+// List tables in database
+let tables = admin.list_tables("my_database").await?;
+
+// Check if table exists
+let exists = admin.table_exists(&table_path).await?;
+
+// Drop table
+admin.drop_table(&table_path, true).await?;
+```
+
+### Partition Operations
+
+```rust
+use fluss::metadata::PartitionSpec;
+use std::collections::HashMap;
+
+// List all partitions
+let partitions = admin.list_partition_infos(&table_path).await?;
+
+// List partitions matching a spec
+let mut filter = HashMap::new();
+filter.insert("year", "2024");
+let spec = PartitionSpec::new(filter);
+let partitions = admin.list_partition_infos_with_spec(&table_path, 
Some(&spec)).await?;
+
+// Create partition
+admin.create_partition(&table_path, &spec, true).await?;
+
+// Drop partition
+admin.drop_partition(&table_path, &spec, true).await?;
+```
+
+### Offset Operations
+
+```rust
+use fluss::rpc::message::OffsetSpec;
+
+let bucket_ids = vec![0, 1, 2];
+
+// Get earliest offsets
+let earliest = admin.list_offsets(&table_path, &bucket_ids, 
OffsetSpec::Earliest).await?;
+
+// Get latest offsets
+let latest = admin.list_offsets(&table_path, &bucket_ids, 
OffsetSpec::Latest).await?;
+
+// Get offsets for a specific timestamp
+let timestamp_ms = 1704067200000; // 2024-01-01 00:00:00 UTC
+let offsets = admin.list_offsets(&table_path, &bucket_ids, 
OffsetSpec::Timestamp(timestamp_ms)).await?;
+
+// Get offsets for a specific partition
+let partition_offsets = admin.list_partition_offsets(
+    &table_path,
+    "partition_name",
+    &bucket_ids,
+    OffsetSpec::Latest,
+).await?;
+```
+
+### Lake Snapshot
+
+```rust
+// Get latest lake snapshot for lakehouse integration
+let snapshot = admin.get_latest_lake_snapshot(&table_path).await?;
+println!("Snapshot ID: {}", snapshot.snapshot_id);
+```
+
+## Log Table Operations
+
+Log tables are append-only tables without primary keys, suitable for event 
streaming.
+
+### Creating a Log Table
+
+```rust
+let table_descriptor = TableDescriptor::builder()
+    .schema(
+        Schema::builder()
+            .column("event_id", DataTypes::int())
+            .column("event_type", DataTypes::string())
+            .column("timestamp", DataTypes::bigint())
+            .build()?,
+    )
+    .build()?;
+
+let table_path = TablePath::new("fluss", "events");
+admin.create_table(&table_path, &table_descriptor, true).await?;
+```
+
+### Writing to Log Tables
+
+```rust
+use fluss::row::{GenericRow, InternalRow};
+
+let table = conn.get_table(&table_path).await?;
+let append_writer = table.new_append()?.create_writer()?;
+
+// Write a single row
+let mut row = GenericRow::new(3);
+row.set_field(0, 1);                    // event_id (int)
+row.set_field(1, "user_login");         // event_type (string)
+row.set_field(2, 1704067200000i64);     // timestamp (bigint)
+
+append_writer.append(&row)?;
+
+// Write multiple rows
+let mut row2 = GenericRow::new(3);
+row2.set_field(0, 2);
+row2.set_field(1, "page_view");
+row2.set_field(2, 1704067201000i64);
+
+append_writer.append(&row2)?;
+
+// Flush to ensure data is persisted
+append_writer.flush().await?;
+```
+
+Write operations (`append`, `upsert`, `delete`) use a **fire-and-forget** 
pattern for efficient batching. Each call queues the write and returns a 
`WriteResultFuture` immediately. Call `flush()` to ensure all queued writes are 
sent to the server.
+
+If you need per-record acknowledgment, you can await the returned future:
+
+```rust
+// Per-record acknowledgment (blocks until server confirms)
+append_writer.append(&row)?.await?;
+```
+
+### Reading from Log Tables
+
+```rust
+use std::time::Duration;
+
+let table = conn.get_table(&table_path).await?;
+let log_scanner = table.new_scan().create_log_scanner()?;
+
+// Subscribe to bucket 0 starting from offset 0
+log_scanner.subscribe(0, 0).await?;
+
+// Poll for records
+let records = log_scanner.poll(Duration::from_secs(10)).await?;
+
+for record in records {
+    let row = record.row();
+    println!(
+        "event_id={}, event_type={}, timestamp={} @ offset={}",
+        row.get_int(0),
+        row.get_string(1),
+        row.get_long(2),
+        record.offset()
+    );

Review Comment:
   `log_scanner.poll(...)` returns `ScanRecords`, which is a bucket->records 
map wrapper and doesn’t implement `IntoIterator` over `ScanRecord`. The `for 
record in records { ... }` loop won’t compile; you need to iterate through 
`records.records_by_buckets()` / `into_records_by_buckets()` and then over each 
bucket’s `Vec<ScanRecord>`.
   ```suggestion
   for (_bucket, bucket_records) in records.records_by_buckets() {
       for record in bucket_records {
           let row = record.row();
           println!(
               "event_id={}, event_type={}, timestamp={} @ offset={}",
               row.get_int(0),
               row.get_string(1),
               row.get_long(2),
               record.offset()
           );
       }
   ```



##########
docs/rust-client.md:
##########
@@ -0,0 +1,748 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+# Fluss Rust Client Guide
+
+This guide covers how to use the Fluss Rust client for reading and writing 
data to log tables and primary key tables.
+
+## Prerequisites
+
+- Rust 1.85+
+- Protobuf compiler (`protoc`) - only required when [building from 
source](#building-from-source)
+
+## Adding to Your Project
+
+The Fluss Rust client is published to 
[crates.io](https://crates.io/crates/fluss-rs) as `fluss-rs`. The crate's 
library name is `fluss`, so you import it with `use fluss::...`.
+
+```toml
+[dependencies]
+fluss-rs = "0.1"
+tokio = { version = "1", features = ["full"] }
+```
+
+### Feature Flags
+
+The Fluss crate supports optional storage backends:
+
+```toml
+[dependencies]
+# Default: memory and filesystem storage
+fluss-rs = "0.1"
+
+# With S3 storage support
+fluss-rs = { version = "0.1", features = ["storage-s3"] }
+
+# With OSS storage support
+fluss-rs = { version = "0.1", features = ["storage-oss"] }
+
+# All storage backends
+fluss-rs = { version = "0.1", features = ["storage-all"] }
+```
+
+Available features:
+- `storage-memory` (default) - In-memory storage
+- `storage-fs` (default) - Filesystem storage
+- `storage-s3` - Amazon S3 storage
+- `storage-oss` - Alibaba OSS storage
+- `storage-all` - All storage backends
+
+### Alternative: Git or Path Dependency
+
+For development against unreleased changes, you can depend on the Git 
repository or a local checkout:
+
+```toml
+[dependencies]
+# From Git
+fluss = { git = "https://github.com/apache/fluss-rust.git";, package = 
"fluss-rs" }
+
+# From local path
+fluss = { path = "/path/to/fluss-rust/crates/fluss", package = "fluss-rs" }
+```
+
+> **Note:** When using `git` or `path` dependencies, the `package = 
"fluss-rs"` field is required so that Cargo resolves the correct package while 
still allowing `use fluss::...` imports.
+
+## Building from Source
+
+### 1. Clone the Repository
+
+```bash
+git clone https://github.com/apache/fluss-rust.git
+cd fluss-rust
+```
+
+### 2. Install Dependencies
+
+The Protobuf compiler (`protoc`) is required to build from source.
+
+#### macOS
+
+```bash
+brew install protobuf
+```
+
+#### Ubuntu/Debian
+
+```bash
+sudo apt-get install protobuf-compiler
+```
+
+### 3. Build the Library
+
+```bash
+cargo build --workspace --all-targets
+```
+
+### 4. Run Tests
+
+```bash
+# Unit tests
+cargo test --workspace
+
+# Integration tests (requires running Fluss cluster)
+RUST_TEST_THREADS=1 cargo test --features integration_tests --workspace

Review Comment:
   The integration test command uses `--features integration_tests 
--workspace`, but only the `fluss` crate defines the `integration_tests` 
feature (other workspace members like `fluss-examples`/bindings don’t). In a 
virtual workspace this commonly fails with “feature not found” for the other 
packages. Prefer scoping the feature to the `fluss` package (e.g., `cargo test 
-p fluss --features integration_tests`) or document a command that works for 
the whole workspace.
   ```suggestion
   RUST_TEST_THREADS=1 cargo test -p fluss --features integration_tests
   ```



##########
docs/rust-client.md:
##########
@@ -0,0 +1,748 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+# Fluss Rust Client Guide
+
+This guide covers how to use the Fluss Rust client for reading and writing 
data to log tables and primary key tables.
+
+## Prerequisites
+
+- Rust 1.85+
+- Protobuf compiler (`protoc`) - only required when [building from 
source](#building-from-source)
+
+## Adding to Your Project
+
+The Fluss Rust client is published to 
[crates.io](https://crates.io/crates/fluss-rs) as `fluss-rs`. The crate's 
library name is `fluss`, so you import it with `use fluss::...`.
+
+```toml
+[dependencies]
+fluss-rs = "0.1"
+tokio = { version = "1", features = ["full"] }
+```
+
+### Feature Flags
+
+The Fluss crate supports optional storage backends:
+
+```toml
+[dependencies]
+# Default: memory and filesystem storage
+fluss-rs = "0.1"
+
+# With S3 storage support
+fluss-rs = { version = "0.1", features = ["storage-s3"] }
+
+# With OSS storage support
+fluss-rs = { version = "0.1", features = ["storage-oss"] }
+
+# All storage backends
+fluss-rs = { version = "0.1", features = ["storage-all"] }
+```
+
+Available features:
+- `storage-memory` (default) - In-memory storage
+- `storage-fs` (default) - Filesystem storage
+- `storage-s3` - Amazon S3 storage
+- `storage-oss` - Alibaba OSS storage
+- `storage-all` - All storage backends
+
+### Alternative: Git or Path Dependency
+
+For development against unreleased changes, you can depend on the Git 
repository or a local checkout:
+
+```toml
+[dependencies]
+# From Git
+fluss = { git = "https://github.com/apache/fluss-rust.git";, package = 
"fluss-rs" }
+
+# From local path
+fluss = { path = "/path/to/fluss-rust/crates/fluss", package = "fluss-rs" }
+```
+
+> **Note:** When using `git` or `path` dependencies, the `package = 
"fluss-rs"` field is required so that Cargo resolves the correct package while 
still allowing `use fluss::...` imports.
+
+## Building from Source
+
+### 1. Clone the Repository
+
+```bash
+git clone https://github.com/apache/fluss-rust.git
+cd fluss-rust
+```
+
+### 2. Install Dependencies
+
+The Protobuf compiler (`protoc`) is required to build from source.
+
+#### macOS
+
+```bash
+brew install protobuf
+```
+
+#### Ubuntu/Debian
+
+```bash
+sudo apt-get install protobuf-compiler
+```
+
+### 3. Build the Library
+
+```bash
+cargo build --workspace --all-targets
+```
+
+### 4. Run Tests
+
+```bash
+# Unit tests
+cargo test --workspace
+
+# Integration tests (requires running Fluss cluster)
+RUST_TEST_THREADS=1 cargo test --features integration_tests --workspace
+```
+
+## Connection Setup
+
+```rust
+use fluss::client::FlussConnection;
+use fluss::config::Config;
+use fluss::error::Result;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+    let mut config = Config::default();
+    config.bootstrap_server = "127.0.0.1:9123".to_string();
+
+    let conn = FlussConnection::new(config).await?;
+
+    // Use the connection...
+
+    Ok(())
+}
+```
+
+### Configuration Options
+
+| Option | Description | Default |
+|--------|-------------|---------|
+| `bootstrap_server` | Coordinator server address | `127.0.0.1:9123` |
+| `request_max_size` | Maximum request size in bytes | 10 MB |
+| `writer_acks` | Acknowledgment setting (`all` waits for all replicas) | 
`all` |
+| `writer_retries` | Number of retries on failure | `i32::MAX` |
+| `writer_batch_size` | Batch size for writes | 2 MB |
+
+## Admin Operations
+
+### Get Admin Interface
+
+```rust
+let admin = conn.get_admin().await?;
+```
+
+### Database Operations
+
+```rust
+// Create database
+admin.create_database("my_database", true, None).await?;
+
+// List all databases
+let databases = admin.list_databases().await?;
+println!("Databases: {:?}", databases);
+
+// Check if database exists
+let exists = admin.database_exists("my_database").await?;
+
+// Get database information
+let db_info = admin.get_database_info("my_database").await?;
+
+// Drop database (with cascade option to drop all tables)
+admin.drop_database("my_database", true, false).await?;
+```
+
+### Table Operations
+
+```rust
+use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
+
+// Define table schema
+let table_descriptor = TableDescriptor::builder()
+    .schema(
+        Schema::builder()
+            .column("id", DataTypes::int())
+            .column("name", DataTypes::string())
+            .column("amount", DataTypes::bigint())
+            .build()?,
+    )
+    .build()?;
+
+let table_path = TablePath::new("my_database", "my_table");
+
+// Create table
+admin.create_table(&table_path, &table_descriptor, true).await?;
+
+// Get table information
+let table_info = admin.get_table(&table_path).await?;
+println!("Table: {}", table_info);
+
+// List tables in database
+let tables = admin.list_tables("my_database").await?;
+
+// Check if table exists
+let exists = admin.table_exists(&table_path).await?;
+
+// Drop table
+admin.drop_table(&table_path, true).await?;
+```
+
+### Partition Operations
+
+```rust
+use fluss::metadata::PartitionSpec;
+use std::collections::HashMap;
+
+// List all partitions
+let partitions = admin.list_partition_infos(&table_path).await?;
+
+// List partitions matching a spec
+let mut filter = HashMap::new();
+filter.insert("year", "2024");
+let spec = PartitionSpec::new(filter);
+let partitions = admin.list_partition_infos_with_spec(&table_path, 
Some(&spec)).await?;
+
+// Create partition
+admin.create_partition(&table_path, &spec, true).await?;
+
+// Drop partition
+admin.drop_partition(&table_path, &spec, true).await?;
+```
+
+### Offset Operations
+
+```rust
+use fluss::rpc::message::OffsetSpec;
+
+let bucket_ids = vec![0, 1, 2];
+
+// Get earliest offsets
+let earliest = admin.list_offsets(&table_path, &bucket_ids, 
OffsetSpec::Earliest).await?;
+
+// Get latest offsets
+let latest = admin.list_offsets(&table_path, &bucket_ids, 
OffsetSpec::Latest).await?;
+
+// Get offsets for a specific timestamp
+let timestamp_ms = 1704067200000; // 2024-01-01 00:00:00 UTC
+let offsets = admin.list_offsets(&table_path, &bucket_ids, 
OffsetSpec::Timestamp(timestamp_ms)).await?;
+
+// Get offsets for a specific partition
+let partition_offsets = admin.list_partition_offsets(
+    &table_path,
+    "partition_name",
+    &bucket_ids,
+    OffsetSpec::Latest,
+).await?;
+```
+
+### Lake Snapshot
+
+```rust
+// Get latest lake snapshot for lakehouse integration
+let snapshot = admin.get_latest_lake_snapshot(&table_path).await?;
+println!("Snapshot ID: {}", snapshot.snapshot_id);
+```
+
+## Log Table Operations
+
+Log tables are append-only tables without primary keys, suitable for event 
streaming.
+
+### Creating a Log Table
+
+```rust
+let table_descriptor = TableDescriptor::builder()
+    .schema(
+        Schema::builder()
+            .column("event_id", DataTypes::int())
+            .column("event_type", DataTypes::string())
+            .column("timestamp", DataTypes::bigint())
+            .build()?,
+    )
+    .build()?;
+
+let table_path = TablePath::new("fluss", "events");
+admin.create_table(&table_path, &table_descriptor, true).await?;
+```
+
+### Writing to Log Tables
+
+```rust
+use fluss::row::{GenericRow, InternalRow};
+
+let table = conn.get_table(&table_path).await?;
+let append_writer = table.new_append()?.create_writer()?;
+
+// Write a single row
+let mut row = GenericRow::new(3);
+row.set_field(0, 1);                    // event_id (int)
+row.set_field(1, "user_login");         // event_type (string)
+row.set_field(2, 1704067200000i64);     // timestamp (bigint)
+
+append_writer.append(&row)?;
+
+// Write multiple rows
+let mut row2 = GenericRow::new(3);
+row2.set_field(0, 2);
+row2.set_field(1, "page_view");
+row2.set_field(2, 1704067201000i64);
+
+append_writer.append(&row2)?;
+
+// Flush to ensure data is persisted
+append_writer.flush().await?;
+```
+
+Write operations (`append`, `upsert`, `delete`) use a **fire-and-forget** 
pattern for efficient batching. Each call queues the write and returns a 
`WriteResultFuture` immediately. Call `flush()` to ensure all queued writes are 
sent to the server.
+
+If you need per-record acknowledgment, you can await the returned future:
+
+```rust
+// Per-record acknowledgment (blocks until server confirms)
+append_writer.append(&row)?.await?;
+```
+
+### Reading from Log Tables
+
+```rust
+use std::time::Duration;
+
+let table = conn.get_table(&table_path).await?;
+let log_scanner = table.new_scan().create_log_scanner()?;
+
+// Subscribe to bucket 0 starting from offset 0
+log_scanner.subscribe(0, 0).await?;
+
+// Poll for records
+let records = log_scanner.poll(Duration::from_secs(10)).await?;
+
+for record in records {
+    let row = record.row();
+    println!(
+        "event_id={}, event_type={}, timestamp={} @ offset={}",
+        row.get_int(0),
+        row.get_string(1),
+        row.get_long(2),
+        record.offset()
+    );
+}
+```
+
+### Column Projection
+
+```rust
+// Project specific columns by index
+let scanner = table.new_scan().project(&[0, 2])?.create_log_scanner()?;
+
+// Or project by column names
+let scanner = table.new_scan().project_by_name(&["event_id", 
"timestamp"])?.create_log_scanner()?;
+```
+
+### Subscribe from Specific Offsets
+
+```rust
+use fluss::client::{EARLIEST_OFFSET, LATEST_OFFSET};
+
+// Subscribe from earliest available offset
+log_scanner.subscribe(0, EARLIEST_OFFSET).await?;
+
+// Subscribe from latest offset (only new records)
+log_scanner.subscribe(0, LATEST_OFFSET).await?;
+
+// Subscribe from a specific offset
+log_scanner.subscribe(0, 42).await?;
+
+// Subscribe to all buckets
+let num_buckets = table.get_table_info().get_num_buckets();
+for bucket_id in 0..num_buckets {
+    log_scanner.subscribe(bucket_id, 0).await?;
+}
+```
+
+### Subscribe to Multiple Buckets
+
+```rust
+use std::collections::HashMap;
+
+// Subscribe to multiple buckets at once with specific offsets
+let mut bucket_offsets = HashMap::new();
+bucket_offsets.insert(0, 0i64);    // bucket 0 from offset 0
+bucket_offsets.insert(1, 100i64);  // bucket 1 from offset 100
+log_scanner.subscribe_buckets(&bucket_offsets).await?;
+```
+
+### Unsubscribe from a Partition
+
+```rust
+// Unsubscribe from a specific partition bucket
+log_scanner.unsubscribe_partition(partition_id, bucket_id).await?;
+```
+
+## Partitioned Log Tables
+
+Partitioned tables distribute data across partitions based on partition column 
values, enabling efficient data organization and querying.
+
+### Creating a Partitioned Log Table
+
+```rust
+use fluss::metadata::{DataTypes, LogFormat, Schema, TableDescriptor, 
TablePath};
+
+let table_descriptor = TableDescriptor::builder()
+    .schema(
+        Schema::builder()
+            .column("event_id", DataTypes::int())
+            .column("event_type", DataTypes::string())
+            .column("dt", DataTypes::string())       // partition column
+            .column("region", DataTypes::string())   // partition column
+            .build()?,
+    )
+    .partitioned_by(vec!["dt", "region"])  // Define partition columns
+    .log_format(LogFormat::ARROW)
+    .build()?;
+
+let table_path = TablePath::new("fluss", "partitioned_events");
+admin.create_table(&table_path, &table_descriptor, true).await?;
+```
+
+### Writing to Partitioned Log Tables
+
+Writing works the same as non-partitioned tables. Include partition column 
values in each row:
+
+```rust
+let table = conn.get_table(&table_path).await?;
+let append_writer = table.new_append()?.create_writer()?;
+
+// Partition column values determine which partition the record goes to
+let mut row = GenericRow::new(4);
+row.set_field(0, 1);                  // event_id
+row.set_field(1, "user_login");       // event_type
+row.set_field(2, "2024-01-15");       // dt (partition column)
+row.set_field(3, "US");               // region (partition column)
+
+append_writer.append(&row)?;
+append_writer.flush().await?;
+```
+
+### Reading from Partitioned Log Tables
+
+For partitioned tables, use `subscribe_partition()` instead of `subscribe()`:
+
+```rust
+use std::time::Duration;
+
+let table = conn.get_table(&table_path).await?;
+let admin = conn.get_admin().await?;
+
+// Get partition information
+let partitions = admin.list_partition_infos(&table_path).await?;
+
+let log_scanner = table.new_scan().create_log_scanner()?;
+
+// Subscribe to each partition's buckets
+for partition_info in &partitions {
+    let partition_id = partition_info.get_partition_id();
+    let num_buckets = table.get_table_info().get_num_buckets();
+
+    for bucket_id in 0..num_buckets {
+        log_scanner.subscribe_partition(partition_id, bucket_id, 0).await?;
+    }
+}
+
+// Poll for records
+let records = log_scanner.poll(Duration::from_secs(10)).await?;
+for record in records {
+    println!("Record from partition: {:?}", record.row());
+}
+```
+
+You can also subscribe to multiple partition-buckets at once:
+
+```rust
+use std::collections::HashMap;
+
+let mut partition_bucket_offsets = HashMap::new();
+partition_bucket_offsets.insert((partition_id, 0), 0i64);  // partition, 
bucket 0, offset 0
+partition_bucket_offsets.insert((partition_id, 1), 0i64);  // partition, 
bucket 1, offset 0
+log_scanner.subscribe_partition_buckets(&partition_bucket_offsets).await?;
+```
+
+### Managing Partitions
+
+```rust
+use fluss::metadata::PartitionSpec;
+use std::collections::HashMap;
+
+// Create a partition
+let mut partition_values = HashMap::new();
+partition_values.insert("dt", "2024-01-15");
+partition_values.insert("region", "EMEA");
+let spec = PartitionSpec::new(partition_values);
+admin.create_partition(&table_path, &spec, true).await?;
+
+// List all partitions
+let partitions = admin.list_partition_infos(&table_path).await?;
+for partition in &partitions {
+    println!(
+        "Partition: id={}, name={}",
+        partition.get_partition_id(),
+        partition.get_partition_name()  // Format: "value1$value2"
+    );
+}
+
+// List partitions with filter (partial spec)
+let mut partial_values = HashMap::new();
+partial_values.insert("dt", "2024-01-15");
+let partial_spec = PartitionSpec::new(partial_values);
+let filtered = admin.list_partition_infos_with_spec(&table_path, 
Some(&partial_spec)).await?;
+
+// Drop a partition
+admin.drop_partition(&table_path, &spec, true).await?;
+```
+
+## Primary Key Table Operations
+
+Primary key tables (KV tables) support upsert, delete, and lookup operations.
+
+### Creating a Primary Key Table
+
+```rust
+let table_descriptor = TableDescriptor::builder()
+    .schema(
+        Schema::builder()
+            .column("id", DataTypes::int())
+            .column("name", DataTypes::string())
+            .column("age", DataTypes::bigint())
+            .primary_key(vec!["id"])  // Define primary key
+            .build()?,
+    )
+    .build()?;
+
+let table_path = TablePath::new("fluss", "users");
+admin.create_table(&table_path, &table_descriptor, true).await?;
+```
+
+### Upserting Records
+
+```rust
+let table = conn.get_table(&table_path).await?;
+let table_upsert = table.new_upsert()?;
+let upsert_writer = table_upsert.create_writer()?;
+
+// Insert or update records
+for (id, name, age) in [(1, "Alice", 25i64), (2, "Bob", 30), (3, "Charlie", 
35)] {
+    let mut row = GenericRow::new(3);
+    row.set_field(0, id);
+    row.set_field(1, name);
+    row.set_field(2, age);
+    upsert_writer.upsert(&row)?;
+}
+upsert_writer.flush().await?;

Review Comment:
   `UpsertWriter::upsert` is `async` and takes `&mut self`. The example uses an 
immutable writer and calls `upsert` without `.await`, so it won’t compile. 
Please make the writer `mut` and await `upsert`/`delete` calls throughout this 
section.



##########
docs/rust-client.md:
##########
@@ -0,0 +1,748 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+# Fluss Rust Client Guide
+
+This guide covers how to use the Fluss Rust client for reading and writing 
data to log tables and primary key tables.
+
+## Prerequisites
+
+- Rust 1.85+
+- Protobuf compiler (`protoc`) - only required when [building from 
source](#building-from-source)
+
+## Adding to Your Project
+
+The Fluss Rust client is published to 
[crates.io](https://crates.io/crates/fluss-rs) as `fluss-rs`. The crate's 
library name is `fluss`, so you import it with `use fluss::...`.
+
+```toml
+[dependencies]
+fluss-rs = "0.1"
+tokio = { version = "1", features = ["full"] }
+```
+
+### Feature Flags
+
+The Fluss crate supports optional storage backends:
+
+```toml
+[dependencies]
+# Default: memory and filesystem storage
+fluss-rs = "0.1"
+
+# With S3 storage support
+fluss-rs = { version = "0.1", features = ["storage-s3"] }
+
+# With OSS storage support
+fluss-rs = { version = "0.1", features = ["storage-oss"] }
+
+# All storage backends
+fluss-rs = { version = "0.1", features = ["storage-all"] }
+```
+
+Available features:
+- `storage-memory` (default) - In-memory storage
+- `storage-fs` (default) - Filesystem storage
+- `storage-s3` - Amazon S3 storage
+- `storage-oss` - Alibaba OSS storage
+- `storage-all` - All storage backends
+
+### Alternative: Git or Path Dependency
+
+For development against unreleased changes, you can depend on the Git 
repository or a local checkout:
+
+```toml
+[dependencies]
+# From Git
+fluss = { git = "https://github.com/apache/fluss-rust.git";, package = 
"fluss-rs" }
+
+# From local path
+fluss = { path = "/path/to/fluss-rust/crates/fluss", package = "fluss-rs" }
+```
+
+> **Note:** When using `git` or `path` dependencies, the `package = 
"fluss-rs"` field is required so that Cargo resolves the correct package while 
still allowing `use fluss::...` imports.
+
+## Building from Source
+
+### 1. Clone the Repository
+
+```bash
+git clone https://github.com/apache/fluss-rust.git
+cd fluss-rust
+```
+
+### 2. Install Dependencies
+
+The Protobuf compiler (`protoc`) is required to build from source.
+
+#### macOS
+
+```bash
+brew install protobuf
+```
+
+#### Ubuntu/Debian
+
+```bash
+sudo apt-get install protobuf-compiler
+```
+
+### 3. Build the Library
+
+```bash
+cargo build --workspace --all-targets
+```
+
+### 4. Run Tests
+
+```bash
+# Unit tests
+cargo test --workspace
+
+# Integration tests (requires running Fluss cluster)
+RUST_TEST_THREADS=1 cargo test --features integration_tests --workspace
+```
+
+## Connection Setup
+
+```rust
+use fluss::client::FlussConnection;
+use fluss::config::Config;
+use fluss::error::Result;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+    let mut config = Config::default();
+    config.bootstrap_server = "127.0.0.1:9123".to_string();
+
+    let conn = FlussConnection::new(config).await?;
+
+    // Use the connection...
+
+    Ok(())
+}
+```
+
+### Configuration Options
+
+| Option | Description | Default |
+|--------|-------------|---------|
+| `bootstrap_server` | Coordinator server address | `127.0.0.1:9123` |
+| `request_max_size` | Maximum request size in bytes | 10 MB |
+| `writer_acks` | Acknowledgment setting (`all` waits for all replicas) | 
`all` |
+| `writer_retries` | Number of retries on failure | `i32::MAX` |
+| `writer_batch_size` | Batch size for writes | 2 MB |
+
+## Admin Operations
+
+### Get Admin Interface
+
+```rust
+let admin = conn.get_admin().await?;
+```
+
+### Database Operations
+
+```rust
+// Create database
+admin.create_database("my_database", true, None).await?;
+
+// List all databases
+let databases = admin.list_databases().await?;
+println!("Databases: {:?}", databases);
+
+// Check if database exists
+let exists = admin.database_exists("my_database").await?;
+
+// Get database information
+let db_info = admin.get_database_info("my_database").await?;
+
+// Drop database (with cascade option to drop all tables)
+admin.drop_database("my_database", true, false).await?;
+```
+
+### Table Operations
+
+```rust
+use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
+
+// Define table schema
+let table_descriptor = TableDescriptor::builder()
+    .schema(
+        Schema::builder()
+            .column("id", DataTypes::int())
+            .column("name", DataTypes::string())
+            .column("amount", DataTypes::bigint())
+            .build()?,
+    )
+    .build()?;
+
+let table_path = TablePath::new("my_database", "my_table");
+
+// Create table
+admin.create_table(&table_path, &table_descriptor, true).await?;
+
+// Get table information
+let table_info = admin.get_table(&table_path).await?;
+println!("Table: {}", table_info);
+
+// List tables in database
+let tables = admin.list_tables("my_database").await?;
+
+// Check if table exists
+let exists = admin.table_exists(&table_path).await?;
+
+// Drop table
+admin.drop_table(&table_path, true).await?;
+```
+
+### Partition Operations
+
+```rust
+use fluss::metadata::PartitionSpec;
+use std::collections::HashMap;
+
+// List all partitions
+let partitions = admin.list_partition_infos(&table_path).await?;
+
+// List partitions matching a spec
+let mut filter = HashMap::new();
+filter.insert("year", "2024");
+let spec = PartitionSpec::new(filter);
+let partitions = admin.list_partition_infos_with_spec(&table_path, 
Some(&spec)).await?;
+
+// Create partition
+admin.create_partition(&table_path, &spec, true).await?;
+
+// Drop partition
+admin.drop_partition(&table_path, &spec, true).await?;
+```
+
+### Offset Operations
+
+```rust
+use fluss::rpc::message::OffsetSpec;
+
+let bucket_ids = vec![0, 1, 2];
+
+// Get earliest offsets
+let earliest = admin.list_offsets(&table_path, &bucket_ids, 
OffsetSpec::Earliest).await?;
+
+// Get latest offsets
+let latest = admin.list_offsets(&table_path, &bucket_ids, 
OffsetSpec::Latest).await?;
+
+// Get offsets for a specific timestamp
+let timestamp_ms = 1704067200000; // 2024-01-01 00:00:00 UTC
+let offsets = admin.list_offsets(&table_path, &bucket_ids, 
OffsetSpec::Timestamp(timestamp_ms)).await?;
+
+// Get offsets for a specific partition
+let partition_offsets = admin.list_partition_offsets(
+    &table_path,
+    "partition_name",
+    &bucket_ids,
+    OffsetSpec::Latest,
+).await?;
+```
+
+### Lake Snapshot
+
+```rust
+// Get latest lake snapshot for lakehouse integration
+let snapshot = admin.get_latest_lake_snapshot(&table_path).await?;
+println!("Snapshot ID: {}", snapshot.snapshot_id);
+```
+
+## Log Table Operations
+
+Log tables are append-only tables without primary keys, suitable for event 
streaming.
+
+### Creating a Log Table
+
+```rust
+let table_descriptor = TableDescriptor::builder()
+    .schema(
+        Schema::builder()
+            .column("event_id", DataTypes::int())
+            .column("event_type", DataTypes::string())
+            .column("timestamp", DataTypes::bigint())
+            .build()?,
+    )
+    .build()?;
+
+let table_path = TablePath::new("fluss", "events");
+admin.create_table(&table_path, &table_descriptor, true).await?;
+```
+
+### Writing to Log Tables
+
+```rust
+use fluss::row::{GenericRow, InternalRow};
+
+let table = conn.get_table(&table_path).await?;
+let append_writer = table.new_append()?.create_writer()?;
+
+// Write a single row
+let mut row = GenericRow::new(3);
+row.set_field(0, 1);                    // event_id (int)
+row.set_field(1, "user_login");         // event_type (string)
+row.set_field(2, 1704067200000i64);     // timestamp (bigint)
+
+append_writer.append(&row)?;
+

Review Comment:
   `AppendWriter::append` is an `async fn` returning `Result<()>`, so the 
example needs to `await` it (and it doesn’t return a future handle). As 
written, `append_writer.append(&row)?;` won’t compile.



##########
docs/rust-client.md:
##########
@@ -0,0 +1,748 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+# Fluss Rust Client Guide
+
+This guide covers how to use the Fluss Rust client for reading and writing 
data to log tables and primary key tables.
+
+## Prerequisites
+
+- Rust 1.85+
+- Protobuf compiler (`protoc`) - only required when [building from 
source](#building-from-source)
+
+## Adding to Your Project
+
+The Fluss Rust client is published to 
[crates.io](https://crates.io/crates/fluss-rs) as `fluss-rs`. The crate's 
library name is `fluss`, so you import it with `use fluss::...`.
+
+```toml
+[dependencies]
+fluss-rs = "0.1"
+tokio = { version = "1", features = ["full"] }
+```

Review Comment:
   The dependency name in the Cargo snippet/document text doesn’t match the 
actual published crate name in this repo. `crates/fluss/Cargo.toml` declares 
`name = "fluss"` (no `fluss-rs` package), and there is no `[lib] name = 
"fluss"` override that would make a `fluss-rs` package import as `fluss`. 
Please align the docs (and link) with the real crates.io package name, or 
update the crate/package metadata if `fluss-rs` is the intended release name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to