This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new f39d68c  chore: introduce c++ integration test (#352)
f39d68c is described below

commit f39d68ced231c34487978bb9f7e842aba7b0fccc
Author: Keith Lee <[email protected]>
AuthorDate: Sun Feb 22 02:55:42 2026 +0000

    chore: introduce c++ integration test (#352)
---
 .github/workflows/build_and_test_cpp.yml           |  78 ++
 .github/workflows/build_and_test_python.yml        |  81 ++
 .github/workflows/build_and_test_rust.yml          |  85 +++
 .../{docs-check.yml => check_documentation.yml}    |   4 +-
 .github/workflows/check_license_and_formatting.yml |  60 ++
 .github/workflows/ci.yml                           | 187 -----
 bindings/cpp/CMakeLists.txt                        |  23 +
 bindings/cpp/test/test_admin.cpp                   | 331 ++++++++
 bindings/cpp/test/test_kv_table.cpp                | 733 ++++++++++++++++++
 bindings/cpp/test/test_log_table.cpp               | 831 +++++++++++++++++++++
 bindings/cpp/test/test_main.cpp                    |  31 +
 bindings/cpp/test/test_utils.h                     | 315 ++++++++
 bindings/python/test/test_log_table.py             |   4 +-
 crates/fluss/tests/integration/log_table.rs        |  59 +-
 14 files changed, 2601 insertions(+), 221 deletions(-)

diff --git a/.github/workflows/build_and_test_cpp.yml 
b/.github/workflows/build_and_test_cpp.yml
new file mode 100644
index 0000000..5cdd14d
--- /dev/null
+++ b/.github/workflows/build_and_test_cpp.yml
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: C++ Build and Tests
+
+on:
+  push:
+    branches:
+      - main
+    paths-ignore:
+      - 'website/**'
+      - '**/*.md'
+  pull_request:
+    branches:
+      - main
+    paths-ignore:
+      - 'website/**'
+      - '**/*.md'
+      - 'bindings/python/**'
+  workflow_dispatch:
+
+concurrency:
+  group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }}
+  cancel-in-progress: true
+
+jobs:
+  build-and-test-cpp:
+    timeout-minutes: 60
+    runs-on: ubuntu-latest
+    steps:
+      - uses: actions/checkout@v4
+
+      - name: Install protoc
+        run: sudo apt-get update && sudo apt-get install -y protobuf-compiler
+
+      - name: Install Apache Arrow C++
+        run: |
+          sudo apt-get install -y -V ca-certificates lsb-release wget
+          wget https://apache.jfrog.io/artifactory/arrow/$(lsb_release --id 
--short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release 
--codename --short).deb
+          sudo apt-get install -y -V 
./apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb
+          sudo apt-get update
+          sudo apt-get install -y -V libarrow-dev
+
+      - name: Rust Cache
+        uses: actions/cache@v4
+        with:
+          path: |
+            ~/.cargo/registry
+            ~/.cargo/git
+            target
+          key: cpp-test-${{ runner.os }}-${{ hashFiles('**/Cargo.lock') }}
+
+      - name: Build C++ bindings and tests
+        working-directory: bindings/cpp
+        run: |
+          cmake -B build -DFLUSS_ENABLE_TESTING=ON -DCMAKE_BUILD_TYPE=Debug
+          cmake --build build --parallel
+
+      - name: Run C++ integration tests
+        working-directory: bindings/cpp
+        run: cd build && ctest --output-on-failure --timeout 300
+        env:
+          RUST_LOG: DEBUG
+          RUST_BACKTRACE: full
diff --git a/.github/workflows/build_and_test_python.yml 
b/.github/workflows/build_and_test_python.yml
new file mode 100644
index 0000000..efb5caa
--- /dev/null
+++ b/.github/workflows/build_and_test_python.yml
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: Python Build and Tests
+
+on:
+  push:
+    branches:
+      - main
+    paths-ignore:
+      - 'website/**'
+      - '**/*.md'
+  pull_request:
+    branches:
+      - main
+    paths-ignore:
+      - 'website/**'
+      - '**/*.md'
+      - 'bindings/cpp/**'
+  workflow_dispatch:
+
+concurrency:
+  group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }}
+  cancel-in-progress: true
+
+jobs:
+  build-and-test-python:
+    timeout-minutes: 60
+    runs-on: ubuntu-latest
+    strategy:
+      matrix:
+        python: ["3.9", "3.10", "3.11", "3.12"]
+    steps:
+      - uses: actions/checkout@v4
+
+      - name: Set up Python
+        uses: actions/setup-python@v5
+        with:
+          python-version: ${{ matrix.python }}
+
+      - name: Install uv
+        uses: astral-sh/setup-uv@v4
+
+      - name: Install protoc
+        run: sudo apt-get update && sudo apt-get install -y protobuf-compiler
+
+      - name: Rust Cache
+        uses: actions/cache@v4
+        with:
+          path: |
+            ~/.cargo/registry
+            ~/.cargo/git
+            target
+          key: python-test-${{ runner.os }}-${{ matrix.python }}-${{ 
hashFiles('**/Cargo.lock') }}
+
+      - name: Build Python bindings
+        working-directory: bindings/python
+        run: |
+          uv sync --extra dev
+          uv run maturin develop
+
+      - name: Run Python integration tests
+        working-directory: bindings/python
+        run: uv run pytest test/ -v
+        env:
+          RUST_LOG: DEBUG
+          RUST_BACKTRACE: full
diff --git a/.github/workflows/build_and_test_rust.yml 
b/.github/workflows/build_and_test_rust.yml
new file mode 100644
index 0000000..c9e05b7
--- /dev/null
+++ b/.github/workflows/build_and_test_rust.yml
@@ -0,0 +1,85 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: Rust Build and Tests
+
+on:
+  push:
+    branches:
+      - main
+    paths-ignore:
+      - 'website/**'
+      - '**/*.md'
+  pull_request:
+    branches:
+      - main
+    paths-ignore:
+      - 'website/**'
+      - '**/*.md'
+      - 'bindings/python/**'
+      - 'bindings/cpp/**'
+  workflow_dispatch:
+
+concurrency:
+  group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }}
+  cancel-in-progress: true
+
+jobs:
+  build-and-test-rust:
+    timeout-minutes: 60
+    runs-on: ${{ matrix.os }}
+    strategy:
+      matrix:
+        os:
+          - ubuntu-latest
+          - macos-latest
+    steps:
+      - uses: actions/checkout@v4
+
+      - name: Install protoc
+        run: |
+          if [ "$RUNNER_OS" = "Linux" ]; then
+            sudo apt-get update && sudo apt-get install -y protobuf-compiler
+          elif [ "$RUNNER_OS" = "macOS" ]; then
+            brew install protobuf
+          fi
+
+      - name: Rust Cache
+        uses: actions/cache@v4
+        with:
+          path: |
+            ~/.cargo/registry
+            ~/.cargo/git
+            target
+          key: rust-${{ runner.os }}-${{ hashFiles('**/Cargo.lock') }}
+
+      - name: Build
+        run: cargo build --workspace --all-targets --exclude fluss_python 
--exclude fluss-cpp
+
+      - name: Unit Test
+        run: cargo test --all-targets --workspace --exclude fluss_python 
--exclude fluss-cpp
+        env:
+          RUST_LOG: DEBUG
+          RUST_BACKTRACE: full
+
+      - name: Integration Test (Linux only)
+        if: runner.os == 'Linux'
+        run: |
+          RUST_TEST_THREADS=1 cargo test --features integration_tests 
--all-targets --workspace --exclude fluss_python --exclude fluss-cpp -- 
--nocapture
+        env:
+          RUST_LOG: DEBUG
+          RUST_BACKTRACE: full
diff --git a/.github/workflows/docs-check.yml 
b/.github/workflows/check_documentation.yml
similarity index 96%
rename from .github/workflows/docs-check.yml
rename to .github/workflows/check_documentation.yml
index 6408c54..70e6a43 100644
--- a/.github/workflows/docs-check.yml
+++ b/.github/workflows/check_documentation.yml
@@ -17,7 +17,7 @@
 
################################################################################
 
 # This workflow is meant for checking broken links in the documentation.
-name: Check Documentation
+name: Documentation Check
 permissions:
   contents: read
 on:
@@ -31,7 +31,7 @@ on:
       - 'website/**'
 
 jobs:
-  test-deploy:
+  check-documentation:
     runs-on: ubuntu-latest
     defaults:
       run:
diff --git a/.github/workflows/check_license_and_formatting.yml 
b/.github/workflows/check_license_and_formatting.yml
new file mode 100644
index 0000000..1b83b74
--- /dev/null
+++ b/.github/workflows/check_license_and_formatting.yml
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: License and Formatting Check
+
+on:
+  push:
+    branches:
+      - main
+    paths-ignore:
+      - 'website/**'
+      - '**/*.md'
+  pull_request:
+    branches:
+      - main
+  workflow_dispatch:
+
+concurrency:
+  group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }}
+  cancel-in-progress: true
+
+jobs:
+  check-license-and-formatting:
+    runs-on: ubuntu-latest
+    steps:
+      - uses: actions/checkout@v4
+
+      - name: Check License Header
+        uses: apache/skywalking-eyes/[email protected]
+
+      - name: Install cargo-deny
+        uses: taiki-e/install-action@v2
+        with:
+          tool: [email protected]
+
+      - name: Check dependency licenses (Apache-compatible)
+        run: cargo deny check licenses
+
+      - name: Install protoc
+        run: sudo apt-get update && sudo apt-get install -y protobuf-compiler
+
+      - name: Format
+        run: cargo fmt --all -- --check
+
+      - name: Clippy
+        run: cargo clippy --all-targets --workspace -- -D warnings
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
deleted file mode 100644
index d51e3c0..0000000
--- a/.github/workflows/ci.yml
+++ /dev/null
@@ -1,187 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-name: CI
-
-on:
-  push:
-    branches:
-      - main
-    paths-ignore:
-      - 'website/**'
-      - '**/*.md'
-  pull_request:
-    branches:
-      - main
-    paths-ignore:
-      - 'website/**'
-      - '**/*.md'
-
-concurrency:
-  group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }}
-  cancel-in-progress: true
-
-jobs:
-  check:
-    runs-on: ubuntu-latest
-    steps:
-      - uses: actions/checkout@v4
-
-      - name: Check License Header
-        uses: apache/skywalking-eyes/[email protected]
-
-      - name: Install cargo-deny
-        uses: taiki-e/install-action@v2
-        with:
-          tool: [email protected]
-
-      - name: Check dependency licenses (Apache-compatible)
-        run: cargo deny check licenses
-
-      - name: Install protoc
-        run: sudo apt-get update && sudo apt-get install -y protobuf-compiler
-
-      - name: Format
-        run: cargo fmt --all -- --check
-
-      - name: Clippy
-        run: cargo clippy --all-targets --workspace -- -D warnings
-
-  build:
-    runs-on: ${{ matrix.os }}
-    strategy:
-      matrix:
-        os:
-          - ubuntu-latest
-          - macos-latest
-        python: ["3.11", "3.12", "3.13"]
-    steps:
-      - uses: actions/checkout@v4
-
-      - name: Set up Python
-        uses: actions/setup-python@v5
-        with:
-          python-version: ${{ matrix.python }}
-
-      - name: Install protoc
-        run: |
-          if [ "$RUNNER_OS" = "Linux" ]; then
-            sudo apt-get update && sudo apt-get install -y protobuf-compiler
-          elif [ "$RUNNER_OS" = "macOS" ]; then
-            brew install protobuf
-          fi
-
-      - name: Rust Cache
-        uses: actions/cache@v4
-        with:
-          path: |
-            ~/.cargo/registry
-            ~/.cargo/git
-            target
-          key: build-${{ runner.os }}-${{ matrix.python }}-${{ 
hashFiles('**/Cargo.lock') }}
-
-      - name: Build
-        run: cargo build --workspace --all-targets
-
-  test:
-    runs-on: ${{ matrix.os }}
-    strategy:
-      matrix:
-        os:
-          - ubuntu-latest
-          - macos-latest
-        python: ["3.11", "3.12", "3.13"]
-    steps:
-      - uses: actions/checkout@v4
-
-      - name: Set up Python
-        uses: actions/setup-python@v5
-        with:
-          python-version: ${{ matrix.python }}
-
-      - name: Install protoc
-        run: |
-          if [ "$RUNNER_OS" = "Linux" ]; then
-            sudo apt-get update && sudo apt-get install -y protobuf-compiler
-          elif [ "$RUNNER_OS" = "macOS" ]; then
-            brew install protobuf
-          fi
-
-      - name: Rust Cache
-        uses: actions/cache@v4
-        with:
-          path: |
-            ~/.cargo/registry
-            ~/.cargo/git
-            target
-          key: test-${{ runner.os }}-${{ matrix.python }}-${{ 
hashFiles('**/Cargo.lock') }}
-
-      - name: Unit Test
-        run: cargo test --all-targets --workspace
-        env:
-          RUST_LOG: DEBUG
-          RUST_BACKTRACE: full
-
-      - name: Integration Test (Linux only)
-        if: runner.os == 'Linux'
-        run: |
-          RUST_TEST_THREADS=1 cargo test --features integration_tests 
--all-targets --workspace -- --nocapture
-        env:
-          RUST_LOG: DEBUG
-          RUST_BACKTRACE: full
-
-  python-integration-test:
-    timeout-minutes: 60
-    runs-on: ubuntu-latest
-    strategy:
-      matrix:
-        python: ["3.9", "3.10", "3.11", "3.12"]
-    steps:
-      - uses: actions/checkout@v4
-
-      - name: Set up Python
-        uses: actions/setup-python@v5
-        with:
-          python-version: ${{ matrix.python }}
-
-      - name: Install uv
-        uses: astral-sh/setup-uv@v4
-
-      - name: Install protoc
-        run: sudo apt-get update && sudo apt-get install -y protobuf-compiler
-
-      - name: Rust Cache
-        uses: actions/cache@v4
-        with:
-          path: |
-            ~/.cargo/registry
-            ~/.cargo/git
-            target
-          key: python-test-${{ runner.os }}-${{ matrix.python }}-${{ 
hashFiles('**/Cargo.lock') }}
-
-      - name: Build Python bindings
-        working-directory: bindings/python
-        run: |
-          uv sync --extra dev
-          uv run maturin develop
-
-      - name: Run Python integration tests
-        working-directory: bindings/python
-        run: uv run pytest test/ -v
-        env:
-          RUST_LOG: DEBUG
-          RUST_BACKTRACE: full
diff --git a/bindings/cpp/CMakeLists.txt b/bindings/cpp/CMakeLists.txt
index a8f527e..6bd9fc7 100644
--- a/bindings/cpp/CMakeLists.txt
+++ b/bindings/cpp/CMakeLists.txt
@@ -123,3 +123,26 @@ if (FLUSS_ENABLE_ADDRESS_SANITIZER)
     target_compile_options(fluss_cpp PRIVATE -fsanitize=leak,address,undefined 
-fno-omit-frame-pointer -fno-common -O1)
     target_link_options(fluss_cpp PRIVATE -fsanitize=leak,address,undefined)
 endif()
