This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new a447db9 chore: refacotr config for CPP and connect -> create in cpp
binding(#298)
a447db9 is described below
commit a447db9f7f59f76808a313214bb67647858992b5
Author: Anton Borisov <[email protected]>
AuthorDate: Wed Feb 11 07:54:21 2026 +0000
chore: refacotr config for CPP and connect -> create in cpp binding(#298)
---
bindings/cpp/examples/admin_example.cpp | 6 ++++--
bindings/cpp/examples/example.cpp | 5 ++++-
bindings/cpp/examples/kv_example.cpp | 7 ++++---
bindings/cpp/include/fluss.hpp | 20 +++++++++++++++++++-
bindings/cpp/src/connection.cpp | 5 +++--
bindings/cpp/src/ffi_converter.hpp | 12 ++++++++++++
bindings/cpp/src/lib.rs | 23 +++++++++++++++++++----
7 files changed, 65 insertions(+), 13 deletions(-)
diff --git a/bindings/cpp/examples/admin_example.cpp
b/bindings/cpp/examples/admin_example.cpp
index 196fe97..c51062c 100644
--- a/bindings/cpp/examples/admin_example.cpp
+++ b/bindings/cpp/examples/admin_example.cpp
@@ -31,13 +31,15 @@ static void check(const char* step, const fluss::Result& r)
{
}
int main() {
- const std::string bootstrap = "127.0.0.1:9123";
const std::string db_name = "admin_example_db";
const std::string table_name = "admin_example_table";
// 1) Connect and get Admin
+ fluss::Configuration config;
+ config.bootstrap_server = "127.0.0.1:9123";
+
fluss::Connection conn;
- check("connect", fluss::Connection::Connect(bootstrap, conn));
+ check("create", fluss::Connection::Create(config, conn));
fluss::Admin admin;
check("get_admin", conn.GetAdmin(admin));
diff --git a/bindings/cpp/examples/example.cpp
b/bindings/cpp/examples/example.cpp
index 59f1ed0..14fbeb2 100644
--- a/bindings/cpp/examples/example.cpp
+++ b/bindings/cpp/examples/example.cpp
@@ -34,8 +34,11 @@ static void check(const char* step, const fluss::Result& r) {
int main() {
// 1) Connect
+ fluss::Configuration config;
+ config.bootstrap_server = "127.0.0.1:9123";
+
fluss::Connection conn;
- check("connect", fluss::Connection::Connect("127.0.0.1:9123", conn));
+ check("create", fluss::Connection::Create(config, conn));
// 2) Admin
fluss::Admin admin;
diff --git a/bindings/cpp/examples/kv_example.cpp
b/bindings/cpp/examples/kv_example.cpp
index 2a40db3..3839e0f 100644
--- a/bindings/cpp/examples/kv_example.cpp
+++ b/bindings/cpp/examples/kv_example.cpp
@@ -30,11 +30,12 @@ static void check(const char* step, const fluss::Result& r)
{
}
int main() {
- const std::string bootstrap = "127.0.0.1:9123";
-
// 1) Connect and get Admin
+ fluss::Configuration config;
+ config.bootstrap_server = "127.0.0.1:9123";
+
fluss::Connection conn;
- check("connect", fluss::Connection::Connect(bootstrap, conn));
+ check("create", fluss::Connection::Create(config, conn));
fluss::Admin admin;
check("get_admin", conn.GetAdmin(admin));
diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index 1806616..6bd6c2f 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -21,6 +21,7 @@
#include <chrono>
#include <cstdint>
+#include <limits>
#include <memory>
#include <stdexcept>
#include <string>
@@ -809,6 +810,23 @@ class TableUpsert;
class TableLookup;
class TableScan;
+struct Configuration {
+ // Coordinator server address
+ std::string bootstrap_server{"127.0.0.1:9123"};
+ // Max request size in bytes (10 MB)
+ int32_t request_max_size{10 * 1024 * 1024};
+ // Writer acknowledgment mode: "all", "0", "1", or "-1"
+ std::string writer_acks{"all"};
+ // Max number of writer retries
+ int32_t writer_retries{std::numeric_limits<int32_t>::max()};
+ // Writer batch size in bytes (2 MB)
+ int32_t writer_batch_size{2 * 1024 * 1024};
+ // Number of remote log batches to prefetch during scanning
+ size_t scanner_remote_log_prefetch_num{4};
+ // Number of threads for downloading remote log data
+ size_t scanner_remote_log_download_threads{3};
+};
+
class Connection {
public:
Connection() noexcept;
@@ -819,7 +837,7 @@ class Connection {
Connection(Connection&& other) noexcept;
Connection& operator=(Connection&& other) noexcept;
- static Result Connect(const std::string& bootstrap_server, Connection&
out);
+ static Result Create(const Configuration& config, Connection& out);
bool Available() const;
diff --git a/bindings/cpp/src/connection.cpp b/bindings/cpp/src/connection.cpp
index 4fbfafb..bceb264 100644
--- a/bindings/cpp/src/connection.cpp
+++ b/bindings/cpp/src/connection.cpp
@@ -46,9 +46,10 @@ Connection& Connection::operator=(Connection&& other)
noexcept {
return *this;
}
-Result Connection::Connect(const std::string& bootstrap_server, Connection&
out) {
+Result Connection::Create(const Configuration& config, Connection& out) {
try {
- out.conn_ = ffi::new_connection(bootstrap_server);
+ auto ffi_config = utils::to_ffi_config(config);
+ out.conn_ = ffi::new_connection(ffi_config);
return utils::make_ok();
} catch (const rust::Error& e) {
return utils::make_error(1, e.what());
diff --git a/bindings/cpp/src/ffi_converter.hpp
b/bindings/cpp/src/ffi_converter.hpp
index 40676e5..3e6cdcd 100644
--- a/bindings/cpp/src/ffi_converter.hpp
+++ b/bindings/cpp/src/ffi_converter.hpp
@@ -123,6 +123,18 @@ inline ffi::FfiTablePath to_ffi_table_path(const
TablePath& path) {
return ffi_path;
}
+inline ffi::FfiConfig to_ffi_config(const Configuration& config) {
+ ffi::FfiConfig ffi_config;
+ ffi_config.bootstrap_server = rust::String(config.bootstrap_server);
+ ffi_config.request_max_size = config.request_max_size;
+ ffi_config.writer_acks = rust::String(config.writer_acks);
+ ffi_config.writer_retries = config.writer_retries;
+ ffi_config.writer_batch_size = config.writer_batch_size;
+ ffi_config.scanner_remote_log_prefetch_num =
config.scanner_remote_log_prefetch_num;
+ ffi_config.scanner_remote_log_download_threads =
config.scanner_remote_log_download_threads;
+ return ffi_config;
+}
+
inline ffi::FfiColumn to_ffi_column(const Column& col) {
ffi::FfiColumn ffi_col;
ffi_col.name = rust::String(col.name);
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index 9b1b5ef..d99209f 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -37,6 +37,16 @@ mod ffi {
value: String,
}
+ struct FfiConfig {
+ bootstrap_server: String,
+ request_max_size: i32,
+ writer_acks: String,
+ writer_retries: i32,
+ writer_batch_size: i32,
+ scanner_remote_log_prefetch_num: usize,
+ scanner_remote_log_download_threads: usize,
+ }
+
struct FfiResult {
error_code: i32,
error_message: String,
@@ -252,7 +262,7 @@ mod ffi {
type Lookuper;
// Connection
- fn new_connection(bootstrap_server: &str) -> Result<*mut Connection>;
+ fn new_connection(config: &FfiConfig) -> Result<*mut Connection>;
unsafe fn delete_connection(conn: *mut Connection);
fn get_admin(self: &Connection) -> Result<*mut Admin>;
fn get_table(self: &Connection, table_path: &FfiTablePath) ->
Result<*mut Table>;
@@ -442,10 +452,15 @@ fn err_result(code: i32, msg: String) -> ffi::FfiResult {
}
// Connection implementation
-fn new_connection(bootstrap_server: &str) -> Result<*mut Connection, String> {
+fn new_connection(config: &ffi::FfiConfig) -> Result<*mut Connection, String> {
let config = fluss::config::Config {
- bootstrap_server: bootstrap_server.to_string(),
- ..Default::default()
+ bootstrap_server: config.bootstrap_server.to_string(),
+ request_max_size: config.request_max_size,
+ writer_acks: config.writer_acks.to_string(),
+ writer_retries: config.writer_retries,
+ writer_batch_size: config.writer_batch_size,
+ scanner_remote_log_prefetch_num:
config.scanner_remote_log_prefetch_num,
+ scanner_remote_log_download_threads:
config.scanner_remote_log_download_threads,
};
let conn = RUNTIME.block_on(async {
fcore::client::FlussConnection::new(config).await });