This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new a34f1c1  feat: introduce cpp bindings (#83)
a34f1c1 is described below

commit a34f1c1d438534bb9ec3de5f1830d600aec09839
Author: AlexZhao <[email protected]>
AuthorDate: Sat Dec 13 15:14:05 2025 +0800

    feat: introduce cpp bindings (#83)
    
    ---------
    
    Co-authored-by: 赵海源 <[email protected]>
---
 Cargo.toml                                         |   2 +-
 Cargo.toml => bindings/cpp/.clang-format           |  25 +-
 bindings/cpp/.gitignore                            |   7 +
 bindings/cpp/CMakeLists.txt                        | 107 +++++
 Cargo.toml => bindings/cpp/Cargo.toml              |  32 +-
 .../fluss/src/config.rs => bindings/cpp/build.rs   |  25 +-
 bindings/cpp/examples/example.cpp                  | 166 +++++++
 bindings/cpp/include/fluss.hpp                     | 461 ++++++++++++++++++
 bindings/cpp/src/admin.cpp                         | 101 ++++
 bindings/cpp/src/connection.cpp                    |  95 ++++
 bindings/cpp/src/ffi_converter.hpp                 | 256 ++++++++++
 bindings/cpp/src/lib.rs                            | 523 +++++++++++++++++++++
 bindings/cpp/src/table.cpp                         | 228 +++++++++
 bindings/cpp/src/types.rs                          | 485 +++++++++++++++++++
 crates/fluss/src/config.rs                         |  14 +-
 15 files changed, 2467 insertions(+), 60 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 15bcb79..b4ac03b 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -28,7 +28,7 @@ rust-version = "1.85"
 
 [workspace]
 resolver = "2"
-members = ["crates/fluss", "crates/examples", "bindings/python"]
+members = ["crates/fluss", "crates/examples", "bindings/python", 
"bindings/cpp"]
 
 [workspace.dependencies]
 fluss = { version = "0.1.0", path = "./crates/fluss" }
diff --git a/Cargo.toml b/bindings/cpp/.clang-format
similarity index 54%
copy from Cargo.toml
copy to bindings/cpp/.clang-format
index 15bcb79..1c31900 100644
--- a/Cargo.toml
+++ b/bindings/cpp/.clang-format
@@ -15,24 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-[workspace.package]
-categories = ["command-line-utilities"]
-description = "The rust implementation of fluss"
-repository = "https://github.com/apache/fluss-rust";
-name = "fluss"
-edition = "2024"
-version = "0.1.0"
-license = "Apache-2.0"
-rust-version = "1.85"
-
-
-[workspace]
-resolver = "2"
-members = ["crates/fluss", "crates/examples", "bindings/python"]
-
-[workspace.dependencies]
-fluss = { version = "0.1.0", path = "./crates/fluss" }
-tokio = { version = "1.44.2", features = ["full"] }
-clap = { version = "4.5.37", features = ["derive"] }
-arrow = { version = "57.0.0", features = ["ipc_compression"] }
-chrono = { version = "0.4", features = ["clock", "std", "wasmbind"] }
+---
+BasedOnStyle: Google
+ColumnLimit: 100
+IndentWidth: 4
diff --git a/bindings/cpp/.gitignore b/bindings/cpp/.gitignore
new file mode 100644
index 0000000..6836e70
--- /dev/null
+++ b/bindings/cpp/.gitignore
@@ -0,0 +1,7 @@
+build/
+cmake-build-*/
+.idea/
+*.o
+*.a
+*.so
+*.dylib
diff --git a/bindings/cpp/CMakeLists.txt b/bindings/cpp/CMakeLists.txt
new file mode 100644
index 0000000..629f3f0
--- /dev/null
+++ b/bindings/cpp/CMakeLists.txt
@@ -0,0 +1,107 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+cmake_minimum_required(VERSION 3.22)
+
+if (POLICY CMP0135)
+    cmake_policy(SET CMP0135 NEW)
+endif()
+
+project(fluss-cpp LANGUAGES CXX)
+
+include(FetchContent)
+set(FLUSS_GOOGLETEST_VERSION 1.15.2 CACHE STRING "version of GoogleTest")
+set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
+
+find_package(Threads REQUIRED)
+
+if (NOT CMAKE_BUILD_TYPE)
+    set(CMAKE_BUILD_TYPE Debug)
+endif()
+
+set(CMAKE_CXX_STANDARD 17)
+set(CMAKE_CXX_STANDARD_REQUIRED ON)
+
+option(FLUSS_ENABLE_ADDRESS_SANITIZER "Enable address sanitizer" OFF)
+option(FLUSS_ENABLE_TESTING "Enable building test binary for fluss" OFF)
+option(FLUSS_DEV "Enable dev mode" OFF)
+
+if (FLUSS_DEV)
+    set(FLUSS_ENABLE_ADDRESS_SANITIZER ON)
+    set(FLUSS_ENABLE_TESTING ON)
+endif()
+
+# Get cargo target dir
+execute_process(COMMAND cargo locate-project --workspace --message-format plain
+    OUTPUT_VARIABLE CARGO_TARGET_DIR
+    WORKING_DIRECTORY ${PROJECT_SOURCE_DIR})
+string(REGEX REPLACE "/Cargo.toml\n$" "/target" CARGO_TARGET_DIR 
"${CARGO_TARGET_DIR}")
+
+set(CARGO_MANIFEST ${PROJECT_SOURCE_DIR}/Cargo.toml)
+set(RUST_SOURCE_FILE ${PROJECT_SOURCE_DIR}/src/lib.rs)
+set(RUST_BRIDGE_CPP ${CARGO_TARGET_DIR}/cxxbridge/fluss-cpp/src/lib.rs.cc)
+set(RUST_HEADER_FILE ${CARGO_TARGET_DIR}/cxxbridge/fluss-cpp/src/lib.rs.h)
+
+if (CMAKE_BUILD_TYPE STREQUAL "Debug")
+    set(RUST_LIB 
${CARGO_TARGET_DIR}/debug/${CMAKE_STATIC_LIBRARY_PREFIX}fluss_cpp${CMAKE_STATIC_LIBRARY_SUFFIX})
+else()
+    set(RUST_LIB 
${CARGO_TARGET_DIR}/release/${CMAKE_STATIC_LIBRARY_PREFIX}fluss_cpp${CMAKE_STATIC_LIBRARY_SUFFIX})
+endif()
+
+set(CPP_INCLUDE_DIR ${PROJECT_SOURCE_DIR}/include
+                    ${PROJECT_SOURCE_DIR}/src
+                    ${CARGO_TARGET_DIR}/cxxbridge
+                    ${CARGO_TARGET_DIR}/cxxbridge/fluss-cpp/src)
+
+file(GLOB CPP_SOURCE_FILE "src/*.cpp")
+file(GLOB CPP_HEADER_FILE "include/*.hpp")
+
+if (NOT CMAKE_BUILD_TYPE STREQUAL "Debug")
+    list(APPEND CARGO_BUILD_FLAGS "--release")
+endif()
+
+add_custom_target(cargo_build
+    COMMAND cargo build --manifest-path ${CARGO_MANIFEST} ${CARGO_BUILD_FLAGS}
+    BYPRODUCTS ${RUST_BRIDGE_CPP} ${RUST_LIB} ${RUST_HEADER_FILE}
+    DEPENDS ${RUST_SOURCE_FILE}
+    USES_TERMINAL
+    COMMENT "Running cargo..."
+)
+
+add_library(fluss_cpp STATIC ${CPP_SOURCE_FILE} ${RUST_BRIDGE_CPP})
+target_sources(fluss_cpp PUBLIC ${CPP_HEADER_FILE})
+target_sources(fluss_cpp PRIVATE ${RUST_HEADER_FILE})
+target_include_directories(fluss_cpp PUBLIC ${CPP_INCLUDE_DIR})
+target_link_libraries(fluss_cpp PUBLIC ${RUST_LIB})
+target_link_libraries(fluss_cpp PRIVATE ${CMAKE_DL_LIBS} Threads::Threads)
+if(APPLE)
+    target_link_libraries(fluss_cpp PUBLIC "-framework CoreFoundation" 
"-framework Security")
+endif()
+
+add_executable(fluss_cpp_example examples/example.cpp)
+target_link_libraries(fluss_cpp_example fluss_cpp)
+target_include_directories(fluss_cpp_example PUBLIC ${CPP_INCLUDE_DIR})
+
+set_target_properties(fluss_cpp
+    PROPERTIES ADDITIONAL_CLEAN_FILES ${CARGO_TARGET_DIR}
+)
+add_dependencies(fluss_cpp cargo_build)
+
+if (FLUSS_ENABLE_ADDRESS_SANITIZER)
+    target_compile_options(fluss_cpp PRIVATE -fsanitize=leak,address,undefined 
-fno-omit-frame-pointer -fno-common -O1)
+    target_link_options(fluss_cpp PRIVATE -fsanitize=leak,address,undefined)
+endif()
\ No newline at end of file
diff --git a/Cargo.toml b/bindings/cpp/Cargo.toml
similarity index 55%
copy from Cargo.toml
copy to bindings/cpp/Cargo.toml
index 15bcb79..2d3d913 100644
--- a/Cargo.toml
+++ b/bindings/cpp/Cargo.toml
@@ -15,24 +15,22 @@
 # specific language governing permissions and limitations
 # under the License.
 
-[workspace.package]
-categories = ["command-line-utilities"]
-description = "The rust implementation of fluss"
-repository = "https://github.com/apache/fluss-rust";
-name = "fluss"
-edition = "2024"
+[package]
+name = "fluss-cpp"
 version = "0.1.0"
-license = "Apache-2.0"
-rust-version = "1.85"
+edition.workspace = true
+rust-version.workspace = true
+publish = false
 
+[lib]
+crate-type = ["staticlib"]
 
-[workspace]
-resolver = "2"
-members = ["crates/fluss", "crates/examples", "bindings/python"]
+[dependencies]
+anyhow = "1.0"
+arrow = { workspace = true }
+cxx = "1.0"
+fluss = { path = "../../crates/fluss" }
+tokio = { version = "1.27", features = ["rt-multi-thread", "macros"] }
 
-[workspace.dependencies]
-fluss = { version = "0.1.0", path = "./crates/fluss" }
-tokio = { version = "1.44.2", features = ["full"] }
-clap = { version = "4.5.37", features = ["derive"] }
-arrow = { version = "57.0.0", features = ["ipc_compression"] }
-chrono = { version = "0.4", features = ["clock", "std", "wasmbind"] }
+[build-dependencies]
+cxx-build = "1.0"
diff --git a/crates/fluss/src/config.rs b/bindings/cpp/build.rs
similarity index 55%
copy from crates/fluss/src/config.rs
copy to bindings/cpp/build.rs
index 0857496..ec75e24 100644
--- a/crates/fluss/src/config.rs
+++ b/bindings/cpp/build.rs
@@ -15,25 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use clap::Parser;
-use serde::{Deserialize, Serialize};
+fn main() {
+    cxx_build::bridge("src/lib.rs")
+        .std("c++17")
+        .compile("fluss-cpp-bridge");
 
-#[derive(Parser, Debug, Clone, Deserialize, Serialize, Default)]
-#[command(author, version, about, long_about = None)]
-pub struct Config {
-    #[arg(long)]
-    #[serde(skip_serializing_if = "Option::is_none")]
-    pub bootstrap_server: Option<String>,
-
-    #[arg(long, default_value_t = 10 * 1024 * 1024)]
-    pub request_max_size: i32,
-
-    #[arg(long, default_value_t = String::from("all"))]
-    pub writer_acks: String,
-
-    #[arg(long, default_value_t = i32::MAX)]
-    pub writer_retries: i32,
-
-    #[arg(long, default_value_t = 2 * 1024 * 1024)]
-    pub writer_batch_size: i32,
+    println!("cargo:rerun-if-changed=src/lib.rs");
 }
diff --git a/bindings/cpp/examples/example.cpp 
b/bindings/cpp/examples/example.cpp
new file mode 100644
index 0000000..5146f28
--- /dev/null
+++ b/bindings/cpp/examples/example.cpp
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "fluss.hpp"
+
+#include <iostream>
+#include <vector>
+
+static void check(const char* step, const fluss::Result& r) {
+    if (!r.Ok()) {
+        std::cerr << step << " failed: code=" << r.error_code
+                  << " msg=" << r.error_message << std::endl;
+        std::exit(1);
+    }
+}
+
+int main() {
+    // 1) Connect
+    fluss::Connection conn;
+    check("connect", fluss::Connection::Connect("127.0.0.1:9123", conn));
+
+    // 2) Admin
+    fluss::Admin admin;
+    check("get_admin", conn.GetAdmin(admin));
+
+    // 3) Schema & descriptor
+    auto schema = fluss::Schema::NewBuilder()
+                        .AddColumn("id", fluss::DataType::Int)
+                        .AddColumn("name", fluss::DataType::String)
+                        .AddColumn("score", fluss::DataType::Float)
+                        .AddColumn("age", fluss::DataType::Int)
+                        .Build();
+
+    auto descriptor = fluss::TableDescriptor::NewBuilder()
+                          .SetSchema(schema)
+                          .SetBucketCount(1)
+                          .SetProperty("table.log.arrow.compression.type", 
"NONE")
+                          .SetComment("cpp example table")
+                          .Build();
+
+    fluss::TablePath table_path("fluss", "sample_table_cpp_v1");
+    // ignore_if_exists=true to allow re-run
+    check("create_table", admin.CreateTable(table_path, descriptor, true));
+
+    // 4) Get table
+    fluss::Table table;
+    check("get_table", conn.GetTable(table_path, table));
+
+    // 5) Writer
+    fluss::AppendWriter writer;
+    check("new_append_writer", table.NewAppendWriter(writer));
+
+    struct RowData {
+        int id;
+        const char* name;
+        float score;
+        int age;
+    };
+
+    std::vector<RowData> rows = {
+        {1, "Alice", 95.2f, 25},
+        {2, "Bob", 87.2f, 30},
+        {3, "Charlie", 92.1f, 35},
+    };
+
+    for (const auto& r : rows) {
+        fluss::GenericRow row;
+        row.SetInt32(0, r.id);
+        row.SetString(1, r.name);
+        row.SetFloat32(2, r.score);
+        row.SetInt32(3, r.age);
+        check("append", writer.Append(row));
+    }
+    check("flush", writer.Flush());
+    std::cout << "Wrote " << rows.size() << " rows" << std::endl;
+
+    // 6) Scan
+    fluss::LogScanner scanner;
+    check("new_log_scanner", table.NewLogScanner(scanner));
+
+    auto info = table.GetTableInfo();
+    int buckets = info.num_buckets;
+    for (int b = 0; b < buckets; ++b) {
+        check("subscribe", scanner.Subscribe(b, 0));
+    }
+
+    fluss::ScanRecords records;
+    check("poll", scanner.Poll(5000, records));
+
+    std::cout << "Scanned records: " << records.records.size() << std::endl;
+    for (const auto& rec : records.records) {
+        std::cout << " offset=" << rec.offset << " id=" << 
rec.row.fields[0].i32_val
+                  << " name=" << rec.row.fields[1].string_val
+                  << " score=" << rec.row.fields[2].f32_val << " age=" << 
rec.row.fields[3].i32_val
+                  << " ts=" << rec.timestamp << std::endl;
+    }
+    
+    // 7) Project only id (0) and name (1) columns
+    std::vector<size_t> projected_columns = {0, 1};
+    fluss::LogScanner projected_scanner;
+    check("new_log_scanner_with_projection", 
+          table.NewLogScannerWithProjection(projected_columns, 
projected_scanner));
+    
+    for (int b = 0; b < buckets; ++b) {
+        check("subscribe_projected", projected_scanner.Subscribe(b, 0));
+    }
+    
+    fluss::ScanRecords projected_records;
+    check("poll_projected", projected_scanner.Poll(5000, projected_records));
+    
+    std::cout << "Projected records: " << projected_records.records.size() << 
std::endl;
+    
+    bool projection_verified = true;
+    for (size_t i = 0; i < projected_records.records.size(); ++i) {
+        const auto& rec = projected_records.records[i];
+        const auto& row = rec.row;
+        
+        if (row.fields.size() != projected_columns.size()) {
+            std::cerr << "ERROR: Record " << i << " has " << row.fields.size() 
+                      << " fields, expected " << projected_columns.size() << 
std::endl;
+            projection_verified = false;
+            continue;
+        }
+        
+        // Verify field types match expected columns
+        // Column 0 (id) should be Int32, Column 1 (name) should be String
+        if (row.fields[0].type != fluss::DatumType::Int32) {
+            std::cerr << "ERROR: Record " << i << " field 0 type mismatch, 
expected Int32" << std::endl;
+            projection_verified = false;
+        }
+        if (row.fields[1].type != fluss::DatumType::String) {
+            std::cerr << "ERROR: Record " << i << " field 1 type mismatch, 
expected String" << std::endl;
+            projection_verified = false;
+        }
+        
+        // Print projected data
+        if (row.fields[0].type == fluss::DatumType::Int32 && 
+            row.fields[1].type == fluss::DatumType::String) {
+            std::cout << "  Record " << i << ": id=" << row.fields[0].i32_val 
+                      << ", name=" << row.fields[1].string_val << std::endl;
+        }
+    }
+    
+    if (projection_verified) {
+        std::cout << "Column pruning verification passed!" << std::endl;
+    } else {
+        std::cerr << "Column pruning verification failed!" << std::endl;
+        std::exit(1);
+    }
+
+    return 0;
+}
diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
new file mode 100644
index 0000000..002f806
--- /dev/null
+++ b/bindings/cpp/include/fluss.hpp
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <optional>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+namespace fluss {
+
+namespace ffi {
+    struct Connection;
+    struct Admin;
+    struct Table;
+    struct AppendWriter;
+    struct LogScanner;
+}  // namespace ffi
+
+enum class DataType {
+    Boolean = 1,
+    TinyInt = 2,
+    SmallInt = 3,
+    Int = 4,
+    BigInt = 5,
+    Float = 6,
+    Double = 7,
+    String = 8,
+    Bytes = 9,
+    Date = 10,
+    Time = 11,
+    Timestamp = 12,
+    TimestampLtz = 13,
+};
+
+enum class DatumType {
+    Null = 0,
+    Bool = 1,
+    Int32 = 2,
+    Int64 = 3,
+    Float32 = 4,
+    Float64 = 5,
+    String = 6,
+    Bytes = 7,
+};
+
+struct Result {
+    int32_t error_code{0};
+    std::string error_message;
+
+    bool Ok() const { return error_code == 0; }
+};
+
+struct TablePath {
+    std::string database_name;
+    std::string table_name;
+
+    TablePath() = default;
+    TablePath(std::string db, std::string tbl)
+        : database_name(std::move(db)), table_name(std::move(tbl)) {}
+
+    std::string ToString() const { return database_name + "." + table_name; }
+};
+
+struct Column {
+    std::string name;
+    DataType data_type;
+    std::string comment;
+};
+
+struct Schema {
+    std::vector<Column> columns;
+    std::vector<std::string> primary_keys;
+
+    class Builder {
+    public:
+        Builder& AddColumn(std::string name, DataType type,
+                           std::string comment = "") {
+            columns_.push_back({std::move(name), type, std::move(comment)});
+            return *this;
+        }
+
+        Builder& SetPrimaryKeys(std::vector<std::string> keys) {
+            primary_keys_ = std::move(keys);
+            return *this;
+        }
+
+        Schema Build() {
+            return Schema{std::move(columns_), std::move(primary_keys_)};
+        }
+
+    private:
+        std::vector<Column> columns_;
+        std::vector<std::string> primary_keys_;
+    };
+
+    static Builder NewBuilder() { return Builder(); }
+};
+
+struct TableDescriptor {
+    Schema schema;
+    std::vector<std::string> partition_keys;
+    int32_t bucket_count{0};
+    std::vector<std::string> bucket_keys;
+    std::unordered_map<std::string, std::string> properties;
+    std::string comment;
+
+    class Builder {
+    public:
+        Builder& SetSchema(Schema s) {
+            schema_ = std::move(s);
+            return *this;
+        }
+
+        Builder& SetPartitionKeys(std::vector<std::string> keys) {
+            partition_keys_ = std::move(keys);
+            return *this;
+        }
+
+        Builder& SetBucketCount(int32_t count) {
+            bucket_count_ = count;
+            return *this;
+        }
+
+        Builder& SetBucketKeys(std::vector<std::string> keys) {
+            bucket_keys_ = std::move(keys);
+            return *this;
+        }
+
+        Builder& SetProperty(std::string key, std::string value) {
+            properties_[std::move(key)] = std::move(value);
+            return *this;
+        }
+
+        Builder& SetComment(std::string comment) {
+            comment_ = std::move(comment);
+            return *this;
+        }
+
+        TableDescriptor Build() {
+            return TableDescriptor{std::move(schema_),
+                                   std::move(partition_keys_),
+                                   bucket_count_,
+                                   std::move(bucket_keys_),
+                                   std::move(properties_),
+                                   std::move(comment_)};
+        }
+
+    private:
+        Schema schema_;
+        std::vector<std::string> partition_keys_;
+        int32_t bucket_count_{0};
+        std::vector<std::string> bucket_keys_;
+        std::unordered_map<std::string, std::string> properties_;
+        std::string comment_;
+    };
+
+    static Builder NewBuilder() { return Builder(); }
+};
+
+struct TableInfo {
+    int64_t table_id;
+    int32_t schema_id;
+    TablePath table_path;
+    int64_t created_time;
+    int64_t modified_time;
+    std::vector<std::string> primary_keys;
+    std::vector<std::string> bucket_keys;
+    std::vector<std::string> partition_keys;
+    int32_t num_buckets;
+    bool has_primary_key;
+    bool is_partitioned;
+    std::unordered_map<std::string, std::string> properties;
+    std::string comment;
+    Schema schema;
+};
+
+struct Datum {
+    DatumType type{DatumType::Null};
+    bool bool_val{false};
+    int32_t i32_val{0};
+    int64_t i64_val{0};
+    float f32_val{0.0F};
+    double f64_val{0.0};
+    std::string string_val;
+    std::vector<uint8_t> bytes_val;
+
+    static Datum Null() { return {}; }
+    static Datum Bool(bool v) {
+        Datum d;
+        d.type = DatumType::Bool;
+        d.bool_val = v;
+        return d;
+    }
+    static Datum Int32(int32_t v) {
+        Datum d;
+        d.type = DatumType::Int32;
+        d.i32_val = v;
+        return d;
+    }
+    static Datum Int64(int64_t v) {
+        Datum d;
+        d.type = DatumType::Int64;
+        d.i64_val = v;
+        return d;
+    }
+    static Datum Float32(float v) {
+        Datum d;
+        d.type = DatumType::Float32;
+        d.f32_val = v;
+        return d;
+    }
+    static Datum Float64(double v) {
+        Datum d;
+        d.type = DatumType::Float64;
+        d.f64_val = v;
+        return d;
+    }
+    static Datum String(std::string v) {
+        Datum d;
+        d.type = DatumType::String;
+        d.string_val = std::move(v);
+        return d;
+    }
+    static Datum Bytes(std::vector<uint8_t> v) {
+        Datum d;
+        d.type = DatumType::Bytes;
+        d.bytes_val = std::move(v);
+        return d;
+    }
+};
+
+struct GenericRow {
+    std::vector<Datum> fields;
+
+    void SetNull(size_t idx) {
+        EnsureSize(idx);
+        fields[idx] = Datum::Null();
+    }
+
+    void SetBool(size_t idx, bool v) {
+        EnsureSize(idx);
+        fields[idx] = Datum::Bool(v);
+    }
+
+    void SetInt32(size_t idx, int32_t v) {
+        EnsureSize(idx);
+        fields[idx] = Datum::Int32(v);
+    }
+
+    void SetInt64(size_t idx, int64_t v) {
+        EnsureSize(idx);
+        fields[idx] = Datum::Int64(v);
+    }
+
+    void SetFloat32(size_t idx, float v) {
+        EnsureSize(idx);
+        fields[idx] = Datum::Float32(v);
+    }
+
+    void SetFloat64(size_t idx, double v) {
+        EnsureSize(idx);
+        fields[idx] = Datum::Float64(v);
+    }
+
+    void SetString(size_t idx, std::string v) {
+        EnsureSize(idx);
+        fields[idx] = Datum::String(std::move(v));
+    }
+
+    void SetBytes(size_t idx, std::vector<uint8_t> v) {
+        EnsureSize(idx);
+        fields[idx] = Datum::Bytes(std::move(v));
+    }
+
+private:
+    void EnsureSize(size_t idx) {
+        if (fields.size() <= idx) {
+            fields.resize(idx + 1);
+        }
+    }
+};
+
+struct ScanRecord {
+    int64_t offset;
+    int64_t timestamp;
+    GenericRow row;
+};
+
+struct ScanRecords {
+    std::vector<ScanRecord> records;
+
+    size_t Size() const { return records.size(); }
+    bool Empty() const { return records.empty(); }
+    const ScanRecord& operator[](size_t idx) const { return records[idx]; }
+
+    auto begin() const { return records.begin(); }
+    auto end() const { return records.end(); }
+};
+
+struct BucketOffset {
+    int64_t table_id;
+    int64_t partition_id;
+    int32_t bucket_id;
+    int64_t offset;
+};
+
+struct LakeSnapshot {
+    int64_t snapshot_id;
+    std::vector<BucketOffset> bucket_offsets;
+};
+
+class AppendWriter;
+class LogScanner;
+class Admin;
+class Table;
+
+class Connection {
+public:
+    Connection() noexcept;
+    ~Connection() noexcept;
+
+    Connection(const Connection&) = delete;
+    Connection& operator=(const Connection&) = delete;
+    Connection(Connection&& other) noexcept;
+    Connection& operator=(Connection&& other) noexcept;
+
+    static Result Connect(const std::string& bootstrap_server, Connection& 
out);
+
+    bool Available() const;
+
+    Result GetAdmin(Admin& out);
+    Result GetTable(const TablePath& table_path, Table& out);
+
+private:
+    void Destroy() noexcept;
+    ffi::Connection* conn_{nullptr};
+};
+
+class Admin {
+public:
+    Admin() noexcept;
+    ~Admin() noexcept;
+
+    Admin(const Admin&) = delete;
+    Admin& operator=(const Admin&) = delete;
+    Admin(Admin&& other) noexcept;
+    Admin& operator=(Admin&& other) noexcept;
+
+    bool Available() const;
+
+    Result CreateTable(const TablePath& table_path,
+                       const TableDescriptor& descriptor,
+                       bool ignore_if_exists = false);
+
+    Result GetTable(const TablePath& table_path, TableInfo& out);
+
+    Result GetLatestLakeSnapshot(const TablePath& table_path, LakeSnapshot& 
out);
+
+private:
+    friend class Connection;
+    Admin(ffi::Admin* admin) noexcept;
+
+    void Destroy() noexcept;
+    ffi::Admin* admin_{nullptr};
+};
+
+class Table {
+public:
+    Table() noexcept;
+    ~Table() noexcept;
+
+    Table(const Table&) = delete;
+    Table& operator=(const Table&) = delete;
+    Table(Table&& other) noexcept;
+    Table& operator=(Table&& other) noexcept;
+
+    bool Available() const;
+
+    Result NewAppendWriter(AppendWriter& out);
+    Result NewLogScanner(LogScanner& out);
+    Result NewLogScannerWithProjection(const std::vector<size_t>& 
column_indices, LogScanner& out);
+
+    TableInfo GetTableInfo() const;
+    TablePath GetTablePath() const;
+    bool HasPrimaryKey() const;
+
+private:
+    friend class Connection;
+    Table(ffi::Table* table) noexcept;
+
+    void Destroy() noexcept;
+    ffi::Table* table_{nullptr};
+};
+
+class AppendWriter {
+public:
+    AppendWriter() noexcept;
+    ~AppendWriter() noexcept;
+
+    AppendWriter(const AppendWriter&) = delete;
+    AppendWriter& operator=(const AppendWriter&) = delete;
+    AppendWriter(AppendWriter&& other) noexcept;
+    AppendWriter& operator=(AppendWriter&& other) noexcept;
+
+    bool Available() const;
+
+    Result Append(const GenericRow& row);
+    Result Flush();
+
+private:
+    friend class Table;
+    AppendWriter(ffi::AppendWriter* writer) noexcept;
+
+    void Destroy() noexcept;
+    ffi::AppendWriter* writer_{nullptr};
+};
+
+class LogScanner {
+public:
+    LogScanner() noexcept;
+    ~LogScanner() noexcept;
+
+    LogScanner(const LogScanner&) = delete;
+    LogScanner& operator=(const LogScanner&) = delete;
+    LogScanner(LogScanner&& other) noexcept;
+    LogScanner& operator=(LogScanner&& other) noexcept;
+
+    bool Available() const;
+
+    Result Subscribe(int32_t bucket_id, int64_t start_offset);
+    Result Poll(int64_t timeout_ms, ScanRecords& out);
+
+private:
+    friend class Table;
+    LogScanner(ffi::LogScanner* scanner) noexcept;
+
+    void Destroy() noexcept;
+    ffi::LogScanner* scanner_{nullptr};
+};
+
+}  // namespace fluss
diff --git a/bindings/cpp/src/admin.cpp b/bindings/cpp/src/admin.cpp
new file mode 100644
index 0000000..f6997a6
--- /dev/null
+++ b/bindings/cpp/src/admin.cpp
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "fluss.hpp"
+#include "lib.rs.h"
+#include "ffi_converter.hpp"
+#include "rust/cxx.h"
+
+namespace fluss {
+
+Admin::Admin() noexcept = default;
+
+Admin::Admin(ffi::Admin* admin) noexcept : admin_(admin) {}
+
+Admin::~Admin() noexcept { Destroy(); }
+
+void Admin::Destroy() noexcept {
+    if (admin_) {
+        ffi::delete_admin(admin_);
+        admin_ = nullptr;
+    }
+}
+
+Admin::Admin(Admin&& other) noexcept : admin_(other.admin_) {
+    other.admin_ = nullptr;
+}
+
+Admin& Admin::operator=(Admin&& other) noexcept {
+    if (this != &other) {
+        Destroy();
+        admin_ = other.admin_;
+        other.admin_ = nullptr;
+    }
+    return *this;
+}
+
+bool Admin::Available() const { return admin_ != nullptr; }
+
+Result Admin::CreateTable(const TablePath& table_path,
+                          const TableDescriptor& descriptor,
+                          bool ignore_if_exists) {
+    if (!Available()) {
+        return utils::make_error(1, "Admin not available");
+    }
+
+    auto ffi_path = utils::to_ffi_table_path(table_path);
+    auto ffi_desc = utils::to_ffi_table_descriptor(descriptor);
+
+    auto ffi_result = admin_->create_table(ffi_path, ffi_desc, 
ignore_if_exists);
+    return utils::from_ffi_result(ffi_result);
+}
+
+Result Admin::GetTable(const TablePath& table_path, TableInfo& out) {
+    if (!Available()) {
+        return utils::make_error(1, "Admin not available");
+    }
+
+    auto ffi_path = utils::to_ffi_table_path(table_path);
+    auto ffi_result = admin_->get_table_info(ffi_path);
+
+    auto result = utils::from_ffi_result(ffi_result.result);
+    if (result.Ok()) {
+        out = utils::from_ffi_table_info(ffi_result.table_info);
+    }
+
+    return result;
+}
+
+Result Admin::GetLatestLakeSnapshot(const TablePath& table_path, LakeSnapshot& 
out) {
+    if (!Available()) {
+        return utils::make_error(1, "Admin not available");
+    }
+
+    auto ffi_path = utils::to_ffi_table_path(table_path);
+    auto ffi_result = admin_->get_latest_lake_snapshot(ffi_path);
+
+    auto result = utils::from_ffi_result(ffi_result.result);
+    if (result.Ok()) {
+        out = utils::from_ffi_lake_snapshot(ffi_result.lake_snapshot);
+    }
+
+    return result;
+}
+
+}  // namespace fluss
diff --git a/bindings/cpp/src/connection.cpp b/bindings/cpp/src/connection.cpp
new file mode 100644
index 0000000..ea884cd
--- /dev/null
+++ b/bindings/cpp/src/connection.cpp
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "fluss.hpp"
+#include "lib.rs.h"
+#include "ffi_converter.hpp"
+#include "rust/cxx.h"
+
+namespace fluss {
+
+Connection::Connection() noexcept = default;
+
+Connection::~Connection() noexcept { Destroy(); }
+
+void Connection::Destroy() noexcept {
+    if (conn_) {
+        ffi::delete_connection(conn_);
+        conn_ = nullptr;
+    }
+}
+
+Connection::Connection(Connection&& other) noexcept : conn_(other.conn_) {
+    other.conn_ = nullptr;
+}
+
+Connection& Connection::operator=(Connection&& other) noexcept {
+    if (this != &other) {
+        Destroy();
+        conn_ = other.conn_;
+        other.conn_ = nullptr;
+    }
+    return *this;
+}
+
+Result Connection::Connect(const std::string& bootstrap_server, Connection& 
out) {
+    try {
+        out.conn_ = ffi::new_connection(bootstrap_server);
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        return utils::make_error(1, e.what());
+    }
+}
+
+bool Connection::Available() const { return conn_ != nullptr; }
+
+Result Connection::GetAdmin(Admin& out) {
+    if (!Available()) {
+        return utils::make_error(1, "Connection not available");
+    }
+
+    try {
+        out.admin_ = conn_->get_admin();
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        return utils::make_error(1, e.what());
+    }
+}
+
+Result Connection::GetTable(const TablePath& table_path, Table& out) {
+    if (!Available()) {
+        return utils::make_error(1, "Connection not available");
+    }
+
+    try {
+        auto ffi_path = utils::to_ffi_table_path(table_path);
+        out.table_ = conn_->get_table(ffi_path);
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        return utils::make_error(1, e.what());
+    }
+}
+
+}  // namespace fluss
diff --git a/bindings/cpp/src/ffi_converter.hpp 
b/bindings/cpp/src/ffi_converter.hpp
new file mode 100644
index 0000000..52dd7fe
--- /dev/null
+++ b/bindings/cpp/src/ffi_converter.hpp
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include "fluss.hpp"
+#include "lib.rs.h"
+
+namespace fluss {
+namespace utils {
+
+inline Result make_error(int32_t code, std::string msg) {
+    return Result{code, std::move(msg)};
+}
+
+inline Result make_ok() {
+    return Result{0, {}};
+}
+
+inline Result from_ffi_result(const ffi::FfiResult& ffi_result) {
+    return Result{ffi_result.error_code, 
std::string(ffi_result.error_message)};
+}
+
+inline ffi::FfiTablePath to_ffi_table_path(const TablePath& path) {
+    ffi::FfiTablePath ffi_path;
+    ffi_path.database_name = rust::String(path.database_name);
+    ffi_path.table_name = rust::String(path.table_name);
+    return ffi_path;
+}
+
+inline ffi::FfiColumn to_ffi_column(const Column& col) {
+    ffi::FfiColumn ffi_col;
+    ffi_col.name = rust::String(col.name);
+    ffi_col.data_type = static_cast<int32_t>(col.data_type);
+    ffi_col.comment = rust::String(col.comment);
+    return ffi_col;
+}
+
+inline ffi::FfiSchema to_ffi_schema(const Schema& schema) {
+    ffi::FfiSchema ffi_schema;
+
+    rust::Vec<ffi::FfiColumn> cols;
+    for (const auto& col : schema.columns) {
+        cols.push_back(to_ffi_column(col));
+    }
+    ffi_schema.columns = std::move(cols);
+
+    rust::Vec<rust::String> pks;
+    for (const auto& pk : schema.primary_keys) {
+        pks.push_back(rust::String(pk));
+    }
+    ffi_schema.primary_keys = std::move(pks);
+
+    return ffi_schema;
+}
+
+inline ffi::FfiTableDescriptor to_ffi_table_descriptor(const TableDescriptor& 
desc) {
+    ffi::FfiTableDescriptor ffi_desc;
+
+    ffi_desc.schema = to_ffi_schema(desc.schema);
+
+    rust::Vec<rust::String> partition_keys;
+    for (const auto& pk : desc.partition_keys) {
+        partition_keys.push_back(rust::String(pk));
+    }
+    ffi_desc.partition_keys = std::move(partition_keys);
+
+    ffi_desc.bucket_count = desc.bucket_count;
+
+    rust::Vec<rust::String> bucket_keys;
+    for (const auto& bk : desc.bucket_keys) {
+        bucket_keys.push_back(rust::String(bk));
+    }
+    ffi_desc.bucket_keys = std::move(bucket_keys);
+
+    rust::Vec<ffi::HashMapValue> props;
+    for (const auto& [k, v] : desc.properties) {
+        ffi::HashMapValue prop;
+        prop.key = rust::String(k);
+        prop.value = rust::String(v);
+        props.push_back(prop);
+    }
+    ffi_desc.properties = std::move(props);
+
+    ffi_desc.comment = rust::String(desc.comment);
+
+    return ffi_desc;
+}
+
+inline ffi::FfiDatum to_ffi_datum(const Datum& datum) {
+    ffi::FfiDatum ffi_datum;
+    ffi_datum.datum_type = static_cast<int32_t>(datum.type);
+    ffi_datum.bool_val = datum.bool_val;
+    ffi_datum.i32_val = datum.i32_val;
+    ffi_datum.i64_val = datum.i64_val;
+    ffi_datum.f32_val = datum.f32_val;
+    ffi_datum.f64_val = datum.f64_val;
+    ffi_datum.string_val = rust::String(datum.string_val);
+
+    rust::Vec<uint8_t> bytes;
+    for (auto b : datum.bytes_val) {
+        bytes.push_back(b);
+    }
+    ffi_datum.bytes_val = std::move(bytes);
+
+    return ffi_datum;
+}
+
+inline ffi::FfiGenericRow to_ffi_generic_row(const GenericRow& row) {
+    ffi::FfiGenericRow ffi_row;
+
+    rust::Vec<ffi::FfiDatum> fields;
+    for (const auto& field : row.fields) {
+        fields.push_back(to_ffi_datum(field));
+    }
+    ffi_row.fields = std::move(fields);
+
+    return ffi_row;
+}
+
+inline Column from_ffi_column(const ffi::FfiColumn& ffi_col) {
+    return Column{
+        std::string(ffi_col.name),
+        static_cast<DataType>(ffi_col.data_type),
+        std::string(ffi_col.comment)};
+}
+
+inline Schema from_ffi_schema(const ffi::FfiSchema& ffi_schema) {
+    Schema schema;
+
+    for (const auto& col : ffi_schema.columns) {
+        schema.columns.push_back(from_ffi_column(col));
+    }
+
+    for (const auto& pk : ffi_schema.primary_keys) {
+        schema.primary_keys.push_back(std::string(pk));
+    }
+
+    return schema;
+}
+
+inline TableInfo from_ffi_table_info(const ffi::FfiTableInfo& ffi_info) {
+    TableInfo info;
+
+    info.table_id = ffi_info.table_id;
+    info.schema_id = ffi_info.schema_id;
+    info.table_path = TablePath{
+        std::string(ffi_info.table_path.database_name),
+        std::string(ffi_info.table_path.table_name)};
+    info.created_time = ffi_info.created_time;
+    info.modified_time = ffi_info.modified_time;
+
+    for (const auto& pk : ffi_info.primary_keys) {
+        info.primary_keys.push_back(std::string(pk));
+    }
+
+    for (const auto& bk : ffi_info.bucket_keys) {
+        info.bucket_keys.push_back(std::string(bk));
+    }
+
+    for (const auto& pk : ffi_info.partition_keys) {
+        info.partition_keys.push_back(std::string(pk));
+    }
+
+    info.num_buckets = ffi_info.num_buckets;
+    info.has_primary_key = ffi_info.has_primary_key;
+    info.is_partitioned = ffi_info.is_partitioned;
+
+    for (const auto& prop : ffi_info.properties) {
+        info.properties[std::string(prop.key)] = std::string(prop.value);
+    }
+
+    info.comment = std::string(ffi_info.comment);
+    info.schema = from_ffi_schema(ffi_info.schema);
+
+    return info;
+}
+
+inline Datum from_ffi_datum(const ffi::FfiDatum& ffi_datum) {
+    Datum datum;
+    datum.type = static_cast<DatumType>(ffi_datum.datum_type);
+    datum.bool_val = ffi_datum.bool_val;
+    datum.i32_val = ffi_datum.i32_val;
+    datum.i64_val = ffi_datum.i64_val;
+    datum.f32_val = ffi_datum.f32_val;
+    datum.f64_val = ffi_datum.f64_val;
+    // todo: avoid copy string
+    datum.string_val = std::string(ffi_datum.string_val);
+
+    for (auto b : ffi_datum.bytes_val) {
+        datum.bytes_val.push_back(b);
+    }
+
+    return datum;
+}
+
+inline GenericRow from_ffi_generic_row(const ffi::FfiGenericRow& ffi_row) {
+    GenericRow row;
+
+    for (const auto& field : ffi_row.fields) {
+        row.fields.push_back(from_ffi_datum(field));
+    }
+
+    return row;
+}
+
+inline ScanRecord from_ffi_scan_record(const ffi::FfiScanRecord& ffi_record) {
+    return ScanRecord{
+        ffi_record.offset,
+        ffi_record.timestamp,
+        from_ffi_generic_row(ffi_record.row)};
+}
+
+inline ScanRecords from_ffi_scan_records(const ffi::FfiScanRecords& 
ffi_records) {
+    ScanRecords records;
+
+    for (const auto& record : ffi_records.records) {
+        records.records.push_back(from_ffi_scan_record(record));
+    }
+
+    return records;
+}
+
+inline LakeSnapshot from_ffi_lake_snapshot(const ffi::FfiLakeSnapshot& 
ffi_snapshot) {
+    LakeSnapshot snapshot;
+    snapshot.snapshot_id = ffi_snapshot.snapshot_id;
+
+    for (const auto& offset : ffi_snapshot.bucket_offsets) {
+        snapshot.bucket_offsets.push_back(BucketOffset{
+            offset.table_id,
+            offset.partition_id,
+            offset.bucket_id,
+            offset.offset});
+    }
+
+    return snapshot;
+}
+
+}  // namespace utils
+}  // namespace fluss
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
new file mode 100644
index 0000000..3e883e2
--- /dev/null
+++ b/bindings/cpp/src/lib.rs
@@ -0,0 +1,523 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod types;
+
+use std::sync::{Arc, LazyLock};
+use std::time::Duration;
+
+use fluss as fcore;
+
+static RUNTIME: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
+    tokio::runtime::Builder::new_multi_thread()
+        .enable_all()
+        .build()
+        .unwrap()
+});
+
+#[cxx::bridge(namespace = "fluss::ffi")]
+mod ffi {
+    struct HashMapValue {
+        key: String,
+        value: String,
+    }
+
+    struct FfiResult {
+        error_code: i32,
+        error_message: String,
+    }
+
+    struct FfiTablePath {
+        database_name: String,
+        table_name: String,
+    }
+
+    struct FfiColumn {
+        name: String,
+        data_type: i32,
+        comment: String,
+    }
+
+    struct FfiSchema {
+        columns: Vec<FfiColumn>,
+        primary_keys: Vec<String>,
+    }
+
+    struct FfiTableDescriptor {
+        schema: FfiSchema,
+        partition_keys: Vec<String>,
+        bucket_count: i32,
+        bucket_keys: Vec<String>,
+        properties: Vec<HashMapValue>,
+        comment: String,
+    }
+
+    struct FfiTableInfo {
+        table_id: i64,
+        schema_id: i32,
+        table_path: FfiTablePath,
+        created_time: i64,
+        modified_time: i64,
+        primary_keys: Vec<String>,
+        bucket_keys: Vec<String>,
+        partition_keys: Vec<String>,
+        num_buckets: i32,
+        has_primary_key: bool,
+        is_partitioned: bool,
+        properties: Vec<HashMapValue>,
+        comment: String,
+        schema: FfiSchema,
+    }
+
+    struct FfiTableInfoResult {
+        result: FfiResult,
+        table_info: FfiTableInfo,
+    }
+
+    struct FfiDatum {
+        datum_type: i32,
+        bool_val: bool,
+        i32_val: i32,
+        i64_val: i64,
+        f32_val: f32,
+        f64_val: f64,
+        string_val: String,
+        bytes_val: Vec<u8>,
+    }
+
+    struct FfiGenericRow {
+        fields: Vec<FfiDatum>,
+    }
+
+    struct FfiScanRecord {
+        offset: i64,
+        timestamp: i64,
+        row: FfiGenericRow,
+    }
+
+    struct FfiScanRecords {
+        records: Vec<FfiScanRecord>,
+    }
+
+    struct FfiScanRecordsResult {
+        result: FfiResult,
+        scan_records: FfiScanRecords,
+    }
+
+    struct FfiLakeSnapshot {
+        snapshot_id: i64,
+        bucket_offsets: Vec<FfiBucketOffset>,
+    }
+
+    struct FfiBucketOffset {
+        table_id: i64,
+        partition_id: i64,
+        bucket_id: i32,
+        offset: i64,
+    }
+
+    struct FfiLakeSnapshotResult {
+        result: FfiResult,
+        lake_snapshot: FfiLakeSnapshot,
+    }
+
+    extern "Rust" {
+        type Connection;
+        type Admin;
+        type Table;
+        type AppendWriter;
+        type LogScanner;
+
+        // Connection
+        fn new_connection(bootstrap_server: &str) -> Result<*mut Connection>;
+        unsafe fn delete_connection(conn: *mut Connection);
+        fn get_admin(self: &Connection) -> Result<*mut Admin>;
+        fn get_table(self: &Connection, table_path: &FfiTablePath) -> 
Result<*mut Table>;
+
+        // Admin
+        unsafe fn delete_admin(admin: *mut Admin);
+        fn create_table(
+            self: &Admin,
+            table_path: &FfiTablePath,
+            descriptor: &FfiTableDescriptor,
+            ignore_if_exists: bool,
+        ) -> FfiResult;
+        fn get_table_info(self: &Admin, table_path: &FfiTablePath) -> 
FfiTableInfoResult;
+        fn get_latest_lake_snapshot(
+            self: &Admin,
+            table_path: &FfiTablePath,
+        ) -> FfiLakeSnapshotResult;
+
+        // Table
+        unsafe fn delete_table(table: *mut Table);
+        fn new_append_writer(self: &Table) -> Result<*mut AppendWriter>;
+        fn new_log_scanner(self: &Table) -> Result<*mut LogScanner>;
+        fn new_log_scanner_with_projection(
+            self: &Table,
+            column_indices: Vec<usize>,
+        ) -> Result<*mut LogScanner>;
+        fn get_table_info_from_table(self: &Table) -> FfiTableInfo;
+        fn get_table_path(self: &Table) -> FfiTablePath;
+        fn has_primary_key(self: &Table) -> bool;
+
+        // AppendWriter
+        unsafe fn delete_append_writer(writer: *mut AppendWriter);
+        fn append(self: &mut AppendWriter, row: &FfiGenericRow) -> FfiResult;
+        fn flush(self: &mut AppendWriter) -> FfiResult;
+
+        // LogScanner
+        unsafe fn delete_log_scanner(scanner: *mut LogScanner);
+        fn subscribe(self: &LogScanner, bucket_id: i32, start_offset: i64) -> 
FfiResult;
+        fn poll(self: &LogScanner, timeout_ms: i64) -> FfiScanRecordsResult;
+    }
+}
+
+pub struct Connection {
+    inner: Arc<fcore::client::FlussConnection>,
+    #[allow(dead_code)]
+    metadata: Option<Arc<fcore::client::Metadata>>,
+}
+
+pub struct Admin {
+    inner: fcore::client::FlussAdmin,
+}
+
+pub struct Table {
+    connection: Arc<fcore::client::FlussConnection>,
+    metadata: Arc<fcore::client::Metadata>,
+    table_info: fcore::metadata::TableInfo,
+    table_path: fcore::metadata::TablePath,
+    has_pk: bool,
+}
+
+pub struct AppendWriter {
+    inner: fcore::client::AppendWriter,
+}
+
+pub struct LogScanner {
+    inner: fcore::client::LogScanner,
+}
+
+fn ok_result() -> ffi::FfiResult {
+    ffi::FfiResult {
+        error_code: 0,
+        error_message: String::new(),
+    }
+}
+
+fn err_result(code: i32, msg: String) -> ffi::FfiResult {
+    ffi::FfiResult {
+        error_code: code,
+        error_message: msg,
+    }
+}
+
+// Connection implementation
+fn new_connection(bootstrap_server: &str) -> Result<*mut Connection, String> {
+    let config = fluss::config::Config {
+        bootstrap_server: Some(bootstrap_server.to_string()),
+        ..Default::default()
+    };
+
+    let conn = RUNTIME.block_on(async { 
fcore::client::FlussConnection::new(config).await });
+
+    match conn {
+        Ok(c) => {
+            let conn = Box::into_raw(Box::new(Connection {
+                inner: Arc::new(c),
+                metadata: None,
+            }));
+            Ok(conn)
+        }
+        Err(e) => Err(format!("Failed to connect: {}", e)),
+    }
+}
+
+unsafe fn delete_connection(conn: *mut Connection) {
+    if !conn.is_null() {
+        unsafe {
+            drop(Box::from_raw(conn));
+        }
+    }
+}
+
+impl Connection {
+    fn get_admin(&self) -> Result<*mut Admin, String> {
+        let admin_result = RUNTIME.block_on(async { 
self.inner.get_admin().await });
+
+        match admin_result {
+            Ok(admin) => {
+                let admin = Box::into_raw(Box::new(Admin { inner: admin }));
+                Ok(admin)
+            }
+            Err(e) => Err(format!("Failed to get admin: {}", e)),
+        }
+    }
+
+    fn get_table(&self, table_path: &ffi::FfiTablePath) -> Result<*mut Table, 
String> {
+        let path = fcore::metadata::TablePath::new(
+            table_path.database_name.clone(),
+            table_path.table_name.clone(),
+        );
+
+        let table_result = RUNTIME.block_on(async { 
self.inner.get_table(&path).await });
+
+        match table_result {
+            Ok(t) => {
+                let table = Box::into_raw(Box::new(Table {
+                    connection: self.inner.clone(),
+                    metadata: t.metadata().clone(),
+                    table_info: t.table_info().clone(),
+                    table_path: t.table_path().clone(),
+                    has_pk: t.has_primary_key(),
+                }));
+                Ok(table)
+            }
+            Err(e) => Err(format!("Failed to get table: {}", e)),
+        }
+    }
+}
+
+// Admin implementation
+unsafe fn delete_admin(admin: *mut Admin) {
+    if !admin.is_null() {
+        unsafe {
+            drop(Box::from_raw(admin));
+        }
+    }
+}
+
+impl Admin {
+    fn create_table(
+        &self,
+        table_path: &ffi::FfiTablePath,
+        descriptor: &ffi::FfiTableDescriptor,
+        ignore_if_exists: bool,
+    ) -> ffi::FfiResult {
+        let path = fcore::metadata::TablePath::new(
+            table_path.database_name.clone(),
+            table_path.table_name.clone(),
+        );
+
+        let core_descriptor = match types::ffi_descriptor_to_core(descriptor) {
+            Ok(d) => d,
+            Err(e) => return err_result(1, e.to_string()),
+        };
+
+        let result = RUNTIME.block_on(async {
+            self.inner
+                .create_table(&path, &core_descriptor, ignore_if_exists)
+                .await
+        });
+
+        match result {
+            Ok(_) => ok_result(),
+            Err(e) => err_result(2, e.to_string()),
+        }
+    }
+
+    fn get_table_info(&self, table_path: &ffi::FfiTablePath) -> 
ffi::FfiTableInfoResult {
+        let path = fcore::metadata::TablePath::new(
+            table_path.database_name.clone(),
+            table_path.table_name.clone(),
+        );
+
+        let result = RUNTIME.block_on(async { 
self.inner.get_table(&path).await });
+
+        match result {
+            Ok(info) => ffi::FfiTableInfoResult {
+                result: ok_result(),
+                table_info: types::core_table_info_to_ffi(&info),
+            },
+            Err(e) => ffi::FfiTableInfoResult {
+                result: err_result(1, e.to_string()),
+                table_info: types::empty_table_info(),
+            },
+        }
+    }
+
+    fn get_latest_lake_snapshot(
+        &self,
+        table_path: &ffi::FfiTablePath,
+    ) -> ffi::FfiLakeSnapshotResult {
+        let path = fcore::metadata::TablePath::new(
+            table_path.database_name.clone(),
+            table_path.table_name.clone(),
+        );
+
+        let result = RUNTIME.block_on(async { 
self.inner.get_latest_lake_snapshot(&path).await });
+
+        match result {
+            Ok(snapshot) => ffi::FfiLakeSnapshotResult {
+                result: ok_result(),
+                lake_snapshot: types::core_lake_snapshot_to_ffi(&snapshot),
+            },
+            Err(e) => ffi::FfiLakeSnapshotResult {
+                result: err_result(1, e.to_string()),
+                lake_snapshot: ffi::FfiLakeSnapshot {
+                    snapshot_id: -1,
+                    bucket_offsets: vec![],
+                },
+            },
+        }
+    }
+}
+
+// Table implementation
+unsafe fn delete_table(table: *mut Table) {
+    if !table.is_null() {
+        unsafe {
+            drop(Box::from_raw(table));
+        }
+    }
+}
+
+impl Table {
+    fn new_append_writer(&self) -> Result<*mut AppendWriter, String> {
+        let _enter = RUNTIME.enter();
+
+        let fluss_table = fcore::client::FlussTable::new(
+            &self.connection,
+            self.metadata.clone(),
+            self.table_info.clone(),
+        );
+
+        let table_append = match fluss_table.new_append() {
+            Ok(a) => a,
+            Err(e) => return Err(format!("Failed to create append: {}", e)),
+        };
+
+        let writer = table_append.create_writer();
+        let writer = Box::into_raw(Box::new(AppendWriter { inner: writer }));
+        Ok(writer)
+    }
+
+    fn new_log_scanner(&self) -> Result<*mut LogScanner, String> {
+        let fluss_table = fcore::client::FlussTable::new(
+            &self.connection,
+            self.metadata.clone(),
+            self.table_info.clone(),
+        );
+
+        let scanner = fluss_table.new_scan().create_log_scanner();
+        let scanner = Box::into_raw(Box::new(LogScanner { inner: scanner }));
+        Ok(scanner)
+    }
+
+    fn new_log_scanner_with_projection(
+        &self,
+        column_indices: Vec<usize>,
+    ) -> Result<*mut LogScanner, String> {
+        let fluss_table = fcore::client::FlussTable::new(
+            &self.connection,
+            self.metadata.clone(),
+            self.table_info.clone(),
+        );
+
+        let scan = fluss_table.new_scan();
+        let scan = match scan.project(&column_indices) {
+            Ok(s) => s,
+            Err(e) => return Err(format!("Failed to project columns: {}", e)),
+        };
+        let scanner = scan.create_log_scanner();
+        let scanner = Box::into_raw(Box::new(LogScanner { inner: scanner }));
+        Ok(scanner)
+    }
+
+    fn get_table_info_from_table(&self) -> ffi::FfiTableInfo {
+        types::core_table_info_to_ffi(&self.table_info)
+    }
+
+    fn get_table_path(&self) -> ffi::FfiTablePath {
+        ffi::FfiTablePath {
+            database_name: self.table_path.database().to_string(),
+            table_name: self.table_path.table().to_string(),
+        }
+    }
+
+    fn has_primary_key(&self) -> bool {
+        self.has_pk
+    }
+}
+
+// AppendWriter implementation
+unsafe fn delete_append_writer(writer: *mut AppendWriter) {
+    if !writer.is_null() {
+        unsafe {
+            drop(Box::from_raw(writer));
+        }
+    }
+}
+
+impl AppendWriter {
+    fn append(&mut self, row: &ffi::FfiGenericRow) -> ffi::FfiResult {
+        let generic_row = types::ffi_row_to_core(row);
+
+        let result = RUNTIME.block_on(async { 
self.inner.append(generic_row).await });
+
+        match result {
+            Ok(_) => ok_result(),
+            Err(e) => err_result(1, e.to_string()),
+        }
+    }
+
+    fn flush(&mut self) -> ffi::FfiResult {
+        let result = RUNTIME.block_on(async { self.inner.flush().await });
+
+        match result {
+            Ok(_) => ok_result(),
+            Err(e) => err_result(1, e.to_string()),
+        }
+    }
+}
+
+// LogScanner implementation
+unsafe fn delete_log_scanner(scanner: *mut LogScanner) {
+    if !scanner.is_null() {
+        unsafe {
+            drop(Box::from_raw(scanner));
+        }
+    }
+}
+
+impl LogScanner {
+    fn subscribe(&self, bucket_id: i32, start_offset: i64) -> ffi::FfiResult {
+        let result =
+            RUNTIME.block_on(async { self.inner.subscribe(bucket_id, 
start_offset).await });
+
+        match result {
+            Ok(_) => ok_result(),
+            Err(e) => err_result(1, e.to_string()),
+        }
+    }
+
+    fn poll(&self, timeout_ms: i64) -> ffi::FfiScanRecordsResult {
+        let timeout = Duration::from_millis(timeout_ms as u64);
+        let result = RUNTIME.block_on(async { self.inner.poll(timeout).await 
});
+
+        match result {
+            Ok(records) => ffi::FfiScanRecordsResult {
+                result: ok_result(),
+                scan_records: types::core_scan_records_to_ffi(&records),
+            },
+            Err(e) => ffi::FfiScanRecordsResult {
+                result: err_result(1, e.to_string()),
+                scan_records: ffi::FfiScanRecords { records: vec![] },
+            },
+        }
+    }
+}
diff --git a/bindings/cpp/src/table.cpp b/bindings/cpp/src/table.cpp
new file mode 100644
index 0000000..b28b783
--- /dev/null
+++ b/bindings/cpp/src/table.cpp
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "fluss.hpp"
+#include "lib.rs.h"
+#include "ffi_converter.hpp"
+#include "rust/cxx.h"
+
+namespace fluss {
+
+Table::Table() noexcept = default;
+
+Table::Table(ffi::Table* table) noexcept : table_(table) {}
+
+Table::~Table() noexcept { Destroy(); }
+
+void Table::Destroy() noexcept {
+    if (table_) {
+        ffi::delete_table(table_);
+        table_ = nullptr;
+    }
+}
+
+Table::Table(Table&& other) noexcept : table_(other.table_) {
+    other.table_ = nullptr;
+}
+
+Table& Table::operator=(Table&& other) noexcept {
+    if (this != &other) {
+        Destroy();
+        table_ = other.table_;
+        other.table_ = nullptr;
+    }
+    return *this;
+}
+
+bool Table::Available() const { return table_ != nullptr; }
+
+Result Table::NewAppendWriter(AppendWriter& out) {
+    if (!Available()) {
+        return utils::make_error(1, "Table not available");
+    }
+
+    try {
+        out.writer_ = table_->new_append_writer();
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        return utils::make_error(1, e.what());
+    }
+}
+
+Result Table::NewLogScanner(LogScanner& out) {
+    if (!Available()) {
+        return utils::make_error(1, "Table not available");
+    }
+
+    try {
+        out.scanner_ = table_->new_log_scanner();
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        return utils::make_error(1, e.what());
+    }
+}
+
+Result Table::NewLogScannerWithProjection(const std::vector<size_t>& 
column_indices, LogScanner& out) {
+    if (!Available()) {
+        return utils::make_error(1, "Table not available");
+    }
+
+    try {
+        rust::Vec<size_t> rust_indices;
+        for (size_t idx : column_indices) {
+            rust_indices.push_back(idx);
+        }
+        out.scanner_ = 
table_->new_log_scanner_with_projection(std::move(rust_indices));
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        return utils::make_error(1, e.what());
+    }
+}
+
+TableInfo Table::GetTableInfo() const {
+    if (!Available()) {
+        return TableInfo{};
+    }
+    auto ffi_info = table_->get_table_info_from_table();
+    return utils::from_ffi_table_info(ffi_info);
+}
+
+TablePath Table::GetTablePath() const {
+    if (!Available()) {
+        return TablePath{};
+    }
+    auto ffi_path = table_->get_table_path();
+    return TablePath{std::string(ffi_path.database_name), 
std::string(ffi_path.table_name)};
+}
+
+bool Table::HasPrimaryKey() const {
+    if (!Available()) {
+        return false;
+    }
+    return table_->has_primary_key();
+}
+
+// AppendWriter implementation
+AppendWriter::AppendWriter() noexcept = default;
+
+AppendWriter::AppendWriter(ffi::AppendWriter* writer) noexcept : 
writer_(writer) {}
+
+AppendWriter::~AppendWriter() noexcept { Destroy(); }
+
+void AppendWriter::Destroy() noexcept {
+    if (writer_) {
+        ffi::delete_append_writer(writer_);
+        writer_ = nullptr;
+    }
+}
+
+AppendWriter::AppendWriter(AppendWriter&& other) noexcept : 
writer_(other.writer_) {
+    other.writer_ = nullptr;
+}
+
+AppendWriter& AppendWriter::operator=(AppendWriter&& other) noexcept {
+    if (this != &other) {
+        Destroy();
+        writer_ = other.writer_;
+        other.writer_ = nullptr;
+    }
+    return *this;
+}
+
+bool AppendWriter::Available() const { return writer_ != nullptr; }
+
+Result AppendWriter::Append(const GenericRow& row) {
+    if (!Available()) {
+        return utils::make_error(1, "AppendWriter not available");
+    }
+
+    auto ffi_row = utils::to_ffi_generic_row(row);
+    auto ffi_result = writer_->append(ffi_row);
+    return utils::from_ffi_result(ffi_result);
+}
+
+Result AppendWriter::Flush() {
+    if (!Available()) {
+        return utils::make_error(1, "AppendWriter not available");
+    }
+
+    auto ffi_result = writer_->flush();
+    return utils::from_ffi_result(ffi_result);
+}
+
+// LogScanner implementation
+LogScanner::LogScanner() noexcept = default;
+
+LogScanner::LogScanner(ffi::LogScanner* scanner) noexcept : scanner_(scanner) 
{}
+
+LogScanner::~LogScanner() noexcept { Destroy(); }
+
+void LogScanner::Destroy() noexcept {
+    if (scanner_) {
+        ffi::delete_log_scanner(scanner_);
+        scanner_ = nullptr;
+    }
+}
+
+LogScanner::LogScanner(LogScanner&& other) noexcept : scanner_(other.scanner_) 
{
+    other.scanner_ = nullptr;
+}
+
+LogScanner& LogScanner::operator=(LogScanner&& other) noexcept {
+    if (this != &other) {
+        Destroy();
+        scanner_ = other.scanner_;
+        other.scanner_ = nullptr;
+    }
+    return *this;
+}
+
+bool LogScanner::Available() const { return scanner_ != nullptr; }
+
+Result LogScanner::Subscribe(int32_t bucket_id, int64_t start_offset) {
+    if (!Available()) {
+        return utils::make_error(1, "LogScanner not available");
+    }
+
+    auto ffi_result = scanner_->subscribe(bucket_id, start_offset);
+    return utils::from_ffi_result(ffi_result);
+}
+
+Result LogScanner::Poll(int64_t timeout_ms, ScanRecords& out) {
+    if (!Available()) {
+        return utils::make_error(1, "LogScanner not available");
+    }
+
+    auto ffi_result = scanner_->poll(timeout_ms);
+    auto result = utils::from_ffi_result(ffi_result.result);
+    if (!result.Ok()) {
+        return result;
+    }
+
+    out = utils::from_ffi_scan_records(ffi_result.scan_records);
+    return utils::make_ok();
+}
+
+}  // namespace fluss
diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs
new file mode 100644
index 0000000..d3bab38
--- /dev/null
+++ b/bindings/cpp/src/types.rs
@@ -0,0 +1,485 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::ffi;
+use anyhow::{Result, anyhow};
+use arrow::array::{
+    Date32Array, LargeBinaryArray, LargeStringArray, Time32MillisecondArray, 
Time32SecondArray,
+    Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray,
+    TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
+};
+use arrow::datatypes::{DataType as ArrowDataType, TimeUnit};
+use fcore::row::InternalRow;
+use fluss as fcore;
+
+pub const DATA_TYPE_BOOLEAN: i32 = 1;
+pub const DATA_TYPE_TINYINT: i32 = 2;
+pub const DATA_TYPE_SMALLINT: i32 = 3;
+pub const DATA_TYPE_INT: i32 = 4;
+pub const DATA_TYPE_BIGINT: i32 = 5;
+pub const DATA_TYPE_FLOAT: i32 = 6;
+pub const DATA_TYPE_DOUBLE: i32 = 7;
+pub const DATA_TYPE_STRING: i32 = 8;
+pub const DATA_TYPE_BYTES: i32 = 9;
+pub const DATA_TYPE_DATE: i32 = 10;
+pub const DATA_TYPE_TIME: i32 = 11;
+pub const DATA_TYPE_TIMESTAMP: i32 = 12;
+pub const DATA_TYPE_TIMESTAMP_LTZ: i32 = 13;
+
+pub const DATUM_TYPE_NULL: i32 = 0;
+pub const DATUM_TYPE_BOOL: i32 = 1;
+pub const DATUM_TYPE_INT32: i32 = 2;
+pub const DATUM_TYPE_INT64: i32 = 3;
+pub const DATUM_TYPE_FLOAT32: i32 = 4;
+pub const DATUM_TYPE_FLOAT64: i32 = 5;
+pub const DATUM_TYPE_STRING: i32 = 6;
+pub const DATUM_TYPE_BYTES: i32 = 7;
+
+fn ffi_data_type_to_core(dt: i32) -> Result<fcore::metadata::DataType> {
+    match dt {
+        DATA_TYPE_BOOLEAN => Ok(fcore::metadata::DataTypes::boolean()),
+        DATA_TYPE_TINYINT => Ok(fcore::metadata::DataTypes::tinyint()),
+        DATA_TYPE_SMALLINT => Ok(fcore::metadata::DataTypes::smallint()),
+        DATA_TYPE_INT => Ok(fcore::metadata::DataTypes::int()),
+        DATA_TYPE_BIGINT => Ok(fcore::metadata::DataTypes::bigint()),
+        DATA_TYPE_FLOAT => Ok(fcore::metadata::DataTypes::float()),
+        DATA_TYPE_DOUBLE => Ok(fcore::metadata::DataTypes::double()),
+        DATA_TYPE_STRING => Ok(fcore::metadata::DataTypes::string()),
+        DATA_TYPE_BYTES => Ok(fcore::metadata::DataTypes::bytes()),
+        DATA_TYPE_DATE => Ok(fcore::metadata::DataTypes::date()),
+        DATA_TYPE_TIME => Ok(fcore::metadata::DataTypes::time()),
+        DATA_TYPE_TIMESTAMP => Ok(fcore::metadata::DataTypes::timestamp()),
+        DATA_TYPE_TIMESTAMP_LTZ => 
Ok(fcore::metadata::DataTypes::timestamp_ltz()),
+        _ => Err(anyhow!("Unknown data type: {}", dt)),
+    }
+}
+
+fn core_data_type_to_ffi(dt: &fcore::metadata::DataType) -> i32 {
+    match dt {
+        fcore::metadata::DataType::Boolean(_) => DATA_TYPE_BOOLEAN,
+        fcore::metadata::DataType::TinyInt(_) => DATA_TYPE_TINYINT,
+        fcore::metadata::DataType::SmallInt(_) => DATA_TYPE_SMALLINT,
+        fcore::metadata::DataType::Int(_) => DATA_TYPE_INT,
+        fcore::metadata::DataType::BigInt(_) => DATA_TYPE_BIGINT,
+        fcore::metadata::DataType::Float(_) => DATA_TYPE_FLOAT,
+        fcore::metadata::DataType::Double(_) => DATA_TYPE_DOUBLE,
+        fcore::metadata::DataType::String(_) => DATA_TYPE_STRING,
+        fcore::metadata::DataType::Bytes(_) => DATA_TYPE_BYTES,
+        fcore::metadata::DataType::Date(_) => DATA_TYPE_DATE,
+        fcore::metadata::DataType::Time(_) => DATA_TYPE_TIME,
+        fcore::metadata::DataType::Timestamp(_) => DATA_TYPE_TIMESTAMP,
+        fcore::metadata::DataType::TimestampLTz(_) => DATA_TYPE_TIMESTAMP_LTZ,
+        _ => 0,
+    }
+}
+
+pub fn ffi_descriptor_to_core(
+    descriptor: &ffi::FfiTableDescriptor,
+) -> Result<fcore::metadata::TableDescriptor> {
+    let mut schema_builder = fcore::metadata::Schema::builder();
+
+    for col in &descriptor.schema.columns {
+        let dt = ffi_data_type_to_core(col.data_type)?;
+        schema_builder = schema_builder.column(&col.name, dt);
+        if !col.comment.is_empty() {
+            schema_builder = schema_builder.with_comment(&col.comment);
+        }
+    }
+
+    if !descriptor.schema.primary_keys.is_empty() {
+        schema_builder = 
schema_builder.primary_key(descriptor.schema.primary_keys.clone());
+    }
+
+    let schema = schema_builder.build()?;
+
+    let mut builder = fcore::metadata::TableDescriptor::builder()
+        .schema(schema)
+        .partitioned_by(descriptor.partition_keys.clone());
+
+    if descriptor.bucket_count > 0 {
+        builder = builder.distributed_by(
+            Some(descriptor.bucket_count),
+            descriptor.bucket_keys.clone(),
+        );
+    } else {
+        builder = builder.distributed_by(None, descriptor.bucket_keys.clone());
+    }
+
+    for prop in &descriptor.properties {
+        builder = builder.property(&prop.key, &prop.value);
+    }
+
+    if !descriptor.comment.is_empty() {
+        builder = builder.comment(&descriptor.comment);
+    }
+
+    Ok(builder.build()?)
+}
+
+pub fn core_table_info_to_ffi(info: &fcore::metadata::TableInfo) -> 
ffi::FfiTableInfo {
+    let schema = info.get_schema();
+    let columns: Vec<ffi::FfiColumn> = schema
+        .columns()
+        .iter()
+        .map(|col| ffi::FfiColumn {
+            name: col.name().to_string(),
+            data_type: core_data_type_to_ffi(col.data_type()),
+            comment: col.comment().unwrap_or("").to_string(),
+        })
+        .collect();
+
+    let primary_keys: Vec<String> = schema
+        .primary_key()
+        .map(|pk| pk.column_names().to_vec())
+        .unwrap_or_default();
+
+    let properties: Vec<ffi::HashMapValue> = info
+        .get_properties()
+        .iter()
+        .map(|(k, v)| ffi::HashMapValue {
+            key: k.clone(),
+            value: v.clone(),
+        })
+        .collect();
+
+    ffi::FfiTableInfo {
+        table_id: info.get_table_id(),
+        schema_id: info.get_schema_id(),
+        table_path: ffi::FfiTablePath {
+            database_name: info.get_table_path().database().to_string(),
+            table_name: info.get_table_path().table().to_string(),
+        },
+        created_time: info.get_created_time(),
+        modified_time: info.get_modified_time(),
+        primary_keys: info.get_primary_keys().clone(),
+        bucket_keys: info.get_bucket_keys().to_vec(),
+        partition_keys: info.get_partition_keys().to_vec(),
+        num_buckets: info.get_num_buckets(),
+        has_primary_key: info.has_primary_key(),
+        is_partitioned: info.is_partitioned(),
+        properties,
+        comment: info.get_comment().unwrap_or("").to_string(),
+        schema: ffi::FfiSchema {
+            columns,
+            primary_keys,
+        },
+    }
+}
+
+pub fn empty_table_info() -> ffi::FfiTableInfo {
+    ffi::FfiTableInfo {
+        table_id: 0,
+        schema_id: 0,
+        table_path: ffi::FfiTablePath {
+            database_name: String::new(),
+            table_name: String::new(),
+        },
+        created_time: 0,
+        modified_time: 0,
+        primary_keys: vec![],
+        bucket_keys: vec![],
+        partition_keys: vec![],
+        num_buckets: 0,
+        has_primary_key: false,
+        is_partitioned: false,
+        properties: vec![],
+        comment: String::new(),
+        schema: ffi::FfiSchema {
+            columns: vec![],
+            primary_keys: vec![],
+        },
+    }
+}
+
+pub fn ffi_row_to_core(row: &ffi::FfiGenericRow) -> fcore::row::GenericRow<'_> 
{
+    use fcore::row::Datum;
+
+    let mut generic_row = fcore::row::GenericRow::new();
+
+    for (idx, field) in row.fields.iter().enumerate() {
+        let datum = match field.datum_type {
+            DATUM_TYPE_NULL => Datum::Null,
+            DATUM_TYPE_BOOL => Datum::Bool(field.bool_val),
+            DATUM_TYPE_INT32 => Datum::Int32(field.i32_val),
+            DATUM_TYPE_INT64 => Datum::Int64(field.i64_val),
+            DATUM_TYPE_FLOAT32 => Datum::Float32(field.f32_val.into()),
+            DATUM_TYPE_FLOAT64 => Datum::Float64(field.f64_val.into()),
+            DATUM_TYPE_STRING => Datum::String(field.string_val.as_str()),
+            // todo: avoid copy bytes for blob
+            DATUM_TYPE_BYTES => Datum::Blob(field.bytes_val.clone().into()),
+            _ => Datum::Null,
+        };
+        generic_row.set_field(idx, datum);
+    }
+
+    generic_row
+}
+
+pub fn core_scan_records_to_ffi(records: &fcore::record::ScanRecords) -> 
ffi::FfiScanRecords {
+    let mut ffi_records = Vec::new();
+
+    // Iterate over all buckets and their records
+    for bucket_records in records.records_by_buckets().values() {
+        for record in bucket_records {
+            let row = record.row();
+            let fields = core_row_to_ffi_fields(row);
+
+            ffi_records.push(ffi::FfiScanRecord {
+                offset: record.offset(),
+                timestamp: record.timestamp(),
+                row: ffi::FfiGenericRow { fields },
+            });
+        }
+    }
+
+    ffi::FfiScanRecords {
+        records: ffi_records,
+    }
+}
+
+fn core_row_to_ffi_fields(row: &fcore::row::ColumnarRow) -> Vec<ffi::FfiDatum> 
{
+    fn new_datum(datum_type: i32) -> ffi::FfiDatum {
+        ffi::FfiDatum {
+            datum_type,
+            bool_val: false,
+            i32_val: 0,
+            i64_val: 0,
+            f32_val: 0.0,
+            f64_val: 0.0,
+            string_val: String::new(),
+            bytes_val: vec![],
+        }
+    }
+
+    let record_batch = row.get_record_batch();
+    let schema = record_batch.schema();
+    let row_id = row.get_row_id();
+
+    let mut fields = Vec::with_capacity(schema.fields().len());
+
+    for (i, field) in schema.fields().iter().enumerate() {
+        if row.is_null_at(i) {
+            fields.push(new_datum(DATUM_TYPE_NULL));
+            continue;
+        }
+
+        let datum = match field.data_type() {
+            ArrowDataType::Boolean => {
+                let mut datum = new_datum(DATUM_TYPE_BOOL);
+                datum.bool_val = row.get_boolean(i);
+                datum
+            }
+            ArrowDataType::Int8 => {
+                let mut datum = new_datum(DATUM_TYPE_INT32);
+                datum.i32_val = row.get_byte(i) as i32;
+                datum
+            }
+            ArrowDataType::Int16 => {
+                let mut datum = new_datum(DATUM_TYPE_INT32);
+                datum.i32_val = row.get_short(i) as i32;
+                datum
+            }
+            ArrowDataType::Int32 => {
+                let mut datum = new_datum(DATUM_TYPE_INT32);
+                datum.i32_val = row.get_int(i);
+                datum
+            }
+            ArrowDataType::Int64 => {
+                let mut datum = new_datum(DATUM_TYPE_INT64);
+                datum.i64_val = row.get_long(i);
+                datum
+            }
+            ArrowDataType::Float32 => {
+                let mut datum = new_datum(DATUM_TYPE_FLOAT32);
+                datum.f32_val = row.get_float(i);
+                datum
+            }
+            ArrowDataType::Float64 => {
+                let mut datum = new_datum(DATUM_TYPE_FLOAT64);
+                datum.f64_val = row.get_double(i);
+                datum
+            }
+            ArrowDataType::Utf8 => {
+                let mut datum = new_datum(DATUM_TYPE_STRING);
+                // todo: avoid copy string
+                datum.string_val = row.get_string(i).to_string();
+                datum
+            }
+            ArrowDataType::LargeUtf8 => {
+                let array = record_batch
+                    .column(i)
+                    .as_any()
+                    .downcast_ref::<LargeStringArray>()
+                    .expect("LargeUtf8 column expected");
+                let mut datum = new_datum(DATUM_TYPE_STRING);
+                datum.string_val = array.value(row_id).to_string();
+                datum
+            }
+            ArrowDataType::Binary => {
+                let mut datum = new_datum(DATUM_TYPE_BYTES);
+                // todo: avoid copy bytes for blob
+                datum.bytes_val = row.get_bytes(i);
+                datum
+            }
+            ArrowDataType::FixedSizeBinary(len) => {
+                let mut datum = new_datum(DATUM_TYPE_BYTES);
+                datum.bytes_val = row.get_binary(i, *len as usize);
+                datum
+            }
+            ArrowDataType::LargeBinary => {
+                let array = record_batch
+                    .column(i)
+                    .as_any()
+                    .downcast_ref::<LargeBinaryArray>()
+                    .expect("LargeBinary column expected");
+                let mut datum = new_datum(DATUM_TYPE_BYTES);
+                datum.bytes_val = array.value(row_id).to_vec();
+                datum
+            }
+            ArrowDataType::Date32 => {
+                let array = record_batch
+                    .column(i)
+                    .as_any()
+                    .downcast_ref::<Date32Array>()
+                    .expect("Date32 column expected");
+                let mut datum = new_datum(DATUM_TYPE_INT32);
+                datum.i32_val = array.value(row_id);
+                datum
+            }
+            ArrowDataType::Timestamp(unit, _) => match unit {
+                TimeUnit::Second => {
+                    let array = record_batch
+                        .column(i)
+                        .as_any()
+                        .downcast_ref::<TimestampSecondArray>()
+                        .expect("Timestamp(second) column expected");
+                    let mut datum = new_datum(DATUM_TYPE_INT64);
+                    datum.i64_val = array.value(row_id);
+                    datum
+                }
+                TimeUnit::Millisecond => {
+                    let array = record_batch
+                        .column(i)
+                        .as_any()
+                        .downcast_ref::<TimestampMillisecondArray>()
+                        .expect("Timestamp(millisecond) column expected");
+                    let mut datum = new_datum(DATUM_TYPE_INT64);
+                    datum.i64_val = array.value(row_id);
+                    datum
+                }
+                TimeUnit::Microsecond => {
+                    let array = record_batch
+                        .column(i)
+                        .as_any()
+                        .downcast_ref::<TimestampMicrosecondArray>()
+                        .expect("Timestamp(microsecond) column expected");
+                    let mut datum = new_datum(DATUM_TYPE_INT64);
+                    datum.i64_val = array.value(row_id);
+                    datum
+                }
+                TimeUnit::Nanosecond => {
+                    let array = record_batch
+                        .column(i)
+                        .as_any()
+                        .downcast_ref::<TimestampNanosecondArray>()
+                        .expect("Timestamp(nanosecond) column expected");
+                    let mut datum = new_datum(DATUM_TYPE_INT64);
+                    datum.i64_val = array.value(row_id);
+                    datum
+                }
+            },
+            ArrowDataType::Time32(unit) => match unit {
+                TimeUnit::Second => {
+                    let array = record_batch
+                        .column(i)
+                        .as_any()
+                        .downcast_ref::<Time32SecondArray>()
+                        .expect("Time32(second) column expected");
+                    let mut datum = new_datum(DATUM_TYPE_INT32);
+                    datum.i32_val = array.value(row_id);
+                    datum
+                }
+                TimeUnit::Millisecond => {
+                    let array = record_batch
+                        .column(i)
+                        .as_any()
+                        .downcast_ref::<Time32MillisecondArray>()
+                        .expect("Time32(millisecond) column expected");
+                    let mut datum = new_datum(DATUM_TYPE_INT32);
+                    datum.i32_val = array.value(row_id);
+                    datum
+                }
+                _ => panic!(
+                    "Will never come here. Unsupported Time32 unit for column 
{}",
+                    i
+                ),
+            },
+            ArrowDataType::Time64(unit) => match unit {
+                TimeUnit::Microsecond => {
+                    let array = record_batch
+                        .column(i)
+                        .as_any()
+                        .downcast_ref::<Time64MicrosecondArray>()
+                        .expect("Time64(microsecond) column expected");
+                    let mut datum = new_datum(DATUM_TYPE_INT64);
+                    datum.i64_val = array.value(row_id);
+                    datum
+                }
+                TimeUnit::Nanosecond => {
+                    let array = record_batch
+                        .column(i)
+                        .as_any()
+                        .downcast_ref::<Time64NanosecondArray>()
+                        .expect("Time64(nanosecond) column expected");
+                    let mut datum = new_datum(DATUM_TYPE_INT64);
+                    datum.i64_val = array.value(row_id);
+                    datum
+                }
+                _ => panic!(
+                    "Will never come here. Unsupported Time64 unit for column 
{}",
+                    i
+                ),
+            },
+            other => panic!(
+                "Will never come here. Unsupported Arrow data type for column 
{}: {:?}",
+                i, other
+            ),
+        };
+
+        fields.push(datum);
+    }
+
+    fields
+}
+
+pub fn core_lake_snapshot_to_ffi(snapshot: &fcore::metadata::LakeSnapshot) -> 
ffi::FfiLakeSnapshot {
+    let bucket_offsets: Vec<ffi::FfiBucketOffset> = snapshot
+        .table_buckets_offset
+        .iter()
+        .map(|(bucket, offset)| ffi::FfiBucketOffset {
+            table_id: bucket.table_id(),
+            partition_id: bucket.partition_id().unwrap_or(-1),
+            bucket_id: bucket.bucket_id(),
+            offset: *offset,
+        })
+        .collect();
+
+    ffi::FfiLakeSnapshot {
+        snapshot_id: snapshot.snapshot_id,
+        bucket_offsets,
+    }
+}
diff --git a/crates/fluss/src/config.rs b/crates/fluss/src/config.rs
index 0857496..92f600e 100644
--- a/crates/fluss/src/config.rs
+++ b/crates/fluss/src/config.rs
@@ -18,7 +18,7 @@
 use clap::Parser;
 use serde::{Deserialize, Serialize};
 
-#[derive(Parser, Debug, Clone, Deserialize, Serialize, Default)]
+#[derive(Parser, Debug, Clone, Deserialize, Serialize)]
 #[command(author, version, about, long_about = None)]
 pub struct Config {
     #[arg(long)]
@@ -37,3 +37,15 @@ pub struct Config {
     #[arg(long, default_value_t = 2 * 1024 * 1024)]
     pub writer_batch_size: i32,
 }
+
+impl Default for Config {
+    fn default() -> Self {
+        Self {
+            bootstrap_server: None,
+            request_max_size: 10 * 1024 * 1024,
+            writer_acks: String::from("all"),
+            writer_retries: i32::MAX,
+            writer_batch_size: 2 * 1024 * 1024,
+        }
+    }
+}

Reply via email to