Copilot commented on code in PR #220:
URL: https://github.com/apache/fluss-rust/pull/220#discussion_r2737396203


##########
crates/examples/src/example_partitioned_kv_table.rs:
##########
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use clap::Parser;
+use fluss::client::{FlussAdmin, FlussConnection, UpsertWriter};
+use fluss::config::Config;
+use fluss::error::Result;
+use fluss::metadata::{DataTypes, PartitionSpec, Schema, TableDescriptor, 
TablePath};
+use fluss::row::{GenericRow, InternalRow};
+
+#[tokio::main]
+#[allow(dead_code)]
+pub async fn main() -> Result<()> {
+    let mut config = Config::parse();
+    config.bootstrap_server = Some("127.0.0.1:9123".to_string());
+
+    let conn = FlussConnection::new(config).await?;
+
+    let table_descriptor = TableDescriptor::builder()
+        .schema(
+            Schema::builder()
+                .column("id", DataTypes::int())
+                .column("region", DataTypes::string())
+                .column("zone", DataTypes::bigint())
+                .column("score", DataTypes::bigint())
+                .primary_key(vec!["id".to_string(), "region".to_string(), 
"zone".to_string()])
+                .build()?,
+        )
+        .partitioned_by(Arc::from(["region".to_string(), "zone".to_string()]))
+        .build()?;
+
+    let table_path = TablePath::new("fluss".to_owned(), 
"partitioned_kv_example".to_owned());
+
+    let mut admin = conn.get_admin().await?;
+    admin
+        .create_table(&table_path, &table_descriptor, true)
+        .await?;
+    println!(
+        "Created KV Table:\n {}\n",
+        admin.get_table(&table_path).await?
+    );
+
+    create_partition(&table_path, &mut admin, "APAC", 1).await;
+    create_partition(&table_path, &mut admin, "EMEA", 2).await;
+    create_partition(&table_path, &mut admin, "US", 3).await;
+
+    let table = conn.get_table(&table_path).await?;
+    let table_upsert = table.new_upsert()?;
+    let mut upsert_writer = table_upsert.create_writer()?;
+
+    println!("\n=== Upserting ===");
+    for (id, region, zone, score) in [(1001, "APAC", 1i64, 1234i64), (1002, 
"EMEA", 2, 2234), (1003, "US", 3, 3234)] {
+        let mut row = GenericRow::new(4);
+        row.set_field(0, id);
+        row.set_field(1, region);
+        row.set_field(2, zone);
+        row.set_field(3, score);
+        upsert_writer.upsert(&row).await?;
+        println!("Upserted: {row:?}");
+    }
+
+    println!("\n=== Looking up ===");
+    let mut lookuper = table.new_lookup()?.create_lookuper()?;
+
+    for (id, region, zone) in [(1, "APAC", 1i64), (2, "EMEA", 2), (3, "US", 
3)] {

Review Comment:
   The lookup is attempting to find records with IDs 1, 2, and 3, but the 
upserted records have IDs 1001, 1002, and 1003. This will cause the lookup to 
fail as these records don't exist. The IDs in the lookup loop should match the 
IDs used in the upsert operation.



##########
crates/examples/src/example_partitioned_kv_table.rs:
##########
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use clap::Parser;
+use fluss::client::{FlussAdmin, FlussConnection, UpsertWriter};
+use fluss::config::Config;
+use fluss::error::Result;
+use fluss::metadata::{DataTypes, PartitionSpec, Schema, TableDescriptor, 
TablePath};
+use fluss::row::{GenericRow, InternalRow};
+
+#[tokio::main]
+#[allow(dead_code)]
+pub async fn main() -> Result<()> {
+    let mut config = Config::parse();
+    config.bootstrap_server = Some("127.0.0.1:9123".to_string());
+
+    let conn = FlussConnection::new(config).await?;
+
+    let table_descriptor = TableDescriptor::builder()
+        .schema(
+            Schema::builder()
+                .column("id", DataTypes::int())
+                .column("region", DataTypes::string())
+                .column("zone", DataTypes::bigint())
+                .column("score", DataTypes::bigint())
+                .primary_key(vec!["id".to_string(), "region".to_string(), 
"zone".to_string()])
+                .build()?,
+        )
+        .partitioned_by(Arc::from(["region".to_string(), "zone".to_string()]))
+        .build()?;
+
+    let table_path = TablePath::new("fluss".to_owned(), 
"partitioned_kv_example".to_owned());
+
+    let mut admin = conn.get_admin().await?;
+    admin
+        .create_table(&table_path, &table_descriptor, true)
+        .await?;
+    println!(
+        "Created KV Table:\n {}\n",
+        admin.get_table(&table_path).await?
+    );
+
+    create_partition(&table_path, &mut admin, "APAC", 1).await;
+    create_partition(&table_path, &mut admin, "EMEA", 2).await;
+    create_partition(&table_path, &mut admin, "US", 3).await;
+
+    let table = conn.get_table(&table_path).await?;
+    let table_upsert = table.new_upsert()?;
+    let mut upsert_writer = table_upsert.create_writer()?;
+
+    println!("\n=== Upserting ===");
+    for (id, region, zone, score) in [(1001, "APAC", 1i64, 1234i64), (1002, 
"EMEA", 2, 2234), (1003, "US", 3, 3234)] {
+        let mut row = GenericRow::new(4);
+        row.set_field(0, id);
+        row.set_field(1, region);
+        row.set_field(2, zone);
+        row.set_field(3, score);
+        upsert_writer.upsert(&row).await?;
+        println!("Upserted: {row:?}");
+    }
+
+    println!("\n=== Looking up ===");
+    let mut lookuper = table.new_lookup()?.create_lookuper()?;
+
+    for (id, region, zone) in [(1, "APAC", 1i64), (2, "EMEA", 2), (3, "US", 
3)] {
+        let result = lookuper.lookup(&make_key(id, region, 
zone)).await.expect("lookup");
+        let row = result.get_single_row()?.unwrap();
+        println!(
+            "Found id={id}: region={}, zone={}, score={}",
+            row.get_string(1),
+            row.get_long(2),
+            row.get_long(3)
+        );
+    }
+
+    println!("\n=== Updating ===");
+    let mut row = GenericRow::new(4);
+    row.set_field(0, 1);

Review Comment:
   Similar to the lookup loop, the update operation uses ID 1 instead of the 
actual upserted ID 1001. This will attempt to update a non-existent record. The 
ID should be 1001 to match the upserted data.



##########
crates/examples/src/example_partitioned_kv_table.rs:
##########
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use clap::Parser;
+use fluss::client::{FlussAdmin, FlussConnection, UpsertWriter};
+use fluss::config::Config;
+use fluss::error::Result;
+use fluss::metadata::{DataTypes, PartitionSpec, Schema, TableDescriptor, 
TablePath};
+use fluss::row::{GenericRow, InternalRow};
+
+#[tokio::main]
+#[allow(dead_code)]
+pub async fn main() -> Result<()> {
+    let mut config = Config::parse();
+    config.bootstrap_server = Some("127.0.0.1:9123".to_string());
+
+    let conn = FlussConnection::new(config).await?;
+
+    let table_descriptor = TableDescriptor::builder()
+        .schema(
+            Schema::builder()
+                .column("id", DataTypes::int())
+                .column("region", DataTypes::string())
+                .column("zone", DataTypes::bigint())
+                .column("score", DataTypes::bigint())
+                .primary_key(vec!["id".to_string(), "region".to_string(), 
"zone".to_string()])
+                .build()?,
+        )
+        .partitioned_by(Arc::from(["region".to_string(), "zone".to_string()]))
+        .build()?;
+
+    let table_path = TablePath::new("fluss".to_owned(), 
"partitioned_kv_example".to_owned());
+
+    let mut admin = conn.get_admin().await?;
+    admin
+        .create_table(&table_path, &table_descriptor, true)
+        .await?;
+    println!(
+        "Created KV Table:\n {}\n",
+        admin.get_table(&table_path).await?
+    );
+
+    create_partition(&table_path, &mut admin, "APAC", 1).await;
+    create_partition(&table_path, &mut admin, "EMEA", 2).await;
+    create_partition(&table_path, &mut admin, "US", 3).await;
+
+    let table = conn.get_table(&table_path).await?;
+    let table_upsert = table.new_upsert()?;
+    let mut upsert_writer = table_upsert.create_writer()?;
+
+    println!("\n=== Upserting ===");
+    for (id, region, zone, score) in [(1001, "APAC", 1i64, 1234i64), (1002, 
"EMEA", 2, 2234), (1003, "US", 3, 3234)] {
+        let mut row = GenericRow::new(4);
+        row.set_field(0, id);
+        row.set_field(1, region);
+        row.set_field(2, zone);
+        row.set_field(3, score);
+        upsert_writer.upsert(&row).await?;
+        println!("Upserted: {row:?}");
+    }
+
+    println!("\n=== Looking up ===");
+    let mut lookuper = table.new_lookup()?.create_lookuper()?;
+
+    for (id, region, zone) in [(1, "APAC", 1i64), (2, "EMEA", 2), (3, "US", 
3)] {
+        let result = lookuper.lookup(&make_key(id, region, 
zone)).await.expect("lookup");
+        let row = result.get_single_row()?.unwrap();
+        println!(
+            "Found id={id}: region={}, zone={}, score={}",
+            row.get_string(1),
+            row.get_long(2),
+            row.get_long(3)
+        );
+    }
+
+    println!("\n=== Updating ===");
+    let mut row = GenericRow::new(4);
+    row.set_field(0, 1);
+    row.set_field(1, "APAC");
+    row.set_field(2, 1i64);
+    row.set_field(3, 4321i64);
+    upsert_writer.upsert(&row).await?;
+    println!("Updated: {row:?}");
+
+    let result = lookuper.lookup(&make_key(1, "APAC", 1)).await?;
+    let row = result.get_single_row()?.unwrap();
+    println!(
+        "Verified update: region={}, zone={}",
+        row.get_string(1),
+        row.get_long(2)
+    );
+
+    println!("\n=== Deleting ===");
+    let mut row = GenericRow::new(4);
+    row.set_field(0, 2);
+    row.set_field(1, "EMEA");
+    row.set_field(2, 2i64);
+    upsert_writer.delete(&row).await?;
+    println!("Deleted: {row:?}");
+
+    let result = lookuper.lookup(&make_key(2, "EMEA", 2)).await?;

Review Comment:
   The deletion operation uses ID 2 instead of the actual upserted ID 1002. 
This will attempt to delete a non-existent record. The ID should be 1002 to 
match the upserted data.
   ```suggestion
       row.set_field(0, 1002);
       row.set_field(1, "EMEA");
       row.set_field(2, 2i64);
       upsert_writer.delete(&row).await?;
       println!("Deleted: {row:?}");
   
       let result = lookuper.lookup(&make_key(1002, "EMEA", 2)).await?;
   ```



##########
crates/fluss/src/client/table/upsert.rs:
##########
@@ -311,6 +321,15 @@ impl<RE: RowEncoder> UpsertWriterImpl<RE> {
         }
         self.row_encoder.finish_row()
     }
+
+    fn get_physical_path<R: InternalRow>(&self, row: &R) -> PhysicalTablePath {
+        if let Some(partition_getter) = &self.partition_field_getter {
+            let partition = partition_getter.get_partition(row);
+            PhysicalTablePath::of_partitioned(Arc::clone(&self.table_path), 
partition.ok())

Review Comment:
   The error from partition extraction is being silently swallowed with 
`.ok()`. If `get_partition()` fails (e.g., due to a null partition value), this 
will create a PhysicalTablePath with `partition_name: None` for a partitioned 
table, which is incorrect and will lead to routing the data to the wrong 
location. The error should be propagated instead of being ignored. Consider 
changing the return type to `Result<PhysicalTablePath>` and propagating errors 
to the caller.
   ```suggestion
               match partition_getter.get_partition(row) {
                   Ok(partition) => {
                       
PhysicalTablePath::of_partitioned(Arc::clone(&self.table_path), Some(partition))
                   }
                   Err(e) => {
                       panic!("Failed to extract partition for physical table 
path: {e}");
                   }
               }
   ```



##########
crates/examples/src/example_partitioned_kv_table.rs:
##########
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use clap::Parser;
+use fluss::client::{FlussAdmin, FlussConnection, UpsertWriter};
+use fluss::config::Config;
+use fluss::error::Result;
+use fluss::metadata::{DataTypes, PartitionSpec, Schema, TableDescriptor, 
TablePath};
+use fluss::row::{GenericRow, InternalRow};
+
+#[tokio::main]
+#[allow(dead_code)]
+pub async fn main() -> Result<()> {
+    let mut config = Config::parse();
+    config.bootstrap_server = Some("127.0.0.1:9123".to_string());
+
+    let conn = FlussConnection::new(config).await?;
+
+    let table_descriptor = TableDescriptor::builder()
+        .schema(
+            Schema::builder()
+                .column("id", DataTypes::int())
+                .column("region", DataTypes::string())
+                .column("zone", DataTypes::bigint())
+                .column("score", DataTypes::bigint())
+                .primary_key(vec!["id".to_string(), "region".to_string(), 
"zone".to_string()])
+                .build()?,
+        )
+        .partitioned_by(Arc::from(["region".to_string(), "zone".to_string()]))
+        .build()?;
+
+    let table_path = TablePath::new("fluss".to_owned(), 
"partitioned_kv_example".to_owned());
+
+    let mut admin = conn.get_admin().await?;
+    admin
+        .create_table(&table_path, &table_descriptor, true)
+        .await?;
+    println!(
+        "Created KV Table:\n {}\n",
+        admin.get_table(&table_path).await?
+    );
+
+    create_partition(&table_path, &mut admin, "APAC", 1).await;
+    create_partition(&table_path, &mut admin, "EMEA", 2).await;
+    create_partition(&table_path, &mut admin, "US", 3).await;
+
+    let table = conn.get_table(&table_path).await?;
+    let table_upsert = table.new_upsert()?;
+    let mut upsert_writer = table_upsert.create_writer()?;
+
+    println!("\n=== Upserting ===");
+    for (id, region, zone, score) in [(1001, "APAC", 1i64, 1234i64), (1002, 
"EMEA", 2, 2234), (1003, "US", 3, 3234)] {
+        let mut row = GenericRow::new(4);
+        row.set_field(0, id);
+        row.set_field(1, region);
+        row.set_field(2, zone);
+        row.set_field(3, score);
+        upsert_writer.upsert(&row).await?;
+        println!("Upserted: {row:?}");
+    }
+
+    println!("\n=== Looking up ===");
+    let mut lookuper = table.new_lookup()?.create_lookuper()?;
+
+    for (id, region, zone) in [(1, "APAC", 1i64), (2, "EMEA", 2), (3, "US", 
3)] {
+        let result = lookuper.lookup(&make_key(id, region, 
zone)).await.expect("lookup");
+        let row = result.get_single_row()?.unwrap();
+        println!(
+            "Found id={id}: region={}, zone={}, score={}",
+            row.get_string(1),
+            row.get_long(2),
+            row.get_long(3)
+        );
+    }
+
+    println!("\n=== Updating ===");
+    let mut row = GenericRow::new(4);
+    row.set_field(0, 1);
+    row.set_field(1, "APAC");
+    row.set_field(2, 1i64);
+    row.set_field(3, 4321i64);
+    upsert_writer.upsert(&row).await?;
+    println!("Updated: {row:?}");
+
+    let result = lookuper.lookup(&make_key(1, "APAC", 1)).await?;
+    let row = result.get_single_row()?.unwrap();
+    println!(
+        "Verified update: region={}, zone={}",
+        row.get_string(1),
+        row.get_long(2)
+    );
+
+    println!("\n=== Deleting ===");
+    let mut row = GenericRow::new(4);
+    row.set_field(0, 2);
+    row.set_field(1, "EMEA");
+    row.set_field(2, 2i64);
+    upsert_writer.delete(&row).await?;
+    println!("Deleted: {row:?}");
+
+    let result = lookuper.lookup(&make_key(2, "EMEA", 2)).await?;
+    if result.get_single_row()?.is_none() {
+        println!("Verified deletion");
+    }
+
+    Ok(())
+}
+
+async fn create_partition(table_path: &TablePath, admin: &mut FlussAdmin, 
region: &str, zone: i64) {
+    let mut partition_values = HashMap::new();
+    partition_values.insert("region".to_string(), region.to_string());
+    partition_values.insert("zone".to_string(), zone.to_string());
+    let partition_spec = PartitionSpec::new(partition_values);
+
+    admin
+        .create_partition(&table_path, &partition_spec, false)
+        .await.unwrap_or_else(|a| ());

Review Comment:
   This error handler silently ignores all errors from partition creation, 
including potentially critical errors. Consider either propagating the error or 
at least logging it for debugging purposes rather than silently discarding it.
   ```suggestion
       if let Err(err) = admin
           .create_partition(&table_path, &partition_spec, false)
           .await
       {
           eprintln!(
               "Failed to create partition for region '{}' and zone {}: {}",
               region, zone, err
           );
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to