Copilot commented on code in PR #100:
URL: https://github.com/apache/fluss-rust/pull/100#discussion_r2637103368
##########
bindings/cpp/src/lib.rs:
##########
@@ -375,6 +428,56 @@ impl Admin {
},
}
}
+
+ fn list_offsets(
+ &self,
+ table_path: &ffi::FfiTablePath,
+ bucket_ids: Vec<i32>,
+ offset_query: &ffi::FfiOffsetQuery,
+ ) -> ffi::FfiListOffsetsResult {
+ use fcore::rpc::message::OffsetSpec;
+
+ let path = fcore::metadata::TablePath::new(
+ table_path.database_name.clone(),
+ table_path.table_name.clone(),
+ );
+
+ let offset_spec = match offset_query.offset_type {
+ 0 => OffsetSpec::Earliest,
+ 1 => OffsetSpec::Latest,
+ 2 => OffsetSpec::Timestamp(offset_query.timestamp),
+ _ => {
+ return ffi::FfiListOffsetsResult {
+ result: err_result(1, format!("Invalid offset_type: {}",
offset_query.offset_type)),
+ bucket_offsets: vec![],
+ };
+ }
+ };
+
Review Comment:
Consider adding validation for empty bucket_ids vector. When bucket_ids is
empty, list_offsets will make a call that might not be meaningful or could be
inefficient. Consider returning early with an appropriate error or empty result.
```suggestion
if bucket_ids.is_empty() {
return ffi::FfiListOffsetsResult {
result: ok_result(),
bucket_offsets: vec![],
};
}
```
##########
bindings/cpp/include/fluss.hpp:
##########
@@ -63,6 +63,24 @@ enum class DatumType {
Bytes = 7,
};
Review Comment:
The constants EARLIEST_OFFSET and LATEST_OFFSET are defined but not used in
the codebase shown. Consider documenting their intended use or removing them if
they are not needed. These constants could be useful for users of the API to
pass to Subscribe methods, but their purpose should be documented.
```suggestion
// Special offset sentinel values exposed as part of the public API.
// These can be used by callers that work directly with numeric offsets
// (for example, when subscribing to a stream) to request:
// - EARLIEST_OFFSET: start from the beginning of the log
// - LATEST_OFFSET: start from the most recent (tail) offset.
// For higher-level C++ APIs, prefer using OffsetQuery::Earliest() /
// OffsetQuery::Latest() instead of these raw constants.
```
##########
bindings/cpp/src/lib.rs:
##########
@@ -375,6 +428,56 @@ impl Admin {
},
}
}
+
+ fn list_offsets(
+ &self,
+ table_path: &ffi::FfiTablePath,
+ bucket_ids: Vec<i32>,
+ offset_query: &ffi::FfiOffsetQuery,
+ ) -> ffi::FfiListOffsetsResult {
+ use fcore::rpc::message::OffsetSpec;
+
+ let path = fcore::metadata::TablePath::new(
+ table_path.database_name.clone(),
+ table_path.table_name.clone(),
+ );
+
+ let offset_spec = match offset_query.offset_type {
+ 0 => OffsetSpec::Earliest,
+ 1 => OffsetSpec::Latest,
+ 2 => OffsetSpec::Timestamp(offset_query.timestamp),
+ _ => {
+ return ffi::FfiListOffsetsResult {
+ result: err_result(1, format!("Invalid offset_type: {}",
offset_query.offset_type)),
+ bucket_offsets: vec![],
+ };
+ }
+ };
Review Comment:
The offset_type matching logic uses hardcoded integer values (0, 1, 2)
without documentation or named constants. Consider defining these as constants
in the FFI module or adding documentation comments to clarify the mapping
between values and their meanings (Earliest=0, Latest=1, Timestamp=2).
##########
bindings/cpp/src/lib.rs:
##########
@@ -511,6 +614,23 @@ impl LogScanner {
}
}
+ fn subscribe_batch(&self, subscriptions: Vec<ffi::FfiBucketSubscription>)
-> ffi::FfiResult {
+ use std::collections::HashMap;
+ let mut bucket_offsets = HashMap::new();
+ for sub in subscriptions {
+ bucket_offsets.insert(sub.bucket_id, sub.offset);
+ }
+
+ let result = RUNTIME.block_on(async {
+ self.inner.subscribe_batch(bucket_offsets).await
Review Comment:
Remove trailing whitespace from this line for consistency with code style.
```suggestion
let result = RUNTIME.block_on(async {
self.inner.subscribe_batch(bucket_offsets).await
```
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -185,6 +185,22 @@ impl LogScanner {
Ok(())
}
+ pub async fn subscribe_batch(&self, bucket_offsets: HashMap<i32, i64>) ->
Result<()> {
+ self.metadata
+ .check_and_update_table_metadata(from_ref(&self.table_path))
+ .await?;
+
+ let mut scan_bucket_offsets = HashMap::new();
+ for (bucket_id, offset) in bucket_offsets {
+ let table_bucket = TableBucket::new(self.table_id, bucket_id);
+ scan_bucket_offsets.insert(table_bucket, offset);
+ }
+
Review Comment:
Remove trailing whitespace from this line for consistency with code style.
```suggestion
let mut scan_bucket_offsets = HashMap::new();
for (bucket_id, offset) in bucket_offsets {
let table_bucket = TableBucket::new(self.table_id, bucket_id);
scan_bucket_offsets.insert(table_bucket, offset);
}
```
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -185,6 +185,22 @@ impl LogScanner {
Ok(())
}
+ pub async fn subscribe_batch(&self, bucket_offsets: HashMap<i32, i64>) ->
Result<()> {
+ self.metadata
+ .check_and_update_table_metadata(from_ref(&self.table_path))
+ .await?;
+
+ let mut scan_bucket_offsets = HashMap::new();
+ for (bucket_id, offset) in bucket_offsets {
+ let table_bucket = TableBucket::new(self.table_id, bucket_id);
+ scan_bucket_offsets.insert(table_bucket, offset);
+ }
+
Review Comment:
Remove trailing whitespace from this line for consistency with code style.
```suggestion
let mut scan_bucket_offsets = HashMap::new();
for (bucket_id, offset) in bucket_offsets {
let table_bucket = TableBucket::new(self.table_id, bucket_id);
scan_bucket_offsets.insert(table_bucket, offset);
}
```
##########
bindings/cpp/src/admin.cpp:
##########
@@ -98,4 +108,36 @@ Result Admin::GetLatestLakeSnapshot(const TablePath&
table_path, LakeSnapshot& o
return result;
}
+Result Admin::ListOffsets(const TablePath& table_path,
+ const std::vector<int32_t>& bucket_ids,
+ const OffsetQuery& offset_query,
+ std::unordered_map<int32_t, int64_t>& out) {
+ if (!Available()) {
+ return utils::make_error(1, "Admin not available");
+ }
+
+ auto ffi_path = utils::to_ffi_table_path(table_path);
+
Review Comment:
Remove trailing whitespace from this line for consistency with code style.
##########
bindings/cpp/src/admin.cpp:
##########
@@ -98,4 +108,36 @@ Result Admin::GetLatestLakeSnapshot(const TablePath&
table_path, LakeSnapshot& o
return result;
}
+Result Admin::ListOffsets(const TablePath& table_path,
+ const std::vector<int32_t>& bucket_ids,
+ const OffsetQuery& offset_query,
+ std::unordered_map<int32_t, int64_t>& out) {
+ if (!Available()) {
+ return utils::make_error(1, "Admin not available");
+ }
+
+ auto ffi_path = utils::to_ffi_table_path(table_path);
+
+ rust::Vec<int32_t> rust_bucket_ids;
+ for (int32_t id : bucket_ids) {
+ rust_bucket_ids.push_back(id);
+ }
+
+ ffi::FfiOffsetQuery ffi_query;
+ ffi_query.offset_type = static_cast<int32_t>(offset_query.spec);
+ ffi_query.timestamp = offset_query.timestamp;
+
+ auto ffi_result = admin_->list_offsets(ffi_path,
std::move(rust_bucket_ids), ffi_query);
+
Review Comment:
Remove trailing whitespace from this line for consistency with code style.
##########
bindings/cpp/src/lib.rs:
##########
@@ -511,6 +614,23 @@ impl LogScanner {
}
}
+ fn subscribe_batch(&self, subscriptions: Vec<ffi::FfiBucketSubscription>)
-> ffi::FfiResult {
+ use std::collections::HashMap;
+ let mut bucket_offsets = HashMap::new();
+ for sub in subscriptions {
+ bucket_offsets.insert(sub.bucket_id, sub.offset);
+ }
+
+ let result = RUNTIME.block_on(async {
+ self.inner.subscribe_batch(bucket_offsets).await
Review Comment:
Remove trailing whitespace from this line for consistency with code style.
```suggestion
let result = RUNTIME.block_on(async {
self.inner.subscribe_batch(bucket_offsets).await
```
##########
bindings/cpp/src/lib.rs:
##########
@@ -511,6 +614,23 @@ impl LogScanner {
}
}
+ fn subscribe_batch(&self, subscriptions: Vec<ffi::FfiBucketSubscription>)
-> ffi::FfiResult {
+ use std::collections::HashMap;
+ let mut bucket_offsets = HashMap::new();
+ for sub in subscriptions {
+ bucket_offsets.insert(sub.bucket_id, sub.offset);
+ }
+
+ let result = RUNTIME.block_on(async {
+ self.inner.subscribe_batch(bucket_offsets).await
Review Comment:
Consider adding validation for empty subscriptions vector. When
subscriptions is empty, subscribe_batch will make a call that might not be
meaningful. Consider returning early with an appropriate error or success
result.
```suggestion
if subscriptions.is_empty() {
return ok_result();
}
use std::collections::HashMap;
let mut bucket_offsets = HashMap::new();
for sub in subscriptions {
bucket_offsets.insert(sub.bucket_id, sub.offset);
}
let result = RUNTIME.block_on(async {
self.inner.subscribe_batch(bucket_offsets).await
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]