leekeiabstraction commented on code in PR #242: URL: https://github.com/apache/fluss-rust/pull/242#discussion_r2781393650
########## docs/rust-client.md: ########## @@ -0,0 +1,748 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +# Fluss Rust Client Guide + +This guide covers how to use the Fluss Rust client for reading and writing data to log tables and primary key tables. + +## Prerequisites + +- Rust 1.85+ +- Protobuf compiler (`protoc`) - only required when [building from source](#building-from-source) + +## Adding to Your Project + +The Fluss Rust client is published to [crates.io](https://crates.io/crates/fluss-rs) as `fluss-rs`. The crate's library name is `fluss`, so you import it with `use fluss::...`. + +```toml +[dependencies] +fluss-rs = "0.1" +tokio = { version = "1", features = ["full"] } +``` + +### Feature Flags + +The Fluss crate supports optional storage backends: + +```toml +[dependencies] +# Default: memory and filesystem storage +fluss-rs = "0.1" + +# With S3 storage support +fluss-rs = { version = "0.1", features = ["storage-s3"] } + +# With OSS storage support +fluss-rs = { version = "0.1", features = ["storage-oss"] } + +# All storage backends +fluss-rs = { version = "0.1", features = ["storage-all"] } +``` + +Available features: +- `storage-memory` (default) - In-memory storage +- `storage-fs` (default) - Filesystem storage +- `storage-s3` - Amazon S3 storage +- `storage-oss` - Alibaba OSS storage +- `storage-all` - All storage backends + +### Alternative: Git or Path Dependency + +For development against unreleased changes, you can depend on the Git repository or a local checkout: + +```toml +[dependencies] +# From Git +fluss = { git = "https://github.com/apache/fluss-rust.git", package = "fluss-rs" } + +# From local path +fluss = { path = "/path/to/fluss-rust/crates/fluss", package = "fluss-rs" } +``` + +> **Note:** When using `git` or `path` dependencies, the `package = "fluss-rs"` field is required so that Cargo resolves the correct package while still allowing `use fluss::...` imports. + +## Building from Source + +### 1. Clone the Repository + +```bash +git clone https://github.com/apache/fluss-rust.git +cd fluss-rust +``` + +### 2. Install Dependencies + +The Protobuf compiler (`protoc`) is required to build from source. + +#### macOS + +```bash +brew install protobuf +``` + +#### Ubuntu/Debian + +```bash +sudo apt-get install protobuf-compiler +``` + +### 3. Build the Library + +```bash +cargo build --workspace --all-targets +``` + +### 4. Run Tests + +```bash +# Unit tests +cargo test --workspace + +# Integration tests (requires running Fluss cluster) +RUST_TEST_THREADS=1 cargo test --features integration_tests --workspace +``` + +## Connection Setup + +```rust +use fluss::client::FlussConnection; +use fluss::config::Config; +use fluss::error::Result; + +#[tokio::main] +async fn main() -> Result<()> { + let mut config = Config::default(); + config.bootstrap_server = "127.0.0.1:9123".to_string(); + + let conn = FlussConnection::new(config).await?; + + // Use the connection... + + Ok(()) +} +``` + +### Configuration Options + +| Option | Description | Default | +|--------|-------------|---------| +| `bootstrap_server` | Coordinator server address | `127.0.0.1:9123` | +| `request_max_size` | Maximum request size in bytes | 10 MB | +| `writer_acks` | Acknowledgment setting (`all` waits for all replicas) | `all` | +| `writer_retries` | Number of retries on failure | `i32::MAX` | +| `writer_batch_size` | Batch size for writes | 2 MB | + +## Admin Operations + +### Get Admin Interface + +```rust +let admin = conn.get_admin().await?; +``` + +### Database Operations + +```rust +// Create database +admin.create_database("my_database", true, None).await?; + +// List all databases +let databases = admin.list_databases().await?; +println!("Databases: {:?}", databases); + +// Check if database exists +let exists = admin.database_exists("my_database").await?; + +// Get database information +let db_info = admin.get_database_info("my_database").await?; + +// Drop database (with cascade option to drop all tables) +admin.drop_database("my_database", true, false).await?; +``` + +### Table Operations + +```rust +use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; + +// Define table schema +let table_descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("id", DataTypes::int()) + .column("name", DataTypes::string()) + .column("amount", DataTypes::bigint()) + .build()?, + ) + .build()?; + +let table_path = TablePath::new("my_database", "my_table"); + +// Create table +admin.create_table(&table_path, &table_descriptor, true).await?; + +// Get table information +let table_info = admin.get_table(&table_path).await?; +println!("Table: {}", table_info); + +// List tables in database +let tables = admin.list_tables("my_database").await?; + +// Check if table exists +let exists = admin.table_exists(&table_path).await?; + +// Drop table +admin.drop_table(&table_path, true).await?; +``` + +### Partition Operations + +```rust +use fluss::metadata::PartitionSpec; +use std::collections::HashMap; + +// List all partitions +let partitions = admin.list_partition_infos(&table_path).await?; + +// List partitions matching a spec +let mut filter = HashMap::new(); +filter.insert("year", "2024"); +let spec = PartitionSpec::new(filter); +let partitions = admin.list_partition_infos_with_spec(&table_path, Some(&spec)).await?; + +// Create partition +admin.create_partition(&table_path, &spec, true).await?; + +// Drop partition +admin.drop_partition(&table_path, &spec, true).await?; +``` + +### Offset Operations + +```rust +use fluss::rpc::message::OffsetSpec; + +let bucket_ids = vec![0, 1, 2]; + +// Get earliest offsets +let earliest = admin.list_offsets(&table_path, &bucket_ids, OffsetSpec::Earliest).await?; + +// Get latest offsets +let latest = admin.list_offsets(&table_path, &bucket_ids, OffsetSpec::Latest).await?; + +// Get offsets for a specific timestamp +let timestamp_ms = 1704067200000; // 2024-01-01 00:00:00 UTC +let offsets = admin.list_offsets(&table_path, &bucket_ids, OffsetSpec::Timestamp(timestamp_ms)).await?; + +// Get offsets for a specific partition +let partition_offsets = admin.list_partition_offsets( + &table_path, + "partition_name", + &bucket_ids, + OffsetSpec::Latest, +).await?; +``` + +### Lake Snapshot + +```rust +// Get latest lake snapshot for lakehouse integration +let snapshot = admin.get_latest_lake_snapshot(&table_path).await?; +println!("Snapshot ID: {}", snapshot.snapshot_id); +``` + +## Log Table Operations + +Log tables are append-only tables without primary keys, suitable for event streaming. + +### Creating a Log Table + +```rust +let table_descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("event_id", DataTypes::int()) + .column("event_type", DataTypes::string()) + .column("timestamp", DataTypes::bigint()) + .build()?, + ) + .build()?; + +let table_path = TablePath::new("fluss", "events"); +admin.create_table(&table_path, &table_descriptor, true).await?; +``` + +### Writing to Log Tables + +```rust +use fluss::row::{GenericRow, InternalRow}; + +let table = conn.get_table(&table_path).await?; +let append_writer = table.new_append()?.create_writer()?; + +// Write a single row +let mut row = GenericRow::new(3); +row.set_field(0, 1); // event_id (int) +row.set_field(1, "user_login"); // event_type (string) +row.set_field(2, 1704067200000i64); // timestamp (bigint) + +append_writer.append(&row)?; + +// Write multiple rows +let mut row2 = GenericRow::new(3); +row2.set_field(0, 2); +row2.set_field(1, "page_view"); +row2.set_field(2, 1704067201000i64); + +append_writer.append(&row2)?; + +// Flush to ensure data is persisted +append_writer.flush().await?; +``` + +Write operations (`append`, `upsert`, `delete`) use a **fire-and-forget** pattern for efficient batching. Each call queues the write and returns a `WriteResultFuture` immediately. Call `flush()` to ensure all queued writes are sent to the server. + +If you need per-record acknowledgment, you can await the returned future: + +```rust +// Per-record acknowledgment (blocks until server confirms) +append_writer.append(&row)?.await?; +``` + +### Reading from Log Tables + +```rust +use std::time::Duration; + +let table = conn.get_table(&table_path).await?; +let log_scanner = table.new_scan().create_log_scanner()?; + +// Subscribe to bucket 0 starting from offset 0 +log_scanner.subscribe(0, 0).await?; + +// Poll for records +let records = log_scanner.poll(Duration::from_secs(10)).await?; + +for record in records { + let row = record.row(); + println!( + "event_id={}, event_type={}, timestamp={} @ offset={}", + row.get_int(0), + row.get_string(1), + row.get_long(2), + record.offset() + ); +} +``` + +### Column Projection + +```rust +// Project specific columns by index +let scanner = table.new_scan().project(&[0, 2])?.create_log_scanner()?; + +// Or project by column names +let scanner = table.new_scan().project_by_name(&["event_id", "timestamp"])?.create_log_scanner()?; +``` + +### Subscribe from Specific Offsets + +```rust +use fluss::client::{EARLIEST_OFFSET, LATEST_OFFSET}; + +// Subscribe from earliest available offset +log_scanner.subscribe(0, EARLIEST_OFFSET).await?; + +// Subscribe from latest offset (only new records) +log_scanner.subscribe(0, LATEST_OFFSET).await?; + +// Subscribe from a specific offset +log_scanner.subscribe(0, 42).await?; + +// Subscribe to all buckets +let num_buckets = table.get_table_info().get_num_buckets(); +for bucket_id in 0..num_buckets { + log_scanner.subscribe(bucket_id, 0).await?; +} +``` + +### Subscribe to Multiple Buckets + +```rust +use std::collections::HashMap; + +// Subscribe to multiple buckets at once with specific offsets +let mut bucket_offsets = HashMap::new(); +bucket_offsets.insert(0, 0i64); // bucket 0 from offset 0 +bucket_offsets.insert(1, 100i64); // bucket 1 from offset 100 +log_scanner.subscribe_buckets(&bucket_offsets).await?; +``` + +### Unsubscribe from a Partition + +```rust +// Unsubscribe from a specific partition bucket +log_scanner.unsubscribe_partition(partition_id, bucket_id).await?; +``` + +## Partitioned Log Tables + +Partitioned tables distribute data across partitions based on partition column values, enabling efficient data organization and querying. + +### Creating a Partitioned Log Table + +```rust +use fluss::metadata::{DataTypes, LogFormat, Schema, TableDescriptor, TablePath}; + +let table_descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("event_id", DataTypes::int()) + .column("event_type", DataTypes::string()) + .column("dt", DataTypes::string()) // partition column + .column("region", DataTypes::string()) // partition column + .build()?, + ) + .partitioned_by(vec!["dt", "region"]) // Define partition columns + .log_format(LogFormat::ARROW) + .build()?; + +let table_path = TablePath::new("fluss", "partitioned_events"); +admin.create_table(&table_path, &table_descriptor, true).await?; +``` + +### Writing to Partitioned Log Tables + +Writing works the same as non-partitioned tables. Include partition column values in each row: + +```rust +let table = conn.get_table(&table_path).await?; +let append_writer = table.new_append()?.create_writer()?; + +// Partition column values determine which partition the record goes to +let mut row = GenericRow::new(4); +row.set_field(0, 1); // event_id +row.set_field(1, "user_login"); // event_type +row.set_field(2, "2024-01-15"); // dt (partition column) +row.set_field(3, "US"); // region (partition column) + +append_writer.append(&row)?; +append_writer.flush().await?; +``` + +### Reading from Partitioned Log Tables + +For partitioned tables, use `subscribe_partition()` instead of `subscribe()`: + +```rust +use std::time::Duration; + +let table = conn.get_table(&table_path).await?; +let admin = conn.get_admin().await?; + +// Get partition information +let partitions = admin.list_partition_infos(&table_path).await?; + +let log_scanner = table.new_scan().create_log_scanner()?; + +// Subscribe to each partition's buckets +for partition_info in &partitions { + let partition_id = partition_info.get_partition_id(); + let num_buckets = table.get_table_info().get_num_buckets(); + + for bucket_id in 0..num_buckets { + log_scanner.subscribe_partition(partition_id, bucket_id, 0).await?; + } +} + +// Poll for records +let records = log_scanner.poll(Duration::from_secs(10)).await?; +for record in records { + println!("Record from partition: {:?}", record.row()); Review Comment: This was raised because I only updated the doc while on older main branch. Rebased. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
