luoyuxia commented on code in PR #100:
URL: https://github.com/apache/fluss-rust/pull/100#discussion_r2637535765
##########
bindings/cpp/src/lib.rs:
##########
@@ -375,6 +428,56 @@ impl Admin {
},
}
}
+
+ fn list_offsets(
+ &self,
+ table_path: &ffi::FfiTablePath,
+ bucket_ids: Vec<i32>,
+ offset_query: &ffi::FfiOffsetQuery,
+ ) -> ffi::FfiListOffsetsResult {
+ use fcore::rpc::message::OffsetSpec;
+
+ let path = fcore::metadata::TablePath::new(
+ table_path.database_name.clone(),
+ table_path.table_name.clone(),
+ );
+
+ let offset_spec = match offset_query.offset_type {
+ 0 => OffsetSpec::Earliest,
+ 1 => OffsetSpec::Latest,
+ 2 => OffsetSpec::Timestamp(offset_query.timestamp),
+ _ => {
+ return ffi::FfiListOffsetsResult {
+ result: err_result(1, format!("Invalid offset_type: {}",
offset_query.offset_type)),
+ bucket_offsets: vec![],
+ };
+ }
+ };
+
Review Comment:
maybe we can optimize it in rust core
##########
bindings/cpp/src/lib.rs:
##########
@@ -511,6 +614,23 @@ impl LogScanner {
}
}
+ fn subscribe_batch(&self, subscriptions: Vec<ffi::FfiBucketSubscription>)
-> ffi::FfiResult {
+ use std::collections::HashMap;
+ let mut bucket_offsets = HashMap::new();
+ for sub in subscriptions {
+ bucket_offsets.insert(sub.bucket_id, sub.offset);
+ }
+
+ let result = RUNTIME.block_on(async {
+ self.inner.subscribe_batch(bucket_offsets).await
Review Comment:
maybe we can optimize it in rust core
##########
bindings/cpp/include/fluss.hpp:
##########
@@ -448,6 +479,7 @@ class LogScanner {
bool Available() const;
Result Subscribe(int32_t bucket_id, int64_t start_offset);
+ Result SubscribeBatch(const std::vector<BucketSubscription>&
bucket_offsets);
Review Comment:
Can `SubscribeBatch` be renamed to `Subscribe`?
##########
bindings/cpp/include/fluss.hpp:
##########
@@ -63,6 +63,24 @@ enum class DatumType {
Bytes = 7,
};
+constexpr int64_t EARLIEST_OFFSET = -2;
+constexpr int64_t LATEST_OFFSET = -1;
Review Comment:
I'm curious about `LATEST_OFFSET` is for what. I can't find it in java code
base.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]