This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 9877993  feat: support subscribe partition in cpp binding (#236)
9877993 is described below

commit 98779935d6ffb29ab93642231fc743b8b6344893
Author: yuxia Luo <[email protected]>
AuthorDate: Wed Feb 4 14:38:10 2026 +0800

    feat: support subscribe partition in cpp binding (#236)
---
 bindings/cpp/include/fluss.hpp |  1 +
 bindings/cpp/src/lib.rs        | 48 +++++++++++++++++++++++++++++++++++++-----
 bindings/cpp/src/table.cpp     |  9 ++++++++
 3 files changed, 53 insertions(+), 5 deletions(-)

diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index 968cb06..d35ece2 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -544,6 +544,7 @@ public:
 
     Result Subscribe(int32_t bucket_id, int64_t start_offset);
     Result Subscribe(const std::vector<BucketSubscription>& bucket_offsets);
+    Result SubscribePartition(int64_t partition_id, int32_t bucket_id, int64_t 
start_offset);
     Result Poll(int64_t timeout_ms, ScanRecords& out);
     Result PollRecordBatch(int64_t timeout_ms, ArrowRecordBatches& out);
 
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index 11e9105..7ae6416 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -21,6 +21,7 @@ use std::sync::{Arc, LazyLock};
 use std::time::Duration;
 
 use fluss as fcore;
+use fluss::PartitionId;
 
 static RUNTIME: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
     tokio::runtime::Builder::new_multi_thread()
@@ -248,6 +249,12 @@ mod ffi {
             self: &LogScanner,
             subscriptions: Vec<FfiBucketSubscription>,
         ) -> FfiResult;
+        fn subscribe_partition(
+            self: &LogScanner,
+            partition_id: i64,
+            bucket_id: i32,
+            start_offset: i64,
+        ) -> FfiResult;
         fn poll(self: &LogScanner, timeout_ms: i64) -> FfiScanRecordsResult;
         fn poll_record_batch(self: &LogScanner, timeout_ms: i64) -> 
FfiArrowRecordBatchesResult;
         fn free_arrow_ffi_structures(array_ptr: usize, schema_ptr: usize);
@@ -743,17 +750,39 @@ pub extern "C" fn free_arrow_ffi_structures(array_ptr: 
usize, schema_ptr: usize)
 
 impl LogScanner {
     fn subscribe(&self, bucket_id: i32, start_offset: i64) -> ffi::FfiResult {
-        if let Some(ref inner) = self.inner {
-            let result = RUNTIME.block_on(async { inner.subscribe(bucket_id, 
start_offset).await });
+        self.do_subscribe(None, bucket_id, start_offset)
+    }
 
+    fn do_subscribe(
+        &self,
+        partition_id: Option<PartitionId>,
+        bucket_id: i32,
+        start_offset: i64,
+    ) -> ffi::FfiResult {
+        if let Some(ref inner) = self.inner {
+            let result = RUNTIME.block_on(async {
+                if let Some(partition_id) = partition_id {
+                    inner
+                        .subscribe_partition(partition_id, bucket_id, 
start_offset)
+                        .await
+                } else {
+                    inner.subscribe(bucket_id, start_offset).await
+                }
+            });
             match result {
                 Ok(_) => ok_result(),
                 Err(e) => err_result(1, e.to_string()),
             }
         } else if let Some(ref inner_batch) = self.inner_batch {
-            let result =
-                RUNTIME.block_on(async { inner_batch.subscribe(bucket_id, 
start_offset).await });
-
+            let result = RUNTIME.block_on(async {
+                if let Some(partition_id) = partition_id {
+                    inner_batch
+                        .subscribe_partition(partition_id, bucket_id, 
start_offset)
+                        .await
+                } else {
+                    inner_batch.subscribe(bucket_id, start_offset).await
+                }
+            });
             match result {
                 Ok(_) => ok_result(),
                 Err(e) => err_result(1, e.to_string()),
@@ -790,6 +819,15 @@ impl LogScanner {
         }
     }
 
+    fn subscribe_partition(
+        &self,
+        partition_id: PartitionId,
+        bucket_id: i32,
+        start_offset: i64,
+    ) -> ffi::FfiResult {
+        self.do_subscribe(Some(partition_id), bucket_id, start_offset)
+    }
+
     fn poll(&self, timeout_ms: i64) -> ffi::FfiScanRecordsResult {
         if let Some(ref inner) = self.inner {
             let timeout = Duration::from_millis(timeout_ms as u64);
diff --git a/bindings/cpp/src/table.cpp b/bindings/cpp/src/table.cpp
index 118ca3c..b327dba 100644
--- a/bindings/cpp/src/table.cpp
+++ b/bindings/cpp/src/table.cpp
@@ -265,6 +265,15 @@ Result LogScanner::Subscribe(const 
std::vector<BucketSubscription>& bucket_offse
     return utils::from_ffi_result(ffi_result);
 }
 
+Result LogScanner::SubscribePartition(int64_t partition_id, int32_t bucket_id, 
int64_t start_offset) {
+    if (!Available()) {
+        return utils::make_error(1, "LogScanner not available");
+    }
+
+    auto ffi_result = scanner_->subscribe_partition(partition_id, bucket_id, 
start_offset);
+    return utils::from_ffi_result(ffi_result);
+}
+
 Result LogScanner::Poll(int64_t timeout_ms, ScanRecords& out) {
     if (!Available()) {
         return utils::make_error(1, "LogScanner not available");

Reply via email to