This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new a05f996 feat: support list partition offset in cpp binding (#231)
a05f996 is described below
commit a05f996be2cccfd28c8c2c044fd10e88b0304e49
Author: yuxia Luo <[email protected]>
AuthorDate: Tue Feb 3 20:42:28 2026 +0800
feat: support list partition offset in cpp binding (#231)
---
.gitignore | 5 ++++-
bindings/cpp/include/fluss.hpp | 12 ++++++++++++
bindings/cpp/src/admin.cpp | 32 +++++++++++++++++++++++++++-----
bindings/cpp/src/lib.rs | 42 ++++++++++++++++++++++++++++++++++++++----
4 files changed, 81 insertions(+), 10 deletions(-)
diff --git a/.gitignore b/.gitignore
index 8202bbc..9c585d8 100644
--- a/.gitignore
+++ b/.gitignore
@@ -26,4 +26,7 @@ __pycache__/
*.so
*.egg-info/
dist/
-build/
\ No newline at end of file
+build/
+
+# CPP
+*CMakeFiles/
\ No newline at end of file
diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index 479adf9..4ef3fe1 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -407,7 +407,19 @@ public:
const OffsetQuery& offset_query,
std::unordered_map<int32_t, int64_t>& out);
+ Result ListPartitionOffsets(const TablePath& table_path,
+ const std::string& partition_name,
+ const std::vector<int32_t>& bucket_ids,
+ const OffsetQuery& offset_query,
+ std::unordered_map<int32_t, int64_t>& out);
+
private:
+ Result DoListOffsets(const TablePath& table_path,
+ const std::vector<int32_t>& bucket_ids,
+ const OffsetQuery& offset_query,
+ std::unordered_map<int32_t, int64_t>& out,
+ const std::string* partition_name = nullptr);
+
friend class Connection;
Admin(ffi::Admin* admin) noexcept;
diff --git a/bindings/cpp/src/admin.cpp b/bindings/cpp/src/admin.cpp
index bf9c712..e410614 100644
--- a/bindings/cpp/src/admin.cpp
+++ b/bindings/cpp/src/admin.cpp
@@ -108,10 +108,12 @@ Result Admin::GetLatestLakeSnapshot(const TablePath&
table_path, LakeSnapshot& o
return result;
}
-Result Admin::ListOffsets(const TablePath& table_path,
- const std::vector<int32_t>& bucket_ids,
- const OffsetQuery& offset_query,
- std::unordered_map<int32_t, int64_t>& out) {
+// function for common list offsets functionality
+Result Admin::DoListOffsets(const TablePath& table_path,
+ const std::vector<int32_t>& bucket_ids,
+ const OffsetQuery& offset_query,
+ std::unordered_map<int32_t, int64_t>& out,
+ const std::string* partition_name) {
if (!Available()) {
return utils::make_error(1, "Admin not available");
}
@@ -127,7 +129,12 @@ Result Admin::ListOffsets(const TablePath& table_path,
ffi_query.offset_type = static_cast<int32_t>(offset_query.spec);
ffi_query.timestamp = offset_query.timestamp;
- auto ffi_result = admin_->list_offsets(ffi_path,
std::move(rust_bucket_ids), ffi_query);
+ ffi::FfiListOffsetsResult ffi_result;
+ if (partition_name != nullptr) {
+ ffi_result = admin_->list_partition_offsets(ffi_path,
rust::String(*partition_name), std::move(rust_bucket_ids), ffi_query);
+ } else {
+ ffi_result = admin_->list_offsets(ffi_path,
std::move(rust_bucket_ids), ffi_query);
+ }
auto result = utils::from_ffi_result(ffi_result.result);
if (result.Ok()) {
@@ -140,4 +147,19 @@ Result Admin::ListOffsets(const TablePath& table_path,
return result;
}
+Result Admin::ListOffsets(const TablePath& table_path,
+ const std::vector<int32_t>& bucket_ids,
+ const OffsetQuery& offset_query,
+ std::unordered_map<int32_t, int64_t>& out) {
+ return DoListOffsets(table_path, bucket_ids, offset_query, out);
+}
+
+Result Admin::ListPartitionOffsets(const TablePath& table_path,
+ const std::string& partition_name,
+ const std::vector<int32_t>& bucket_ids,
+ const OffsetQuery& offset_query,
+ std::unordered_map<int32_t, int64_t>& out) {
+ return DoListOffsets(table_path, bucket_ids, offset_query, out,
&partition_name);
+}
+
} // namespace fluss
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index bd38a03..b327ba5 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -193,6 +193,13 @@ mod ffi {
bucket_ids: Vec<i32>,
offset_query: &FfiOffsetQuery,
) -> FfiListOffsetsResult;
+ fn list_partition_offsets(
+ self: &Admin,
+ table_path: &FfiTablePath,
+ partition_name: String,
+ bucket_ids: Vec<i32>,
+ offset_query: &FfiOffsetQuery,
+ ) -> FfiListOffsetsResult;
// Table
unsafe fn delete_table(table: *mut Table);
@@ -431,9 +438,11 @@ impl Admin {
}
}
- fn list_offsets(
+ // Helper function for common list offsets functionality
+ fn do_list_offsets(
&self,
table_path: &ffi::FfiTablePath,
+ partition_name: Option<&str>,
bucket_ids: Vec<i32>,
offset_query: &ffi::FfiOffsetQuery,
) -> ffi::FfiListOffsetsResult {
@@ -460,9 +469,15 @@ impl Admin {
};
let result = RUNTIME.block_on(async {
- self.inner
- .list_offsets(&path, &bucket_ids, offset_spec)
- .await
+ if let Some(part_name) = partition_name {
+ self.inner
+ .list_partition_offsets(&path, part_name, &bucket_ids,
offset_spec)
+ .await
+ } else {
+ self.inner
+ .list_offsets(&path, &bucket_ids, offset_spec)
+ .await
+ }
});
match result {
@@ -482,6 +497,25 @@ impl Admin {
},
}
}
+
+ fn list_offsets(
+ &self,
+ table_path: &ffi::FfiTablePath,
+ bucket_ids: Vec<i32>,
+ offset_query: &ffi::FfiOffsetQuery,
+ ) -> ffi::FfiListOffsetsResult {
+ self.do_list_offsets(table_path, None, bucket_ids, offset_query)
+ }
+
+ fn list_partition_offsets(
+ &self,
+ table_path: &ffi::FfiTablePath,
+ partition_name: String,
+ bucket_ids: Vec<i32>,
+ offset_query: &ffi::FfiOffsetQuery,
+ ) -> ffi::FfiListOffsetsResult {
+ self.do_list_offsets(table_path, Some(&partition_name), bucket_ids,
offset_query)
+ }
}
// Table implementation