This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new a447db9  chore: refacotr config for CPP and connect -> create  in cpp 
binding(#298)
a447db9 is described below

commit a447db9f7f59f76808a313214bb67647858992b5
Author: Anton Borisov <[email protected]>
AuthorDate: Wed Feb 11 07:54:21 2026 +0000

    chore: refacotr config for CPP and connect -> create  in cpp binding(#298)
---
 bindings/cpp/examples/admin_example.cpp |  6 ++++--
 bindings/cpp/examples/example.cpp       |  5 ++++-
 bindings/cpp/examples/kv_example.cpp    |  7 ++++---
 bindings/cpp/include/fluss.hpp          | 20 +++++++++++++++++++-
 bindings/cpp/src/connection.cpp         |  5 +++--
 bindings/cpp/src/ffi_converter.hpp      | 12 ++++++++++++
 bindings/cpp/src/lib.rs                 | 23 +++++++++++++++++++----
 7 files changed, 65 insertions(+), 13 deletions(-)

diff --git a/bindings/cpp/examples/admin_example.cpp 
b/bindings/cpp/examples/admin_example.cpp
index 196fe97..c51062c 100644
--- a/bindings/cpp/examples/admin_example.cpp
+++ b/bindings/cpp/examples/admin_example.cpp
@@ -31,13 +31,15 @@ static void check(const char* step, const fluss::Result& r) 
{
 }
 
 int main() {
-    const std::string bootstrap = "127.0.0.1:9123";
     const std::string db_name = "admin_example_db";
     const std::string table_name = "admin_example_table";
 
     // 1) Connect and get Admin
+    fluss::Configuration config;
+    config.bootstrap_server = "127.0.0.1:9123";
+
     fluss::Connection conn;
-    check("connect", fluss::Connection::Connect(bootstrap, conn));
+    check("create", fluss::Connection::Create(config, conn));
 
     fluss::Admin admin;
     check("get_admin", conn.GetAdmin(admin));
diff --git a/bindings/cpp/examples/example.cpp 
b/bindings/cpp/examples/example.cpp
index 59f1ed0..14fbeb2 100644
--- a/bindings/cpp/examples/example.cpp
+++ b/bindings/cpp/examples/example.cpp
@@ -34,8 +34,11 @@ static void check(const char* step, const fluss::Result& r) {
 
 int main() {
     // 1) Connect
+    fluss::Configuration config;
+    config.bootstrap_server = "127.0.0.1:9123";
+
     fluss::Connection conn;
-    check("connect", fluss::Connection::Connect("127.0.0.1:9123", conn));
+    check("create", fluss::Connection::Create(config, conn));
 
     // 2) Admin
     fluss::Admin admin;
diff --git a/bindings/cpp/examples/kv_example.cpp 
b/bindings/cpp/examples/kv_example.cpp
index 2a40db3..3839e0f 100644
--- a/bindings/cpp/examples/kv_example.cpp
+++ b/bindings/cpp/examples/kv_example.cpp
@@ -30,11 +30,12 @@ static void check(const char* step, const fluss::Result& r) 
{
 }
 
 int main() {
-    const std::string bootstrap = "127.0.0.1:9123";
-
     // 1) Connect and get Admin
+    fluss::Configuration config;
+    config.bootstrap_server = "127.0.0.1:9123";
+
     fluss::Connection conn;
-    check("connect", fluss::Connection::Connect(bootstrap, conn));
+    check("create", fluss::Connection::Create(config, conn));
 
     fluss::Admin admin;
     check("get_admin", conn.GetAdmin(admin));
diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index 1806616..6bd6c2f 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -21,6 +21,7 @@
 
 #include <chrono>
 #include <cstdint>
+#include <limits>
 #include <memory>
 #include <stdexcept>
 #include <string>
@@ -809,6 +810,23 @@ class TableUpsert;
 class TableLookup;
 class TableScan;
 
+struct Configuration {
+    // Coordinator server address
+    std::string bootstrap_server{"127.0.0.1:9123"};
+    // Max request size in bytes (10 MB)
+    int32_t request_max_size{10 * 1024 * 1024};
+    // Writer acknowledgment mode: "all", "0", "1", or "-1"
+    std::string writer_acks{"all"};
+    // Max number of writer retries
+    int32_t writer_retries{std::numeric_limits<int32_t>::max()};
+    // Writer batch size in bytes (2 MB)
+    int32_t writer_batch_size{2 * 1024 * 1024};
+    // Number of remote log batches to prefetch during scanning
+    size_t scanner_remote_log_prefetch_num{4};
+    // Number of threads for downloading remote log data
+    size_t scanner_remote_log_download_threads{3};
+};
+
 class Connection {
    public:
     Connection() noexcept;
@@ -819,7 +837,7 @@ class Connection {
     Connection(Connection&& other) noexcept;
     Connection& operator=(Connection&& other) noexcept;
 
-    static Result Connect(const std::string& bootstrap_server, Connection& 
out);
+    static Result Create(const Configuration& config, Connection& out);
 
     bool Available() const;
 
diff --git a/bindings/cpp/src/connection.cpp b/bindings/cpp/src/connection.cpp
index 4fbfafb..bceb264 100644
--- a/bindings/cpp/src/connection.cpp
+++ b/bindings/cpp/src/connection.cpp
@@ -46,9 +46,10 @@ Connection& Connection::operator=(Connection&& other) 
noexcept {
     return *this;
 }
 
-Result Connection::Connect(const std::string& bootstrap_server, Connection& 
out) {
+Result Connection::Create(const Configuration& config, Connection& out) {
     try {
-        out.conn_ = ffi::new_connection(bootstrap_server);
+        auto ffi_config = utils::to_ffi_config(config);
+        out.conn_ = ffi::new_connection(ffi_config);
         return utils::make_ok();
     } catch (const rust::Error& e) {
         return utils::make_error(1, e.what());
diff --git a/bindings/cpp/src/ffi_converter.hpp 
b/bindings/cpp/src/ffi_converter.hpp
index 40676e5..3e6cdcd 100644
--- a/bindings/cpp/src/ffi_converter.hpp
+++ b/bindings/cpp/src/ffi_converter.hpp
@@ -123,6 +123,18 @@ inline ffi::FfiTablePath to_ffi_table_path(const 
TablePath& path) {
     return ffi_path;
 }
 
+inline ffi::FfiConfig to_ffi_config(const Configuration& config) {
+    ffi::FfiConfig ffi_config;
+    ffi_config.bootstrap_server = rust::String(config.bootstrap_server);
+    ffi_config.request_max_size = config.request_max_size;
+    ffi_config.writer_acks = rust::String(config.writer_acks);
+    ffi_config.writer_retries = config.writer_retries;
+    ffi_config.writer_batch_size = config.writer_batch_size;
+    ffi_config.scanner_remote_log_prefetch_num = 
config.scanner_remote_log_prefetch_num;
+    ffi_config.scanner_remote_log_download_threads = 
config.scanner_remote_log_download_threads;
+    return ffi_config;
+}
+
 inline ffi::FfiColumn to_ffi_column(const Column& col) {
     ffi::FfiColumn ffi_col;
     ffi_col.name = rust::String(col.name);
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index 9b1b5ef..d99209f 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -37,6 +37,16 @@ mod ffi {
         value: String,
     }
 
+    struct FfiConfig {
+        bootstrap_server: String,
+        request_max_size: i32,
+        writer_acks: String,
+        writer_retries: i32,
+        writer_batch_size: i32,
+        scanner_remote_log_prefetch_num: usize,
+        scanner_remote_log_download_threads: usize,
+    }
+
     struct FfiResult {
         error_code: i32,
         error_message: String,
@@ -252,7 +262,7 @@ mod ffi {
         type Lookuper;
 
         // Connection
-        fn new_connection(bootstrap_server: &str) -> Result<*mut Connection>;
+        fn new_connection(config: &FfiConfig) -> Result<*mut Connection>;
         unsafe fn delete_connection(conn: *mut Connection);
         fn get_admin(self: &Connection) -> Result<*mut Admin>;
         fn get_table(self: &Connection, table_path: &FfiTablePath) -> 
Result<*mut Table>;
@@ -442,10 +452,15 @@ fn err_result(code: i32, msg: String) -> ffi::FfiResult {
 }
 
 // Connection implementation
-fn new_connection(bootstrap_server: &str) -> Result<*mut Connection, String> {
+fn new_connection(config: &ffi::FfiConfig) -> Result<*mut Connection, String> {
     let config = fluss::config::Config {
-        bootstrap_server: bootstrap_server.to_string(),
-        ..Default::default()
+        bootstrap_server: config.bootstrap_server.to_string(),
+        request_max_size: config.request_max_size,
+        writer_acks: config.writer_acks.to_string(),
+        writer_retries: config.writer_retries,
+        writer_batch_size: config.writer_batch_size,
+        scanner_remote_log_prefetch_num: 
config.scanner_remote_log_prefetch_num,
+        scanner_remote_log_download_threads: 
config.scanner_remote_log_download_threads,
     };
 
     let conn = RUNTIME.block_on(async { 
fcore::client::FlussConnection::new(config).await });

Reply via email to