Copilot commented on code in PR #100:
URL: https://github.com/apache/fluss-rust/pull/100#discussion_r2637103368


##########
bindings/cpp/src/lib.rs:
##########
@@ -375,6 +428,56 @@ impl Admin {
             },
         }
     }
+
+    fn list_offsets(
+        &self,
+        table_path: &ffi::FfiTablePath,
+        bucket_ids: Vec<i32>,
+        offset_query: &ffi::FfiOffsetQuery,
+    ) -> ffi::FfiListOffsetsResult {
+        use fcore::rpc::message::OffsetSpec;
+
+        let path = fcore::metadata::TablePath::new(
+            table_path.database_name.clone(),
+            table_path.table_name.clone(),
+        );
+
+        let offset_spec = match offset_query.offset_type {
+            0 => OffsetSpec::Earliest,
+            1 => OffsetSpec::Latest,
+            2 => OffsetSpec::Timestamp(offset_query.timestamp),
+            _ => {
+                return ffi::FfiListOffsetsResult {
+                    result: err_result(1, format!("Invalid offset_type: {}", 
offset_query.offset_type)),
+                    bucket_offsets: vec![],
+                };
+            }
+        };
+

Review Comment:
   Consider adding validation for empty bucket_ids vector. When bucket_ids is 
empty, list_offsets will make a call that might not be meaningful or could be 
inefficient. Consider returning early with an appropriate error or empty result.
   ```suggestion
   
           if bucket_ids.is_empty() {
               return ffi::FfiListOffsetsResult {
                   result: ok_result(),
                   bucket_offsets: vec![],
               };
           }
   ```



##########
bindings/cpp/include/fluss.hpp:
##########
@@ -63,6 +63,24 @@ enum class DatumType {
     Bytes = 7,
 };
 

Review Comment:
   The constants EARLIEST_OFFSET and LATEST_OFFSET are defined but not used in 
the codebase shown. Consider documenting their intended use or removing them if 
they are not needed. These constants could be useful for users of the API to 
pass to Subscribe methods, but their purpose should be documented.
   ```suggestion
   
   // Special offset sentinel values exposed as part of the public API.
   // These can be used by callers that work directly with numeric offsets
   // (for example, when subscribing to a stream) to request:
   //   - EARLIEST_OFFSET: start from the beginning of the log
   //   - LATEST_OFFSET:   start from the most recent (tail) offset.
   // For higher-level C++ APIs, prefer using OffsetQuery::Earliest() /
   // OffsetQuery::Latest() instead of these raw constants.
   ```



##########
bindings/cpp/src/lib.rs:
##########
@@ -375,6 +428,56 @@ impl Admin {
             },
         }
     }
+
+    fn list_offsets(
+        &self,
+        table_path: &ffi::FfiTablePath,
+        bucket_ids: Vec<i32>,
+        offset_query: &ffi::FfiOffsetQuery,
+    ) -> ffi::FfiListOffsetsResult {
+        use fcore::rpc::message::OffsetSpec;
+
+        let path = fcore::metadata::TablePath::new(
+            table_path.database_name.clone(),
+            table_path.table_name.clone(),
+        );
+
+        let offset_spec = match offset_query.offset_type {
+            0 => OffsetSpec::Earliest,
+            1 => OffsetSpec::Latest,
+            2 => OffsetSpec::Timestamp(offset_query.timestamp),
+            _ => {
+                return ffi::FfiListOffsetsResult {
+                    result: err_result(1, format!("Invalid offset_type: {}", 
offset_query.offset_type)),
+                    bucket_offsets: vec![],
+                };
+            }
+        };

Review Comment:
   The offset_type matching logic uses hardcoded integer values (0, 1, 2) 
without documentation or named constants. Consider defining these as constants 
in the FFI module or adding documentation comments to clarify the mapping 
between values and their meanings (Earliest=0, Latest=1, Timestamp=2).



##########
bindings/cpp/src/lib.rs:
##########
@@ -511,6 +614,23 @@ impl LogScanner {
         }
     }
 
