Copilot commented on code in PR #220: URL: https://github.com/apache/fluss-rust/pull/220#discussion_r2737396203
########## crates/examples/src/example_partitioned_kv_table.rs: ########## @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::sync::Arc; +use clap::Parser; +use fluss::client::{FlussAdmin, FlussConnection, UpsertWriter}; +use fluss::config::Config; +use fluss::error::Result; +use fluss::metadata::{DataTypes, PartitionSpec, Schema, TableDescriptor, TablePath}; +use fluss::row::{GenericRow, InternalRow}; + +#[tokio::main] +#[allow(dead_code)] +pub async fn main() -> Result<()> { + let mut config = Config::parse(); + config.bootstrap_server = Some("127.0.0.1:9123".to_string()); + + let conn = FlussConnection::new(config).await?; + + let table_descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("id", DataTypes::int()) + .column("region", DataTypes::string()) + .column("zone", DataTypes::bigint()) + .column("score", DataTypes::bigint()) + .primary_key(vec!["id".to_string(), "region".to_string(), "zone".to_string()]) + .build()?, + ) + .partitioned_by(Arc::from(["region".to_string(), "zone".to_string()])) + .build()?; + + let table_path = TablePath::new("fluss".to_owned(), "partitioned_kv_example".to_owned()); + + let mut admin = conn.get_admin().await?; + admin + .create_table(&table_path, &table_descriptor, true) + .await?; + println!( + "Created KV Table:\n {}\n", + admin.get_table(&table_path).await? + ); + + create_partition(&table_path, &mut admin, "APAC", 1).await; + create_partition(&table_path, &mut admin, "EMEA", 2).await; + create_partition(&table_path, &mut admin, "US", 3).await; + + let table = conn.get_table(&table_path).await?; + let table_upsert = table.new_upsert()?; + let mut upsert_writer = table_upsert.create_writer()?; + + println!("\n=== Upserting ==="); + for (id, region, zone, score) in [(1001, "APAC", 1i64, 1234i64), (1002, "EMEA", 2, 2234), (1003, "US", 3, 3234)] { + let mut row = GenericRow::new(4); + row.set_field(0, id); + row.set_field(1, region); + row.set_field(2, zone); + row.set_field(3, score); + upsert_writer.upsert(&row).await?; + println!("Upserted: {row:?}"); + } + + println!("\n=== Looking up ==="); + let mut lookuper = table.new_lookup()?.create_lookuper()?; + + for (id, region, zone) in [(1, "APAC", 1i64), (2, "EMEA", 2), (3, "US", 3)] { Review Comment: The lookup is attempting to find records with IDs 1, 2, and 3, but the upserted records have IDs 1001, 1002, and 1003. This will cause the lookup to fail as these records don't exist. The IDs in the lookup loop should match the IDs used in the upsert operation. ########## crates/examples/src/example_partitioned_kv_table.rs: ########## @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::sync::Arc; +use clap::Parser; +use fluss::client::{FlussAdmin, FlussConnection, UpsertWriter}; +use fluss::config::Config; +use fluss::error::Result; +use fluss::metadata::{DataTypes, PartitionSpec, Schema, TableDescriptor, TablePath}; +use fluss::row::{GenericRow, InternalRow}; + +#[tokio::main] +#[allow(dead_code)] +pub async fn main() -> Result<()> { + let mut config = Config::parse(); + config.bootstrap_server = Some("127.0.0.1:9123".to_string()); + + let conn = FlussConnection::new(config).await?; + + let table_descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("id", DataTypes::int()) + .column("region", DataTypes::string()) + .column("zone", DataTypes::bigint()) + .column("score", DataTypes::bigint()) + .primary_key(vec!["id".to_string(), "region".to_string(), "zone".to_string()]) + .build()?, + ) + .partitioned_by(Arc::from(["region".to_string(), "zone".to_string()])) + .build()?; + + let table_path = TablePath::new("fluss".to_owned(), "partitioned_kv_example".to_owned()); + + let mut admin = conn.get_admin().await?; + admin + .create_table(&table_path, &table_descriptor, true) + .await?; + println!( + "Created KV Table:\n {}\n", + admin.get_table(&table_path).await? + ); + + create_partition(&table_path, &mut admin, "APAC", 1).await; + create_partition(&table_path, &mut admin, "EMEA", 2).await; + create_partition(&table_path, &mut admin, "US", 3).await; + + let table = conn.get_table(&table_path).await?; + let table_upsert = table.new_upsert()?; + let mut upsert_writer = table_upsert.create_writer()?; + + println!("\n=== Upserting ==="); + for (id, region, zone, score) in [(1001, "APAC", 1i64, 1234i64), (1002, "EMEA", 2, 2234), (1003, "US", 3, 3234)] { + let mut row = GenericRow::new(4); + row.set_field(0, id); + row.set_field(1, region); + row.set_field(2, zone); + row.set_field(3, score); + upsert_writer.upsert(&row).await?; + println!("Upserted: {row:?}"); + } + + println!("\n=== Looking up ==="); + let mut lookuper = table.new_lookup()?.create_lookuper()?; + + for (id, region, zone) in [(1, "APAC", 1i64), (2, "EMEA", 2), (3, "US", 3)] { + let result = lookuper.lookup(&make_key(id, region, zone)).await.expect("lookup"); + let row = result.get_single_row()?.unwrap(); + println!( + "Found id={id}: region={}, zone={}, score={}", + row.get_string(1), + row.get_long(2), + row.get_long(3) + ); + } + + println!("\n=== Updating ==="); + let mut row = GenericRow::new(4); + row.set_field(0, 1); Review Comment: Similar to the lookup loop, the update operation uses ID 1 instead of the actual upserted ID 1001. This will attempt to update a non-existent record. The ID should be 1001 to match the upserted data. ########## crates/examples/src/example_partitioned_kv_table.rs: ########## @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::sync::Arc; +use clap::Parser; +use fluss::client::{FlussAdmin, FlussConnection, UpsertWriter}; +use fluss::config::Config; +use fluss::error::Result; +use fluss::metadata::{DataTypes, PartitionSpec, Schema, TableDescriptor, TablePath}; +use fluss::row::{GenericRow, InternalRow}; + +#[tokio::main] +#[allow(dead_code)] +pub async fn main() -> Result<()> { + let mut config = Config::parse(); + config.bootstrap_server = Some("127.0.0.1:9123".to_string()); + + let conn = FlussConnection::new(config).await?; + + let table_descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("id", DataTypes::int()) + .column("region", DataTypes::string()) + .column("zone", DataTypes::bigint()) + .column("score", DataTypes::bigint()) + .primary_key(vec!["id".to_string(), "region".to_string(), "zone".to_string()]) + .build()?, + ) + .partitioned_by(Arc::from(["region".to_string(), "zone".to_string()])) + .build()?; + + let table_path = TablePath::new("fluss".to_owned(), "partitioned_kv_example".to_owned()); + + let mut admin = conn.get_admin().await?; + admin + .create_table(&table_path, &table_descriptor, true) + .await?; + println!( + "Created KV Table:\n {}\n", + admin.get_table(&table_path).await? + ); + + create_partition(&table_path, &mut admin, "APAC", 1).await; + create_partition(&table_path, &mut admin, "EMEA", 2).await; + create_partition(&table_path, &mut admin, "US", 3).await; + + let table = conn.get_table(&table_path).await?; + let table_upsert = table.new_upsert()?; + let mut upsert_writer = table_upsert.create_writer()?; + + println!("\n=== Upserting ==="); + for (id, region, zone, score) in [(1001, "APAC", 1i64, 1234i64), (1002, "EMEA", 2, 2234), (1003, "US", 3, 3234)] { + let mut row = GenericRow::new(4); + row.set_field(0, id); + row.set_field(1, region); + row.set_field(2, zone); + row.set_field(3, score); + upsert_writer.upsert(&row).await?; + println!("Upserted: {row:?}"); + } + + println!("\n=== Looking up ==="); + let mut lookuper = table.new_lookup()?.create_lookuper()?; + + for (id, region, zone) in [(1, "APAC", 1i64), (2, "EMEA", 2), (3, "US", 3)] { + let result = lookuper.lookup(&make_key(id, region, zone)).await.expect("lookup"); + let row = result.get_single_row()?.unwrap(); + println!( + "Found id={id}: region={}, zone={}, score={}", + row.get_string(1), + row.get_long(2), + row.get_long(3) + ); + } + + println!("\n=== Updating ==="); + let mut row = GenericRow::new(4); + row.set_field(0, 1); + row.set_field(1, "APAC"); + row.set_field(2, 1i64); + row.set_field(3, 4321i64); + upsert_writer.upsert(&row).await?; + println!("Updated: {row:?}"); + + let result = lookuper.lookup(&make_key(1, "APAC", 1)).await?; + let row = result.get_single_row()?.unwrap(); + println!( + "Verified update: region={}, zone={}", + row.get_string(1), + row.get_long(2) + ); + + println!("\n=== Deleting ==="); + let mut row = GenericRow::new(4); + row.set_field(0, 2); + row.set_field(1, "EMEA"); + row.set_field(2, 2i64); + upsert_writer.delete(&row).await?; + println!("Deleted: {row:?}"); + + let result = lookuper.lookup(&make_key(2, "EMEA", 2)).await?; Review Comment: The deletion operation uses ID 2 instead of the actual upserted ID 1002. This will attempt to delete a non-existent record. The ID should be 1002 to match the upserted data. ```suggestion row.set_field(0, 1002); row.set_field(1, "EMEA"); row.set_field(2, 2i64); upsert_writer.delete(&row).await?; println!("Deleted: {row:?}"); let result = lookuper.lookup(&make_key(1002, "EMEA", 2)).await?; ``` ########## crates/fluss/src/client/table/upsert.rs: ########## @@ -311,6 +321,15 @@ impl<RE: RowEncoder> UpsertWriterImpl<RE> { } self.row_encoder.finish_row() } + + fn get_physical_path<R: InternalRow>(&self, row: &R) -> PhysicalTablePath { + if let Some(partition_getter) = &self.partition_field_getter { + let partition = partition_getter.get_partition(row); + PhysicalTablePath::of_partitioned(Arc::clone(&self.table_path), partition.ok()) Review Comment: The error from partition extraction is being silently swallowed with `.ok()`. If `get_partition()` fails (e.g., due to a null partition value), this will create a PhysicalTablePath with `partition_name: None` for a partitioned table, which is incorrect and will lead to routing the data to the wrong location. The error should be propagated instead of being ignored. Consider changing the return type to `Result<PhysicalTablePath>` and propagating errors to the caller. ```suggestion match partition_getter.get_partition(row) { Ok(partition) => { PhysicalTablePath::of_partitioned(Arc::clone(&self.table_path), Some(partition)) } Err(e) => { panic!("Failed to extract partition for physical table path: {e}"); } } ``` ########## crates/examples/src/example_partitioned_kv_table.rs: ########## @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::sync::Arc; +use clap::Parser; +use fluss::client::{FlussAdmin, FlussConnection, UpsertWriter}; +use fluss::config::Config; +use fluss::error::Result; +use fluss::metadata::{DataTypes, PartitionSpec, Schema, TableDescriptor, TablePath}; +use fluss::row::{GenericRow, InternalRow}; + +#[tokio::main] +#[allow(dead_code)] +pub async fn main() -> Result<()> { + let mut config = Config::parse(); + config.bootstrap_server = Some("127.0.0.1:9123".to_string()); + + let conn = FlussConnection::new(config).await?; + + let table_descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("id", DataTypes::int()) + .column("region", DataTypes::string()) + .column("zone", DataTypes::bigint()) + .column("score", DataTypes::bigint()) + .primary_key(vec!["id".to_string(), "region".to_string(), "zone".to_string()]) + .build()?, + ) + .partitioned_by(Arc::from(["region".to_string(), "zone".to_string()])) + .build()?; + + let table_path = TablePath::new("fluss".to_owned(), "partitioned_kv_example".to_owned()); + + let mut admin = conn.get_admin().await?; + admin + .create_table(&table_path, &table_descriptor, true) + .await?; + println!( + "Created KV Table:\n {}\n", + admin.get_table(&table_path).await? + ); + + create_partition(&table_path, &mut admin, "APAC", 1).await; + create_partition(&table_path, &mut admin, "EMEA", 2).await; + create_partition(&table_path, &mut admin, "US", 3).await; + + let table = conn.get_table(&table_path).await?; + let table_upsert = table.new_upsert()?; + let mut upsert_writer = table_upsert.create_writer()?; + + println!("\n=== Upserting ==="); + for (id, region, zone, score) in [(1001, "APAC", 1i64, 1234i64), (1002, "EMEA", 2, 2234), (1003, "US", 3, 3234)] { + let mut row = GenericRow::new(4); + row.set_field(0, id); + row.set_field(1, region); + row.set_field(2, zone); + row.set_field(3, score); + upsert_writer.upsert(&row).await?; + println!("Upserted: {row:?}"); + } + + println!("\n=== Looking up ==="); + let mut lookuper = table.new_lookup()?.create_lookuper()?; + + for (id, region, zone) in [(1, "APAC", 1i64), (2, "EMEA", 2), (3, "US", 3)] { + let result = lookuper.lookup(&make_key(id, region, zone)).await.expect("lookup"); + let row = result.get_single_row()?.unwrap(); + println!( + "Found id={id}: region={}, zone={}, score={}", + row.get_string(1), + row.get_long(2), + row.get_long(3) + ); + } + + println!("\n=== Updating ==="); + let mut row = GenericRow::new(4); + row.set_field(0, 1); + row.set_field(1, "APAC"); + row.set_field(2, 1i64); + row.set_field(3, 4321i64); + upsert_writer.upsert(&row).await?; + println!("Updated: {row:?}"); + + let result = lookuper.lookup(&make_key(1, "APAC", 1)).await?; + let row = result.get_single_row()?.unwrap(); + println!( + "Verified update: region={}, zone={}", + row.get_string(1), + row.get_long(2) + ); + + println!("\n=== Deleting ==="); + let mut row = GenericRow::new(4); + row.set_field(0, 2); + row.set_field(1, "EMEA"); + row.set_field(2, 2i64); + upsert_writer.delete(&row).await?; + println!("Deleted: {row:?}"); + + let result = lookuper.lookup(&make_key(2, "EMEA", 2)).await?; + if result.get_single_row()?.is_none() { + println!("Verified deletion"); + } + + Ok(()) +} + +async fn create_partition(table_path: &TablePath, admin: &mut FlussAdmin, region: &str, zone: i64) { + let mut partition_values = HashMap::new(); + partition_values.insert("region".to_string(), region.to_string()); + partition_values.insert("zone".to_string(), zone.to_string()); + let partition_spec = PartitionSpec::new(partition_values); + + admin + .create_partition(&table_path, &partition_spec, false) + .await.unwrap_or_else(|a| ()); Review Comment: This error handler silently ignores all errors from partition creation, including potentially critical errors. Consider either propagating the error or at least logging it for debugging purposes rather than silently discarding it. ```suggestion if let Err(err) = admin .create_partition(&table_path, &partition_spec, false) .await { eprintln!( "Failed to create partition for region '{}' and zone {}: {}", region, zone, err ); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
