This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new da9c112 feat: Introduce Kv table example (#181)
da9c112 is described below
commit da9c1125995a3c35f1eb0ee65a5ed4c13f9ebe92
Author: Keith Lee <[email protected]>
AuthorDate: Tue Jan 20 15:56:22 2026 +0000
feat: Introduce Kv table example (#181)
---
crates/examples/Cargo.toml | 6 +-
crates/examples/src/example_kv_table.rs | 116 ++++++++++++++++++++++++++++++++
crates/examples/src/example_table.rs | 2 +
crates/fluss/src/row/mod.rs | 1 +
4 files changed, 124 insertions(+), 1 deletion(-)
diff --git a/crates/examples/Cargo.toml b/crates/examples/Cargo.toml
index e1fa531..117ceb2 100644
--- a/crates/examples/Cargo.toml
+++ b/crates/examples/Cargo.toml
@@ -29,4 +29,8 @@ tokio = { workspace = true }
clap = { workspace = true }
[[example]]
name = "example-table"
-path = "src/example_table.rs"
\ No newline at end of file
+path = "src/example_table.rs"
+
+[[example]]
+name = "example-upsert-lookup"
+path = "src/example_kv_table.rs"
\ No newline at end of file
diff --git a/crates/examples/src/example_kv_table.rs
b/crates/examples/src/example_kv_table.rs
new file mode 100644
index 0000000..75821a3
--- /dev/null
+++ b/crates/examples/src/example_kv_table.rs
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use clap::Parser;
+use fluss::client::{FlussConnection, UpsertWriter};
+use fluss::config::Config;
+use fluss::error::Result;
+use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
+use fluss::row::{GenericRow, InternalRow};
+
+#[tokio::main]
+#[allow(dead_code)]
+pub async fn main() -> Result<()> {
+ let mut config = Config::parse();
+ config.bootstrap_server = Some("127.0.0.1:9123".to_string());
+
+ let conn = FlussConnection::new(config).await?;
+
+ let table_descriptor = TableDescriptor::builder()
+ .schema(
+ Schema::builder()
+ .column("id", DataTypes::int())
+ .column("name", DataTypes::string())
+ .column("age", DataTypes::bigint())
+ .primary_key(vec!["id".to_string()])
+ .build()?,
+ )
+ .build()?;
+
+ let table_path = TablePath::new("fluss".to_owned(),
"rust_upsert_lookup_example".to_owned());
+
+ let admin = conn.get_admin().await?;
+ admin
+ .create_table(&table_path, &table_descriptor, true)
+ .await?;
+ println!(
+ "Created KV Table:\n {}\n",
+ admin.get_table(&table_path).await?
+ );
+
+ let table = conn.get_table(&table_path).await?;
+ let table_upsert = table.new_upsert()?;
+ let mut upsert_writer = table_upsert.create_writer()?;
+
+ println!("\n=== Upserting ===");
+ for (id, name, age) in [(1, "Verso", 32i64), (2, "Noco", 25), (3,
"Esquie", 35)] {
+ let mut row = GenericRow::new();
+ row.set_field(0, id);
+ row.set_field(1, name);
+ row.set_field(2, age);
+ upsert_writer.upsert(&row).await?;
+ println!("Upserted: {row:?}");
+ }
+
+ println!("\n=== Looking up ===");
+ let mut lookuper = table.new_lookup()?.create_lookuper()?;
+
+ for id in 1..=2 {
+ let result = lookuper.lookup(&make_key(id)).await?;
+ let row = result.get_single_row()?.unwrap();
+ println!(
+ "Found id={id}: name={}, age={}",
+ row.get_string(1),
+ row.get_long(2)
+ );
+ }
+
+ println!("\n=== Updating ===");
+ let mut row = GenericRow::new();
+ row.set_field(0, 1);
+ row.set_field(1, "Verso");
+ row.set_field(2, 33i64);
+ upsert_writer.upsert(&row).await?;
+ println!("Updated: {row:?}");
+
+ let result = lookuper.lookup(&make_key(1)).await?;
+ let row = result.get_single_row()?.unwrap();
+ println!(
+ "Verified update: name={}, age={}",
+ row.get_string(1),
+ row.get_long(2)
+ );
+
+ println!("\n=== Deleting ===");
+ let mut row = GenericRow::new();
+ row.set_field(0, 2);
+ upsert_writer.delete(&row).await?;
+ println!("Deleted: {row:?}");
+
+ let result = lookuper.lookup(&make_key(2)).await?;
+ if result.get_single_row()?.is_none() {
+ println!("Verified deletion");
+ }
+
+ Ok(())
+}
+
+fn make_key(id: i32) -> GenericRow<'static> {
+ let mut row = GenericRow::new();
+ row.set_field(0, id);
+ row
+}
diff --git a/crates/examples/src/example_table.rs
b/crates/examples/src/example_table.rs
index 2d6ac53..7333056 100644
--- a/crates/examples/src/example_table.rs
+++ b/crates/examples/src/example_table.rs
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+mod example_kv_table;
+
use clap::Parser;
use fluss::client::FlussConnection;
use fluss::config::Config;
diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs
index bc8134d..81a4254 100644
--- a/crates/fluss/src/row/mod.rs
+++ b/crates/fluss/src/row/mod.rs
@@ -122,6 +122,7 @@ pub trait InternalRow {
}
}
+#[derive(Debug)]
pub struct GenericRow<'a> {
pub values: Vec<Datum<'a>>,
}