This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new da9c112  feat: Introduce Kv table example (#181)
da9c112 is described below

commit da9c1125995a3c35f1eb0ee65a5ed4c13f9ebe92
Author: Keith Lee <[email protected]>
AuthorDate: Tue Jan 20 15:56:22 2026 +0000

    feat: Introduce Kv table example (#181)
---
 crates/examples/Cargo.toml              |   6 +-
 crates/examples/src/example_kv_table.rs | 116 ++++++++++++++++++++++++++++++++
 crates/examples/src/example_table.rs    |   2 +
 crates/fluss/src/row/mod.rs             |   1 +
 4 files changed, 124 insertions(+), 1 deletion(-)

diff --git a/crates/examples/Cargo.toml b/crates/examples/Cargo.toml
index e1fa531..117ceb2 100644
--- a/crates/examples/Cargo.toml
+++ b/crates/examples/Cargo.toml
@@ -29,4 +29,8 @@ tokio = { workspace = true }
 clap = { workspace = true }
 [[example]]
 name = "example-table"
-path = "src/example_table.rs"
\ No newline at end of file
+path = "src/example_table.rs"
+
+[[example]]
+name = "example-upsert-lookup"
+path = "src/example_kv_table.rs"
\ No newline at end of file
diff --git a/crates/examples/src/example_kv_table.rs 
b/crates/examples/src/example_kv_table.rs
new file mode 100644
index 0000000..75821a3
--- /dev/null
+++ b/crates/examples/src/example_kv_table.rs
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use clap::Parser;
+use fluss::client::{FlussConnection, UpsertWriter};
+use fluss::config::Config;
+use fluss::error::Result;
+use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
+use fluss::row::{GenericRow, InternalRow};
+
+#[tokio::main]
+#[allow(dead_code)]
+pub async fn main() -> Result<()> {
+    let mut config = Config::parse();
+    config.bootstrap_server = Some("127.0.0.1:9123".to_string());
+
+    let conn = FlussConnection::new(config).await?;
+
+    let table_descriptor = TableDescriptor::builder()
+        .schema(
+            Schema::builder()
+                .column("id", DataTypes::int())
+                .column("name", DataTypes::string())
+                .column("age", DataTypes::bigint())
+                .primary_key(vec!["id".to_string()])
+                .build()?,
+        )
+        .build()?;
+
+    let table_path = TablePath::new("fluss".to_owned(), 
"rust_upsert_lookup_example".to_owned());
+
+    let admin = conn.get_admin().await?;
+    admin
+        .create_table(&table_path, &table_descriptor, true)
+        .await?;
+    println!(
+        "Created KV Table:\n {}\n",
+        admin.get_table(&table_path).await?
+    );
+
+    let table = conn.get_table(&table_path).await?;
+    let table_upsert = table.new_upsert()?;
+    let mut upsert_writer = table_upsert.create_writer()?;
+
+    println!("\n=== Upserting ===");
+    for (id, name, age) in [(1, "Verso", 32i64), (2, "Noco", 25), (3, 
"Esquie", 35)] {
+        let mut row = GenericRow::new();
+        row.set_field(0, id);
+        row.set_field(1, name);
+        row.set_field(2, age);
+        upsert_writer.upsert(&row).await?;
+        println!("Upserted: {row:?}");
+    }
+
+    println!("\n=== Looking up ===");
+    let mut lookuper = table.new_lookup()?.create_lookuper()?;
+
+    for id in 1..=2 {
+        let result = lookuper.lookup(&make_key(id)).await?;
+        let row = result.get_single_row()?.unwrap();
+        println!(
+            "Found id={id}: name={}, age={}",
+            row.get_string(1),
+            row.get_long(2)
+        );
+    }
+
+    println!("\n=== Updating ===");
+    let mut row = GenericRow::new();
+    row.set_field(0, 1);
+    row.set_field(1, "Verso");
+    row.set_field(2, 33i64);
+    upsert_writer.upsert(&row).await?;
+    println!("Updated: {row:?}");
+
+    let result = lookuper.lookup(&make_key(1)).await?;
+    let row = result.get_single_row()?.unwrap();
+    println!(
+        "Verified update: name={}, age={}",
+        row.get_string(1),
+        row.get_long(2)
+    );
+
+    println!("\n=== Deleting ===");
+    let mut row = GenericRow::new();
+    row.set_field(0, 2);
+    upsert_writer.delete(&row).await?;
+    println!("Deleted: {row:?}");
+
+    let result = lookuper.lookup(&make_key(2)).await?;
+    if result.get_single_row()?.is_none() {
+        println!("Verified deletion");
+    }
+
+    Ok(())
+}
+
+fn make_key(id: i32) -> GenericRow<'static> {
+    let mut row = GenericRow::new();
+    row.set_field(0, id);
+    row
+}
diff --git a/crates/examples/src/example_table.rs 
b/crates/examples/src/example_table.rs
index 2d6ac53..7333056 100644
--- a/crates/examples/src/example_table.rs
+++ b/crates/examples/src/example_table.rs
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+mod example_kv_table;
+
 use clap::Parser;
 use fluss::client::FlussConnection;
 use fluss::config::Config;
diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs
index bc8134d..81a4254 100644
--- a/crates/fluss/src/row/mod.rs
+++ b/crates/fluss/src/row/mod.rs
@@ -122,6 +122,7 @@ pub trait InternalRow {
     }
 }
 
+#[derive(Debug)]
 pub struct GenericRow<'a> {
     pub values: Vec<Datum<'a>>,
 }

Reply via email to