This is an automated email from the ASF dual-hosted git repository. yuxia pushed a commit to branch support-list-partition-offset-in-cpp in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
commit e61bdcd246d07eaa167e3c6fd27083295cc80bb6 Author: luoyuxia <[email protected]> AuthorDate: Mon Feb 2 21:42:06 2026 +0800 feat: support list partition offset in cpp binding --- .gitignore | 5 ++++- bindings/cpp/include/fluss.hpp | 12 ++++++++++++ bindings/cpp/src/admin.cpp | 32 +++++++++++++++++++++++++++----- bindings/cpp/src/lib.rs | 42 ++++++++++++++++++++++++++++++++++++++---- 4 files changed, 81 insertions(+), 10 deletions(-) diff --git a/.gitignore b/.gitignore index 8202bbc..9c585d8 100644 --- a/.gitignore +++ b/.gitignore @@ -26,4 +26,7 @@ __pycache__/ *.so *.egg-info/ dist/ -build/ \ No newline at end of file +build/ + +# CPP +*CMakeFiles/ \ No newline at end of file diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp index 479adf9..4ef3fe1 100644 --- a/bindings/cpp/include/fluss.hpp +++ b/bindings/cpp/include/fluss.hpp @@ -407,7 +407,19 @@ public: const OffsetQuery& offset_query, std::unordered_map<int32_t, int64_t>& out); + Result ListPartitionOffsets(const TablePath& table_path, + const std::string& partition_name, + const std::vector<int32_t>& bucket_ids, + const OffsetQuery& offset_query, + std::unordered_map<int32_t, int64_t>& out); + private: + Result DoListOffsets(const TablePath& table_path, + const std::vector<int32_t>& bucket_ids, + const OffsetQuery& offset_query, + std::unordered_map<int32_t, int64_t>& out, + const std::string* partition_name = nullptr); + friend class Connection; Admin(ffi::Admin* admin) noexcept; diff --git a/bindings/cpp/src/admin.cpp b/bindings/cpp/src/admin.cpp index bf9c712..e410614 100644 --- a/bindings/cpp/src/admin.cpp +++ b/bindings/cpp/src/admin.cpp @@ -108,10 +108,12 @@ Result Admin::GetLatestLakeSnapshot(const TablePath& table_path, LakeSnapshot& o return result; } -Result Admin::ListOffsets(const TablePath& table_path, - const std::vector<int32_t>& bucket_ids, - const OffsetQuery& offset_query, - std::unordered_map<int32_t, int64_t>& out) { +// function for common list offsets functionality +Result Admin::DoListOffsets(const TablePath& table_path, + const std::vector<int32_t>& bucket_ids, + const OffsetQuery& offset_query, + std::unordered_map<int32_t, int64_t>& out, + const std::string* partition_name) { if (!Available()) { return utils::make_error(1, "Admin not available"); } @@ -127,7 +129,12 @@ Result Admin::ListOffsets(const TablePath& table_path, ffi_query.offset_type = static_cast<int32_t>(offset_query.spec); ffi_query.timestamp = offset_query.timestamp; - auto ffi_result = admin_->list_offsets(ffi_path, std::move(rust_bucket_ids), ffi_query); + ffi::FfiListOffsetsResult ffi_result; + if (partition_name != nullptr) { + ffi_result = admin_->list_partition_offsets(ffi_path, rust::String(*partition_name), std::move(rust_bucket_ids), ffi_query); + } else { + ffi_result = admin_->list_offsets(ffi_path, std::move(rust_bucket_ids), ffi_query); + } auto result = utils::from_ffi_result(ffi_result.result); if (result.Ok()) { @@ -140,4 +147,19 @@ Result Admin::ListOffsets(const TablePath& table_path, return result; } +Result Admin::ListOffsets(const TablePath& table_path, + const std::vector<int32_t>& bucket_ids, + const OffsetQuery& offset_query, + std::unordered_map<int32_t, int64_t>& out) { + return DoListOffsets(table_path, bucket_ids, offset_query, out); +} + +Result Admin::ListPartitionOffsets(const TablePath& table_path, + const std::string& partition_name, + const std::vector<int32_t>& bucket_ids, + const OffsetQuery& offset_query, + std::unordered_map<int32_t, int64_t>& out) { + return DoListOffsets(table_path, bucket_ids, offset_query, out, &partition_name); +} + } // namespace fluss diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index bd38a03..b327ba5 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -193,6 +193,13 @@ mod ffi { bucket_ids: Vec<i32>, offset_query: &FfiOffsetQuery, ) -> FfiListOffsetsResult; + fn list_partition_offsets( + self: &Admin, + table_path: &FfiTablePath, + partition_name: String, + bucket_ids: Vec<i32>, + offset_query: &FfiOffsetQuery, + ) -> FfiListOffsetsResult; // Table unsafe fn delete_table(table: *mut Table); @@ -431,9 +438,11 @@ impl Admin { } } - fn list_offsets( + // Helper function for common list offsets functionality + fn do_list_offsets( &self, table_path: &ffi::FfiTablePath, + partition_name: Option<&str>, bucket_ids: Vec<i32>, offset_query: &ffi::FfiOffsetQuery, ) -> ffi::FfiListOffsetsResult { @@ -460,9 +469,15 @@ impl Admin { }; let result = RUNTIME.block_on(async { - self.inner - .list_offsets(&path, &bucket_ids, offset_spec) - .await + if let Some(part_name) = partition_name { + self.inner + .list_partition_offsets(&path, part_name, &bucket_ids, offset_spec) + .await + } else { + self.inner + .list_offsets(&path, &bucket_ids, offset_spec) + .await + } }); match result { @@ -482,6 +497,25 @@ impl Admin { }, } } + + fn list_offsets( + &self, + table_path: &ffi::FfiTablePath, + bucket_ids: Vec<i32>, + offset_query: &ffi::FfiOffsetQuery, + ) -> ffi::FfiListOffsetsResult { + self.do_list_offsets(table_path, None, bucket_ids, offset_query) + } + + fn list_partition_offsets( + &self, + table_path: &ffi::FfiTablePath, + partition_name: String, + bucket_ids: Vec<i32>, + offset_query: &ffi::FfiOffsetQuery, + ) -> ffi::FfiListOffsetsResult { + self.do_list_offsets(table_path, Some(&partition_name), bucket_ids, offset_query) + } } // Table implementation
