This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 9877993 feat: support subscribe partition in cpp binding (#236)
9877993 is described below
commit 98779935d6ffb29ab93642231fc743b8b6344893
Author: yuxia Luo <[email protected]>
AuthorDate: Wed Feb 4 14:38:10 2026 +0800
feat: support subscribe partition in cpp binding (#236)
---
bindings/cpp/include/fluss.hpp | 1 +
bindings/cpp/src/lib.rs | 48 +++++++++++++++++++++++++++++++++++++-----
bindings/cpp/src/table.cpp | 9 ++++++++
3 files changed, 53 insertions(+), 5 deletions(-)
diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index 968cb06..d35ece2 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -544,6 +544,7 @@ public:
Result Subscribe(int32_t bucket_id, int64_t start_offset);
Result Subscribe(const std::vector<BucketSubscription>& bucket_offsets);
+ Result SubscribePartition(int64_t partition_id, int32_t bucket_id, int64_t
start_offset);
Result Poll(int64_t timeout_ms, ScanRecords& out);
Result PollRecordBatch(int64_t timeout_ms, ArrowRecordBatches& out);
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index 11e9105..7ae6416 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -21,6 +21,7 @@ use std::sync::{Arc, LazyLock};
use std::time::Duration;
use fluss as fcore;
+use fluss::PartitionId;
static RUNTIME: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
tokio::runtime::Builder::new_multi_thread()
@@ -248,6 +249,12 @@ mod ffi {
self: &LogScanner,
subscriptions: Vec<FfiBucketSubscription>,
) -> FfiResult;
+ fn subscribe_partition(
+ self: &LogScanner,
+ partition_id: i64,
+ bucket_id: i32,
+ start_offset: i64,
+ ) -> FfiResult;
fn poll(self: &LogScanner, timeout_ms: i64) -> FfiScanRecordsResult;
fn poll_record_batch(self: &LogScanner, timeout_ms: i64) ->
FfiArrowRecordBatchesResult;
fn free_arrow_ffi_structures(array_ptr: usize, schema_ptr: usize);
@@ -743,17 +750,39 @@ pub extern "C" fn free_arrow_ffi_structures(array_ptr:
usize, schema_ptr: usize)
impl LogScanner {
fn subscribe(&self, bucket_id: i32, start_offset: i64) -> ffi::FfiResult {
- if let Some(ref inner) = self.inner {
- let result = RUNTIME.block_on(async { inner.subscribe(bucket_id,
start_offset).await });
+ self.do_subscribe(None, bucket_id, start_offset)
+ }
+ fn do_subscribe(
+ &self,
+ partition_id: Option<PartitionId>,
+ bucket_id: i32,
+ start_offset: i64,
+ ) -> ffi::FfiResult {
+ if let Some(ref inner) = self.inner {
+ let result = RUNTIME.block_on(async {
+ if let Some(partition_id) = partition_id {
+ inner
+ .subscribe_partition(partition_id, bucket_id,
start_offset)
+ .await
+ } else {
+ inner.subscribe(bucket_id, start_offset).await
+ }
+ });
match result {
Ok(_) => ok_result(),
Err(e) => err_result(1, e.to_string()),
}
} else if let Some(ref inner_batch) = self.inner_batch {
- let result =
- RUNTIME.block_on(async { inner_batch.subscribe(bucket_id,
start_offset).await });
-
+ let result = RUNTIME.block_on(async {
+ if let Some(partition_id) = partition_id {
+ inner_batch
+ .subscribe_partition(partition_id, bucket_id,
start_offset)
+ .await
+ } else {
+ inner_batch.subscribe(bucket_id, start_offset).await
+ }
+ });
match result {
Ok(_) => ok_result(),
Err(e) => err_result(1, e.to_string()),
@@ -790,6 +819,15 @@ impl LogScanner {
}
}
+ fn subscribe_partition(
+ &self,
+ partition_id: PartitionId,
+ bucket_id: i32,
+ start_offset: i64,
+ ) -> ffi::FfiResult {
+ self.do_subscribe(Some(partition_id), bucket_id, start_offset)
+ }
+
fn poll(&self, timeout_ms: i64) -> ffi::FfiScanRecordsResult {
if let Some(ref inner) = self.inner {
let timeout = Duration::from_millis(timeout_ms as u64);
diff --git a/bindings/cpp/src/table.cpp b/bindings/cpp/src/table.cpp
index 118ca3c..b327dba 100644
--- a/bindings/cpp/src/table.cpp
+++ b/bindings/cpp/src/table.cpp
@@ -265,6 +265,15 @@ Result LogScanner::Subscribe(const
std::vector<BucketSubscription>& bucket_offse
return utils::from_ffi_result(ffi_result);
}
+Result LogScanner::SubscribePartition(int64_t partition_id, int32_t bucket_id,
int64_t start_offset) {
+ if (!Available()) {
+ return utils::make_error(1, "LogScanner not available");
+ }
+
+ auto ffi_result = scanner_->subscribe_partition(partition_id, bucket_id,
start_offset);
+ return utils::from_ffi_result(ffi_result);
+}
+
Result LogScanner::Poll(int64_t timeout_ms, ScanRecords& out) {
if (!Available()) {
return utils::make_error(1, "LogScanner not available");