+
+if (FLUSS_ENABLE_TESTING)
+    FetchContent_Declare(
+        googletest
+        URL 
https://github.com/google/googletest/archive/refs/tags/v${FLUSS_GOOGLETEST_VERSION}.tar.gz
+    )
+    set(gtest_force_shared_crt ON CACHE BOOL "" FORCE)
+    FetchContent_MakeAvailable(googletest)
+
+    enable_testing()
+
+    file(GLOB TEST_SOURCE_FILES "test/*.cpp")
+    add_executable(fluss_cpp_test ${TEST_SOURCE_FILES})
+    target_link_libraries(fluss_cpp_test PRIVATE fluss_cpp GTest::gtest)
+    target_link_libraries(fluss_cpp_test PRIVATE Arrow::arrow_shared)
+    target_compile_definitions(fluss_cpp_test PRIVATE ARROW_FOUND)
+    target_include_directories(fluss_cpp_test PRIVATE
+        ${CPP_INCLUDE_DIR}
+        ${PROJECT_SOURCE_DIR}/test
+    )
+
+    add_test(NAME fluss_cpp_integration_tests COMMAND fluss_cpp_test)
+endif()
diff --git a/bindings/cpp/test/test_admin.cpp b/bindings/cpp/test/test_admin.cpp
new file mode 100644
index 0000000..b6bb25b
--- /dev/null
+++ b/bindings/cpp/test/test_admin.cpp
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include "test_utils.h"
+
+class AdminTest : public ::testing::Test {
+   protected:
+    fluss::Admin& admin() { return 
fluss_test::FlussTestEnvironment::Instance()->GetAdmin(); }
+};
+
+TEST_F(AdminTest, CreateDatabase) {
+    auto& adm = admin();
+
+    std::string db_name = "test_create_database_cpp";
+
+    // Database should not exist initially
+    bool exists = true;
+    ASSERT_OK(adm.DatabaseExists(db_name, exists));
+    ASSERT_FALSE(exists);
+
+    // Create database with descriptor
+    fluss::DatabaseDescriptor descriptor;
+    descriptor.comment = "test_db";
+    descriptor.properties = {{"k1", "v1"}, {"k2", "v2"}};
+    ASSERT_OK(adm.CreateDatabase(db_name, descriptor, false));
+
+    // Database should exist now
+    ASSERT_OK(adm.DatabaseExists(db_name, exists));
+    ASSERT_TRUE(exists);
+
+    // Get database info
+    fluss::DatabaseInfo db_info;
+    ASSERT_OK(adm.GetDatabaseInfo(db_name, db_info));
+    EXPECT_EQ(db_info.database_name, db_name);
+    EXPECT_EQ(db_info.comment, "test_db");
+    EXPECT_EQ(db_info.properties.at("k1"), "v1");
+    EXPECT_EQ(db_info.properties.at("k2"), "v2");
+
+    // Drop database
+    ASSERT_OK(adm.DropDatabase(db_name, false, true));
+
+    // Database should not exist now
+    ASSERT_OK(adm.DatabaseExists(db_name, exists));
+    ASSERT_FALSE(exists);
+}
+
+TEST_F(AdminTest, CreateTable) {
+    auto& adm = admin();
+
+    std::string db_name = "test_create_table_cpp_db";
+    fluss::DatabaseDescriptor db_desc;
+    db_desc.comment = "Database for test_create_table";
+
+    bool exists = false;
+    ASSERT_OK(adm.DatabaseExists(db_name, exists));
+    ASSERT_FALSE(exists);
+
+    ASSERT_OK(adm.CreateDatabase(db_name, db_desc, false));
+
+    std::string table_name = "test_user_table";
+    fluss::TablePath table_path(db_name, table_name);
+
+    // Build schema
+    auto schema = fluss::Schema::NewBuilder()
+                      .AddColumn("id", fluss::DataType::Int())
+                      .AddColumn("name", fluss::DataType::String())
+                      .AddColumn("age", fluss::DataType::Int(), "User's age 
(optional)")
+                      .AddColumn("email", fluss::DataType::String())
+                      .SetPrimaryKeys({"id"})
+                      .Build();
+
+    // Build table descriptor
+    auto table_descriptor = fluss::TableDescriptor::NewBuilder()
+                                .SetSchema(schema)
+                                .SetComment("Test table for user data (id, 
name, age, email)")
+                                .SetBucketCount(3)
+                                .SetBucketKeys({"id"})
+                                .SetProperty("table.replication.factor", "1")
+                                .SetLogFormat("arrow")
+                                .SetKvFormat("indexed")
+                                .Build();
+
+    // Create table
+    ASSERT_OK(adm.CreateTable(table_path, table_descriptor, false));
+
+    // Table should exist
+    ASSERT_OK(adm.TableExists(table_path, exists));
+    ASSERT_TRUE(exists);
+
+    // List tables
+    std::vector<std::string> tables;
+    ASSERT_OK(adm.ListTables(db_name, tables));
+    ASSERT_EQ(tables.size(), 1u);
+    EXPECT_TRUE(std::find(tables.begin(), tables.end(), table_name) != 
tables.end());
+
+    // Get table info
+    fluss::TableInfo table_info;
+    ASSERT_OK(adm.GetTableInfo(table_path, table_info));
+
+    EXPECT_EQ(table_info.comment, "Test table for user data (id, name, age, 
email)");
+    EXPECT_EQ(table_info.primary_keys, std::vector<std::string>{"id"});
+    EXPECT_EQ(table_info.num_buckets, 3);
+    EXPECT_EQ(table_info.bucket_keys, std::vector<std::string>{"id"});
+
+    // Drop table
+    ASSERT_OK(adm.DropTable(table_path, false));
+    ASSERT_OK(adm.TableExists(table_path, exists));
+    ASSERT_FALSE(exists);
+
+    // Drop database
+    ASSERT_OK(adm.DropDatabase(db_name, false, true));
+    ASSERT_OK(adm.DatabaseExists(db_name, exists));
+    ASSERT_FALSE(exists);
+}
+
+TEST_F(AdminTest, PartitionApis) {
+    auto& adm = admin();
+
+    std::string db_name = "test_partition_apis_cpp_db";
+    fluss::DatabaseDescriptor db_desc;
+    db_desc.comment = "Database for test_partition_apis";
+    ASSERT_OK(adm.CreateDatabase(db_name, db_desc, true));
+
+    fluss::TablePath table_path(db_name, "partitioned_table");
+
+    auto schema = fluss::Schema::NewBuilder()
+                      .AddColumn("id", fluss::DataType::Int())
+                      .AddColumn("name", fluss::DataType::String())
+                      .AddColumn("dt", fluss::DataType::String())
+                      .AddColumn("region", fluss::DataType::String())
+                      .SetPrimaryKeys({"id", "dt", "region"})
+                      .Build();
+
+    auto table_descriptor = fluss::TableDescriptor::NewBuilder()
+                                .SetSchema(schema)
+                                .SetBucketCount(3)
+                                .SetBucketKeys({"id"})
+                                .SetPartitionKeys({"dt", "region"})
+                                .SetProperty("table.replication.factor", "1")
+                                .SetLogFormat("arrow")
+                                .SetKvFormat("compacted")
+                                .Build();
+
+    ASSERT_OK(adm.CreateTable(table_path, table_descriptor, true));
+
+    // No partitions initially
+    std::vector<fluss::PartitionInfo> partitions;
+    ASSERT_OK(adm.ListPartitionInfos(table_path, partitions));
+    ASSERT_TRUE(partitions.empty());
+
+    // Create a partition
+    std::unordered_map<std::string, std::string> partition_spec = {
+        {"dt", "2024-01-15"}, {"region", "EMEA"}};
+    ASSERT_OK(adm.CreatePartition(table_path, partition_spec, false));
+
+    // Should have one partition
+    ASSERT_OK(adm.ListPartitionInfos(table_path, partitions));
+    ASSERT_EQ(partitions.size(), 1u);
+    EXPECT_EQ(partitions[0].partition_name, "2024-01-15$EMEA");
+
+    // List with partial spec filter - should find the partition
+    std::unordered_map<std::string, std::string> partial_spec = {{"dt", 
"2024-01-15"}};
+    std::vector<fluss::PartitionInfo> partitions_with_spec;
+    ASSERT_OK(adm.ListPartitionInfos(table_path, partial_spec, 
partitions_with_spec));
+    ASSERT_EQ(partitions_with_spec.size(), 1u);
+    EXPECT_EQ(partitions_with_spec[0].partition_name, "2024-01-15$EMEA");
+
+    // List with non-matching spec - should find no partitions
+    std::unordered_map<std::string, std::string> non_matching_spec = {{"dt", 
"2024-01-16"}};
+    std::vector<fluss::PartitionInfo> empty_partitions;
+    ASSERT_OK(adm.ListPartitionInfos(table_path, non_matching_spec, 
empty_partitions));
+    ASSERT_TRUE(empty_partitions.empty());
+
+    // Drop partition
+    ASSERT_OK(adm.DropPartition(table_path, partition_spec, false));
+
+    ASSERT_OK(adm.ListPartitionInfos(table_path, partitions));
+    ASSERT_TRUE(partitions.empty());
+
+    // Cleanup
+    ASSERT_OK(adm.DropTable(table_path, true));
+    ASSERT_OK(adm.DropDatabase(db_name, true, true));
+}
+
+TEST_F(AdminTest, FlussErrorResponse) {
+    auto& adm = admin();
+
+    fluss::TablePath table_path("fluss", "not_exist_cpp");
+
+    fluss::TableInfo info;
+    auto result = adm.GetTableInfo(table_path, info);
+    ASSERT_FALSE(result.Ok());
+    EXPECT_EQ(result.error_code, fluss::ErrorCode::TABLE_NOT_EXIST);
+}
+
+TEST_F(AdminTest, ErrorDatabaseNotExist) {
+    auto& adm = admin();
+
+    // get_database_info for non-existent database
+    fluss::DatabaseInfo info;
+    auto result = adm.GetDatabaseInfo("no_such_db_cpp", info);
+    ASSERT_FALSE(result.Ok());
+    EXPECT_EQ(result.error_code, fluss::ErrorCode::DATABASE_NOT_EXIST);
+
+    // drop_database without ignore flag
+    result = adm.DropDatabase("no_such_db_cpp", false, false);
+    ASSERT_FALSE(result.Ok());
+    EXPECT_EQ(result.error_code, fluss::ErrorCode::DATABASE_NOT_EXIST);
+
+    // list_tables for non-existent database
+    std::vector<std::string> tables;
+    result = adm.ListTables("no_such_db_cpp", tables);
+    ASSERT_FALSE(result.Ok());
+    EXPECT_EQ(result.error_code, fluss::ErrorCode::DATABASE_NOT_EXIST);
+}
+
+TEST_F(AdminTest, ErrorDatabaseAlreadyExist) {
+    auto& adm = admin();
+
+    std::string db_name = "test_error_db_already_exist_cpp";
+    fluss::DatabaseDescriptor descriptor;
+
+    ASSERT_OK(adm.CreateDatabase(db_name, descriptor, false));
+
+    // Create same database again without ignore flag
+    auto result = adm.CreateDatabase(db_name, descriptor, false);
+    ASSERT_FALSE(result.Ok());
+    EXPECT_EQ(result.error_code, fluss::ErrorCode::DATABASE_ALREADY_EXIST);
+
+    // With ignore flag should succeed
+    ASSERT_OK(adm.CreateDatabase(db_name, descriptor, true));
+
+    // Cleanup
+    ASSERT_OK(adm.DropDatabase(db_name, true, true));
+}
+
+TEST_F(AdminTest, ErrorTableAlreadyExist) {
+    auto& adm = admin();
+
+    std::string db_name = "test_error_tbl_already_exist_cpp_db";
+    fluss::DatabaseDescriptor db_desc;
+    ASSERT_OK(adm.CreateDatabase(db_name, db_desc, true));
+
+    fluss::TablePath table_path(db_name, "my_table");
+    auto schema = fluss::Schema::NewBuilder()
+                      .AddColumn("id", fluss::DataType::Int())
+                      .AddColumn("name", fluss::DataType::String())
+                      .Build();
+    auto table_desc = fluss::TableDescriptor::NewBuilder()
+                          .SetSchema(schema)
+                          .SetBucketCount(1)
+                          .SetProperty("table.replication.factor", "1")
+                          .Build();
+
+    ASSERT_OK(adm.CreateTable(table_path, table_desc, false));
+
+    // Create same table again without ignore flag
+    auto result = adm.CreateTable(table_path, table_desc, false);
+    ASSERT_FALSE(result.Ok());
+    EXPECT_EQ(result.error_code, fluss::ErrorCode::TABLE_ALREADY_EXIST);
+
+    // With ignore flag should succeed
+    ASSERT_OK(adm.CreateTable(table_path, table_desc, true));
+
+    // Cleanup
+    ASSERT_OK(adm.DropTable(table_path, true));
+    ASSERT_OK(adm.DropDatabase(db_name, true, true));
+}
+
+TEST_F(AdminTest, ErrorTableNotExist) {
+    auto& adm = admin();
+
+    fluss::TablePath table_path("fluss", "no_such_table_cpp");
+
+    // Drop without ignore flag
+    auto result = adm.DropTable(table_path, false);
+    ASSERT_FALSE(result.Ok());
+    EXPECT_EQ(result.error_code, fluss::ErrorCode::TABLE_NOT_EXIST);
+
+    // Drop with ignore flag should succeed
+    ASSERT_OK(adm.DropTable(table_path, true));
+}
+
+TEST_F(AdminTest, ErrorTableNotPartitioned) {
+    auto& adm = admin();
+
+    std::string db_name = "test_error_not_partitioned_cpp_db";
+    fluss::DatabaseDescriptor db_desc;
+    ASSERT_OK(adm.CreateDatabase(db_name, db_desc, true));
+
+    fluss::TablePath table_path(db_name, "non_partitioned_table");
+    auto schema = fluss::Schema::NewBuilder()
+                      .AddColumn("id", fluss::DataType::Int())
+                      .AddColumn("name", fluss::DataType::String())
+                      .Build();
+    auto table_desc = fluss::TableDescriptor::NewBuilder()
+                          .SetSchema(schema)
+                          .SetBucketCount(1)
+                          .SetProperty("table.replication.factor", "1")
+                          .Build();
+
+    ASSERT_OK(adm.CreateTable(table_path, table_desc, false));
+
+    // list_partition_infos on non-partitioned table
+    std::vector<fluss::PartitionInfo> partitions;
+    auto result = adm.ListPartitionInfos(table_path, partitions);
+    ASSERT_FALSE(result.Ok());
+    EXPECT_EQ(result.error_code, 
fluss::ErrorCode::TABLE_NOT_PARTITIONED_EXCEPTION);
+
+    // Cleanup
+    ASSERT_OK(adm.DropTable(table_path, true));
+    ASSERT_OK(adm.DropDatabase(db_name, true, true));
+}
diff --git a/bindings/cpp/test/test_kv_table.cpp 
b/bindings/cpp/test/test_kv_table.cpp
new file mode 100644
index 0000000..9c4f7a0
--- /dev/null
+++ b/bindings/cpp/test/test_kv_table.cpp
@@ -0,0 +1,733 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include "test_utils.h"
+
+class KvTableTest : public ::testing::Test {
+   protected:
+    fluss::Admin& admin() { return 
fluss_test::FlussTestEnvironment::Instance()->GetAdmin(); }
+
+    fluss::Connection& connection() {
+        return fluss_test::FlussTestEnvironment::Instance()->GetConnection();
+    }
+};
+
+TEST_F(KvTableTest, UpsertDeleteAndLookup) {
+    auto& adm = admin();
+    auto& conn = connection();
+
+    fluss::TablePath table_path("fluss", "test_upsert_and_lookup_cpp");
+
+    auto schema = fluss::Schema::NewBuilder()
+                      .AddColumn("id", fluss::DataType::Int())
+                      .AddColumn("name", fluss::DataType::String())
+                      .AddColumn("age", fluss::DataType::BigInt())
+                      .SetPrimaryKeys({"id"})
+                      .Build();
+
+    auto table_descriptor = fluss::TableDescriptor::NewBuilder()
+                                .SetSchema(schema)
+                                .SetProperty("table.replication.factor", "1")
+                                .Build();
+
+    fluss_test::CreateTable(adm, table_path, table_descriptor);
+
+    fluss::Table table;
+    ASSERT_OK(conn.GetTable(table_path, table));
+
+    // Create upsert writer
+    auto table_upsert = table.NewUpsert();
+    fluss::UpsertWriter upsert_writer;
+    ASSERT_OK(table_upsert.CreateWriter(upsert_writer));
+
+    // Upsert 3 rows (fire-and-forget, then flush)
+    struct TestData {
+        int32_t id;
+        std::string name;
+        int64_t age;
+    };
+    std::vector<TestData> test_data = {{1, "Verso", 32}, {2, "Noco", 25}, {3, 
"Esquie", 35}};
+
+    for (const auto& d : test_data) {
+        fluss::GenericRow row(3);
+        row.SetInt32(0, d.id);
+        row.SetString(1, d.name);
+        row.SetInt64(2, d.age);
+        ASSERT_OK(upsert_writer.Upsert(row));
+    }
+    ASSERT_OK(upsert_writer.Flush());
+
+    // Create lookuper
+    fluss::Lookuper lookuper;
+    ASSERT_OK(table.NewLookup().CreateLookuper(lookuper));
+
+    // Verify lookup results
+    for (const auto& d : test_data) {
+        fluss::GenericRow key(3);
+        key.SetInt32(0, d.id);
+
+        fluss::LookupResult result;
+        ASSERT_OK(lookuper.Lookup(key, result));
+        ASSERT_TRUE(result.Found()) << "Row with id=" << d.id << " should 
exist";
+
+        EXPECT_EQ(result.GetInt32(0), d.id) << "id mismatch";
+        EXPECT_EQ(result.GetString(1), d.name) << "name mismatch";
+        EXPECT_EQ(result.GetInt64(2), d.age) << "age mismatch";
+    }
+
+    // Update record with id=1 (await acknowledgment)
+    {
+        fluss::GenericRow updated_row(3);
+        updated_row.SetInt32(0, 1);
+        updated_row.SetString(1, "Verso");
+        updated_row.SetInt64(2, 33);
+        fluss::WriteResult wr;
+        ASSERT_OK(upsert_writer.Upsert(updated_row, wr));
+        ASSERT_OK(wr.Wait());
+    }
+
+    // Verify the update
+    {
+        fluss::GenericRow key(3);
+        key.SetInt32(0, 1);
+        fluss::LookupResult result;
+        ASSERT_OK(lookuper.Lookup(key, result));
+        ASSERT_TRUE(result.Found());
+        EXPECT_EQ(result.GetInt64(2), 33) << "Age should be updated";
+        EXPECT_EQ(result.GetString(1), "Verso") << "Name should remain 
unchanged";
+    }
+
+    // Delete record with id=1 (await acknowledgment)
+    {
+        fluss::GenericRow delete_row(3);
+        delete_row.SetInt32(0, 1);
+        fluss::WriteResult wr;
+        ASSERT_OK(upsert_writer.Delete(delete_row, wr));
+        ASSERT_OK(wr.Wait());
+    }
+
+    // Verify deletion
+    {
+        fluss::GenericRow key(3);
+        key.SetInt32(0, 1);
+        fluss::LookupResult result;
+        ASSERT_OK(lookuper.Lookup(key, result));
+        ASSERT_FALSE(result.Found()) << "Record 1 should not exist after 
delete";
+    }
+
+    // Verify other records still exist
+    for (int id : {2, 3}) {
+        fluss::GenericRow key(3);
+        key.SetInt32(0, id);
+        fluss::LookupResult result;
+        ASSERT_OK(lookuper.Lookup(key, result));
+        ASSERT_TRUE(result.Found()) << "Record " << id
+                                    << " should still exist after deleting 
record 1";
+    }
+
+    // Lookup non-existent key
+    {
+        fluss::GenericRow key(3);
+        key.SetInt32(0, 999);
+        fluss::LookupResult result;
+        ASSERT_OK(lookuper.Lookup(key, result));
+        ASSERT_FALSE(result.Found()) << "Non-existent key should return not 
found";
+    }
+
+    ASSERT_OK(adm.DropTable(table_path, false));
+}
+
+TEST_F(KvTableTest, CompositePrimaryKeys) {
+    auto& adm = admin();
+    auto& conn = connection();
+
+    fluss::TablePath table_path("fluss", "test_composite_pk_cpp");
+
+    auto schema = fluss::Schema::NewBuilder()
+                      .AddColumn("region", fluss::DataType::String())
+                      .AddColumn("score", fluss::DataType::BigInt())
+                      .AddColumn("user_id", fluss::DataType::Int())
+                      .SetPrimaryKeys({"region", "user_id"})
+                      .Build();
+
+    auto table_descriptor = fluss::TableDescriptor::NewBuilder()
+                                .SetSchema(schema)
+                                .SetProperty("table.replication.factor", "1")
+                                .Build();
+
+    fluss_test::CreateTable(adm, table_path, table_descriptor);
+
+    fluss::Table table;
+    ASSERT_OK(conn.GetTable(table_path, table));
+
+    auto table_upsert = table.NewUpsert();
+    fluss::UpsertWriter upsert_writer;
+    ASSERT_OK(table_upsert.CreateWriter(upsert_writer));
+
+    // Insert records with composite keys
+    struct TestData {
+        std::string region;
+        int32_t user_id;
+        int64_t score;
+    };
+    std::vector<TestData> test_data = {
+        {"US", 1, 100}, {"US", 2, 200}, {"EU", 1, 150}, {"EU", 2, 250}};
+
+    for (const auto& d : test_data) {
+        auto row = table.NewRow();
+        row.Set("region", d.region);
+        row.Set("score", d.score);
+        row.Set("user_id", d.user_id);
+        ASSERT_OK(upsert_writer.Upsert(row));
+    }
+    ASSERT_OK(upsert_writer.Flush());
+
+    // Create lookuper
+    fluss::Lookuper lookuper;
+    ASSERT_OK(table.NewLookup().CreateLookuper(lookuper));
+
+    // Lookup (US, 1) - should return score 100
+    {
+        auto key = table.NewRow();
+        key.Set("region", "US");
+        key.Set("user_id", 1);
+        fluss::LookupResult result;
+        ASSERT_OK(lookuper.Lookup(key, result));
+        ASSERT_TRUE(result.Found());
+        EXPECT_EQ(result.GetInt64("score"), 100) << "Score for (US, 1) should 
be 100";
+    }
+
+    // Lookup (EU, 2) - should return score 250
+    {
+        auto key = table.NewRow();
+        key.Set("region", "EU");
+        key.Set("user_id", 2);
+        fluss::LookupResult result;
+        ASSERT_OK(lookuper.Lookup(key, result));
+        ASSERT_TRUE(result.Found());
+        EXPECT_EQ(result.GetInt64("score"), 250) << "Score for (EU, 2) should 
be 250";
+    }
+
+    // Update (US, 1) score (await acknowledgment)
+    {
+        auto update_row = table.NewRow();
+        update_row.Set("region", "US");
+        update_row.Set("user_id", 1);
+        update_row.Set("score", static_cast<int64_t>(500));
+        fluss::WriteResult wr;
+        ASSERT_OK(upsert_writer.Upsert(update_row, wr));
+        ASSERT_OK(wr.Wait());
+    }
+
+    // Verify update
+    {
+        auto key = table.NewRow();
+        key.Set("region", "US");
+        key.Set("user_id", 1);
+        fluss::LookupResult result;
+        ASSERT_OK(lookuper.Lookup(key, result));
+        ASSERT_TRUE(result.Found());
+        EXPECT_EQ(result.GetInt64("score"), 500) << "Row score should be 
updated";
+    }
+
+    ASSERT_OK(adm.DropTable(table_path, false));
+}
+
+TEST_F(KvTableTest, PartialUpdate) {
+    auto& adm = admin();
+    auto& conn = connection();
+
+    fluss::TablePath table_path("fluss", "test_partial_update_cpp");
+
+    auto schema = fluss::Schema::NewBuilder()
+                      .AddColumn("id", fluss::DataType::Int())
+                      .AddColumn("name", fluss::DataType::String())
+                      .AddColumn("age", fluss::DataType::BigInt())
+                      .AddColumn("score", fluss::DataType::BigInt())
+                      .SetPrimaryKeys({"id"})
+                      .Build();
+
+    auto table_descriptor = fluss::TableDescriptor::NewBuilder()
+                                .SetSchema(schema)
+                                .SetProperty("table.replication.factor", "1")
+                                .Build();
+
+    fluss_test::CreateTable(adm, table_path, table_descriptor);
+
+    fluss::Table table;
+    ASSERT_OK(conn.GetTable(table_path, table));
+
+    // Insert initial record with all columns
+    auto table_upsert = table.NewUpsert();
+    fluss::UpsertWriter upsert_writer;
+    ASSERT_OK(table_upsert.CreateWriter(upsert_writer));
+
+    {
+        fluss::GenericRow row(4);
+        row.SetInt32(0, 1);
+        row.SetString(1, "Verso");
+        row.SetInt64(2, 32);
+        row.SetInt64(3, 6942);
+        fluss::WriteResult wr;
+        ASSERT_OK(upsert_writer.Upsert(row, wr));
+        ASSERT_OK(wr.Wait());
+    }
+
+    // Verify initial record
+    fluss::Lookuper lookuper;
+    ASSERT_OK(table.NewLookup().CreateLookuper(lookuper));
+
+    {
+        fluss::GenericRow key(4);
+        key.SetInt32(0, 1);
+        fluss::LookupResult result;
+        ASSERT_OK(lookuper.Lookup(key, result));
+        ASSERT_TRUE(result.Found());
+        EXPECT_EQ(result.GetInt32(0), 1);
+        EXPECT_EQ(result.GetString(1), "Verso");
+        EXPECT_EQ(result.GetInt64(2), 32);
+        EXPECT_EQ(result.GetInt64(3), 6942);
+    }
+
+    // Create partial update writer to update only score column
+    auto partial_upsert = table.NewUpsert();
+    partial_upsert.PartialUpdateByName({"id", "score"});
+    fluss::UpsertWriter partial_writer;
+    ASSERT_OK(partial_upsert.CreateWriter(partial_writer));
+
+    // Update only the score column (await acknowledgment)
+    {
+        fluss::GenericRow partial_row(4);
+        partial_row.SetInt32(0, 1);
+        partial_row.SetNull(1);  // not in partial update
+        partial_row.SetNull(2);  // not in partial update
+        partial_row.SetInt64(3, 420);
+        fluss::WriteResult wr;
+        ASSERT_OK(partial_writer.Upsert(partial_row, wr));
+        ASSERT_OK(wr.Wait());
+    }
+
+    // Verify partial update - name and age should remain unchanged
+    {
+        fluss::GenericRow key(4);
+        key.SetInt32(0, 1);
+        fluss::LookupResult result;
+        ASSERT_OK(lookuper.Lookup(key, result));
+        ASSERT_TRUE(result.Found());
+        EXPECT_EQ(result.GetInt32(0), 1) << "id should remain 1";
+        EXPECT_EQ(result.GetString(1), "Verso") << "name should remain 
unchanged";
+        EXPECT_EQ(result.GetInt64(2), 32) << "age should remain unchanged";
+        EXPECT_EQ(result.GetInt64(3), 420) << "score should be updated to 420";
+    }
+
+    ASSERT_OK(adm.DropTable(table_path, false));
+}
+
+TEST_F(KvTableTest, PartialUpdateByIndex) {
+    auto& adm = admin();
+    auto& conn = connection();
+
+    fluss::TablePath table_path("fluss", "test_partial_update_by_index_cpp");
+
+    auto schema = fluss::Schema::NewBuilder()
+                      .AddColumn("id", fluss::DataType::Int())
+                      .AddColumn("name", fluss::DataType::String())
+                      .AddColumn("age", fluss::DataType::BigInt())
+                      .AddColumn("score", fluss::DataType::BigInt())
+                      .SetPrimaryKeys({"id"})
+                      .Build();
+
+    auto table_descriptor = fluss::TableDescriptor::NewBuilder()
+                                .SetSchema(schema)
+                                .SetProperty("table.replication.factor", "1")
+                                .Build();
+
+    fluss_test::CreateTable(adm, table_path, table_descriptor);
+
+    fluss::Table table;
+    ASSERT_OK(conn.GetTable(table_path, table));
+
+    // Insert initial record with all columns
+    auto table_upsert = table.NewUpsert();
+    fluss::UpsertWriter upsert_writer;
+    ASSERT_OK(table_upsert.CreateWriter(upsert_writer));
+
+    {
+        fluss::GenericRow row(4);
+        row.SetInt32(0, 1);
+        row.SetString(1, "Verso");
+        row.SetInt64(2, 32);
+        row.SetInt64(3, 6942);
+        fluss::WriteResult wr;
+        ASSERT_OK(upsert_writer.Upsert(row, wr));
+        ASSERT_OK(wr.Wait());
+    }
+
+    // Verify initial record
+    fluss::Lookuper lookuper;
+    ASSERT_OK(table.NewLookup().CreateLookuper(lookuper));
+
+    {
+        fluss::GenericRow key(4);
+        key.SetInt32(0, 1);
+        fluss::LookupResult result;
+        ASSERT_OK(lookuper.Lookup(key, result));
+        ASSERT_TRUE(result.Found());
+        EXPECT_EQ(result.GetInt32(0), 1);
+        EXPECT_EQ(result.GetString(1), "Verso");
+        EXPECT_EQ(result.GetInt64(2), 32);
+        EXPECT_EQ(result.GetInt64(3), 6942);
+    }
+
+    // Create partial update writer using column indices: 0 (id) and 3 (score)
+    auto partial_upsert = table.NewUpsert();
+    partial_upsert.PartialUpdateByIndex({0, 3});
+    fluss::UpsertWriter partial_writer;
+    ASSERT_OK(partial_upsert.CreateWriter(partial_writer));
+
+    // Update only the score column (await acknowledgment)
+    {
+        fluss::GenericRow partial_row(4);
+        partial_row.SetInt32(0, 1);
+        partial_row.SetNull(1);  // not in partial update
+        partial_row.SetNull(2);  // not in partial update
+        partial_row.SetInt64(3, 420);
+        fluss::WriteResult wr;
+        ASSERT_OK(partial_writer.Upsert(partial_row, wr));
+        ASSERT_OK(wr.Wait());
+    }
+
+    // Verify partial update - name and age should remain unchanged
+    {
+        fluss::GenericRow key(4);
+        key.SetInt32(0, 1);
+        fluss::LookupResult result;
+        ASSERT_OK(lookuper.Lookup(key, result));
+        ASSERT_TRUE(result.Found());
+        EXPECT_EQ(result.GetInt32(0), 1) << "id should remain 1";
+        EXPECT_EQ(result.GetString(1), "Verso") << "name should remain 
unchanged";
+        EXPECT_EQ(result.GetInt64(2), 32) << "age should remain unchanged";
+        EXPECT_EQ(result.GetInt64(3), 420) << "score should be updated to 420";
+    }
+
+    ASSERT_OK(adm.DropTable(table_path, false));
+}
+
+TEST_F(KvTableTest, PartitionedTableUpsertAndLookup) {
+    auto& adm = admin();
+    auto& conn = connection();
+
+    fluss::TablePath table_path("fluss", "test_partitioned_kv_table_cpp");
+
+    // Create a partitioned KV table with region as partition key
+    auto schema = fluss::Schema::NewBuilder()
+                      .AddColumn("region", fluss::DataType::String())
+                      .AddColumn("user_id", fluss::DataType::Int())
+                      .AddColumn("name", fluss::DataType::String())
+                      .AddColumn("score", fluss::DataType::BigInt())
+                      .SetPrimaryKeys({"region", "user_id"})
+                      .Build();
+
+    auto table_descriptor = fluss::TableDescriptor::NewBuilder()
+                                .SetSchema(schema)
+                                .SetPartitionKeys({"region"})
+                                .SetProperty("table.replication.factor", "1")
+                                .Build();
+
+    fluss_test::CreateTable(adm, table_path, table_descriptor);
+
+    // Create partitions
+    fluss_test::CreatePartitions(adm, table_path, "region", {"US", "EU", 
"APAC"});
+
+    fluss::Table table;
+    ASSERT_OK(conn.GetTable(table_path, table));
+
+    auto table_upsert = table.NewUpsert();
+    fluss::UpsertWriter upsert_writer;
+    ASSERT_OK(table_upsert.CreateWriter(upsert_writer));
+
+    // Insert records with different partitions
+    struct TestData {
+        std::string region;
+        int32_t user_id;
+        std::string name;
+        int64_t score;
+    };
+    std::vector<TestData> test_data = {{"US", 1, "Gustave", 100}, {"US", 2, 
"Lune", 200},
+                                       {"EU", 1, "Sciel", 150},   {"EU", 2, 
"Maelle", 250},
+                                       {"APAC", 1, "Noco", 300}};
+
+    for (const auto& d : test_data) {
+        fluss::GenericRow row(4);
+        row.SetString(0, d.region);
+        row.SetInt32(1, d.user_id);
+        row.SetString(2, d.name);
+        row.SetInt64(3, d.score);
+        ASSERT_OK(upsert_writer.Upsert(row));
+    }
+    ASSERT_OK(upsert_writer.Flush());
+
+    // Create lookuper
+    fluss::Lookuper lookuper;
+    ASSERT_OK(table.NewLookup().CreateLookuper(lookuper));
+
+    // Lookup records
+    for (const auto& d : test_data) {
+        fluss::GenericRow key(4);
+        key.SetString(0, d.region);
+        key.SetInt32(1, d.user_id);
+
+        fluss::LookupResult result;
+        ASSERT_OK(lookuper.Lookup(key, result));
+        ASSERT_TRUE(result.Found());
+
+        EXPECT_EQ(std::string(result.GetString(0)), d.region) << "region 
mismatch";
+        EXPECT_EQ(result.GetInt32(1), d.user_id) << "user_id mismatch";
+        EXPECT_EQ(std::string(result.GetString(2)), d.name) << "name mismatch";
+        EXPECT_EQ(result.GetInt64(3), d.score) << "score mismatch";
+    }
+
+    // Update within a partition (await acknowledgment)
+    {
+        fluss::GenericRow updated_row(4);
+        updated_row.SetString(0, "US");
+        updated_row.SetInt32(1, 1);
+        updated_row.SetString(2, "Gustave Updated");
+        updated_row.SetInt64(3, 999);
+        fluss::WriteResult wr;
+        ASSERT_OK(upsert_writer.Upsert(updated_row, wr));
+        ASSERT_OK(wr.Wait());
+    }
+
+    // Verify the update
+    {
+        fluss::GenericRow key(4);
+        key.SetString(0, "US");
+        key.SetInt32(1, 1);
+        fluss::LookupResult result;
+        ASSERT_OK(lookuper.Lookup(key, result));
+        ASSERT_TRUE(result.Found());
+        EXPECT_EQ(std::string(result.GetString(2)), "Gustave Updated");
+        EXPECT_EQ(result.GetInt64(3), 999);
+    }
+
+    // Lookup in non-existent partition should return not found
+    {
+        fluss::GenericRow key(4);
+        key.SetString(0, "UNKNOWN_REGION");
+        key.SetInt32(1, 1);
+        fluss::LookupResult result;
+        ASSERT_OK(lookuper.Lookup(key, result));
+        ASSERT_FALSE(result.Found()) << "Lookup in non-existent partition 
should return not found";
+    }
+
+    // Delete a record within a partition (await acknowledgment)
+    {
+        fluss::GenericRow delete_key(4);
+        delete_key.SetString(0, "EU");
+        delete_key.SetInt32(1, 1);
+        fluss::WriteResult wr;
+        ASSERT_OK(upsert_writer.Delete(delete_key, wr));
+        ASSERT_OK(wr.Wait());
+    }
+
+    // Verify deletion
+    {
+        fluss::GenericRow key(4);
+        key.SetString(0, "EU");
+        key.SetInt32(1, 1);
+        fluss::LookupResult result;
+        ASSERT_OK(lookuper.Lookup(key, result));
+        ASSERT_FALSE(result.Found()) << "Deleted record should not exist";
+    }
+
+    // Verify other records in same partition still exist
+    {
+        fluss::GenericRow key(4);
+        key.SetString(0, "EU");
+        key.SetInt32(1, 2);
+        fluss::LookupResult result;
+        ASSERT_OK(lookuper.Lookup(key, result));
+        ASSERT_TRUE(result.Found());
+        EXPECT_EQ(std::string(result.GetString(2)), "Maelle");
+    }
+
+    ASSERT_OK(adm.DropTable(table_path, false));
+}
+
+TEST_F(KvTableTest, AllSupportedDatatypes) {
+    auto& adm = admin();
+    auto& conn = connection();
+
+    fluss::TablePath table_path("fluss", "test_all_datatypes_cpp");
+
+    // Create a table with all supported datatypes
+    auto schema = fluss::Schema::NewBuilder()
+                      .AddColumn("pk_int", fluss::DataType::Int())
+                      .AddColumn("col_boolean", fluss::DataType::Boolean())
+                      .AddColumn("col_tinyint", fluss::DataType::TinyInt())
+                      .AddColumn("col_smallint", fluss::DataType::SmallInt())
+                      .AddColumn("col_int", fluss::DataType::Int())
+                      .AddColumn("col_bigint", fluss::DataType::BigInt())
+                      .AddColumn("col_float", fluss::DataType::Float())
+                      .AddColumn("col_double", fluss::DataType::Double())
+                      .AddColumn("col_char", fluss::DataType::Char(10))
+                      .AddColumn("col_string", fluss::DataType::String())
+                      .AddColumn("col_decimal", fluss::DataType::Decimal(10, 
2))
+                      .AddColumn("col_date", fluss::DataType::Date())
+                      .AddColumn("col_time", fluss::DataType::Time())
+                      .AddColumn("col_timestamp", fluss::DataType::Timestamp())
+                      .AddColumn("col_timestamp_ltz", 
fluss::DataType::TimestampLtz())
+                      .AddColumn("col_bytes", fluss::DataType::Bytes())
+                      .AddColumn("col_binary", fluss::DataType::Binary(20))
+                      .SetPrimaryKeys({"pk_int"})
+                      .Build();
+
+    auto table_descriptor = fluss::TableDescriptor::NewBuilder()
+                                .SetSchema(schema)
+                                .SetProperty("table.replication.factor", "1")
+                                .Build();
+
+    fluss_test::CreateTable(adm, table_path, table_descriptor);
+
+    fluss::Table table;
+    ASSERT_OK(conn.GetTable(table_path, table));
+
+    auto table_upsert = table.NewUpsert();
+    fluss::UpsertWriter upsert_writer;
+    ASSERT_OK(table_upsert.CreateWriter(upsert_writer));
+
+    // Test data
+    int32_t pk_int = 1;
+    bool col_boolean = true;
+    int32_t col_tinyint = 127;
+    int32_t col_smallint = 32767;
+    int32_t col_int = 2147483647;
+    int64_t col_bigint = 9223372036854775807LL;
+    float col_float = 3.14f;
+    double col_double = 2.718281828459045;
+    std::string col_char = "hello";
+    std::string col_string = "world of fluss rust client";
+    std::string col_decimal = "123.45";
+    auto col_date = fluss::Date::FromDays(20476);           // 2026-01-23
+    auto col_time = fluss::Time::FromMillis(36827000);       // 10:13:47
+    auto col_timestamp = fluss::Timestamp::FromMillis(1769163227123);      // 
2026-01-23 10:13:47.123
+    auto col_timestamp_ltz = fluss::Timestamp::FromMillis(1769163227123);
+    std::vector<uint8_t> col_bytes = {'b', 'i', 'n', 'a', 'r', 'y', ' ', 'd', 
'a', 't', 'a'};
+    std::vector<uint8_t> col_binary = {'f', 'i', 'x', 'e', 'd', ' ', 'b', 'i', 
'n', 'a',
+                                       'r', 'y', ' ', 'd', 'a', 't', 'a', '!', 
'!', '!'};
+
+    // Upsert a row with all datatypes
+    {
+        fluss::GenericRow row(17);
+        row.SetInt32(0, pk_int);
+        row.SetBool(1, col_boolean);
+        row.SetInt32(2, col_tinyint);
+        row.SetInt32(3, col_smallint);
+        row.SetInt32(4, col_int);
+        row.SetInt64(5, col_bigint);
+        row.SetFloat32(6, col_float);
+        row.SetFloat64(7, col_double);
+        row.SetString(8, col_char);
+        row.SetString(9, col_string);
+        row.SetDecimal(10, col_decimal);
+        row.SetDate(11, col_date);
+        row.SetTime(12, col_time);
+        row.SetTimestampNtz(13, col_timestamp);
+        row.SetTimestampLtz(14, col_timestamp_ltz);
+        row.SetBytes(15, col_bytes);
+        row.SetBytes(16, col_binary);
+        fluss::WriteResult wr;
+        ASSERT_OK(upsert_writer.Upsert(row, wr));
+        ASSERT_OK(wr.Wait());
+    }
+
+    // Lookup the record
+    fluss::Lookuper lookuper;
+    ASSERT_OK(table.NewLookup().CreateLookuper(lookuper));
+
+    {
+        fluss::GenericRow key(17);
+        key.SetInt32(0, pk_int);
+
+        fluss::LookupResult result;
+        ASSERT_OK(lookuper.Lookup(key, result));
+        ASSERT_TRUE(result.Found());
+
+        // Verify all datatypes
+        EXPECT_EQ(result.GetInt32(0), pk_int) << "pk_int mismatch";
+        EXPECT_EQ(result.GetBool(1), col_boolean) << "col_boolean mismatch";
+        EXPECT_EQ(result.GetInt32(2), col_tinyint) << "col_tinyint mismatch";
+        EXPECT_EQ(result.GetInt32(3), col_smallint) << "col_smallint mismatch";
+        EXPECT_EQ(result.GetInt32(4), col_int) << "col_int mismatch";
+        EXPECT_EQ(result.GetInt64(5), col_bigint) << "col_bigint mismatch";
+        EXPECT_NEAR(result.GetFloat32(6), col_float, 1e-6f) << "col_float 
mismatch";
+        EXPECT_NEAR(result.GetFloat64(7), col_double, 1e-15) << "col_double 
mismatch";
+        EXPECT_EQ(result.GetString(8), col_char) << "col_char mismatch";
+        EXPECT_EQ(result.GetString(9), col_string) << "col_string mismatch";
+        EXPECT_EQ(result.GetDecimalString(10), col_decimal) << "col_decimal 
mismatch";
+        EXPECT_EQ(result.GetDate(11).days_since_epoch, 
col_date.days_since_epoch) << "col_date mismatch";
+        EXPECT_EQ(result.GetTime(12).millis_since_midnight, 
col_time.millis_since_midnight) << "col_time mismatch";
+        EXPECT_EQ(result.GetTimestamp(13).epoch_millis, 
col_timestamp.epoch_millis)
+            << "col_timestamp mismatch";
+        EXPECT_EQ(result.GetTimestamp(14).epoch_millis, 
col_timestamp_ltz.epoch_millis)
+            << "col_timestamp_ltz mismatch";
+
+        auto [bytes_ptr, bytes_len] = result.GetBytes(15);
+        EXPECT_EQ(bytes_len, col_bytes.size()) << "col_bytes length mismatch";
+        EXPECT_TRUE(std::memcmp(bytes_ptr, col_bytes.data(), bytes_len) == 0)
+            << "col_bytes mismatch";
+
+        auto [binary_ptr, binary_len] = result.GetBytes(16);
+        EXPECT_EQ(binary_len, col_binary.size()) << "col_binary length 
mismatch";
+        EXPECT_TRUE(std::memcmp(binary_ptr, col_binary.data(), binary_len) == 
0)
+            << "col_binary mismatch";
+    }
+
+    // Test with null values for nullable columns
+    {
+        fluss::GenericRow row_with_nulls(17);
+        row_with_nulls.SetInt32(0, 2);  // pk_int = 2
+        for (size_t i = 1; i < 17; ++i) {
+            row_with_nulls.SetNull(i);
+        }
+        fluss::WriteResult wr;
+        ASSERT_OK(upsert_writer.Upsert(row_with_nulls, wr));
+        ASSERT_OK(wr.Wait());
+    }
+
+    // Lookup row with nulls
+    {
+        fluss::GenericRow key(17);
+        key.SetInt32(0, 2);
+
+        fluss::LookupResult result;
+        ASSERT_OK(lookuper.Lookup(key, result));
+        ASSERT_TRUE(result.Found());
+
+        EXPECT_EQ(result.GetInt32(0), 2) << "pk_int mismatch";
+        for (size_t i = 1; i < 17; ++i) {
+            EXPECT_TRUE(result.IsNull(i)) << "column " << i << " should be 
null";
+        }
+    }
+
+    ASSERT_OK(adm.DropTable(table_path, false));
+}
diff --git a/bindings/cpp/test/test_log_table.cpp 
b/bindings/cpp/test/test_log_table.cpp
new file mode 100644
index 0000000..47ab6f2
--- /dev/null
+++ b/bindings/cpp/test/test_log_table.cpp
@@ -0,0 +1,831 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <arrow/api.h>
+#include <gtest/gtest.h>
+
+#include <algorithm>
+#include <chrono>
+#include <thread>
+#include <tuple>
+
+#include "test_utils.h"
+
+class LogTableTest : public ::testing::Test {
+   protected:
+    fluss::Admin& admin() { return 
fluss_test::FlussTestEnvironment::Instance()->GetAdmin(); }
+
+    fluss::Connection& connection() {
+        return fluss_test::FlussTestEnvironment::Instance()->GetConnection();
+    }
+};
+
+TEST_F(LogTableTest, AppendRecordBatchAndScan) {
+    auto& adm = admin();
+    auto& conn = connection();
+
+    fluss::TablePath table_path("fluss", 
"test_append_record_batch_and_scan_cpp");
+
+    auto schema = fluss::Schema::NewBuilder()
+                      .AddColumn("c1", fluss::DataType::Int())
+                      .AddColumn("c2", fluss::DataType::String())
+                      .Build();
+
+    auto table_descriptor = fluss::TableDescriptor::NewBuilder()
+                                .SetSchema(schema)
+                                .SetBucketCount(3)
+                                .SetBucketKeys({"c1"})
+                                .SetProperty("table.replication.factor", "1")
+                                .Build();
+
+    fluss_test::CreateTable(adm, table_path, table_descriptor);
+
+    fluss::Table table;
+    ASSERT_OK(conn.GetTable(table_path, table));
+
+    // Create append writer
+    auto table_append = table.NewAppend();
+    fluss::AppendWriter append_writer;
+    ASSERT_OK(table_append.CreateWriter(append_writer));
+
+    // Append Arrow record batches
+    {
+        auto c1 = arrow::Int32Builder();
+        c1.AppendValues({1, 2, 3}).ok();
+        auto c2 = arrow::StringBuilder();
+        c2.AppendValues({"a1", "a2", "a3"}).ok();
+
+        auto batch = arrow::RecordBatch::Make(
+            arrow::schema({arrow::field("c1", arrow::int32()), 
arrow::field("c2", arrow::utf8())}),
+            3, {c1.Finish().ValueOrDie(), c2.Finish().ValueOrDie()});
+
+        ASSERT_OK(append_writer.AppendArrowBatch(batch));
+    }
+
+    {
+        auto c1 = arrow::Int32Builder();
+        c1.AppendValues({4, 5, 6}).ok();
+        auto c2 = arrow::StringBuilder();
+        c2.AppendValues({"a4", "a5", "a6"}).ok();
+
+        auto batch = arrow::RecordBatch::Make(
+            arrow::schema({arrow::field("c1", arrow::int32()), 
arrow::field("c2", arrow::utf8())}),
+            3, {c1.Finish().ValueOrDie(), c2.Finish().ValueOrDie()});
+
+        ASSERT_OK(append_writer.AppendArrowBatch(batch));
+    }
+
+    ASSERT_OK(append_writer.Flush());
+
+    // Create scanner and subscribe to all 3 buckets
+    fluss::Table scan_table;
+    ASSERT_OK(conn.GetTable(table_path, scan_table));
+    int32_t num_buckets = scan_table.GetTableInfo().num_buckets;
+    ASSERT_EQ(num_buckets, 3) << "Table should have 3 buckets";
+
+    auto table_scan = scan_table.NewScan();
+    fluss::LogScanner log_scanner;
+    ASSERT_OK(table_scan.CreateLogScanner(log_scanner));
+
+    for (int32_t bucket_id = 0; bucket_id < num_buckets; ++bucket_id) {
+        ASSERT_OK(log_scanner.Subscribe(bucket_id, fluss::EARLIEST_OFFSET));
+    }
+
+    // Poll for records across all buckets
+    std::vector<std::pair<int32_t, std::string>> records;
+    fluss_test::PollRecords(log_scanner, 6, [](const fluss::ScanRecord& rec) {
+        return std::make_pair(rec.row.GetInt32(0), 
std::string(rec.row.GetString(1)));
+    }, records);
+    ASSERT_EQ(records.size(), 6u) << "Expected 6 records";
+    std::sort(records.begin(), records.end());
+
+    std::vector<std::pair<int32_t, std::string>> expected = {
+        {1, "a1"}, {2, "a2"}, {3, "a3"}, {4, "a4"}, {5, "a5"}, {6, "a6"}};
+    EXPECT_EQ(records, expected);
+
+    // Verify per-bucket iteration via BucketView
+    {
+        fluss::Table bucket_table;
+        ASSERT_OK(conn.GetTable(table_path, bucket_table));
+        auto bucket_scan = bucket_table.NewScan();
+        fluss::LogScanner bucket_scanner;
+        ASSERT_OK(bucket_scan.CreateLogScanner(bucket_scanner));
+
+        for (int32_t bid = 0; bid < num_buckets; ++bid) {
+            ASSERT_OK(bucket_scanner.Subscribe(bid, fluss::EARLIEST_OFFSET));
+        }
+
+        std::vector<std::pair<int32_t, std::string>> bucket_records;
+        auto bucket_deadline = std::chrono::steady_clock::now() + 
std::chrono::seconds(10);
+        size_t buckets_with_data = 0;
+        while (bucket_records.size() < 6 && std::chrono::steady_clock::now() < 
bucket_deadline) {
+            fluss::ScanRecords scan_records;
+            ASSERT_OK(bucket_scanner.Poll(500, scan_records));
+
+            // Iterate by bucket
+            for (size_t b = 0; b < scan_records.BucketCount(); ++b) {
+                auto bucket_view = scan_records.BucketAt(b);
+                if (!bucket_view.Empty()) {
+                    buckets_with_data++;
+                }
+                for (auto rec : bucket_view) {
+                    bucket_records.emplace_back(rec.row.GetInt32(0),
+                                                
std::string(rec.row.GetString(1)));
+                }
+            }
+        }
+
+        ASSERT_EQ(bucket_records.size(), 6u) << "Expected 6 records via 
per-bucket iteration";
+        EXPECT_GT(buckets_with_data, 1u) << "Records should be distributed 
across multiple buckets";
+
+        std::sort(bucket_records.begin(), bucket_records.end());
+        EXPECT_EQ(bucket_records, expected);
+    }
+
+    // Test unsubscribe
+    ASSERT_OK(log_scanner.Unsubscribe(0));
+
+    // Verify unsubscribe_partition fails on a non-partitioned table
+    auto unsub_result = log_scanner.UnsubscribePartition(0, 0);
+    ASSERT_FALSE(unsub_result.Ok())
+        << "unsubscribe_partition should fail on a non-partitioned table";
+
+    ASSERT_OK(adm.DropTable(table_path, false));
+}
+
+TEST_F(LogTableTest, ListOffsets) {
+    auto& adm = admin();
+    auto& conn = connection();
+
+    fluss::TablePath table_path("fluss", "test_list_offsets_cpp");
+
+    auto schema = fluss::Schema::NewBuilder()
+                      .AddColumn("id", fluss::DataType::Int())
+                      .AddColumn("name", fluss::DataType::String())
+                      .Build();
+
+    auto table_descriptor = fluss::TableDescriptor::NewBuilder()
+                                .SetSchema(schema)
+                                .SetProperty("table.replication.factor", "1")
+                                .Build();
+
+    fluss_test::CreateTable(adm, table_path, table_descriptor);
+
+    // Wait for table initialization
+    std::this_thread::sleep_for(std::chrono::seconds(2));
+
+    // Earliest offset should be 0 for empty table
+    std::unordered_map<int32_t, int64_t> earliest_offsets;
+    ASSERT_OK(adm.ListOffsets(table_path, {0}, fluss::OffsetSpec::Earliest(), 
earliest_offsets));
+    EXPECT_EQ(earliest_offsets[0], 0) << "Earliest offset should be 0 for 
bucket 0";
+
+    // Latest offset should be 0 for empty table
+    std::unordered_map<int32_t, int64_t> latest_offsets;
+    ASSERT_OK(adm.ListOffsets(table_path, {0}, fluss::OffsetSpec::Latest(), 
latest_offsets));
+    EXPECT_EQ(latest_offsets[0], 0) << "Latest offset should be 0 for empty 
table";
+
+    auto before_append_ms =
+        std::chrono::duration_cast<std::chrono::milliseconds>(
+            std::chrono::system_clock::now().time_since_epoch())
+            .count();
+
+    // Append records
+    fluss::Table table;
+    ASSERT_OK(conn.GetTable(table_path, table));
+    auto table_append = table.NewAppend();
+    fluss::AppendWriter append_writer;
+    ASSERT_OK(table_append.CreateWriter(append_writer));
+
+    {
+        auto id_builder = arrow::Int32Builder();
+        id_builder.AppendValues({1, 2, 3}).ok();
+        auto name_builder = arrow::StringBuilder();
+        name_builder.AppendValues({"alice", "bob", "charlie"}).ok();
+
+        auto batch = arrow::RecordBatch::Make(
+            arrow::schema(
+                {arrow::field("id", arrow::int32()), arrow::field("name", 
arrow::utf8())}),
+            3, {id_builder.Finish().ValueOrDie(), 
name_builder.Finish().ValueOrDie()});
+
+        ASSERT_OK(append_writer.AppendArrowBatch(batch));
+    }
+    ASSERT_OK(append_writer.Flush());
+
+    std::this_thread::sleep_for(std::chrono::seconds(1));
+
+    auto after_append_ms =
+        std::chrono::duration_cast<std::chrono::milliseconds>(
+            std::chrono::system_clock::now().time_since_epoch())
+            .count();
+
+    // Latest offset after appending should be 3
+    std::unordered_map<int32_t, int64_t> latest_after;
+    ASSERT_OK(adm.ListOffsets(table_path, {0}, fluss::OffsetSpec::Latest(), 
latest_after));
+    EXPECT_EQ(latest_after[0], 3) << "Latest offset should be 3 after 
appending 3 records";
+
+    // Earliest offset should still be 0
+    std::unordered_map<int32_t, int64_t> earliest_after;
+    ASSERT_OK(adm.ListOffsets(table_path, {0}, fluss::OffsetSpec::Earliest(), 
earliest_after));
+    EXPECT_EQ(earliest_after[0], 0) << "Earliest offset should still be 0";
+
+    // Timestamp before append should resolve to offset 0
+    std::unordered_map<int32_t, int64_t> ts_offsets;
+    ASSERT_OK(adm.ListOffsets(table_path, {0}, 
fluss::OffsetSpec::Timestamp(before_append_ms),
+                              ts_offsets));
+    EXPECT_EQ(ts_offsets[0], 0)
+        << "Timestamp before append should resolve to offset 0";
+
+    // Timestamp after append should resolve to offset 3
+    std::unordered_map<int32_t, int64_t> ts_after_offsets;
+    ASSERT_OK(adm.ListOffsets(table_path, {0}, 
fluss::OffsetSpec::Timestamp(after_append_ms),
+                              ts_after_offsets));
+    EXPECT_EQ(ts_after_offsets[0], 3)
+        << "Timestamp after append should resolve to offset 3";
+
+    ASSERT_OK(adm.DropTable(table_path, false));
+}
+
+TEST_F(LogTableTest, TestProject) {
+    auto& adm = admin();
+    auto& conn = connection();
+
+    fluss::TablePath table_path("fluss", "test_project_cpp");
+
+    auto schema = fluss::Schema::NewBuilder()
+                      .AddColumn("col_a", fluss::DataType::Int())
+                      .AddColumn("col_b", fluss::DataType::String())
+                      .AddColumn("col_c", fluss::DataType::Int())
+                      .Build();
+
+    auto table_descriptor = fluss::TableDescriptor::NewBuilder()
+                                .SetSchema(schema)
+                                .SetProperty("table.replication.factor", "1")
+                                .Build();
+
+    fluss_test::CreateTable(adm, table_path, table_descriptor);
+
+    fluss::Table table;
+    ASSERT_OK(conn.GetTable(table_path, table));
+
+    // Append 3 records
+    auto table_append = table.NewAppend();
+    fluss::AppendWriter append_writer;
+    ASSERT_OK(table_append.CreateWriter(append_writer));
+
+    {
+        auto col_a_builder = arrow::Int32Builder();
+        col_a_builder.AppendValues({1, 2, 3}).ok();
+        auto col_b_builder = arrow::StringBuilder();
+        col_b_builder.AppendValues({"x", "y", "z"}).ok();
+        auto col_c_builder = arrow::Int32Builder();
+        col_c_builder.AppendValues({10, 20, 30}).ok();
+
+        auto batch = arrow::RecordBatch::Make(
+            arrow::schema({arrow::field("col_a", arrow::int32()),
+                           arrow::field("col_b", arrow::utf8()),
+                           arrow::field("col_c", arrow::int32())}),
+            3,
+            {col_a_builder.Finish().ValueOrDie(), 
col_b_builder.Finish().ValueOrDie(),
+             col_c_builder.Finish().ValueOrDie()});
+
+        ASSERT_OK(append_writer.AppendArrowBatch(batch));
+    }
+    ASSERT_OK(append_writer.Flush());
+
+    // Test project_by_name: select col_b and col_c only
+    {
+        fluss::Table proj_table;
+        ASSERT_OK(conn.GetTable(table_path, proj_table));
+        auto scan = proj_table.NewScan();
+        scan.ProjectByName({"col_b", "col_c"});
+        fluss::LogScanner scanner;
+        ASSERT_OK(scan.CreateLogScanner(scanner));
+
+        ASSERT_OK(scanner.Subscribe(0, 0));
+
+        fluss::ScanRecords records;
+        ASSERT_OK(scanner.Poll(10000, records));
+
+        ASSERT_EQ(records.Count(), 3u) << "Should have 3 records with 
project_by_name";
+
+        std::vector<std::string> expected_col_b = {"x", "y", "z"};
+        std::vector<int32_t> expected_col_c = {10, 20, 30};
+
+        // Collect and sort by col_c to get deterministic order
+        std::vector<std::pair<std::string, int32_t>> collected;
+        for (auto rec : records) {
+            collected.emplace_back(std::string(rec.row.GetString(0)), 
rec.row.GetInt32(1));
+        }
+        std::sort(collected.begin(), collected.end(),
+                  [](const auto& a, const auto& b) { return a.second < 
b.second; });
+
+        for (size_t i = 0; i < 3; ++i) {
+            EXPECT_EQ(collected[i].first, expected_col_b[i]) << "col_b 
mismatch at index " << i;
+            EXPECT_EQ(collected[i].second, expected_col_c[i]) << "col_c 
mismatch at index " << i;
+        }
+    }
+
+    // Test project by column indices: select col_b (1) and col_a (0) in that 
order
+    {
+        fluss::Table proj_table;
+        ASSERT_OK(conn.GetTable(table_path, proj_table));
+        auto scan = proj_table.NewScan();
+        scan.ProjectByIndex({1, 0});
+        fluss::LogScanner scanner;
+        ASSERT_OK(scan.CreateLogScanner(scanner));
+
+        ASSERT_OK(scanner.Subscribe(0, 0));
+
+        fluss::ScanRecords records;
+        ASSERT_OK(scanner.Poll(10000, records));
+
+        ASSERT_EQ(records.Count(), 3u);
+
+        std::vector<std::string> expected_col_b = {"x", "y", "z"};
+        std::vector<int32_t> expected_col_a = {1, 2, 3};
+
+        std::vector<std::pair<std::string, int32_t>> collected;
+        for (auto rec : records) {
+            collected.emplace_back(std::string(rec.row.GetString(0)), 
rec.row.GetInt32(1));
+        }
+        std::sort(collected.begin(), collected.end(),
+                  [](const auto& a, const auto& b) { return a.second < 
b.second; });
+
+        for (size_t i = 0; i < 3; ++i) {
+            EXPECT_EQ(collected[i].first, expected_col_b[i]) << "col_b 
mismatch at index " << i;
+            EXPECT_EQ(collected[i].second, expected_col_a[i]) << "col_a 
mismatch at index " << i;
+        }
+    }
+
+    ASSERT_OK(adm.DropTable(table_path, false));
+}
+
+TEST_F(LogTableTest, TestPollBatches) {
+    auto& adm = admin();
+    auto& conn = connection();
+
+    fluss::TablePath table_path("fluss", "test_poll_batches_cpp");
+
+    auto schema = fluss::Schema::NewBuilder()
+                      .AddColumn("id", fluss::DataType::Int())
+                      .AddColumn("name", fluss::DataType::String())
+                      .Build();
+
+    auto table_descriptor = fluss::TableDescriptor::NewBuilder()
+                                .SetSchema(schema)
+                                .SetProperty("table.replication.factor", "1")
+                                .Build();
+
+    fluss_test::CreateTable(adm, table_path, table_descriptor);
+
+    std::this_thread::sleep_for(std::chrono::seconds(1));
+
+    fluss::Table table;
+    ASSERT_OK(conn.GetTable(table_path, table));
+
+    auto scan = table.NewScan();
+    fluss::LogScanner scanner;
+    ASSERT_OK(scan.CreateRecordBatchLogScanner(scanner));
+    ASSERT_OK(scanner.Subscribe(0, 0));
+
+    // Test 1: Empty table should return empty result
+    {
+        fluss::ArrowRecordBatches batches;
+        ASSERT_OK(scanner.PollRecordBatch(500, batches));
+        ASSERT_TRUE(batches.Empty());
+    }
+
+    // Append data
+    auto table_append = table.NewAppend();
+    fluss::AppendWriter writer;
+    ASSERT_OK(table_append.CreateWriter(writer));
+
+    auto make_batch = [](std::vector<int32_t> ids, std::vector<std::string> 
names) {
+        auto id_builder = arrow::Int32Builder();
+        id_builder.AppendValues(ids).ok();
+        auto name_builder = arrow::StringBuilder();
+        name_builder.AppendValues(names).ok();
+        return arrow::RecordBatch::Make(
+            arrow::schema(
+                {arrow::field("id", arrow::int32()), arrow::field("name", 
arrow::utf8())}),
+            static_cast<int64_t>(ids.size()),
+            {id_builder.Finish().ValueOrDie(), 
name_builder.Finish().ValueOrDie()});
+    };
+
+    ASSERT_OK(writer.AppendArrowBatch(make_batch({1, 2}, {"a", "b"})));
+    ASSERT_OK(writer.AppendArrowBatch(make_batch({3, 4}, {"c", "d"})));
+    ASSERT_OK(writer.AppendArrowBatch(make_batch({5, 6}, {"e", "f"})));
+    ASSERT_OK(writer.Flush());
+
+    // Extract ids from Arrow batches
+    auto extract_ids = [](const fluss::ArrowRecordBatches& batches) {
+        std::vector<int32_t> ids;
+        for (const auto& batch : batches) {
+            auto arr =
+                
std::static_pointer_cast<arrow::Int32Array>(batch->GetArrowRecordBatch()->column(0));
+            for (int64_t i = 0; i < arr->length(); ++i) {
+                ids.push_back(arr->Value(i));
+            }
+        }
+        return ids;
+    };
+
+    // Test 2: Poll until we get all 6 records
+    std::vector<int32_t> all_ids;
+    fluss_test::PollRecordBatches(scanner, 6, extract_ids, all_ids);
+    ASSERT_EQ(all_ids, (std::vector<int32_t>{1, 2, 3, 4, 5, 6}));
+
+    // Test 3: Append more and verify offset continuation (no duplicates)
+    ASSERT_OK(writer.AppendArrowBatch(make_batch({7, 8}, {"g", "h"})));
+    ASSERT_OK(writer.Flush());
+
+    std::vector<int32_t> new_ids;
+    fluss_test::PollRecordBatches(scanner, 2, extract_ids, new_ids);
+    ASSERT_EQ(new_ids, (std::vector<int32_t>{7, 8}));
+
+    // Test 4: Subscribing from mid-offset should truncate batch
+    {
+        fluss::Table trunc_table;
+        ASSERT_OK(conn.GetTable(table_path, trunc_table));
+        auto trunc_scan = trunc_table.NewScan();
+        fluss::LogScanner trunc_scanner;
+        ASSERT_OK(trunc_scan.CreateRecordBatchLogScanner(trunc_scanner));
+        ASSERT_OK(trunc_scanner.Subscribe(0, 3));
+
+        std::vector<int32_t> trunc_ids;
+        fluss_test::PollRecordBatches(trunc_scanner, 5, extract_ids, 
trunc_ids);
+        ASSERT_EQ(trunc_ids, (std::vector<int32_t>{4, 5, 6, 7, 8}));
+    }
+
+    // Test 5: Projection should only return requested columns
+    {
+        fluss::Table proj_table;
+        ASSERT_OK(conn.GetTable(table_path, proj_table));
+        auto proj_scan = proj_table.NewScan();
+        proj_scan.ProjectByName({"id"});
+        fluss::LogScanner proj_scanner;
+        ASSERT_OK(proj_scan.CreateRecordBatchLogScanner(proj_scanner));
+        ASSERT_OK(proj_scanner.Subscribe(0, 0));
+
+        fluss::ArrowRecordBatches proj_batches;
+        ASSERT_OK(proj_scanner.PollRecordBatch(10000, proj_batches));
+
+        ASSERT_FALSE(proj_batches.Empty());
+        EXPECT_EQ(proj_batches[0]->GetArrowRecordBatch()->num_columns(), 1)
+            << "Projected batch should have 1 column (id), not 2";
+    }
+
+    ASSERT_OK(adm.DropTable(table_path, false));
+}
+
+TEST_F(LogTableTest, AllSupportedDatatypes) {
+    auto& adm = admin();
+    auto& conn = connection();
+
+    fluss::TablePath table_path("fluss", "test_log_all_datatypes_cpp");
+
+    // Create a log table with all supported datatypes
+    auto schema =
+        fluss::Schema::NewBuilder()
+            .AddColumn("col_tinyint", fluss::DataType::TinyInt())
+            .AddColumn("col_smallint", fluss::DataType::SmallInt())
+            .AddColumn("col_int", fluss::DataType::Int())
+            .AddColumn("col_bigint", fluss::DataType::BigInt())
+            .AddColumn("col_float", fluss::DataType::Float())
+            .AddColumn("col_double", fluss::DataType::Double())
+            .AddColumn("col_boolean", fluss::DataType::Boolean())
+            .AddColumn("col_char", fluss::DataType::Char(10))
+            .AddColumn("col_string", fluss::DataType::String())
+            .AddColumn("col_decimal", fluss::DataType::Decimal(10, 2))
+            .AddColumn("col_date", fluss::DataType::Date())
+            .AddColumn("col_time", fluss::DataType::Time())
+            .AddColumn("col_timestamp", fluss::DataType::Timestamp())
+            .AddColumn("col_timestamp_ltz", fluss::DataType::TimestampLtz())
+            .AddColumn("col_bytes", fluss::DataType::Bytes())
+            .AddColumn("col_binary", fluss::DataType::Binary(4))
+            .Build();
+
+    auto table_descriptor = fluss::TableDescriptor::NewBuilder()
+                                .SetSchema(schema)
+                                .SetProperty("table.replication.factor", "1")
+                                .Build();
+
+    fluss_test::CreateTable(adm, table_path, table_descriptor);
+
+    fluss::Table table;
+    ASSERT_OK(conn.GetTable(table_path, table));
+
+    size_t field_count = table.GetTableInfo().schema.columns.size();
+
+    auto table_append = table.NewAppend();
+    fluss::AppendWriter append_writer;
+    ASSERT_OK(table_append.CreateWriter(append_writer));
+
+    // Test data
+    int32_t col_tinyint = 127;
+    int32_t col_smallint = 32767;
+    int32_t col_int = 2147483647;
+    int64_t col_bigint = 9223372036854775807LL;
+    float col_float = 3.14f;
+    double col_double = 2.718281828459045;
+    bool col_boolean = true;
+    std::string col_char = "hello";
+    std::string col_string = "world of fluss rust client";
+    std::string col_decimal = "123.45";
+    auto col_date = fluss::Date::FromDays(20476);           // 2026-01-23
+    auto col_time = fluss::Time::FromMillis(36827000);       // 10:13:47
+    auto col_timestamp = fluss::Timestamp::FromMillisNanos(1769163227123, 
456000);
+    auto col_timestamp_ltz = fluss::Timestamp::FromMillisNanos(1769163227123, 
456000);
+    std::vector<uint8_t> col_bytes = {'b', 'i', 'n', 'a', 'r', 'y', ' ', 'd', 
'a', 't', 'a'};
+    std::vector<uint8_t> col_binary = {0xDE, 0xAD, 0xBE, 0xEF};
+
+    // Append a row with all datatypes
+    {
+        fluss::GenericRow row(field_count);
+        row.SetInt32(0, col_tinyint);
+        row.SetInt32(1, col_smallint);
+        row.SetInt32(2, col_int);
+        row.SetInt64(3, col_bigint);
+        row.SetFloat32(4, col_float);
+        row.SetFloat64(5, col_double);
+        row.SetBool(6, col_boolean);
+        row.SetString(7, col_char);
+        row.SetString(8, col_string);
+        row.SetDecimal(9, col_decimal);
+        row.SetDate(10, col_date);
+        row.SetTime(11, col_time);
+        row.SetTimestampNtz(12, col_timestamp);
+        row.SetTimestampLtz(13, col_timestamp_ltz);
+        row.SetBytes(14, col_bytes);
+        row.SetBytes(15, col_binary);
+        ASSERT_OK(append_writer.Append(row));
+    }
+
+    // Append a row with null values
+    {
+        fluss::GenericRow row_with_nulls(field_count);
+        for (size_t i = 0; i < field_count; ++i) {
+            row_with_nulls.SetNull(i);
+        }
+        ASSERT_OK(append_writer.Append(row_with_nulls));
+    }
+
+    ASSERT_OK(append_writer.Flush());
+
+    // Scan the records
+    fluss::Table scan_table;
+    ASSERT_OK(conn.GetTable(table_path, scan_table));
+    auto table_scan = scan_table.NewScan();
+    fluss::LogScanner log_scanner;
+    ASSERT_OK(table_scan.CreateLogScanner(log_scanner));
+    ASSERT_OK(log_scanner.Subscribe(0, 0));
+
+    // Poll until we get 2 records
+    std::vector<fluss::ScanRecord> all_records;
+    fluss_test::PollRecords(log_scanner, 2,
+        [](const fluss::ScanRecord& rec) { return rec; }, all_records);
+    ASSERT_EQ(all_records.size(), 2u) << "Expected 2 records";
+
+    // Verify first record (all values)
+    auto& row = all_records[0].row;
+
+    EXPECT_EQ(row.GetInt32(0), col_tinyint) << "col_tinyint mismatch";
+    EXPECT_EQ(row.GetInt32(1), col_smallint) << "col_smallint mismatch";
+    EXPECT_EQ(row.GetInt32(2), col_int) << "col_int mismatch";
+    EXPECT_EQ(row.GetInt64(3), col_bigint) << "col_bigint mismatch";
+    EXPECT_NEAR(row.GetFloat32(4), col_float, 1e-6f) << "col_float mismatch";
+    EXPECT_NEAR(row.GetFloat64(5), col_double, 1e-15) << "col_double mismatch";
+    EXPECT_EQ(row.GetBool(6), col_boolean) << "col_boolean mismatch";
+    EXPECT_EQ(row.GetString(7), col_char) << "col_char mismatch";
+    EXPECT_EQ(row.GetString(8), col_string) << "col_string mismatch";
+    EXPECT_EQ(row.GetDecimalString(9), col_decimal) << "col_decimal mismatch";
+    EXPECT_EQ(row.GetDate(10).days_since_epoch, col_date.days_since_epoch) << 
"col_date mismatch";
+    EXPECT_EQ(row.GetTime(11).millis_since_midnight, 
col_time.millis_since_midnight)
+        << "col_time mismatch";
+    EXPECT_EQ(row.GetTimestamp(12).epoch_millis, col_timestamp.epoch_millis)
+        << "col_timestamp millis mismatch";
+    EXPECT_EQ(row.GetTimestamp(12).nano_of_millisecond, 
col_timestamp.nano_of_millisecond)
+        << "col_timestamp nanos mismatch";
+    EXPECT_EQ(row.GetTimestamp(13).epoch_millis, 
col_timestamp_ltz.epoch_millis)
+        << "col_timestamp_ltz millis mismatch";
+    EXPECT_EQ(row.GetTimestamp(13).nano_of_millisecond, 
col_timestamp_ltz.nano_of_millisecond)
+        << "col_timestamp_ltz nanos mismatch";
+
+    auto [bytes_ptr, bytes_len] = row.GetBytes(14);
+    EXPECT_EQ(bytes_len, col_bytes.size()) << "col_bytes length mismatch";
+    EXPECT_TRUE(std::memcmp(bytes_ptr, col_bytes.data(), bytes_len) == 0)
+        << "col_bytes mismatch";
+
+    auto [binary_ptr, binary_len] = row.GetBytes(15);
+    EXPECT_EQ(binary_len, col_binary.size()) << "col_binary length mismatch";
+    EXPECT_TRUE(std::memcmp(binary_ptr, col_binary.data(), binary_len) == 0)
+        << "col_binary mismatch";
+
+    // Verify second record (all nulls)
+    auto& null_row = all_records[1].row;
+    for (size_t i = 0; i < field_count; ++i) {
+        EXPECT_TRUE(null_row.IsNull(i)) << "column " << i << " should be null";
+    }
+
+    ASSERT_OK(adm.DropTable(table_path, false));
+}
+
+TEST_F(LogTableTest, PartitionedTableAppendScan) {
+    auto& adm = admin();
+    auto& conn = connection();
+
+    fluss::TablePath table_path("fluss", "test_partitioned_log_append_cpp");
+
+    // Create a partitioned log table
+    auto schema = fluss::Schema::NewBuilder()
+                      .AddColumn("id", fluss::DataType::Int())
+                      .AddColumn("region", fluss::DataType::String())
+                      .AddColumn("value", fluss::DataType::BigInt())
+                      .Build();
+
+    auto table_descriptor = fluss::TableDescriptor::NewBuilder()
+                                .SetSchema(schema)
+                                .SetPartitionKeys({"region"})
+                                .SetProperty("table.replication.factor", "1")
+                                .Build();
+
+    fluss_test::CreateTable(adm, table_path, table_descriptor);
+
+    // Create partitions
+    fluss_test::CreatePartitions(adm, table_path, "region", {"US", "EU"});
+
+    // Wait for partitions
+    std::this_thread::sleep_for(std::chrono::seconds(2));
+
+    fluss::Table table;
+    ASSERT_OK(conn.GetTable(table_path, table));
+
+    auto table_append = table.NewAppend();
+    fluss::AppendWriter append_writer;
+    ASSERT_OK(table_append.CreateWriter(append_writer));
+
+    // Append rows
+    struct TestData {
+        int32_t id;
+        std::string region;
+        int64_t value;
+    };
+    std::vector<TestData> test_data = {{1, "US", 100}, {2, "US", 200}, {3, 
"EU", 300}, {4, "EU", 400}};
+
+    for (const auto& d : test_data) {
+        fluss::GenericRow row(3);
+        row.SetInt32(0, d.id);
+        row.SetString(1, d.region);
+        row.SetInt64(2, d.value);
+        ASSERT_OK(append_writer.Append(row));
+    }
+    ASSERT_OK(append_writer.Flush());
+
+    // Append arrow batches per partition
+    {
+        auto id_builder = arrow::Int32Builder();
+        id_builder.AppendValues({5, 6}).ok();
+        auto region_builder = arrow::StringBuilder();
+        region_builder.AppendValues({"US", "US"}).ok();
+        auto value_builder = arrow::Int64Builder();
+        value_builder.AppendValues({500, 600}).ok();
+
+        auto batch = arrow::RecordBatch::Make(
+            arrow::schema({arrow::field("id", arrow::int32()),
+                           arrow::field("region", arrow::utf8()),
+                           arrow::field("value", arrow::int64())}),
+            2,
+            {id_builder.Finish().ValueOrDie(), 
region_builder.Finish().ValueOrDie(),
+             value_builder.Finish().ValueOrDie()});
+
+        ASSERT_OK(append_writer.AppendArrowBatch(batch));
+    }
+
+    {
+        auto id_builder = arrow::Int32Builder();
+        id_builder.AppendValues({7, 8}).ok();
+        auto region_builder = arrow::StringBuilder();
+        region_builder.AppendValues({"EU", "EU"}).ok();
+        auto value_builder = arrow::Int64Builder();
+        value_builder.AppendValues({700, 800}).ok();
+
+        auto batch = arrow::RecordBatch::Make(
+            arrow::schema({arrow::field("id", arrow::int32()),
+                           arrow::field("region", arrow::utf8()),
+                           arrow::field("value", arrow::int64())}),
+            2,
+            {id_builder.Finish().ValueOrDie(), 
region_builder.Finish().ValueOrDie(),
+             value_builder.Finish().ValueOrDie()});
+
+        ASSERT_OK(append_writer.AppendArrowBatch(batch));
+    }
+    ASSERT_OK(append_writer.Flush());
+
+    // Test list partition offsets
+    std::unordered_map<int32_t, int64_t> us_offsets;
+    ASSERT_OK(adm.ListPartitionOffsets(table_path, "US", {0}, 
fluss::OffsetSpec::Latest(),
+                                       us_offsets));
+    EXPECT_EQ(us_offsets[0], 4) << "US partition should have 4 records";
+
+    std::unordered_map<int32_t, int64_t> eu_offsets;
+    ASSERT_OK(adm.ListPartitionOffsets(table_path, "EU", {0}, 
fluss::OffsetSpec::Latest(),
+                                       eu_offsets));
+    EXPECT_EQ(eu_offsets[0], 4) << "EU partition should have 4 records";
+
+    // Subscribe to all partitions and scan
+    fluss::Table scan_table;
+    ASSERT_OK(conn.GetTable(table_path, scan_table));
+    auto table_scan = scan_table.NewScan();
+    fluss::LogScanner log_scanner;
+    ASSERT_OK(table_scan.CreateLogScanner(log_scanner));
+
+    std::vector<fluss::PartitionInfo> partition_infos;
+    ASSERT_OK(adm.ListPartitionInfos(table_path, partition_infos));
+
+    for (const auto& pi : partition_infos) {
+        ASSERT_OK(log_scanner.SubscribePartitionBuckets(pi.partition_id, 0, 
0));
+    }
+
+    // Collect all records
+    using Record = std::tuple<int32_t, std::string, int64_t>;
+    auto extract_record = [](const fluss::ScanRecord& rec) -> Record {
+        return {rec.row.GetInt32(0), std::string(rec.row.GetString(1)), 
rec.row.GetInt64(2)};
+    };
+    std::vector<Record> collected;
+    fluss_test::PollRecords(log_scanner, 8, extract_record, collected);
+
+    ASSERT_EQ(collected.size(), 8u) << "Expected 8 records total";
+    std::sort(collected.begin(), collected.end());
+
+    std::vector<Record> expected = {{1, "US", 100},  {2, "US", 200},  {3, 
"EU", 300},
+                                    {4, "EU", 400},  {5, "US", 500},  {6, 
"US", 600},
+                                    {7, "EU", 700},  {8, "EU", 800}};
+    EXPECT_EQ(collected, expected);
+
+    // Test unsubscribe_partition: unsubscribe EU, should only get US data
+    {
+        fluss::Table unsub_table;
+        ASSERT_OK(conn.GetTable(table_path, unsub_table));
+        auto unsub_scan = unsub_table.NewScan();
+        fluss::LogScanner unsub_scanner;
+        ASSERT_OK(unsub_scan.CreateLogScanner(unsub_scanner));
+
+        int64_t eu_partition_id = -1;
+        for (const auto& pi : partition_infos) {
+            ASSERT_OK(unsub_scanner.SubscribePartitionBuckets(pi.partition_id, 
0, 0));
+            if (pi.partition_name == "EU") {
+                eu_partition_id = pi.partition_id;
+            }
+        }
+        ASSERT_GE(eu_partition_id, 0) << "EU partition should exist";
+
+        ASSERT_OK(unsub_scanner.UnsubscribePartition(eu_partition_id, 0));
+
+        std::vector<Record> us_only;
+        fluss_test::PollRecords(unsub_scanner, 4, extract_record, us_only);
+
+        ASSERT_EQ(us_only.size(), 4u) << "Should receive exactly 4 US records";
+        for (const auto& [id, region, val] : us_only) {
+            EXPECT_EQ(region, "US") << "After unsubscribe EU, only US data 
should be read";
+        }
+    }
+
+    // Test subscribe_partition_buckets (batch subscribe)
+    {
+        fluss::Table batch_table;
+        ASSERT_OK(conn.GetTable(table_path, batch_table));
+        auto batch_scan = batch_table.NewScan();
+        fluss::LogScanner batch_scanner;
+        ASSERT_OK(batch_scan.CreateLogScanner(batch_scanner));
+
+        std::vector<fluss::PartitionBucketSubscription> subs;
+        for (const auto& pi : partition_infos) {
+            subs.push_back({pi.partition_id, 0, 0});
+        }
+        ASSERT_OK(batch_scanner.SubscribePartitionBuckets(subs));
+
+        std::vector<Record> batch_collected;
+        fluss_test::PollRecords(batch_scanner, 8, extract_record, 
batch_collected);
+        ASSERT_EQ(batch_collected.size(), 8u);
+        std::sort(batch_collected.begin(), batch_collected.end());
+        EXPECT_EQ(batch_collected, expected);
+    }
+
+    ASSERT_OK(adm.DropTable(table_path, false));
+}
diff --git a/bindings/cpp/test/test_main.cpp b/bindings/cpp/test/test_main.cpp
new file mode 100644
index 0000000..8c2e2d9
--- /dev/null
+++ b/bindings/cpp/test/test_main.cpp
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include "test_utils.h"
+
+int main(int argc, char** argv) {
+    ::testing::InitGoogleTest(&argc, argv);
+
+    // Register the global test environment (manages the Fluss cluster 
lifecycle).
+    
::testing::AddGlobalTestEnvironment(fluss_test::FlussTestEnvironment::Instance());
+
+    return RUN_ALL_TESTS();
+}
diff --git a/bindings/cpp/test/test_utils.h b/bindings/cpp/test/test_utils.h
new file mode 100644
index 0000000..bae5237
--- /dev/null
+++ b/bindings/cpp/test/test_utils.h
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <gtest/gtest.h>
+
+#include <algorithm>
+#include <chrono>
+#include <cstdio>
+#include <cstdlib>
+#include <cstring>
+#include <string>
+#include <thread>
+#include <vector>
+
+#ifdef _WIN32
+#include <winsock2.h>
+#include <ws2tcpip.h>
+#pragma comment(lib, "ws2_32.lib")
+#else
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <sys/socket.h>
+#include <unistd.h>
+#endif
+
+#include "fluss.hpp"
+
+// Macro to assert Result is OK and print error message on failure
+#define ASSERT_OK(result) ASSERT_TRUE((result).Ok()) << (result).error_message
+#define EXPECT_OK(result) EXPECT_TRUE((result).Ok()) << (result).error_message
+
+namespace fluss_test {
+
+static constexpr const char* kFlussVersion = "0.7.0";
+static constexpr const char* kNetworkName = "fluss-cpp-test-network";
+static constexpr const char* kZookeeperName = "zookeeper-cpp-test";
+static constexpr const char* kCoordinatorName = "coordinator-server-cpp-test";
+static constexpr const char* kTabletServerName = "tablet-server-cpp-test";
+static constexpr int kCoordinatorPort = 9123;
+static constexpr int kTabletServerPort = 9124;
+
+/// Execute a shell command and return its exit code.
+inline int RunCommand(const std::string& cmd) {
+    return system(cmd.c_str());
+}
+
+/// Wait until a TCP port is accepting connections, or timeout.
+inline bool WaitForPort(const std::string& host, int port, int timeout_seconds 
= 60) {
+    auto deadline =
+        std::chrono::steady_clock::now() + 
std::chrono::seconds(timeout_seconds);
+
+    while (std::chrono::steady_clock::now() < deadline) {
+        int sock = socket(AF_INET, SOCK_STREAM, 0);
+        if (sock < 0) {
+            std::this_thread::sleep_for(std::chrono::milliseconds(500));
+            continue;
+        }
+
+        struct sockaddr_in addr {};
+        addr.sin_family = AF_INET;
+        addr.sin_port = htons(static_cast<uint16_t>(port));
+        inet_pton(AF_INET, host.c_str(), &addr.sin_addr);
+
+        int result = connect(sock, reinterpret_cast<struct sockaddr*>(&addr), 
sizeof(addr));
+#ifdef _WIN32
+        closesocket(sock);
+#else
+        close(sock);
+#endif
+        if (result == 0) {
+            return true;
+        }
+
+        std::this_thread::sleep_for(std::chrono::milliseconds(500));
+    }
+    return false;
+}
+
+/// Manages a Docker-based Fluss cluster for integration testing.
+class FlussTestCluster {
+   public:
+    FlussTestCluster() = default;
+
+    bool Start() {
+        const char* env_servers = std::getenv("FLUSS_BOOTSTRAP_SERVERS");
+        if (env_servers && std::strlen(env_servers) > 0) {
+            bootstrap_servers_ = env_servers;
+            external_cluster_ = true;
+            std::cout << "Using external cluster: " << bootstrap_servers_ << 
std::endl;
+            return true;
+        }
+
+        std::cout << "Starting Fluss cluster via Docker..." << std::endl;
+
+        // Create network
+        RunCommand(std::string("docker network create ") + kNetworkName + " 
2>/dev/null || true");
+
+        // Start ZooKeeper
+        std::string zk_cmd = std::string("docker run -d --rm") +
+                              " --name " + kZookeeperName +
+                              " --network " + kNetworkName +
+                              " zookeeper:3.9.2";
+        if (RunCommand(zk_cmd) != 0) {
+            std::cerr << "Failed to start ZooKeeper" << std::endl;
+            return false;
+        }
+
+        // Wait for ZooKeeper to be ready before starting Fluss servers
+        std::this_thread::sleep_for(std::chrono::seconds(5));
+
+        // Start Coordinator Server
+        std::string coord_props =
+            "zookeeper.address: " + std::string(kZookeeperName) + ":2181\\n"
+            "bind.listeners: INTERNAL://" + std::string(kCoordinatorName) + 
":0, CLIENT://" +
+            std::string(kCoordinatorName) + ":9123\\n"
+            "advertised.listeners: CLIENT://localhost:9123\\n"
+            "internal.listener.name: INTERNAL\\n"
+            "netty.server.num-network-threads: 1\\n"
+            "netty.server.num-worker-threads: 3";
+
+        std::string coord_cmd = std::string("docker run -d --rm") +
+                                " --name " + kCoordinatorName +
+                                " --network " + kNetworkName +
+                                " -p 9123:9123" +
+                                " -e FLUSS_PROPERTIES=\"$(printf '" + 
coord_props + "')\"" +
+                                " fluss/fluss:" + kFlussVersion +
+                                " coordinatorServer";
+        if (RunCommand(coord_cmd) != 0) {
+            std::cerr << "Failed to start Coordinator Server" << std::endl;
+            Stop();
+            return false;
+        }
+
+        // Wait for coordinator to be ready
+        if (!WaitForPort("127.0.0.1", kCoordinatorPort)) {
+            std::cerr << "Coordinator Server did not become ready" << 
std::endl;
+            Stop();
+            return false;
+        }
+
+        // Start Tablet Server
+        std::string ts_props =
+            "zookeeper.address: " + std::string(kZookeeperName) + ":2181\\n"
+            "bind.listeners: INTERNAL://" + std::string(kTabletServerName) + 
":0, CLIENT://" +
+            std::string(kTabletServerName) + ":9123\\n"
+            "advertised.listeners: CLIENT://localhost:" + 
std::to_string(kTabletServerPort) + "\\n"
+            "internal.listener.name: INTERNAL\\n"
+            "tablet-server.id: 0\\n"
+            "netty.server.num-network-threads: 1\\n"
+            "netty.server.num-worker-threads: 3";
+
+        std::string ts_cmd = std::string("docker run -d --rm") +
+                             " --name " + kTabletServerName +
+                             " --network " + kNetworkName +
+                             " -p " + std::to_string(kTabletServerPort) + 
":9123" +
+                             " -e FLUSS_PROPERTIES=\"$(printf '" + ts_props + 
"')\"" +
+                             " fluss/fluss:" + kFlussVersion +
+                             " tabletServer";
+        if (RunCommand(ts_cmd) != 0) {
+            std::cerr << "Failed to start Tablet Server" << std::endl;
+            Stop();
+            return false;
+        }
+
+        // Wait for tablet server to be ready
+        if (!WaitForPort("127.0.0.1", kTabletServerPort)) {
+            std::cerr << "Tablet Server did not become ready" << std::endl;
+            Stop();
+            return false;
+        }
+
+        bootstrap_servers_ = "127.0.0.1:9123";
+        std::cout << "Fluss cluster started successfully." << std::endl;
+        return true;
+    }
+
+    void Stop() {
+        if (external_cluster_) return;
+
+        std::cout << "Stopping Fluss cluster..." << std::endl;
+        RunCommand(std::string("docker stop ") + kTabletServerName + " 
2>/dev/null || true");
+        RunCommand(std::string("docker stop ") + kCoordinatorName + " 
2>/dev/null || true");
+        RunCommand(std::string("docker stop ") + kZookeeperName + " 
2>/dev/null || true");
+        RunCommand(std::string("docker network rm ") + kNetworkName + " 
2>/dev/null || true");
+        std::cout << "Fluss cluster stopped." << std::endl;
+    }
+
+    const std::string& GetBootstrapServers() const { return 
bootstrap_servers_; }
+
+   private:
+    std::string bootstrap_servers_;
+    bool external_cluster_{false};
+};
+
+/// GoogleTest Environment that manages the Fluss cluster lifecycle.
+class FlussTestEnvironment : public ::testing::Environment {
+   public:
+    static FlussTestEnvironment* Instance() {
+        static FlussTestEnvironment* instance = nullptr;
+        if (!instance) {
+            instance = new FlussTestEnvironment();
+        }
+        return instance;
+    }
+
+    void SetUp() override {
+        if (!cluster_.Start()) {
+            GTEST_SKIP() << "Failed to start Fluss cluster. Skipping 
integration tests.";
+        }
+
+        // Retry connection creation until the coordinator is fully 
initialized.
+        fluss::Configuration config;
+        config.bootstrap_servers = cluster_.GetBootstrapServers();
+
+        auto deadline =
+            std::chrono::steady_clock::now() + std::chrono::seconds(60);
+        while (std::chrono::steady_clock::now() < deadline) {
+            auto result = fluss::Connection::Create(config, connection_);
+            if (result.Ok()) {
+                auto admin_result = connection_.GetAdmin(admin_);
+                if (admin_result.Ok()) {
+                    std::cout << "Connected to Fluss cluster." << std::endl;
+                    return;
+                }
+            }
+            std::cout << "Waiting for Fluss cluster to be ready..." << 
std::endl;
+            std::this_thread::sleep_for(std::chrono::seconds(2));
+        }
+        GTEST_SKIP() << "Fluss cluster did not become ready within timeout.";
+    }
+
+    void TearDown() override {
+        cluster_.Stop();
+    }
+
+    fluss::Connection& GetConnection() { return connection_; }
+    fluss::Admin& GetAdmin() { return admin_; }
+    const std::string& GetBootstrapServers() { return 
cluster_.GetBootstrapServers(); }
+
+   private:
+    FlussTestEnvironment() = default;
+
+    FlussTestCluster cluster_;
+    fluss::Connection connection_;
+    fluss::Admin admin_;
+};
+
+/// Helper: create a table (assert success). Drops existing table first if it 
exists.
+inline void CreateTable(fluss::Admin& admin, const fluss::TablePath& path,
+                        const fluss::TableDescriptor& descriptor) {
+    admin.DropTable(path, true);  // ignore if not exists
+    auto result = admin.CreateTable(path, descriptor, false);
+    ASSERT_OK(result);
+}
+
+/// Helper: create partitions for a partitioned table.
+inline void CreatePartitions(fluss::Admin& admin, const fluss::TablePath& path,
+                             const std::string& partition_column,
+                             const std::vector<std::string>& values) {
+    for (const auto& value : values) {
+        std::unordered_map<std::string, std::string> spec;
+        spec[partition_column] = value;
+        auto result = admin.CreatePartition(path, spec, true);
+        ASSERT_OK(result);
+    }
+}
+
+/// Poll a LogScanner for ScanRecords until `expected_count` items are 
collected or timeout.
+/// `extract_fn` is called for each ScanRecord and should return a value of 
type T.
+template <typename T, typename ExtractFn>
+void PollRecords(fluss::LogScanner& scanner, size_t expected_count,
+                 ExtractFn extract_fn, std::vector<T>& out) {
+    auto deadline = std::chrono::steady_clock::now() + 
std::chrono::seconds(10);
+    while (out.size() < expected_count && std::chrono::steady_clock::now() < 
deadline) {
+        fluss::ScanRecords records;
+        ASSERT_OK(scanner.Poll(1000, records));
+        for (auto rec : records) {
+            out.push_back(extract_fn(rec));
+        }
+    }
+}
+
+/// Poll a LogScanner for ArrowRecordBatches until `expected_count` items are 
collected or timeout.
+/// `extract_fn` is called with the full ArrowRecordBatches and should return 
a std::vector<T>.
+template <typename T, typename ExtractFn>
+void PollRecordBatches(fluss::LogScanner& scanner, size_t expected_count,
+                       ExtractFn extract_fn, std::vector<T>& out) {
+    auto deadline = std::chrono::steady_clock::now() + 
std::chrono::seconds(10);
+    while (out.size() < expected_count && std::chrono::steady_clock::now() < 
deadline) {
+        fluss::ArrowRecordBatches batches;
+        ASSERT_OK(scanner.PollRecordBatch(1000, batches));
+        auto items = extract_fn(batches);
+        out.insert(out.end(), items.begin(), items.end());
+    }
+}
+
+}  // namespace fluss_test
diff --git a/bindings/python/test/test_log_table.py 
b/bindings/python/test/test_log_table.py
index bfa9789..dd1a4d4 100644
--- a/bindings/python/test/test_log_table.py
+++ b/bindings/python/test/test_log_table.py
@@ -36,7 +36,9 @@ async def test_append_and_scan(connection, admin):
     schema = fluss.Schema(
         pa.schema([pa.field("c1", pa.int32()), pa.field("c2", pa.string())])
     )
-    table_descriptor = fluss.TableDescriptor(schema)
+    table_descriptor = fluss.TableDescriptor(
+        schema, bucket_count=3, bucket_keys=["c1"]
+    )
     await admin.create_table(table_path, table_descriptor, 
ignore_if_exists=False)
 
     table = await connection.get_table(table_path)
diff --git a/crates/fluss/tests/integration/log_table.rs 
b/crates/fluss/tests/integration/log_table.rs
index 7642067..eac72e5 100644
--- a/crates/fluss/tests/integration/log_table.rs
+++ b/crates/fluss/tests/integration/log_table.rs
@@ -39,7 +39,7 @@ mod table_test {
     };
     use arrow::array::record_batch;
     use fluss::client::{EARLIEST_OFFSET, FlussTable, TableScan};
-    use fluss::metadata::{DataTypes, Schema, TableBucket, TableDescriptor, 
TablePath};
+    use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
     use fluss::record::ScanRecord;
     use fluss::row::InternalRow;
     use fluss::rpc::message::OffsetSpec;
@@ -79,6 +79,7 @@ mod table_test {
                     .build()
                     .expect("Failed to build schema"),
             )
+            .distributed_by(Some(3), vec!["c1".to_string()])
             .build()
             .expect("Failed to build table");
 
@@ -127,38 +128,34 @@ mod table_test {
                 .expect("Failed to subscribe with EARLIEST_OFFSET");
         }
 
-        // Poll for records
-        let scan_records = log_scanner
-            .poll(tokio::time::Duration::from_secs(10))
-            .await
-            .expect("Failed to poll records");
-
-        // Verify the scanned records
-        let table_bucket = TableBucket::new(table.get_table_info().table_id, 
0);
-        let records = scan_records.records(&table_bucket);
-
-        assert_eq!(records.len(), 6, "Expected 6 records");
-
-        // Verify record contents match what was appended
-        let expected_c1_values = vec![1, 2, 3, 4, 5, 6];
-        let expected_c2_values = vec!["a1", "a2", "a3", "a4", "a5", "a6"];
-
-        for (i, record) in records.iter().enumerate() {
-            let row = record.row();
-            assert_eq!(
-                row.get_int(0),
-                expected_c1_values[i],
-                "c1 value mismatch at row {}",
-                i
-            );
-            assert_eq!(
-                row.get_string(1),
-                expected_c2_values[i],
-                "c2 value mismatch at row {}",
-                i
-            );
+        // Poll for records across all buckets
+        let mut collected: Vec<(i32, String)> = Vec::new();
+        let start_time = std::time::Instant::now();
+        while collected.len() < 6 && start_time.elapsed() < 
Duration::from_secs(10) {
+            let scan_records = log_scanner
+                .poll(Duration::from_millis(500))
+                .await
+                .expect("Failed to poll records");
+            for rec in scan_records {
+                let row = rec.row();
+                collected.push((row.get_int(0), 
row.get_string(1).to_string()));
+            }
         }
 
+        assert_eq!(collected.len(), 6, "Expected 6 records");
+
+        // Sort and verify record contents
+        collected.sort();
+        let expected: Vec<(i32, String)> = vec![
+            (1, "a1".to_string()),
+            (2, "a2".to_string()),
+            (3, "a3".to_string()),
+            (4, "a4".to_string()),
+            (5, "a5".to_string()),
+            (6, "a6".to_string()),
+        ];
+        assert_eq!(collected, expected);
+
         // Test unsubscribe: unsubscribe from bucket 0, verify no error
         log_scanner
             .unsubscribe(0)

Reply via email to