Copilot commented on code in PR #159:
URL: https://github.com/apache/fluss-rust/pull/159#discussion_r2690007979


##########
crates/fluss/src/proto/fluss_api.proto:
##########
@@ -317,4 +317,32 @@ message GetFileSystemSecurityTokenResponse {
   required bytes token = 2;
   optional int64 expiration_time = 3;
   repeated PbKeyValue addition_info = 4;
+}
+
+// lookup request and response
+message LookupRequest {
+  required int64 table_id = 1;
+  repeated PbLookupReqForBucket buckets_req = 2;
+}
+
+message LookupResponse {
+  repeated PbLookupRespForBucket buckets_resp = 1;
+}
+
+message PbLookupReqForBucket {
+  optional int64 partition_id = 1;
+  required int32 bucket_id = 2;
+  repeated bytes key = 3;
+}
+
+message PbLookupRespForBucket {
+  optional int64 partition_id = 1;
+  required int32 bucket_id = 2;
+  optional int32 error_code = 3;
+  optional string error_message = 4;
+  repeated PbValue values = 5;
+}
+
+message PbValue {
+  optional bytes values = 1;

Review Comment:
   The field name 'values' (plural) is misleading since PbValue represents a 
single value. Consider renaming to 'value' (singular) for clarity.
   ```suggestion
     optional bytes value = 1;
   ```



##########
crates/fluss/src/client/table/mod.rs:
##########
@@ -85,6 +86,88 @@ impl<'a> FlussTable<'a> {
     pub fn has_primary_key(&self) -> bool {
         self.has_primary_key
     }
+
+    /// Lookup values by primary key in a key-value table.
+    ///
+    /// This method performs a direct lookup to retrieve the value associated 
with the given key
+    /// in the specified bucket. The table must have a primary key (be a 
primary key table).
+    ///
+    /// # Arguments
+    /// * `bucket_id` - The bucket ID to look up the key in
+    /// * `key` - The encoded primary key bytes to look up
+    ///
+    /// # Returns
+    /// * `Ok(Some(Vec<u8>))` - The value bytes if the key exists
+    /// * `Ok(None)` - If the key does not exist
+    /// * `Err(Error)` - If the lookup fails or the table doesn't have a 
primary key
+    ///
+    /// # Example
+    /// ```ignore
+    /// let table = conn.get_table(&table_path).await?;
+    /// let key = /* encoded key bytes */;
+    /// if let Some(value) = table.lookup(0, key).await? {
+    ///     println!("Found value: {:?}", value);
+    /// }
+    /// ```
+    pub async fn lookup(&self, bucket_id: i32, key: Vec<u8>) -> 
Result<Option<Vec<u8>>> {
+        if !self.has_primary_key {
+            return Err(Error::UnsupportedOperation {
+                message: "Lookup is only supported for primary key 
tables".to_string(),
+            });
+        }
+
+        let table_id = self.table_info.get_table_id();
+        let table_bucket = TableBucket::new(table_id, bucket_id);
+
+        // Find the leader for this bucket
+        let cluster = self.metadata.get_cluster();
+        let leader =
+            cluster
+                .leader_for(&table_bucket)
+                .ok_or_else(|| Error::LeaderNotAvailable {
+                    message: format!("No leader found for table bucket: 
{table_bucket}"),
+                })?;
+
+        // Get connection to the tablet server
+        let tablet_server =
+            cluster
+                .get_tablet_server(leader.id())
+                .ok_or_else(|| Error::LeaderNotAvailable {
+                    message: format!(
+                        "Tablet server {} is not found in metadata cache",
+                        leader.id()
+                    ),
+                })?;
+
+        let connections = self.conn.get_connections();
+        let connection = connections.get_connection(tablet_server).await?;
+
+        // Send lookup request
+        let request = LookupRequest::new(table_id, None, bucket_id, vec![key]);
+        let response = connection.request(request).await?;
+
+        // Extract the value from response
+        if let Some(bucket_resp) = response.buckets_resp.into_iter().next() {
+            // Check for errors
+            if let Some(error_code) = bucket_resp.error_code {
+                if error_code != 0 {
+                    return Err(Error::FlussAPIError {
+                        api_error: ApiError {
+                            code: error_code,
+                            message: 
bucket_resp.error_message.unwrap_or_default(),
+                        },
+                    });
+                }
+            }
+
+            // Get the first value (we only requested one key)
+            if let Some(pb_value) = bucket_resp.values.into_iter().next() {
+                return Ok(pb_value.values);

Review Comment:
   The lookup functionality lacks test coverage. Consider adding tests that 
verify: successful lookup returns correct value, lookup of non-existent key 
returns None, and lookup on non-primary-key table returns appropriate error.



##########
crates/fluss/src/client/table/mod.rs:
##########
@@ -85,6 +86,88 @@ impl<'a> FlussTable<'a> {
     pub fn has_primary_key(&self) -> bool {
         self.has_primary_key
     }
+
+    /// Lookup values by primary key in a key-value table.
+    ///
+    /// This method performs a direct lookup to retrieve the value associated 
with the given key
+    /// in the specified bucket. The table must have a primary key (be a 
primary key table).
+    ///
+    /// # Arguments
+    /// * `bucket_id` - The bucket ID to look up the key in
+    /// * `key` - The encoded primary key bytes to look up
+    ///
+    /// # Returns
+    /// * `Ok(Some(Vec<u8>))` - The value bytes if the key exists
+    /// * `Ok(None)` - If the key does not exist
+    /// * `Err(Error)` - If the lookup fails or the table doesn't have a 
primary key
+    ///
+    /// # Example
+    /// ```ignore
+    /// let table = conn.get_table(&table_path).await?;
+    /// let key = /* encoded key bytes */;

Review Comment:
   The placeholder comment '/* encoded key bytes */' should provide a concrete 
example, such as 'vec![1, 2, 3]' to make the documentation more helpful.
   ```suggestion
       /// let key = vec![1, 2, 3]; // encoded primary key bytes
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to