+    fn subscribe_batch(&self, subscriptions: Vec<ffi::FfiBucketSubscription>) 
-> ffi::FfiResult {
+        use std::collections::HashMap;
+        let mut bucket_offsets = HashMap::new();
+        for sub in subscriptions {
+            bucket_offsets.insert(sub.bucket_id, sub.offset);
+        }
+        
+        let result = RUNTIME.block_on(async { 
+            self.inner.subscribe_batch(bucket_offsets).await 

Review Comment:
   Remove trailing whitespace from this line for consistency with code style.
   ```suggestion
   
           let result = RUNTIME.block_on(async {
               self.inner.subscribe_batch(bucket_offsets).await
   ```



##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -185,6 +185,22 @@ impl LogScanner {
         Ok(())
     }
 
+    pub async fn subscribe_batch(&self, bucket_offsets: HashMap<i32, i64>) -> 
Result<()> {
+        self.metadata
+            .check_and_update_table_metadata(from_ref(&self.table_path))
+            .await?;
+        
+        let mut scan_bucket_offsets = HashMap::new();
+        for (bucket_id, offset) in bucket_offsets {
+            let table_bucket = TableBucket::new(self.table_id, bucket_id);
+            scan_bucket_offsets.insert(table_bucket, offset);
+        }
+        

Review Comment:
   Remove trailing whitespace from this line for consistency with code style.
   ```suggestion
   
           let mut scan_bucket_offsets = HashMap::new();
           for (bucket_id, offset) in bucket_offsets {
               let table_bucket = TableBucket::new(self.table_id, bucket_id);
               scan_bucket_offsets.insert(table_bucket, offset);
           }
   ```



##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -185,6 +185,22 @@ impl LogScanner {
         Ok(())
     }
 
+    pub async fn subscribe_batch(&self, bucket_offsets: HashMap<i32, i64>) -> 
Result<()> {
+        self.metadata
+            .check_and_update_table_metadata(from_ref(&self.table_path))
+            .await?;
+        
+        let mut scan_bucket_offsets = HashMap::new();
+        for (bucket_id, offset) in bucket_offsets {
+            let table_bucket = TableBucket::new(self.table_id, bucket_id);
+            scan_bucket_offsets.insert(table_bucket, offset);
+        }
+        

Review Comment:
   Remove trailing whitespace from this line for consistency with code style.
   ```suggestion
   
           let mut scan_bucket_offsets = HashMap::new();
           for (bucket_id, offset) in bucket_offsets {
               let table_bucket = TableBucket::new(self.table_id, bucket_id);
               scan_bucket_offsets.insert(table_bucket, offset);
           }
   ```



##########
bindings/cpp/src/admin.cpp:
##########
@@ -98,4 +108,36 @@ Result Admin::GetLatestLakeSnapshot(const TablePath& 
table_path, LakeSnapshot& o
     return result;
 }
 
+Result Admin::ListOffsets(const TablePath& table_path,
+                          const std::vector<int32_t>& bucket_ids,
+                          const OffsetQuery& offset_query,
+                          std::unordered_map<int32_t, int64_t>& out) {
+    if (!Available()) {
+        return utils::make_error(1, "Admin not available");
+    }
+
+    auto ffi_path = utils::to_ffi_table_path(table_path);
+    

Review Comment:
   Remove trailing whitespace from this line for consistency with code style.



##########
bindings/cpp/src/admin.cpp:
##########
@@ -98,4 +108,36 @@ Result Admin::GetLatestLakeSnapshot(const TablePath& 
table_path, LakeSnapshot& o
     return result;
 }
 
+Result Admin::ListOffsets(const TablePath& table_path,
+                          const std::vector<int32_t>& bucket_ids,
+                          const OffsetQuery& offset_query,
+                          std::unordered_map<int32_t, int64_t>& out) {
+    if (!Available()) {
+        return utils::make_error(1, "Admin not available");
+    }
+
+    auto ffi_path = utils::to_ffi_table_path(table_path);
+    
+    rust::Vec<int32_t> rust_bucket_ids;
+    for (int32_t id : bucket_ids) {
+        rust_bucket_ids.push_back(id);
+    }
+
+    ffi::FfiOffsetQuery ffi_query;
+    ffi_query.offset_type = static_cast<int32_t>(offset_query.spec);
+    ffi_query.timestamp = offset_query.timestamp;
+
+    auto ffi_result = admin_->list_offsets(ffi_path, 
std::move(rust_bucket_ids), ffi_query);
+    

Review Comment:
   Remove trailing whitespace from this line for consistency with code style.



##########
bindings/cpp/src/lib.rs:
##########
@@ -511,6 +614,23 @@ impl LogScanner {
         }
     }
 
+    fn subscribe_batch(&self, subscriptions: Vec<ffi::FfiBucketSubscription>) 
-> ffi::FfiResult {
+        use std::collections::HashMap;
+        let mut bucket_offsets = HashMap::new();
+        for sub in subscriptions {
+            bucket_offsets.insert(sub.bucket_id, sub.offset);
+        }
+        
+        let result = RUNTIME.block_on(async { 
+            self.inner.subscribe_batch(bucket_offsets).await 

Review Comment:
   Remove trailing whitespace from this line for consistency with code style.
   ```suggestion
   
           let result = RUNTIME.block_on(async {
               self.inner.subscribe_batch(bucket_offsets).await
   ```



##########
bindings/cpp/src/lib.rs:
##########
@@ -511,6 +614,23 @@ impl LogScanner {
         }
     }
 
+    fn subscribe_batch(&self, subscriptions: Vec<ffi::FfiBucketSubscription>) 
-> ffi::FfiResult {
+        use std::collections::HashMap;
+        let mut bucket_offsets = HashMap::new();
+        for sub in subscriptions {
+            bucket_offsets.insert(sub.bucket_id, sub.offset);
+        }
+        
+        let result = RUNTIME.block_on(async { 
+            self.inner.subscribe_batch(bucket_offsets).await 

Review Comment:
   Consider adding validation for empty subscriptions vector. When 
subscriptions is empty, subscribe_batch will make a call that might not be 
meaningful. Consider returning early with an appropriate error or success 
result.
   ```suggestion
           if subscriptions.is_empty() {
               return ok_result();
           }
   
           use std::collections::HashMap;
           let mut bucket_offsets = HashMap::new();
           for sub in subscriptions {
               bucket_offsets.insert(sub.bucket_id, sub.offset);
           }
   
           let result = RUNTIME.block_on(async {
               self.inner.subscribe_batch(bucket_offsets).await
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to