This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 52dca9f chore: rename subscribe_batch to subscribe_buckets (#255)
52dca9f is described below
commit 52dca9fc7700f74bf2e2eee84c267f8ec4f074fe
Author: yuxia Luo <[email protected]>
AuthorDate: Fri Feb 6 07:09:59 2026 +0800
chore: rename subscribe_batch to subscribe_buckets (#255)
---
bindings/cpp/examples/example.cpp | 2 +-
bindings/cpp/src/lib.rs | 8 ++++----
bindings/cpp/src/table.cpp | 2 +-
crates/fluss/src/client/table/scanner.rs | 12 ++++++------
crates/fluss/tests/integration/log_table.rs | 2 +-
5 files changed, 13 insertions(+), 13 deletions(-)
diff --git a/bindings/cpp/examples/example.cpp
b/bindings/cpp/examples/example.cpp
index f35f37e..7022cad 100644
--- a/bindings/cpp/examples/example.cpp
+++ b/bindings/cpp/examples/example.cpp
@@ -235,7 +235,7 @@ int main() {
<< ", offset=" << offset << std::endl;
}
- check("subscribe_batch", batch_scanner.Subscribe(subscriptions));
+ check("subscribe_buckets", batch_scanner.Subscribe(subscriptions));
std::cout << "Batch subscribed to " << subscriptions.size() << " buckets"
<< std::endl;
// 8.5) Poll and verify bucket_id in records
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index ab02c8d..b834865 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -245,7 +245,7 @@ mod ffi {
// LogScanner
unsafe fn delete_log_scanner(scanner: *mut LogScanner);
fn subscribe(self: &LogScanner, bucket_id: i32, start_offset: i64) ->
FfiResult;
- fn subscribe_batch(
+ fn subscribe_buckets(
self: &LogScanner,
subscriptions: Vec<FfiBucketSubscription>,
) -> FfiResult;
@@ -789,7 +789,7 @@ impl LogScanner {
}
}
- fn subscribe_batch(&self, subscriptions: Vec<ffi::FfiBucketSubscription>)
-> ffi::FfiResult {
+ fn subscribe_buckets(&self, subscriptions:
Vec<ffi::FfiBucketSubscription>) -> ffi::FfiResult {
use std::collections::HashMap;
let mut bucket_offsets = HashMap::new();
for sub in subscriptions {
@@ -797,7 +797,7 @@ impl LogScanner {
}
if let Some(ref inner) = self.inner {
- let result = RUNTIME.block_on(async {
inner.subscribe_batch(&bucket_offsets).await });
+ let result = RUNTIME.block_on(async {
inner.subscribe_buckets(&bucket_offsets).await });
match result {
Ok(_) => ok_result(),
@@ -805,7 +805,7 @@ impl LogScanner {
}
} else if let Some(ref inner_batch) = self.inner_batch {
let result =
- RUNTIME.block_on(async {
inner_batch.subscribe_batch(&bucket_offsets).await });
+ RUNTIME.block_on(async {
inner_batch.subscribe_buckets(&bucket_offsets).await });
match result {
Ok(_) => ok_result(),
diff --git a/bindings/cpp/src/table.cpp b/bindings/cpp/src/table.cpp
index f943790..ab26038 100644
--- a/bindings/cpp/src/table.cpp
+++ b/bindings/cpp/src/table.cpp
@@ -251,7 +251,7 @@ Result LogScanner::Subscribe(const
std::vector<BucketSubscription>& bucket_offse
rust_subs.push_back(ffi_sub);
}
- auto ffi_result = scanner_->subscribe_batch(std::move(rust_subs));
+ auto ffi_result = scanner_->subscribe_buckets(std::move(rust_subs));
return utils::from_ffi_result(ffi_result);
}
diff --git a/crates/fluss/src/client/table/scanner.rs
b/crates/fluss/src/client/table/scanner.rs
index aa9fca4..ef68fb4 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -357,11 +357,11 @@ impl LogScannerInner {
Ok(())
}
- async fn subscribe_batch(&self, bucket_offsets: &HashMap<i32, i64>) ->
Result<()> {
+ async fn subscribe_buckets(&self, bucket_offsets: &HashMap<i32, i64>) ->
Result<()> {
if self.is_partitioned_table {
return Err(Error::UnsupportedOperation {
message:
- "The table is a partitioned table, subscribe_batch is not
supported currently."
+ "The table is a partitioned table, subscribe_buckets is
not supported currently."
.to_string(),
});
}
@@ -473,8 +473,8 @@ impl LogScanner {
self.inner.subscribe(bucket, offset).await
}
- pub async fn subscribe_batch(&self, bucket_offsets: &HashMap<i32, i64>) ->
Result<()> {
- self.inner.subscribe_batch(bucket_offsets).await
+ pub async fn subscribe_buckets(&self, bucket_offsets: &HashMap<i32, i64>)
-> Result<()> {
+ self.inner.subscribe_buckets(bucket_offsets).await
}
pub async fn subscribe_partition(
@@ -500,8 +500,8 @@ impl RecordBatchLogScanner {
self.inner.subscribe(bucket, offset).await
}
- pub async fn subscribe_batch(&self, bucket_offsets: &HashMap<i32, i64>) ->
Result<()> {
- self.inner.subscribe_batch(bucket_offsets).await
+ pub async fn subscribe_buckets(&self, bucket_offsets: &HashMap<i32, i64>)
-> Result<()> {
+ self.inner.subscribe_buckets(bucket_offsets).await
}
pub async fn subscribe_partition(
diff --git a/crates/fluss/tests/integration/log_table.rs
b/crates/fluss/tests/integration/log_table.rs
index cbfcbe5..3f7dd6e 100644
--- a/crates/fluss/tests/integration/log_table.rs
+++ b/crates/fluss/tests/integration/log_table.rs
@@ -426,7 +426,7 @@ mod table_test {
let mut bucket_offsets = HashMap::new();
bucket_offsets.insert(0, 0);
log_scanner
- .subscribe_batch(&bucket_offsets)
+ .subscribe_buckets(&bucket_offsets)
.await
.expect("Failed to subscribe");