This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new f39d68c chore: introduce c++ integration test (#352)
f39d68c is described below
commit f39d68ced231c34487978bb9f7e842aba7b0fccc
Author: Keith Lee <[email protected]>
AuthorDate: Sun Feb 22 02:55:42 2026 +0000
chore: introduce c++ integration test (#352)
---
.github/workflows/build_and_test_cpp.yml | 78 ++
.github/workflows/build_and_test_python.yml | 81 ++
.github/workflows/build_and_test_rust.yml | 85 +++
.../{docs-check.yml => check_documentation.yml} | 4 +-
.github/workflows/check_license_and_formatting.yml | 60 ++
.github/workflows/ci.yml | 187 -----
bindings/cpp/CMakeLists.txt | 23 +
bindings/cpp/test/test_admin.cpp | 331 ++++++++
bindings/cpp/test/test_kv_table.cpp | 733 ++++++++++++++++++
bindings/cpp/test/test_log_table.cpp | 831 +++++++++++++++++++++
bindings/cpp/test/test_main.cpp | 31 +
bindings/cpp/test/test_utils.h | 315 ++++++++
bindings/python/test/test_log_table.py | 4 +-
crates/fluss/tests/integration/log_table.rs | 59 +-
14 files changed, 2601 insertions(+), 221 deletions(-)
diff --git a/.github/workflows/build_and_test_cpp.yml
b/.github/workflows/build_and_test_cpp.yml
new file mode 100644
index 0000000..5cdd14d
--- /dev/null
+++ b/.github/workflows/build_and_test_cpp.yml
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: C++ Build and Tests
+
+on:
+ push:
+ branches:
+ - main
+ paths-ignore:
+ - 'website/**'
+ - '**/*.md'
+ pull_request:
+ branches:
+ - main
+ paths-ignore:
+ - 'website/**'
+ - '**/*.md'
+ - 'bindings/python/**'
+ workflow_dispatch:
+
+concurrency:
+ group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }}
+ cancel-in-progress: true
+
+jobs:
+ build-and-test-cpp:
+ timeout-minutes: 60
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v4
+
+ - name: Install protoc
+ run: sudo apt-get update && sudo apt-get install -y protobuf-compiler
+
+ - name: Install Apache Arrow C++
+ run: |
+ sudo apt-get install -y -V ca-certificates lsb-release wget
+ wget https://apache.jfrog.io/artifactory/arrow/$(lsb_release --id
--short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release
--codename --short).deb
+ sudo apt-get install -y -V
./apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb
+ sudo apt-get update
+ sudo apt-get install -y -V libarrow-dev
+
+ - name: Rust Cache
+ uses: actions/cache@v4
+ with:
+ path: |
+ ~/.cargo/registry
+ ~/.cargo/git
+ target
+ key: cpp-test-${{ runner.os }}-${{ hashFiles('**/Cargo.lock') }}
+
+ - name: Build C++ bindings and tests
+ working-directory: bindings/cpp
+ run: |
+ cmake -B build -DFLUSS_ENABLE_TESTING=ON -DCMAKE_BUILD_TYPE=Debug
+ cmake --build build --parallel
+
+ - name: Run C++ integration tests
+ working-directory: bindings/cpp
+ run: cd build && ctest --output-on-failure --timeout 300
+ env:
+ RUST_LOG: DEBUG
+ RUST_BACKTRACE: full
diff --git a/.github/workflows/build_and_test_python.yml
b/.github/workflows/build_and_test_python.yml
new file mode 100644
index 0000000..efb5caa
--- /dev/null
+++ b/.github/workflows/build_and_test_python.yml
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: Python Build and Tests
+
+on:
+ push:
+ branches:
+ - main
+ paths-ignore:
+ - 'website/**'
+ - '**/*.md'
+ pull_request:
+ branches:
+ - main
+ paths-ignore:
+ - 'website/**'
+ - '**/*.md'
+ - 'bindings/cpp/**'
+ workflow_dispatch:
+
+concurrency:
+ group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }}
+ cancel-in-progress: true
+
+jobs:
+ build-and-test-python:
+ timeout-minutes: 60
+ runs-on: ubuntu-latest
+ strategy:
+ matrix:
+ python: ["3.9", "3.10", "3.11", "3.12"]
+ steps:
+ - uses: actions/checkout@v4
+
+ - name: Set up Python
+ uses: actions/setup-python@v5
+ with:
+ python-version: ${{ matrix.python }}
+
+ - name: Install uv
+ uses: astral-sh/setup-uv@v4
+
+ - name: Install protoc
+ run: sudo apt-get update && sudo apt-get install -y protobuf-compiler
+
+ - name: Rust Cache
+ uses: actions/cache@v4
+ with:
+ path: |
+ ~/.cargo/registry
+ ~/.cargo/git
+ target
+ key: python-test-${{ runner.os }}-${{ matrix.python }}-${{
hashFiles('**/Cargo.lock') }}
+
+ - name: Build Python bindings
+ working-directory: bindings/python
+ run: |
+ uv sync --extra dev
+ uv run maturin develop
+
+ - name: Run Python integration tests
+ working-directory: bindings/python
+ run: uv run pytest test/ -v
+ env:
+ RUST_LOG: DEBUG
+ RUST_BACKTRACE: full
diff --git a/.github/workflows/build_and_test_rust.yml
b/.github/workflows/build_and_test_rust.yml
new file mode 100644
index 0000000..c9e05b7
--- /dev/null
+++ b/.github/workflows/build_and_test_rust.yml
@@ -0,0 +1,85 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: Rust Build and Tests
+
+on:
+ push:
+ branches:
+ - main
+ paths-ignore:
+ - 'website/**'
+ - '**/*.md'
+ pull_request:
+ branches:
+ - main
+ paths-ignore:
+ - 'website/**'
+ - '**/*.md'
+ - 'bindings/python/**'
+ - 'bindings/cpp/**'
+ workflow_dispatch:
+
+concurrency:
+ group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }}
+ cancel-in-progress: true
+
+jobs:
+ build-and-test-rust:
+ timeout-minutes: 60
+ runs-on: ${{ matrix.os }}
+ strategy:
+ matrix:
+ os:
+ - ubuntu-latest
+ - macos-latest
+ steps:
+ - uses: actions/checkout@v4
+
+ - name: Install protoc
+ run: |
+ if [ "$RUNNER_OS" = "Linux" ]; then
+ sudo apt-get update && sudo apt-get install -y protobuf-compiler
+ elif [ "$RUNNER_OS" = "macOS" ]; then
+ brew install protobuf
+ fi
+
+ - name: Rust Cache
+ uses: actions/cache@v4
+ with:
+ path: |
+ ~/.cargo/registry
+ ~/.cargo/git
+ target
+ key: rust-${{ runner.os }}-${{ hashFiles('**/Cargo.lock') }}
+
+ - name: Build
+ run: cargo build --workspace --all-targets --exclude fluss_python
--exclude fluss-cpp
+
+ - name: Unit Test
+ run: cargo test --all-targets --workspace --exclude fluss_python
--exclude fluss-cpp
+ env:
+ RUST_LOG: DEBUG
+ RUST_BACKTRACE: full
+
+ - name: Integration Test (Linux only)
+ if: runner.os == 'Linux'
+ run: |
+ RUST_TEST_THREADS=1 cargo test --features integration_tests
--all-targets --workspace --exclude fluss_python --exclude fluss-cpp --
--nocapture
+ env:
+ RUST_LOG: DEBUG
+ RUST_BACKTRACE: full
diff --git a/.github/workflows/docs-check.yml
b/.github/workflows/check_documentation.yml
similarity index 96%
rename from .github/workflows/docs-check.yml
rename to .github/workflows/check_documentation.yml
index 6408c54..70e6a43 100644
--- a/.github/workflows/docs-check.yml
+++ b/.github/workflows/check_documentation.yml
@@ -17,7 +17,7 @@
################################################################################
# This workflow is meant for checking broken links in the documentation.
-name: Check Documentation
+name: Documentation Check
permissions:
contents: read
on:
@@ -31,7 +31,7 @@ on:
- 'website/**'
jobs:
- test-deploy:
+ check-documentation:
runs-on: ubuntu-latest
defaults:
run:
diff --git a/.github/workflows/check_license_and_formatting.yml
b/.github/workflows/check_license_and_formatting.yml
new file mode 100644
index 0000000..1b83b74
--- /dev/null
+++ b/.github/workflows/check_license_and_formatting.yml
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: License and Formatting Check
+
+on:
+ push:
+ branches:
+ - main
+ paths-ignore:
+ - 'website/**'
+ - '**/*.md'
+ pull_request:
+ branches:
+ - main
+ workflow_dispatch:
+
+concurrency:
+ group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }}
+ cancel-in-progress: true
+
+jobs:
+ check-license-and-formatting:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v4
+
+ - name: Check License Header
+ uses: apache/skywalking-eyes/[email protected]
+
+ - name: Install cargo-deny
+ uses: taiki-e/install-action@v2
+ with:
+ tool: [email protected]
+
+ - name: Check dependency licenses (Apache-compatible)
+ run: cargo deny check licenses
+
+ - name: Install protoc
+ run: sudo apt-get update && sudo apt-get install -y protobuf-compiler
+
+ - name: Format
+ run: cargo fmt --all -- --check
+
+ - name: Clippy
+ run: cargo clippy --all-targets --workspace -- -D warnings
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
deleted file mode 100644
index d51e3c0..0000000
--- a/.github/workflows/ci.yml
+++ /dev/null
@@ -1,187 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-name: CI
-
-on:
- push:
- branches:
- - main
- paths-ignore:
- - 'website/**'
- - '**/*.md'
- pull_request:
- branches:
- - main
- paths-ignore:
- - 'website/**'
- - '**/*.md'
-
-concurrency:
- group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }}
- cancel-in-progress: true
-
-jobs:
- check:
- runs-on: ubuntu-latest
- steps:
- - uses: actions/checkout@v4
-
- - name: Check License Header
- uses: apache/skywalking-eyes/[email protected]
-
- - name: Install cargo-deny
- uses: taiki-e/install-action@v2
- with:
- tool: [email protected]
-
- - name: Check dependency licenses (Apache-compatible)
- run: cargo deny check licenses
-
- - name: Install protoc
- run: sudo apt-get update && sudo apt-get install -y protobuf-compiler
-
- - name: Format
- run: cargo fmt --all -- --check
-
- - name: Clippy
- run: cargo clippy --all-targets --workspace -- -D warnings
-
- build:
- runs-on: ${{ matrix.os }}
- strategy:
- matrix:
- os:
- - ubuntu-latest
- - macos-latest
- python: ["3.11", "3.12", "3.13"]
- steps:
- - uses: actions/checkout@v4
-
- - name: Set up Python
- uses: actions/setup-python@v5
- with:
- python-version: ${{ matrix.python }}
-
- - name: Install protoc
- run: |
- if [ "$RUNNER_OS" = "Linux" ]; then
- sudo apt-get update && sudo apt-get install -y protobuf-compiler
- elif [ "$RUNNER_OS" = "macOS" ]; then
- brew install protobuf
- fi
-
- - name: Rust Cache
- uses: actions/cache@v4
- with:
- path: |
- ~/.cargo/registry
- ~/.cargo/git
- target
- key: build-${{ runner.os }}-${{ matrix.python }}-${{
hashFiles('**/Cargo.lock') }}
-
- - name: Build
- run: cargo build --workspace --all-targets
-
- test:
- runs-on: ${{ matrix.os }}
- strategy:
- matrix:
- os:
- - ubuntu-latest
- - macos-latest
- python: ["3.11", "3.12", "3.13"]
- steps:
- - uses: actions/checkout@v4
-
- - name: Set up Python
- uses: actions/setup-python@v5
- with:
- python-version: ${{ matrix.python }}
-
- - name: Install protoc
- run: |
- if [ "$RUNNER_OS" = "Linux" ]; then
- sudo apt-get update && sudo apt-get install -y protobuf-compiler
- elif [ "$RUNNER_OS" = "macOS" ]; then
- brew install protobuf
- fi
-
- - name: Rust Cache
- uses: actions/cache@v4
- with:
- path: |
- ~/.cargo/registry
- ~/.cargo/git
- target
- key: test-${{ runner.os }}-${{ matrix.python }}-${{
hashFiles('**/Cargo.lock') }}
-
- - name: Unit Test
- run: cargo test --all-targets --workspace
- env:
- RUST_LOG: DEBUG
- RUST_BACKTRACE: full
-
- - name: Integration Test (Linux only)
- if: runner.os == 'Linux'
- run: |
- RUST_TEST_THREADS=1 cargo test --features integration_tests
--all-targets --workspace -- --nocapture
- env:
- RUST_LOG: DEBUG
- RUST_BACKTRACE: full
-
- python-integration-test:
- timeout-minutes: 60
- runs-on: ubuntu-latest
- strategy:
- matrix:
- python: ["3.9", "3.10", "3.11", "3.12"]
- steps:
- - uses: actions/checkout@v4
-
- - name: Set up Python
- uses: actions/setup-python@v5
- with:
- python-version: ${{ matrix.python }}
-
- - name: Install uv
- uses: astral-sh/setup-uv@v4
-
- - name: Install protoc
- run: sudo apt-get update && sudo apt-get install -y protobuf-compiler
-
- - name: Rust Cache
- uses: actions/cache@v4
- with:
- path: |
- ~/.cargo/registry
- ~/.cargo/git
- target
- key: python-test-${{ runner.os }}-${{ matrix.python }}-${{
hashFiles('**/Cargo.lock') }}
-
- - name: Build Python bindings
- working-directory: bindings/python
- run: |
- uv sync --extra dev
- uv run maturin develop
-
- - name: Run Python integration tests
- working-directory: bindings/python
- run: uv run pytest test/ -v
- env:
- RUST_LOG: DEBUG
- RUST_BACKTRACE: full
diff --git a/bindings/cpp/CMakeLists.txt b/bindings/cpp/CMakeLists.txt
index a8f527e..6bd9fc7 100644
--- a/bindings/cpp/CMakeLists.txt
+++ b/bindings/cpp/CMakeLists.txt
@@ -123,3 +123,26 @@ if (FLUSS_ENABLE_ADDRESS_SANITIZER)
target_compile_options(fluss_cpp PRIVATE -fsanitize=leak,address,undefined
-fno-omit-frame-pointer -fno-common -O1)
target_link_options(fluss_cpp PRIVATE -fsanitize=leak,address,undefined)
endif()
+
+if (FLUSS_ENABLE_TESTING)
+ FetchContent_Declare(
+ googletest
+ URL
https://github.com/google/googletest/archive/refs/tags/v${FLUSS_GOOGLETEST_VERSION}.tar.gz
+ )
+ set(gtest_force_shared_crt ON CACHE BOOL "" FORCE)
+ FetchContent_MakeAvailable(googletest)
+
+ enable_testing()
+
+ file(GLOB TEST_SOURCE_FILES "test/*.cpp")
+ add_executable(fluss_cpp_test ${TEST_SOURCE_FILES})
+ target_link_libraries(fluss_cpp_test PRIVATE fluss_cpp GTest::gtest)
+ target_link_libraries(fluss_cpp_test PRIVATE Arrow::arrow_shared)
+ target_compile_definitions(fluss_cpp_test PRIVATE ARROW_FOUND)
+ target_include_directories(fluss_cpp_test PRIVATE
+ ${CPP_INCLUDE_DIR}
+ ${PROJECT_SOURCE_DIR}/test
+ )
+
+ add_test(NAME fluss_cpp_integration_tests COMMAND fluss_cpp_test)
+endif()
diff --git a/bindings/cpp/test/test_admin.cpp b/bindings/cpp/test/test_admin.cpp
new file mode 100644
index 0000000..b6bb25b
--- /dev/null
+++ b/bindings/cpp/test/test_admin.cpp
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include "test_utils.h"
+
+class AdminTest : public ::testing::Test {
+ protected:
+ fluss::Admin& admin() { return
fluss_test::FlussTestEnvironment::Instance()->GetAdmin(); }
+};
+
+TEST_F(AdminTest, CreateDatabase) {
+ auto& adm = admin();
+
+ std::string db_name = "test_create_database_cpp";
+
+ // Database should not exist initially
+ bool exists = true;
+ ASSERT_OK(adm.DatabaseExists(db_name, exists));
+ ASSERT_FALSE(exists);
+
+ // Create database with descriptor
+ fluss::DatabaseDescriptor descriptor;
+ descriptor.comment = "test_db";
+ descriptor.properties = {{"k1", "v1"}, {"k2", "v2"}};
+ ASSERT_OK(adm.CreateDatabase(db_name, descriptor, false));
+
+ // Database should exist now
+ ASSERT_OK(adm.DatabaseExists(db_name, exists));
+ ASSERT_TRUE(exists);
+
+ // Get database info
+ fluss::DatabaseInfo db_info;
+ ASSERT_OK(adm.GetDatabaseInfo(db_name, db_info));
+ EXPECT_EQ(db_info.database_name, db_name);
+ EXPECT_EQ(db_info.comment, "test_db");
+ EXPECT_EQ(db_info.properties.at("k1"), "v1");
+ EXPECT_EQ(db_info.properties.at("k2"), "v2");
+
+ // Drop database
+ ASSERT_OK(adm.DropDatabase(db_name, false, true));
+
+ // Database should not exist now
+ ASSERT_OK(adm.DatabaseExists(db_name, exists));
+ ASSERT_FALSE(exists);
+}
+
+TEST_F(AdminTest, CreateTable) {
+ auto& adm = admin();
+
+ std::string db_name = "test_create_table_cpp_db";
+ fluss::DatabaseDescriptor db_desc;
+ db_desc.comment = "Database for test_create_table";
+
+ bool exists = false;
+ ASSERT_OK(adm.DatabaseExists(db_name, exists));
+ ASSERT_FALSE(exists);
+
+ ASSERT_OK(adm.CreateDatabase(db_name, db_desc, false));
+
+ std::string table_name = "test_user_table";
+ fluss::TablePath table_path(db_name, table_name);
+
+ // Build schema
+ auto schema = fluss::Schema::NewBuilder()
+ .AddColumn("id", fluss::DataType::Int())
+ .AddColumn("name", fluss::DataType::String())
+ .AddColumn("age", fluss::DataType::Int(), "User's age
(optional)")
+ .AddColumn("email", fluss::DataType::String())
+ .SetPrimaryKeys({"id"})
+ .Build();
+
+ // Build table descriptor
+ auto table_descriptor = fluss::TableDescriptor::NewBuilder()
+ .SetSchema(schema)
+ .SetComment("Test table for user data (id,
name, age, email)")
+ .SetBucketCount(3)
+ .SetBucketKeys({"id"})
+ .SetProperty("table.replication.factor", "1")
+ .SetLogFormat("arrow")
+ .SetKvFormat("indexed")
+ .Build();
+
+ // Create table
+ ASSERT_OK(adm.CreateTable(table_path, table_descriptor, false));
+
+ // Table should exist
+ ASSERT_OK(adm.TableExists(table_path, exists));
+ ASSERT_TRUE(exists);
+
+ // List tables
+ std::vector<std::string> tables;
+ ASSERT_OK(adm.ListTables(db_name, tables));
+ ASSERT_EQ(tables.size(), 1u);
+ EXPECT_TRUE(std::find(tables.begin(), tables.end(), table_name) !=
tables.end());
+
+ // Get table info
+ fluss::TableInfo table_info;
+ ASSERT_OK(adm.GetTableInfo(table_path, table_info));
+
+ EXPECT_EQ(table_info.comment, "Test table for user data (id, name, age,
email)");
+ EXPECT_EQ(table_info.primary_keys, std::vector<std::string>{"id"});
+ EXPECT_EQ(table_info.num_buckets, 3);
+ EXPECT_EQ(table_info.bucket_keys, std::vector<std::string>{"id"});
+
+ // Drop table
+ ASSERT_OK(adm.DropTable(table_path, false));
+ ASSERT_OK(adm.TableExists(table_path, exists));
+ ASSERT_FALSE(exists);
+
+ // Drop database
+ ASSERT_OK(adm.DropDatabase(db_name, false, true));
+ ASSERT_OK(adm.DatabaseExists(db_name, exists));
+ ASSERT_FALSE(exists);
+}
+
+TEST_F(AdminTest, PartitionApis) {
+ auto& adm = admin();
+
+ std::string db_name = "test_partition_apis_cpp_db";
+ fluss::DatabaseDescriptor db_desc;
+ db_desc.comment = "Database for test_partition_apis";
+ ASSERT_OK(adm.CreateDatabase(db_name, db_desc, true));
+
+ fluss::TablePath table_path(db_name, "partitioned_table");
+
+ auto schema = fluss::Schema::NewBuilder()
+ .AddColumn("id", fluss::DataType::Int())
+ .AddColumn("name", fluss::DataType::String())
+ .AddColumn("dt", fluss::DataType::String())
+ .AddColumn("region", fluss::DataType::String())
+ .SetPrimaryKeys({"id", "dt", "region"})
+ .Build();
+
+ auto table_descriptor = fluss::TableDescriptor::NewBuilder()
+ .SetSchema(schema)
+ .SetBucketCount(3)
+ .SetBucketKeys({"id"})
+ .SetPartitionKeys({"dt", "region"})
+ .SetProperty("table.replication.factor", "1")
+ .SetLogFormat("arrow")
+ .SetKvFormat("compacted")
+ .Build();
+
+ ASSERT_OK(adm.CreateTable(table_path, table_descriptor, true));
+
+ // No partitions initially
+ std::vector<fluss::PartitionInfo> partitions;
+ ASSERT_OK(adm.ListPartitionInfos(table_path, partitions));
+ ASSERT_TRUE(partitions.empty());
+
+ // Create a partition
+ std::unordered_map<std::string, std::string> partition_spec = {
+ {"dt", "2024-01-15"}, {"region", "EMEA"}};
+ ASSERT_OK(adm.CreatePartition(table_path, partition_spec, false));
+
+ // Should have one partition
+ ASSERT_OK(adm.ListPartitionInfos(table_path, partitions));
+ ASSERT_EQ(partitions.size(), 1u);
+ EXPECT_EQ(partitions[0].partition_name, "2024-01-15$EMEA");
+
+ // List with partial spec filter - should find the partition
+ std::unordered_map<std::string, std::string> partial_spec = {{"dt",
"2024-01-15"}};
+ std::vector<fluss::PartitionInfo> partitions_with_spec;
+ ASSERT_OK(adm.ListPartitionInfos(table_path, partial_spec,
partitions_with_spec));
+ ASSERT_EQ(partitions_with_spec.size(), 1u);
+ EXPECT_EQ(partitions_with_spec[0].partition_name, "2024-01-15$EMEA");
+
+ // List with non-matching spec - should find no partitions
+ std::unordered_map<std::string, std::string> non_matching_spec = {{"dt",
"2024-01-16"}};
+ std::vector<fluss::PartitionInfo> empty_partitions;
+ ASSERT_OK(adm.ListPartitionInfos(table_path, non_matching_spec,
empty_partitions));
+ ASSERT_TRUE(empty_partitions.empty());
+
+ // Drop partition
+ ASSERT_OK(adm.DropPartition(table_path, partition_spec, false));
+
+ ASSERT_OK(adm.ListPartitionInfos(table_path, partitions));
+ ASSERT_TRUE(partitions.empty());
+
+ // Cleanup
+ ASSERT_OK(adm.DropTable(table_path, true));
+ ASSERT_OK(adm.DropDatabase(db_name, true, true));
+}
+
+TEST_F(AdminTest, FlussErrorResponse) {
+ auto& adm = admin();
+
+ fluss::TablePath table_path("fluss", "not_exist_cpp");
+
+ fluss::TableInfo info;
+ auto result = adm.GetTableInfo(table_path, info);
+ ASSERT_FALSE(result.Ok());
+ EXPECT_EQ(result.error_code, fluss::ErrorCode::TABLE_NOT_EXIST);
+}
+
+TEST_F(AdminTest, ErrorDatabaseNotExist) {
+ auto& adm = admin();
+
+ // get_database_info for non-existent database
+ fluss::DatabaseInfo info;
+ auto result = adm.GetDatabaseInfo("no_such_db_cpp", info);
+ ASSERT_FALSE(result.Ok());
+ EXPECT_EQ(result.error_code, fluss::ErrorCode::DATABASE_NOT_EXIST);
+
+ // drop_database without ignore flag
+ result = adm.DropDatabase("no_such_db_cpp", false, false);
+ ASSERT_FALSE(result.Ok());
+ EXPECT_EQ(result.error_code, fluss::ErrorCode::DATABASE_NOT_EXIST);
+
+ // list_tables for non-existent database
+ std::vector<std::string> tables;
+ result = adm.ListTables("no_such_db_cpp", tables);
+ ASSERT_FALSE(result.Ok());
+ EXPECT_EQ(result.error_code, fluss::ErrorCode::DATABASE_NOT_EXIST);
+}
+
+TEST_F(AdminTest, ErrorDatabaseAlreadyExist) {
+ auto& adm = admin();
+
+ std::string db_name = "test_error_db_already_exist_cpp";
+ fluss::DatabaseDescriptor descriptor;
+
+ ASSERT_OK(adm.CreateDatabase(db_name, descriptor, false));
+
+ // Create same database again without ignore flag
+ auto result = adm.CreateDatabase(db_name, descriptor, false);
+ ASSERT_FALSE(result.Ok());
+ EXPECT_EQ(result.error_code, fluss::ErrorCode::DATABASE_ALREADY_EXIST);
+
+ // With ignore flag should succeed
+ ASSERT_OK(adm.CreateDatabase(db_name, descriptor, true));
+
+ // Cleanup
+ ASSERT_OK(adm.DropDatabase(db_name, true, true));
+}
+
+TEST_F(AdminTest, ErrorTableAlreadyExist) {
+ auto& adm = admin();
+
+ std::string db_name = "test_error_tbl_already_exist_cpp_db";
+ fluss::DatabaseDescriptor db_desc;
+ ASSERT_OK(adm.CreateDatabase(db_name, db_desc, true));
+
+ fluss::TablePath table_path(db_name, "my_table");
+ auto schema = fluss::Schema::NewBuilder()
+ .AddColumn("id", fluss::DataType::Int())
+ .AddColumn("name", fluss::DataType::String())
+ .Build();
+ auto table_desc = fluss::TableDescriptor::NewBuilder()
+ .SetSchema(schema)
+ .SetBucketCount(1)
+ .SetProperty("table.replication.factor", "1")
+ .Build();
+
+ ASSERT_OK(adm.CreateTable(table_path, table_desc, false));
+
+ // Create same table again without ignore flag
+ auto result = adm.CreateTable(table_path, table_desc, false);
+ ASSERT_FALSE(result.Ok());
+ EXPECT_EQ(result.error_code, fluss::ErrorCode::TABLE_ALREADY_EXIST);
+
+ // With ignore flag should succeed
+ ASSERT_OK(adm.CreateTable(table_path, table_desc, true));
+
+ // Cleanup
+ ASSERT_OK(adm.DropTable(table_path, true));
+ ASSERT_OK(adm.DropDatabase(db_name, true, true));
+}
+
+TEST_F(AdminTest, ErrorTableNotExist) {
+ auto& adm = admin();
+
+ fluss::TablePath table_path("fluss", "no_such_table_cpp");
+
+ // Drop without ignore flag
+ auto result = adm.DropTable(table_path, false);
+ ASSERT_FALSE(result.Ok());
+ EXPECT_EQ(result.error_code, fluss::ErrorCode::TABLE_NOT_EXIST);
+
+ // Drop with ignore flag should succeed
+ ASSERT_OK(adm.DropTable(table_path, true));
+}
+
+TEST_F(AdminTest, ErrorTableNotPartitioned) {
+ auto& adm = admin();
+
+ std::string db_name = "test_error_not_partitioned_cpp_db";
+ fluss::DatabaseDescriptor db_desc;
+ ASSERT_OK(adm.CreateDatabase(db_name, db_desc, true));
+
+ fluss::TablePath table_path(db_name, "non_partitioned_table");
+ auto schema = fluss::Schema::NewBuilder()
+ .AddColumn("id", fluss::DataType::Int())
+ .AddColumn("name", fluss::DataType::String())
+ .Build();
+ auto table_desc = fluss::TableDescriptor::NewBuilder()
+ .SetSchema(schema)
+ .SetBucketCount(1)
+ .SetProperty("table.replication.factor", "1")
+ .Build();
+
+ ASSERT_OK(adm.CreateTable(table_path, table_desc, false));
+
+ // list_partition_infos on non-partitioned table
+ std::vector<fluss::PartitionInfo> partitions;
+ auto result = adm.ListPartitionInfos(table_path, partitions);
+ ASSERT_FALSE(result.Ok());
+ EXPECT_EQ(result.error_code,
fluss::ErrorCode::TABLE_NOT_PARTITIONED_EXCEPTION);
+
+ // Cleanup
+ ASSERT_OK(adm.DropTable(table_path, true));
+ ASSERT_OK(adm.DropDatabase(db_name, true, true));
+}
diff --git a/bindings/cpp/test/test_kv_table.cpp
b/bindings/cpp/test/test_kv_table.cpp
new file mode 100644
index 0000000..9c4f7a0
--- /dev/null
+++ b/bindings/cpp/test/test_kv_table.cpp
@@ -0,0 +1,733 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include "test_utils.h"
+
+class KvTableTest : public ::testing::Test {
+ protected:
+ fluss::Admin& admin() { return
fluss_test::FlussTestEnvironment::Instance()->GetAdmin(); }
+
+ fluss::Connection& connection() {
+ return fluss_test::FlussTestEnvironment::Instance()->GetConnection();
+ }
+};
+
+TEST_F(KvTableTest, UpsertDeleteAndLookup) {
+ auto& adm = admin();
+ auto& conn = connection();
+
+ fluss::TablePath table_path("fluss", "test_upsert_and_lookup_cpp");
+
+ auto schema = fluss::Schema::NewBuilder()
+ .AddColumn("id", fluss::DataType::Int())
+ .AddColumn("name", fluss::DataType::String())
+ .AddColumn("age", fluss::DataType::BigInt())
+ .SetPrimaryKeys({"id"})
+ .Build();
+
+ auto table_descriptor = fluss::TableDescriptor::NewBuilder()
+ .SetSchema(schema)
+ .SetProperty("table.replication.factor", "1")
+ .Build();
+
+ fluss_test::CreateTable(adm, table_path, table_descriptor);
+
+ fluss::Table table;
+ ASSERT_OK(conn.GetTable(table_path, table));
+
+ // Create upsert writer
+ auto table_upsert = table.NewUpsert();
+ fluss::UpsertWriter upsert_writer;
+ ASSERT_OK(table_upsert.CreateWriter(upsert_writer));
+
+ // Upsert 3 rows (fire-and-forget, then flush)
+ struct TestData {
+ int32_t id;
+ std::string name;
+ int64_t age;
+ };
+ std::vector<TestData> test_data = {{1, "Verso", 32}, {2, "Noco", 25}, {3,
"Esquie", 35}};
+
+ for (const auto& d : test_data) {
+ fluss::GenericRow row(3);
+ row.SetInt32(0, d.id);
+ row.SetString(1, d.name);
+ row.SetInt64(2, d.age);
+ ASSERT_OK(upsert_writer.Upsert(row));
+ }
+ ASSERT_OK(upsert_writer.Flush());
+
+ // Create lookuper
+ fluss::Lookuper lookuper;
+ ASSERT_OK(table.NewLookup().CreateLookuper(lookuper));
+
+ // Verify lookup results
+ for (const auto& d : test_data) {
+ fluss::GenericRow key(3);
+ key.SetInt32(0, d.id);
+
+ fluss::LookupResult result;
+ ASSERT_OK(lookuper.Lookup(key, result));
+ ASSERT_TRUE(result.Found()) << "Row with id=" << d.id << " should
exist";
+
+ EXPECT_EQ(result.GetInt32(0), d.id) << "id mismatch";
+ EXPECT_EQ(result.GetString(1), d.name) << "name mismatch";
+ EXPECT_EQ(result.GetInt64(2), d.age) << "age mismatch";
+ }
+
+ // Update record with id=1 (await acknowledgment)
+ {
+ fluss::GenericRow updated_row(3);
+ updated_row.SetInt32(0, 1);
+ updated_row.SetString(1, "Verso");
+ updated_row.SetInt64(2, 33);
+ fluss::WriteResult wr;
+ ASSERT_OK(upsert_writer.Upsert(updated_row, wr));
+ ASSERT_OK(wr.Wait());
+ }
+
+ // Verify the update
+ {
+ fluss::GenericRow key(3);
+ key.SetInt32(0, 1);
+ fluss::LookupResult result;
+ ASSERT_OK(lookuper.Lookup(key, result));
+ ASSERT_TRUE(result.Found());
+ EXPECT_EQ(result.GetInt64(2), 33) << "Age should be updated";
+ EXPECT_EQ(result.GetString(1), "Verso") << "Name should remain
unchanged";
+ }
+
+ // Delete record with id=1 (await acknowledgment)
+ {
+ fluss::GenericRow delete_row(3);
+ delete_row.SetInt32(0, 1);
+ fluss::WriteResult wr;
+ ASSERT_OK(upsert_writer.Delete(delete_row, wr));
+ ASSERT_OK(wr.Wait());
+ }
+
+ // Verify deletion
+ {
+ fluss::GenericRow key(3);
+ key.SetInt32(0, 1);
+ fluss::LookupResult result;
+ ASSERT_OK(lookuper.Lookup(key, result));
+ ASSERT_FALSE(result.Found()) << "Record 1 should not exist after
delete";
+ }
+
+ // Verify other records still exist
+ for (int id : {2, 3}) {
+ fluss::GenericRow key(3);
+ key.SetInt32(0, id);
+ fluss::LookupResult result;
+ ASSERT_OK(lookuper.Lookup(key, result));
+ ASSERT_TRUE(result.Found()) << "Record " << id
+ << " should still exist after deleting
record 1";
+ }
+
+ // Lookup non-existent key
+ {
+ fluss::GenericRow key(3);
+ key.SetInt32(0, 999);
+ fluss::LookupResult result;
+ ASSERT_OK(lookuper.Lookup(key, result));
+ ASSERT_FALSE(result.Found()) << "Non-existent key should return not
found";
+ }
+
+ ASSERT_OK(adm.DropTable(table_path, false));
+}
+
+TEST_F(KvTableTest, CompositePrimaryKeys) {
+ auto& adm = admin();
+ auto& conn = connection();
+
+ fluss::TablePath table_path("fluss", "test_composite_pk_cpp");
+
+ auto schema = fluss::Schema::NewBuilder()
+ .AddColumn("region", fluss::DataType::String())
+ .AddColumn("score", fluss::DataType::BigInt())
+ .AddColumn("user_id", fluss::DataType::Int())
+ .SetPrimaryKeys({"region", "user_id"})
+ .Build();
+
+ auto table_descriptor = fluss::TableDescriptor::NewBuilder()
+ .SetSchema(schema)
+ .SetProperty("table.replication.factor", "1")
+ .Build();
+
+ fluss_test::CreateTable(adm, table_path, table_descriptor);
+
+ fluss::Table table;
+ ASSERT_OK(conn.GetTable(table_path, table));
+
+ auto table_upsert = table.NewUpsert();
+ fluss::UpsertWriter upsert_writer;
+ ASSERT_OK(table_upsert.CreateWriter(upsert_writer));
+
+ // Insert records with composite keys
+ struct TestData {
+ std::string region;
+ int32_t user_id;
+ int64_t score;
+ };
+ std::vector<TestData> test_data = {
+ {"US", 1, 100}, {"US", 2, 200}, {"EU", 1, 150}, {"EU", 2, 250}};
+
+ for (const auto& d : test_data) {
+ auto row = table.NewRow();
+ row.Set("region", d.region);
+ row.Set("score", d.score);
+ row.Set("user_id", d.user_id);
+ ASSERT_OK(upsert_writer.Upsert(row));
+ }
+ ASSERT_OK(upsert_writer.Flush());
+
+ // Create lookuper
+ fluss::Lookuper lookuper;
+ ASSERT_OK(table.NewLookup().CreateLookuper(lookuper));
+
+ // Lookup (US, 1) - should return score 100
+ {
+ auto key = table.NewRow();
+ key.Set("region", "US");
+ key.Set("user_id", 1);
+ fluss::LookupResult result;
+ ASSERT_OK(lookuper.Lookup(key, result));
+ ASSERT_TRUE(result.Found());
+ EXPECT_EQ(result.GetInt64("score"), 100) << "Score for (US, 1) should
be 100";
+ }
+
+ // Lookup (EU, 2) - should return score 250
+ {
+ auto key = table.NewRow();
+ key.Set("region", "EU");
+ key.Set("user_id", 2);
+ fluss::LookupResult result;
+ ASSERT_OK(lookuper.Lookup(key, result));
+ ASSERT_TRUE(result.Found());
+ EXPECT_EQ(result.GetInt64("score"), 250) << "Score for (EU, 2) should
be 250";
+ }
+
+ // Update (US, 1) score (await acknowledgment)
+ {
+ auto update_row = table.NewRow();
+ update_row.Set("region", "US");
+ update_row.Set("user_id", 1);
+ update_row.Set("score", static_cast<int64_t>(500));
+ fluss::WriteResult wr;
+ ASSERT_OK(upsert_writer.Upsert(update_row, wr));
+ ASSERT_OK(wr.Wait());
+ }
+
+ // Verify update
+ {
+ auto key = table.NewRow();
+ key.Set("region", "US");
+ key.Set("user_id", 1);
+ fluss::LookupResult result;
+ ASSERT_OK(lookuper.Lookup(key, result));
+ ASSERT_TRUE(result.Found());
+ EXPECT_EQ(result.GetInt64("score"), 500) << "Row score should be
updated";
+ }
+
+ ASSERT_OK(adm.DropTable(table_path, false));
+}
+
+TEST_F(KvTableTest, PartialUpdate) {
+ auto& adm = admin();
+ auto& conn = connection();
+
+ fluss::TablePath table_path("fluss", "test_partial_update_cpp");
+
+ auto schema = fluss::Schema::NewBuilder()
+ .AddColumn("id", fluss::DataType::Int())
+ .AddColumn("name", fluss::DataType::String())
+ .AddColumn("age", fluss::DataType::BigInt())
+ .AddColumn("score", fluss::DataType::BigInt())
+ .SetPrimaryKeys({"id"})
+ .Build();
+
+ auto table_descriptor = fluss::TableDescriptor::NewBuilder()
+ .SetSchema(schema)
+ .SetProperty("table.replication.factor", "1")
+ .Build();
+
+ fluss_test::CreateTable(adm, table_path, table_descriptor);
+
+ fluss::Table table;
+ ASSERT_OK(conn.GetTable(table_path, table));
+
+ // Insert initial record with all columns
+ auto table_upsert = table.NewUpsert();
+ fluss::UpsertWriter upsert_writer;
+ ASSERT_OK(table_upsert.CreateWriter(upsert_writer));
+
+ {
+ fluss::GenericRow row(4);
+ row.SetInt32(0, 1);
+ row.SetString(1, "Verso");
+ row.SetInt64(2, 32);
+ row.SetInt64(3, 6942);
+ fluss::WriteResult wr;
+ ASSERT_OK(upsert_writer.Upsert(row, wr));
+ ASSERT_OK(wr.Wait());
+ }
+
+ // Verify initial record
+ fluss::Lookuper lookuper;
+ ASSERT_OK(table.NewLookup().CreateLookuper(lookuper));
+
+ {
+ fluss::GenericRow key(4);
+ key.SetInt32(0, 1);
+ fluss::LookupResult result;
+ ASSERT_OK(lookuper.Lookup(key, result));
+ ASSERT_TRUE(result.Found());
+ EXPECT_EQ(result.GetInt32(0), 1);
+ EXPECT_EQ(result.GetString(1), "Verso");
+ EXPECT_EQ(result.GetInt64(2), 32);
+ EXPECT_EQ(result.GetInt64(3), 6942);
+ }
+
+ // Create partial update writer to update only score column
+ auto partial_upsert = table.NewUpsert();
+ partial_upsert.PartialUpdateByName({"id", "score"});
+ fluss::UpsertWriter partial_writer;
+ ASSERT_OK(partial_upsert.CreateWriter(partial_writer));
+
+ // Update only the score column (await acknowledgment)
+ {
+ fluss::GenericRow partial_row(4);
+ partial_row.SetInt32(0, 1);
+ partial_row.SetNull(1); // not in partial update
+ partial_row.SetNull(2); // not in partial update
+ partial_row.SetInt64(3, 420);
+ fluss::WriteResult wr;
+ ASSERT_OK(partial_writer.Upsert(partial_row, wr));
+ ASSERT_OK(wr.Wait());
+ }
+
+ // Verify partial update - name and age should remain unchanged
+ {
+ fluss::GenericRow key(4);
+ key.SetInt32(0, 1);
+ fluss::LookupResult result;
+ ASSERT_OK(lookuper.Lookup(key, result));
+ ASSERT_TRUE(result.Found());
+ EXPECT_EQ(result.GetInt32(0), 1) << "id should remain 1";
+ EXPECT_EQ(result.GetString(1), "Verso") << "name should remain
unchanged";
+ EXPECT_EQ(result.GetInt64(2), 32) << "age should remain unchanged";
+ EXPECT_EQ(result.GetInt64(3), 420) << "score should be updated to 420";
+ }
+
+ ASSERT_OK(adm.DropTable(table_path, false));
+}
+
+TEST_F(KvTableTest, PartialUpdateByIndex) {
+ auto& adm = admin();
+ auto& conn = connection();
+
+ fluss::TablePath table_path("fluss", "test_partial_update_by_index_cpp");
+
+ auto schema = fluss::Schema::NewBuilder()
+ .AddColumn("id", fluss::DataType::Int())
+ .AddColumn("name", fluss::DataType::String())
+ .AddColumn("age", fluss::DataType::BigInt())
+ .AddColumn("score", fluss::DataType::BigInt())
+ .SetPrimaryKeys({"id"})
+ .Build();
+
+ auto table_descriptor = fluss::TableDescriptor::NewBuilder()
+ .SetSchema(schema)
+ .SetProperty("table.replication.factor", "1")
+ .Build();
+
+ fluss_test::CreateTable(adm, table_path, table_descriptor);
+
+ fluss::Table table;
+ ASSERT_OK(conn.GetTable(table_path, table));
+
+ // Insert initial record with all columns
+ auto table_upsert = table.NewUpsert();
+ fluss::UpsertWriter upsert_writer;
+ ASSERT_OK(table_upsert.CreateWriter(upsert_writer));
+
+ {
+ fluss::GenericRow row(4);
+ row.SetInt32(0, 1);
+ row.SetString(1, "Verso");
+ row.SetInt64(2, 32);
+ row.SetInt64(3, 6942);
+ fluss::WriteResult wr;
+ ASSERT_OK(upsert_writer.Upsert(row, wr));
+ ASSERT_OK(wr.Wait());
+ }
+
+ // Verify initial record
+ fluss::Lookuper lookuper;
+ ASSERT_OK(table.NewLookup().CreateLookuper(lookuper));
+
+ {
+ fluss::GenericRow key(4);
+ key.SetInt32(0, 1);
+ fluss::LookupResult result;
+ ASSERT_OK(lookuper.Lookup(key, result));
+ ASSERT_TRUE(result.Found());
+ EXPECT_EQ(result.GetInt32(0), 1);
+ EXPECT_EQ(result.GetString(1), "Verso");
+ EXPECT_EQ(result.GetInt64(2), 32);
+ EXPECT_EQ(result.GetInt64(3), 6942);
+ }
+
+ // Create partial update writer using column indices: 0 (id) and 3 (score)
+ auto partial_upsert = table.NewUpsert();
+ partial_upsert.PartialUpdateByIndex({0, 3});
+ fluss::UpsertWriter partial_writer;
+ ASSERT_OK(partial_upsert.CreateWriter(partial_writer));
+
+ // Update only the score column (await acknowledgment)
+ {
+ fluss::GenericRow partial_row(4);
+ partial_row.SetInt32(0, 1);
+ partial_row.SetNull(1); // not in partial update
+ partial_row.SetNull(2); // not in partial update
+ partial_row.SetInt64(3, 420);
+ fluss::WriteResult wr;
+ ASSERT_OK(partial_writer.Upsert(partial_row, wr));
+ ASSERT_OK(wr.Wait());
+ }
+
+ // Verify partial update - name and age should remain unchanged
+ {
+ fluss::GenericRow key(4);
+ key.SetInt32(0, 1);
+ fluss::LookupResult result;
+ ASSERT_OK(lookuper.Lookup(key, result));
+ ASSERT_TRUE(result.Found());
+ EXPECT_EQ(result.GetInt32(0), 1) << "id should remain 1";
+ EXPECT_EQ(result.GetString(1), "Verso") << "name should remain
unchanged";
+ EXPECT_EQ(result.GetInt64(2), 32) << "age should remain unchanged";
+ EXPECT_EQ(result.GetInt64(3), 420) << "score should be updated to 420";
+ }
+
+ ASSERT_OK(adm.DropTable(table_path, false));
+}
+
+TEST_F(KvTableTest, PartitionedTableUpsertAndLookup) {
+ auto& adm = admin();
+ auto& conn = connection();
+
+ fluss::TablePath table_path("fluss", "test_partitioned_kv_table_cpp");
+
+ // Create a partitioned KV table with region as partition key
+ auto schema = fluss::Schema::NewBuilder()
+ .AddColumn("region", fluss::DataType::String())
+ .AddColumn("user_id", fluss::DataType::Int())
+ .AddColumn("name", fluss::DataType::String())
+ .AddColumn("score", fluss::DataType::BigInt())
+ .SetPrimaryKeys({"region", "user_id"})
+ .Build();
+
+ auto table_descriptor = fluss::TableDescriptor::NewBuilder()
+ .SetSchema(schema)
+ .SetPartitionKeys({"region"})
+ .SetProperty("table.replication.factor", "1")
+ .Build();
+
+ fluss_test::CreateTable(adm, table_path, table_descriptor);
+
+ // Create partitions
+ fluss_test::CreatePartitions(adm, table_path, "region", {"US", "EU",
"APAC"});
+
+ fluss::Table table;
+ ASSERT_OK(conn.GetTable(table_path, table));
+
+ auto table_upsert = table.NewUpsert();
+ fluss::UpsertWriter upsert_writer;
+ ASSERT_OK(table_upsert.CreateWriter(upsert_writer));
+
+ // Insert records with different partitions
+ struct TestData {
+ std::string region;
+ int32_t user_id;
+ std::string name;
+ int64_t score;
+ };
+ std::vector<TestData> test_data = {{"US", 1, "Gustave", 100}, {"US", 2,
"Lune", 200},
+ {"EU", 1, "Sciel", 150}, {"EU", 2,
"Maelle", 250},
+ {"APAC", 1, "Noco", 300}};
+
+ for (const auto& d : test_data) {
+ fluss::GenericRow row(4);
+ row.SetString(0, d.region);
+ row.SetInt32(1, d.user_id);
+ row.SetString(2, d.name);
+ row.SetInt64(3, d.score);
+ ASSERT_OK(upsert_writer.Upsert(row));
+ }
+ ASSERT_OK(upsert_writer.Flush());
+
+ // Create lookuper
+ fluss::Lookuper lookuper;
+ ASSERT_OK(table.NewLookup().CreateLookuper(lookuper));
+
+ // Lookup records
+ for (const auto& d : test_data) {
+ fluss::GenericRow key(4);
+ key.SetString(0, d.region);
+ key.SetInt32(1, d.user_id);
+
+ fluss::LookupResult result;
+ ASSERT_OK(lookuper.Lookup(key, result));
+ ASSERT_TRUE(result.Found());
+
+ EXPECT_EQ(std::string(result.GetString(0)), d.region) << "region
mismatch";
+ EXPECT_EQ(result.GetInt32(1), d.user_id) << "user_id mismatch";
+ EXPECT_EQ(std::string(result.GetString(2)), d.name) << "name mismatch";
+ EXPECT_EQ(result.GetInt64(3), d.score) << "score mismatch";
+ }
+
+ // Update within a partition (await acknowledgment)
+ {
+ fluss::GenericRow updated_row(4);
+ updated_row.SetString(0, "US");
+ updated_row.SetInt32(1, 1);
+ updated_row.SetString(2, "Gustave Updated");
+ updated_row.SetInt64(3, 999);
+ fluss::WriteResult wr;
+ ASSERT_OK(upsert_writer.Upsert(updated_row, wr));
+ ASSERT_OK(wr.Wait());
+ }
+
+ // Verify the update
+ {
+ fluss::GenericRow key(4);
+ key.SetString(0, "US");
+ key.SetInt32(1, 1);
+ fluss::LookupResult result;
+ ASSERT_OK(lookuper.Lookup(key, result));
+ ASSERT_TRUE(result.Found());
+ EXPECT_EQ(std::string(result.GetString(2)), "Gustave Updated");
+ EXPECT_EQ(result.GetInt64(3), 999);
+ }
+
+ // Lookup in non-existent partition should return not found
+ {
+ fluss::GenericRow key(4);
+ key.SetString(0, "UNKNOWN_REGION");
+ key.SetInt32(1, 1);
+ fluss::LookupResult result;
+ ASSERT_OK(lookuper.Lookup(key, result));
+ ASSERT_FALSE(result.Found()) << "Lookup in non-existent partition
should return not found";
+ }
+
+ // Delete a record within a partition (await acknowledgment)
+ {
+ fluss::GenericRow delete_key(4);
+ delete_key.SetString(0, "EU");
+ delete_key.SetInt32(1, 1);
+ fluss::WriteResult wr;
+ ASSERT_OK(upsert_writer.Delete(delete_key, wr));
+ ASSERT_OK(wr.Wait());
+ }
+
+ // Verify deletion
+ {
+ fluss::GenericRow key(4);
+ key.SetString(0, "EU");
+ key.SetInt32(1, 1);
+ fluss::LookupResult result;
+ ASSERT_OK(lookuper.Lookup(key, result));
+ ASSERT_FALSE(result.Found()) << "Deleted record should not exist";
+ }
+
+ // Verify other records in same partition still exist
+ {
+ fluss::GenericRow key(4);
+ key.SetString(0, "EU");
+ key.SetInt32(1, 2);
+ fluss::LookupResult result;
+ ASSERT_OK(lookuper.Lookup(key, result));
+ ASSERT_TRUE(result.Found());
+ EXPECT_EQ(std::string(result.GetString(2)), "Maelle");
+ }
+
+ ASSERT_OK(adm.DropTable(table_path, false));
+}
+
+TEST_F(KvTableTest, AllSupportedDatatypes) {
+ auto& adm = admin();
+ auto& conn = connection();
+
+ fluss::TablePath table_path("fluss", "test_all_datatypes_cpp");
+
+ // Create a table with all supported datatypes
+ auto schema = fluss::Schema::NewBuilder()
+ .AddColumn("pk_int", fluss::DataType::Int())
+ .AddColumn("col_boolean", fluss::DataType::Boolean())
+ .AddColumn("col_tinyint", fluss::DataType::TinyInt())
+ .AddColumn("col_smallint", fluss::DataType::SmallInt())
+ .AddColumn("col_int", fluss::DataType::Int())
+ .AddColumn("col_bigint", fluss::DataType::BigInt())
+ .AddColumn("col_float", fluss::DataType::Float())
+ .AddColumn("col_double", fluss::DataType::Double())
+ .AddColumn("col_char", fluss::DataType::Char(10))
+ .AddColumn("col_string", fluss::DataType::String())
+ .AddColumn("col_decimal", fluss::DataType::Decimal(10,
2))
+ .AddColumn("col_date", fluss::DataType::Date())
+ .AddColumn("col_time", fluss::DataType::Time())
+ .AddColumn("col_timestamp", fluss::DataType::Timestamp())
+ .AddColumn("col_timestamp_ltz",
fluss::DataType::TimestampLtz())
+ .AddColumn("col_bytes", fluss::DataType::Bytes())
+ .AddColumn("col_binary", fluss::DataType::Binary(20))
+ .SetPrimaryKeys({"pk_int"})
+ .Build();
+
+ auto table_descriptor = fluss::TableDescriptor::NewBuilder()
+ .SetSchema(schema)
+ .SetProperty("table.replication.factor", "1")
+ .Build();
+
+ fluss_test::CreateTable(adm, table_path, table_descriptor);
+
+ fluss::Table table;
+ ASSERT_OK(conn.GetTable(table_path, table));
+
+ auto table_upsert = table.NewUpsert();
+ fluss::UpsertWriter upsert_writer;
+ ASSERT_OK(table_upsert.CreateWriter(upsert_writer));
+
+ // Test data
+ int32_t pk_int = 1;
+ bool col_boolean = true;
+ int32_t col_tinyint = 127;
+ int32_t col_smallint = 32767;
+ int32_t col_int = 2147483647;
+ int64_t col_bigint = 9223372036854775807LL;
+ float col_float = 3.14f;
+ double col_double = 2.718281828459045;
+ std::string col_char = "hello";
+ std::string col_string = "world of fluss rust client";
+ std::string col_decimal = "123.45";
+ auto col_date = fluss::Date::FromDays(20476); // 2026-01-23
+ auto col_time = fluss::Time::FromMillis(36827000); // 10:13:47
+ auto col_timestamp = fluss::Timestamp::FromMillis(1769163227123); //
2026-01-23 10:13:47.123
+ auto col_timestamp_ltz = fluss::Timestamp::FromMillis(1769163227123);
+ std::vector<uint8_t> col_bytes = {'b', 'i', 'n', 'a', 'r', 'y', ' ', 'd',
'a', 't', 'a'};
+ std::vector<uint8_t> col_binary = {'f', 'i', 'x', 'e', 'd', ' ', 'b', 'i',
'n', 'a',
+ 'r', 'y', ' ', 'd', 'a', 't', 'a', '!',
'!', '!'};
+
+ // Upsert a row with all datatypes
+ {
+ fluss::GenericRow row(17);
+ row.SetInt32(0, pk_int);
+ row.SetBool(1, col_boolean);
+ row.SetInt32(2, col_tinyint);
+ row.SetInt32(3, col_smallint);
+ row.SetInt32(4, col_int);
+ row.SetInt64(5, col_bigint);
+ row.SetFloat32(6, col_float);
+ row.SetFloat64(7, col_double);
+ row.SetString(8, col_char);
+ row.SetString(9, col_string);
+ row.SetDecimal(10, col_decimal);
+ row.SetDate(11, col_date);
+ row.SetTime(12, col_time);
+ row.SetTimestampNtz(13, col_timestamp);
+ row.SetTimestampLtz(14, col_timestamp_ltz);
+ row.SetBytes(15, col_bytes);
+ row.SetBytes(16, col_binary);
+ fluss::WriteResult wr;
+ ASSERT_OK(upsert_writer.Upsert(row, wr));
+ ASSERT_OK(wr.Wait());
+ }
+
+ // Lookup the record
+ fluss::Lookuper lookuper;
+ ASSERT_OK(table.NewLookup().CreateLookuper(lookuper));
+
+ {
+ fluss::GenericRow key(17);
+ key.SetInt32(0, pk_int);
+
+ fluss::LookupResult result;
+ ASSERT_OK(lookuper.Lookup(key, result));
+ ASSERT_TRUE(result.Found());
+
+ // Verify all datatypes
+ EXPECT_EQ(result.GetInt32(0), pk_int) << "pk_int mismatch";
+ EXPECT_EQ(result.GetBool(1), col_boolean) << "col_boolean mismatch";
+ EXPECT_EQ(result.GetInt32(2), col_tinyint) << "col_tinyint mismatch";
+ EXPECT_EQ(result.GetInt32(3), col_smallint) << "col_smallint mismatch";
+ EXPECT_EQ(result.GetInt32(4), col_int) << "col_int mismatch";
+ EXPECT_EQ(result.GetInt64(5), col_bigint) << "col_bigint mismatch";
+ EXPECT_NEAR(result.GetFloat32(6), col_float, 1e-6f) << "col_float
mismatch";
+ EXPECT_NEAR(result.GetFloat64(7), col_double, 1e-15) << "col_double
mismatch";
+ EXPECT_EQ(result.GetString(8), col_char) << "col_char mismatch";
+ EXPECT_EQ(result.GetString(9), col_string) << "col_string mismatch";
+ EXPECT_EQ(result.GetDecimalString(10), col_decimal) << "col_decimal
mismatch";
+ EXPECT_EQ(result.GetDate(11).days_since_epoch,
col_date.days_since_epoch) << "col_date mismatch";
+ EXPECT_EQ(result.GetTime(12).millis_since_midnight,
col_time.millis_since_midnight) << "col_time mismatch";
+ EXPECT_EQ(result.GetTimestamp(13).epoch_millis,
col_timestamp.epoch_millis)
+ << "col_timestamp mismatch";
+ EXPECT_EQ(result.GetTimestamp(14).epoch_millis,
col_timestamp_ltz.epoch_millis)
+ << "col_timestamp_ltz mismatch";
+
+ auto [bytes_ptr, bytes_len] = result.GetBytes(15);
+ EXPECT_EQ(bytes_len, col_bytes.size()) << "col_bytes length mismatch";
+ EXPECT_TRUE(std::memcmp(bytes_ptr, col_bytes.data(), bytes_len) == 0)
+ << "col_bytes mismatch";
+
+ auto [binary_ptr, binary_len] = result.GetBytes(16);
+ EXPECT_EQ(binary_len, col_binary.size()) << "col_binary length
mismatch";
+ EXPECT_TRUE(std::memcmp(binary_ptr, col_binary.data(), binary_len) ==
0)
+ << "col_binary mismatch";
+ }
+
+ // Test with null values for nullable columns
+ {
+ fluss::GenericRow row_with_nulls(17);
+ row_with_nulls.SetInt32(0, 2); // pk_int = 2
+ for (size_t i = 1; i < 17; ++i) {
+ row_with_nulls.SetNull(i);
+ }
+ fluss::WriteResult wr;
+ ASSERT_OK(upsert_writer.Upsert(row_with_nulls, wr));
+ ASSERT_OK(wr.Wait());
+ }
+
+ // Lookup row with nulls
+ {
+ fluss::GenericRow key(17);
+ key.SetInt32(0, 2);
+
+ fluss::LookupResult result;
+ ASSERT_OK(lookuper.Lookup(key, result));
+ ASSERT_TRUE(result.Found());
+
+ EXPECT_EQ(result.GetInt32(0), 2) << "pk_int mismatch";
+ for (size_t i = 1; i < 17; ++i) {
+ EXPECT_TRUE(result.IsNull(i)) << "column " << i << " should be
null";
+ }
+ }
+
+ ASSERT_OK(adm.DropTable(table_path, false));
+}
diff --git a/bindings/cpp/test/test_log_table.cpp
b/bindings/cpp/test/test_log_table.cpp
new file mode 100644
index 0000000..47ab6f2
--- /dev/null
+++ b/bindings/cpp/test/test_log_table.cpp
@@ -0,0 +1,831 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <arrow/api.h>
+#include <gtest/gtest.h>
+
+#include <algorithm>
+#include <chrono>
+#include <thread>
+#include <tuple>
+
+#include "test_utils.h"
+
+class LogTableTest : public ::testing::Test {
+ protected:
+ fluss::Admin& admin() { return
fluss_test::FlussTestEnvironment::Instance()->GetAdmin(); }
+
+ fluss::Connection& connection() {
+ return fluss_test::FlussTestEnvironment::Instance()->GetConnection();
+ }
+};
+
+TEST_F(LogTableTest, AppendRecordBatchAndScan) {
+ auto& adm = admin();
+ auto& conn = connection();
+
+ fluss::TablePath table_path("fluss",
"test_append_record_batch_and_scan_cpp");
+
+ auto schema = fluss::Schema::NewBuilder()
+ .AddColumn("c1", fluss::DataType::Int())
+ .AddColumn("c2", fluss::DataType::String())
+ .Build();
+
+ auto table_descriptor = fluss::TableDescriptor::NewBuilder()
+ .SetSchema(schema)
+ .SetBucketCount(3)
+ .SetBucketKeys({"c1"})
+ .SetProperty("table.replication.factor", "1")
+ .Build();
+
+ fluss_test::CreateTable(adm, table_path, table_descriptor);
+
+ fluss::Table table;
+ ASSERT_OK(conn.GetTable(table_path, table));
+
+ // Create append writer
+ auto table_append = table.NewAppend();
+ fluss::AppendWriter append_writer;
+ ASSERT_OK(table_append.CreateWriter(append_writer));
+
+ // Append Arrow record batches
+ {
+ auto c1 = arrow::Int32Builder();
+ c1.AppendValues({1, 2, 3}).ok();
+ auto c2 = arrow::StringBuilder();
+ c2.AppendValues({"a1", "a2", "a3"}).ok();
+
+ auto batch = arrow::RecordBatch::Make(
+ arrow::schema({arrow::field("c1", arrow::int32()),
arrow::field("c2", arrow::utf8())}),
+ 3, {c1.Finish().ValueOrDie(), c2.Finish().ValueOrDie()});
+
+ ASSERT_OK(append_writer.AppendArrowBatch(batch));
+ }
+
+ {
+ auto c1 = arrow::Int32Builder();
+ c1.AppendValues({4, 5, 6}).ok();
+ auto c2 = arrow::StringBuilder();
+ c2.AppendValues({"a4", "a5", "a6"}).ok();
+
+ auto batch = arrow::RecordBatch::Make(
+ arrow::schema({arrow::field("c1", arrow::int32()),
arrow::field("c2", arrow::utf8())}),
+ 3, {c1.Finish().ValueOrDie(), c2.Finish().ValueOrDie()});
+
+ ASSERT_OK(append_writer.AppendArrowBatch(batch));
+ }
+
+ ASSERT_OK(append_writer.Flush());
+
+ // Create scanner and subscribe to all 3 buckets
+ fluss::Table scan_table;
+ ASSERT_OK(conn.GetTable(table_path, scan_table));
+ int32_t num_buckets = scan_table.GetTableInfo().num_buckets;
+ ASSERT_EQ(num_buckets, 3) << "Table should have 3 buckets";
+
+ auto table_scan = scan_table.NewScan();
+ fluss::LogScanner log_scanner;
+ ASSERT_OK(table_scan.CreateLogScanner(log_scanner));
+
+ for (int32_t bucket_id = 0; bucket_id < num_buckets; ++bucket_id) {
+ ASSERT_OK(log_scanner.Subscribe(bucket_id, fluss::EARLIEST_OFFSET));
+ }
+
+ // Poll for records across all buckets
+ std::vector<std::pair<int32_t, std::string>> records;
+ fluss_test::PollRecords(log_scanner, 6, [](const fluss::ScanRecord& rec) {
+ return std::make_pair(rec.row.GetInt32(0),
std::string(rec.row.GetString(1)));
+ }, records);
+ ASSERT_EQ(records.size(), 6u) << "Expected 6 records";
+ std::sort(records.begin(), records.end());
+
+ std::vector<std::pair<int32_t, std::string>> expected = {
+ {1, "a1"}, {2, "a2"}, {3, "a3"}, {4, "a4"}, {5, "a5"}, {6, "a6"}};
+ EXPECT_EQ(records, expected);
+
+ // Verify per-bucket iteration via BucketView
+ {
+ fluss::Table bucket_table;
+ ASSERT_OK(conn.GetTable(table_path, bucket_table));
+ auto bucket_scan = bucket_table.NewScan();
+ fluss::LogScanner bucket_scanner;
+ ASSERT_OK(bucket_scan.CreateLogScanner(bucket_scanner));
+
+ for (int32_t bid = 0; bid < num_buckets; ++bid) {
+ ASSERT_OK(bucket_scanner.Subscribe(bid, fluss::EARLIEST_OFFSET));
+ }
+
+ std::vector<std::pair<int32_t, std::string>> bucket_records;
+ auto bucket_deadline = std::chrono::steady_clock::now() +
std::chrono::seconds(10);
+ size_t buckets_with_data = 0;
+ while (bucket_records.size() < 6 && std::chrono::steady_clock::now() <
bucket_deadline) {
+ fluss::ScanRecords scan_records;
+ ASSERT_OK(bucket_scanner.Poll(500, scan_records));
+
+ // Iterate by bucket
+ for (size_t b = 0; b < scan_records.BucketCount(); ++b) {
+ auto bucket_view = scan_records.BucketAt(b);
+ if (!bucket_view.Empty()) {
+ buckets_with_data++;
+ }
+ for (auto rec : bucket_view) {
+ bucket_records.emplace_back(rec.row.GetInt32(0),
+
std::string(rec.row.GetString(1)));
+ }
+ }
+ }
+
+ ASSERT_EQ(bucket_records.size(), 6u) << "Expected 6 records via
per-bucket iteration";
+ EXPECT_GT(buckets_with_data, 1u) << "Records should be distributed
across multiple buckets";
+
+ std::sort(bucket_records.begin(), bucket_records.end());
+ EXPECT_EQ(bucket_records, expected);
+ }
+
+ // Test unsubscribe
+ ASSERT_OK(log_scanner.Unsubscribe(0));
+
+ // Verify unsubscribe_partition fails on a non-partitioned table
+ auto unsub_result = log_scanner.UnsubscribePartition(0, 0);
+ ASSERT_FALSE(unsub_result.Ok())
+ << "unsubscribe_partition should fail on a non-partitioned table";
+
+ ASSERT_OK(adm.DropTable(table_path, false));
+}
+
+TEST_F(LogTableTest, ListOffsets) {
+ auto& adm = admin();
+ auto& conn = connection();
+
+ fluss::TablePath table_path("fluss", "test_list_offsets_cpp");
+
+ auto schema = fluss::Schema::NewBuilder()
+ .AddColumn("id", fluss::DataType::Int())
+ .AddColumn("name", fluss::DataType::String())
+ .Build();
+
+ auto table_descriptor = fluss::TableDescriptor::NewBuilder()
+ .SetSchema(schema)
+ .SetProperty("table.replication.factor", "1")
+ .Build();
+
+ fluss_test::CreateTable(adm, table_path, table_descriptor);
+
+ // Wait for table initialization
+ std::this_thread::sleep_for(std::chrono::seconds(2));
+
+ // Earliest offset should be 0 for empty table
+ std::unordered_map<int32_t, int64_t> earliest_offsets;
+ ASSERT_OK(adm.ListOffsets(table_path, {0}, fluss::OffsetSpec::Earliest(),
earliest_offsets));
+ EXPECT_EQ(earliest_offsets[0], 0) << "Earliest offset should be 0 for
bucket 0";
+
+ // Latest offset should be 0 for empty table
+ std::unordered_map<int32_t, int64_t> latest_offsets;
+ ASSERT_OK(adm.ListOffsets(table_path, {0}, fluss::OffsetSpec::Latest(),
latest_offsets));
+ EXPECT_EQ(latest_offsets[0], 0) << "Latest offset should be 0 for empty
table";
+
+ auto before_append_ms =
+ std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::system_clock::now().time_since_epoch())
+ .count();
+
+ // Append records
+ fluss::Table table;
+ ASSERT_OK(conn.GetTable(table_path, table));
+ auto table_append = table.NewAppend();
+ fluss::AppendWriter append_writer;
+ ASSERT_OK(table_append.CreateWriter(append_writer));
+
+ {
+ auto id_builder = arrow::Int32Builder();
+ id_builder.AppendValues({1, 2, 3}).ok();
+ auto name_builder = arrow::StringBuilder();
+ name_builder.AppendValues({"alice", "bob", "charlie"}).ok();
+
+ auto batch = arrow::RecordBatch::Make(
+ arrow::schema(
+ {arrow::field("id", arrow::int32()), arrow::field("name",
arrow::utf8())}),
+ 3, {id_builder.Finish().ValueOrDie(),
name_builder.Finish().ValueOrDie()});
+
+ ASSERT_OK(append_writer.AppendArrowBatch(batch));
+ }
+ ASSERT_OK(append_writer.Flush());
+
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+
+ auto after_append_ms =
+ std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::system_clock::now().time_since_epoch())
+ .count();
+
+ // Latest offset after appending should be 3
+ std::unordered_map<int32_t, int64_t> latest_after;
+ ASSERT_OK(adm.ListOffsets(table_path, {0}, fluss::OffsetSpec::Latest(),
latest_after));
+ EXPECT_EQ(latest_after[0], 3) << "Latest offset should be 3 after
appending 3 records";
+
+ // Earliest offset should still be 0
+ std::unordered_map<int32_t, int64_t> earliest_after;
+ ASSERT_OK(adm.ListOffsets(table_path, {0}, fluss::OffsetSpec::Earliest(),
earliest_after));
+ EXPECT_EQ(earliest_after[0], 0) << "Earliest offset should still be 0";
+
+ // Timestamp before append should resolve to offset 0
+ std::unordered_map<int32_t, int64_t> ts_offsets;
+ ASSERT_OK(adm.ListOffsets(table_path, {0},
fluss::OffsetSpec::Timestamp(before_append_ms),
+ ts_offsets));
+ EXPECT_EQ(ts_offsets[0], 0)
+ << "Timestamp before append should resolve to offset 0";
+
+ // Timestamp after append should resolve to offset 3
+ std::unordered_map<int32_t, int64_t> ts_after_offsets;
+ ASSERT_OK(adm.ListOffsets(table_path, {0},
fluss::OffsetSpec::Timestamp(after_append_ms),
+ ts_after_offsets));
+ EXPECT_EQ(ts_after_offsets[0], 3)
+ << "Timestamp after append should resolve to offset 3";
+
+ ASSERT_OK(adm.DropTable(table_path, false));
+}
+
+TEST_F(LogTableTest, TestProject) {
+ auto& adm = admin();
+ auto& conn = connection();
+
+ fluss::TablePath table_path("fluss", "test_project_cpp");
+
+ auto schema = fluss::Schema::NewBuilder()
+ .AddColumn("col_a", fluss::DataType::Int())
+ .AddColumn("col_b", fluss::DataType::String())
+ .AddColumn("col_c", fluss::DataType::Int())
+ .Build();
+
+ auto table_descriptor = fluss::TableDescriptor::NewBuilder()
+ .SetSchema(schema)
+ .SetProperty("table.replication.factor", "1")
+ .Build();
+
+ fluss_test::CreateTable(adm, table_path, table_descriptor);
+
+ fluss::Table table;
+ ASSERT_OK(conn.GetTable(table_path, table));
+
+ // Append 3 records
+ auto table_append = table.NewAppend();
+ fluss::AppendWriter append_writer;
+ ASSERT_OK(table_append.CreateWriter(append_writer));
+
+ {
+ auto col_a_builder = arrow::Int32Builder();
+ col_a_builder.AppendValues({1, 2, 3}).ok();
+ auto col_b_builder = arrow::StringBuilder();
+ col_b_builder.AppendValues({"x", "y", "z"}).ok();
+ auto col_c_builder = arrow::Int32Builder();
+ col_c_builder.AppendValues({10, 20, 30}).ok();
+
+ auto batch = arrow::RecordBatch::Make(
+ arrow::schema({arrow::field("col_a", arrow::int32()),
+ arrow::field("col_b", arrow::utf8()),
+ arrow::field("col_c", arrow::int32())}),
+ 3,
+ {col_a_builder.Finish().ValueOrDie(),
col_b_builder.Finish().ValueOrDie(),
+ col_c_builder.Finish().ValueOrDie()});
+
+ ASSERT_OK(append_writer.AppendArrowBatch(batch));
+ }
+ ASSERT_OK(append_writer.Flush());
+
+ // Test project_by_name: select col_b and col_c only
+ {
+ fluss::Table proj_table;
+ ASSERT_OK(conn.GetTable(table_path, proj_table));
+ auto scan = proj_table.NewScan();
+ scan.ProjectByName({"col_b", "col_c"});
+ fluss::LogScanner scanner;
+ ASSERT_OK(scan.CreateLogScanner(scanner));
+
+ ASSERT_OK(scanner.Subscribe(0, 0));
+
+ fluss::ScanRecords records;
+ ASSERT_OK(scanner.Poll(10000, records));
+
+ ASSERT_EQ(records.Count(), 3u) << "Should have 3 records with
project_by_name";
+
+ std::vector<std::string> expected_col_b = {"x", "y", "z"};
+ std::vector<int32_t> expected_col_c = {10, 20, 30};
+
+ // Collect and sort by col_c to get deterministic order
+ std::vector<std::pair<std::string, int32_t>> collected;
+ for (auto rec : records) {
+ collected.emplace_back(std::string(rec.row.GetString(0)),
rec.row.GetInt32(1));
+ }
+ std::sort(collected.begin(), collected.end(),
+ [](const auto& a, const auto& b) { return a.second <
b.second; });
+
+ for (size_t i = 0; i < 3; ++i) {
+ EXPECT_EQ(collected[i].first, expected_col_b[i]) << "col_b
mismatch at index " << i;
+ EXPECT_EQ(collected[i].second, expected_col_c[i]) << "col_c
mismatch at index " << i;
+ }
+ }
+
+ // Test project by column indices: select col_b (1) and col_a (0) in that
order
+ {
+ fluss::Table proj_table;
+ ASSERT_OK(conn.GetTable(table_path, proj_table));
+ auto scan = proj_table.NewScan();
+ scan.ProjectByIndex({1, 0});
+ fluss::LogScanner scanner;
+ ASSERT_OK(scan.CreateLogScanner(scanner));
+
+ ASSERT_OK(scanner.Subscribe(0, 0));
+
+ fluss::ScanRecords records;
+ ASSERT_OK(scanner.Poll(10000, records));
+
+ ASSERT_EQ(records.Count(), 3u);
+
+ std::vector<std::string> expected_col_b = {"x", "y", "z"};
+ std::vector<int32_t> expected_col_a = {1, 2, 3};
+
+ std::vector<std::pair<std::string, int32_t>> collected;
+ for (auto rec : records) {
+ collected.emplace_back(std::string(rec.row.GetString(0)),
rec.row.GetInt32(1));
+ }
+ std::sort(collected.begin(), collected.end(),
+ [](const auto& a, const auto& b) { return a.second <
b.second; });
+
+ for (size_t i = 0; i < 3; ++i) {
+ EXPECT_EQ(collected[i].first, expected_col_b[i]) << "col_b
mismatch at index " << i;
+ EXPECT_EQ(collected[i].second, expected_col_a[i]) << "col_a
mismatch at index " << i;
+ }
+ }
+
+ ASSERT_OK(adm.DropTable(table_path, false));
+}
+
+TEST_F(LogTableTest, TestPollBatches) {
+ auto& adm = admin();
+ auto& conn = connection();
+
+ fluss::TablePath table_path("fluss", "test_poll_batches_cpp");
+
+ auto schema = fluss::Schema::NewBuilder()
+ .AddColumn("id", fluss::DataType::Int())
+ .AddColumn("name", fluss::DataType::String())
+ .Build();
+
+ auto table_descriptor = fluss::TableDescriptor::NewBuilder()
+ .SetSchema(schema)
+ .SetProperty("table.replication.factor", "1")
+ .Build();
+
+ fluss_test::CreateTable(adm, table_path, table_descriptor);
+
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+
+ fluss::Table table;
+ ASSERT_OK(conn.GetTable(table_path, table));
+
+ auto scan = table.NewScan();
+ fluss::LogScanner scanner;
+ ASSERT_OK(scan.CreateRecordBatchLogScanner(scanner));
+ ASSERT_OK(scanner.Subscribe(0, 0));
+
+ // Test 1: Empty table should return empty result
+ {
+ fluss::ArrowRecordBatches batches;
+ ASSERT_OK(scanner.PollRecordBatch(500, batches));
+ ASSERT_TRUE(batches.Empty());
+ }
+
+ // Append data
+ auto table_append = table.NewAppend();
+ fluss::AppendWriter writer;
+ ASSERT_OK(table_append.CreateWriter(writer));
+
+ auto make_batch = [](std::vector<int32_t> ids, std::vector<std::string>
names) {
+ auto id_builder = arrow::Int32Builder();
+ id_builder.AppendValues(ids).ok();
+ auto name_builder = arrow::StringBuilder();
+ name_builder.AppendValues(names).ok();
+ return arrow::RecordBatch::Make(
+ arrow::schema(
+ {arrow::field("id", arrow::int32()), arrow::field("name",
arrow::utf8())}),
+ static_cast<int64_t>(ids.size()),
+ {id_builder.Finish().ValueOrDie(),
name_builder.Finish().ValueOrDie()});
+ };
+
+ ASSERT_OK(writer.AppendArrowBatch(make_batch({1, 2}, {"a", "b"})));
+ ASSERT_OK(writer.AppendArrowBatch(make_batch({3, 4}, {"c", "d"})));
+ ASSERT_OK(writer.AppendArrowBatch(make_batch({5, 6}, {"e", "f"})));
+ ASSERT_OK(writer.Flush());
+
+ // Extract ids from Arrow batches
+ auto extract_ids = [](const fluss::ArrowRecordBatches& batches) {
+ std::vector<int32_t> ids;
+ for (const auto& batch : batches) {
+ auto arr =
+
std::static_pointer_cast<arrow::Int32Array>(batch->GetArrowRecordBatch()->column(0));
+ for (int64_t i = 0; i < arr->length(); ++i) {
+ ids.push_back(arr->Value(i));
+ }
+ }
+ return ids;
+ };
+
+ // Test 2: Poll until we get all 6 records
+ std::vector<int32_t> all_ids;
+ fluss_test::PollRecordBatches(scanner, 6, extract_ids, all_ids);
+ ASSERT_EQ(all_ids, (std::vector<int32_t>{1, 2, 3, 4, 5, 6}));
+
+ // Test 3: Append more and verify offset continuation (no duplicates)
+ ASSERT_OK(writer.AppendArrowBatch(make_batch({7, 8}, {"g", "h"})));
+ ASSERT_OK(writer.Flush());
+
+ std::vector<int32_t> new_ids;
+ fluss_test::PollRecordBatches(scanner, 2, extract_ids, new_ids);
+ ASSERT_EQ(new_ids, (std::vector<int32_t>{7, 8}));
+
+ // Test 4: Subscribing from mid-offset should truncate batch
+ {
+ fluss::Table trunc_table;
+ ASSERT_OK(conn.GetTable(table_path, trunc_table));
+ auto trunc_scan = trunc_table.NewScan();
+ fluss::LogScanner trunc_scanner;
+ ASSERT_OK(trunc_scan.CreateRecordBatchLogScanner(trunc_scanner));
+ ASSERT_OK(trunc_scanner.Subscribe(0, 3));
+
+ std::vector<int32_t> trunc_ids;
+ fluss_test::PollRecordBatches(trunc_scanner, 5, extract_ids,
trunc_ids);
+ ASSERT_EQ(trunc_ids, (std::vector<int32_t>{4, 5, 6, 7, 8}));
+ }
+
+ // Test 5: Projection should only return requested columns
+ {
+ fluss::Table proj_table;
+ ASSERT_OK(conn.GetTable(table_path, proj_table));
+ auto proj_scan = proj_table.NewScan();
+ proj_scan.ProjectByName({"id"});
+ fluss::LogScanner proj_scanner;
+ ASSERT_OK(proj_scan.CreateRecordBatchLogScanner(proj_scanner));
+ ASSERT_OK(proj_scanner.Subscribe(0, 0));
+
+ fluss::ArrowRecordBatches proj_batches;
+ ASSERT_OK(proj_scanner.PollRecordBatch(10000, proj_batches));
+
+ ASSERT_FALSE(proj_batches.Empty());
+ EXPECT_EQ(proj_batches[0]->GetArrowRecordBatch()->num_columns(), 1)
+ << "Projected batch should have 1 column (id), not 2";
+ }
+
+ ASSERT_OK(adm.DropTable(table_path, false));
+}
+
+TEST_F(LogTableTest, AllSupportedDatatypes) {
+ auto& adm = admin();
+ auto& conn = connection();
+
+ fluss::TablePath table_path("fluss", "test_log_all_datatypes_cpp");
+
+ // Create a log table with all supported datatypes
+ auto schema =
+ fluss::Schema::NewBuilder()
+ .AddColumn("col_tinyint", fluss::DataType::TinyInt())
+ .AddColumn("col_smallint", fluss::DataType::SmallInt())
+ .AddColumn("col_int", fluss::DataType::Int())
+ .AddColumn("col_bigint", fluss::DataType::BigInt())
+ .AddColumn("col_float", fluss::DataType::Float())
+ .AddColumn("col_double", fluss::DataType::Double())
+ .AddColumn("col_boolean", fluss::DataType::Boolean())
+ .AddColumn("col_char", fluss::DataType::Char(10))
+ .AddColumn("col_string", fluss::DataType::String())
+ .AddColumn("col_decimal", fluss::DataType::Decimal(10, 2))
+ .AddColumn("col_date", fluss::DataType::Date())
+ .AddColumn("col_time", fluss::DataType::Time())
+ .AddColumn("col_timestamp", fluss::DataType::Timestamp())
+ .AddColumn("col_timestamp_ltz", fluss::DataType::TimestampLtz())
+ .AddColumn("col_bytes", fluss::DataType::Bytes())
+ .AddColumn("col_binary", fluss::DataType::Binary(4))
+ .Build();
+
+ auto table_descriptor = fluss::TableDescriptor::NewBuilder()
+ .SetSchema(schema)
+ .SetProperty("table.replication.factor", "1")
+ .Build();
+
+ fluss_test::CreateTable(adm, table_path, table_descriptor);
+
+ fluss::Table table;
+ ASSERT_OK(conn.GetTable(table_path, table));
+
+ size_t field_count = table.GetTableInfo().schema.columns.size();
+
+ auto table_append = table.NewAppend();
+ fluss::AppendWriter append_writer;
+ ASSERT_OK(table_append.CreateWriter(append_writer));
+
+ // Test data
+ int32_t col_tinyint = 127;
+ int32_t col_smallint = 32767;
+ int32_t col_int = 2147483647;
+ int64_t col_bigint = 9223372036854775807LL;
+ float col_float = 3.14f;
+ double col_double = 2.718281828459045;
+ bool col_boolean = true;
+ std::string col_char = "hello";
+ std::string col_string = "world of fluss rust client";
+ std::string col_decimal = "123.45";
+ auto col_date = fluss::Date::FromDays(20476); // 2026-01-23
+ auto col_time = fluss::Time::FromMillis(36827000); // 10:13:47
+ auto col_timestamp = fluss::Timestamp::FromMillisNanos(1769163227123,
456000);
+ auto col_timestamp_ltz = fluss::Timestamp::FromMillisNanos(1769163227123,
456000);
+ std::vector<uint8_t> col_bytes = {'b', 'i', 'n', 'a', 'r', 'y', ' ', 'd',
'a', 't', 'a'};
+ std::vector<uint8_t> col_binary = {0xDE, 0xAD, 0xBE, 0xEF};
+
+ // Append a row with all datatypes
+ {
+ fluss::GenericRow row(field_count);
+ row.SetInt32(0, col_tinyint);
+ row.SetInt32(1, col_smallint);
+ row.SetInt32(2, col_int);
+ row.SetInt64(3, col_bigint);
+ row.SetFloat32(4, col_float);
+ row.SetFloat64(5, col_double);
+ row.SetBool(6, col_boolean);
+ row.SetString(7, col_char);
+ row.SetString(8, col_string);
+ row.SetDecimal(9, col_decimal);
+ row.SetDate(10, col_date);
+ row.SetTime(11, col_time);
+ row.SetTimestampNtz(12, col_timestamp);
+ row.SetTimestampLtz(13, col_timestamp_ltz);
+ row.SetBytes(14, col_bytes);
+ row.SetBytes(15, col_binary);
+ ASSERT_OK(append_writer.Append(row));
+ }
+
+ // Append a row with null values
+ {
+ fluss::GenericRow row_with_nulls(field_count);
+ for (size_t i = 0; i < field_count; ++i) {
+ row_with_nulls.SetNull(i);
+ }
+ ASSERT_OK(append_writer.Append(row_with_nulls));
+ }
+
+ ASSERT_OK(append_writer.Flush());
+
+ // Scan the records
+ fluss::Table scan_table;
+ ASSERT_OK(conn.GetTable(table_path, scan_table));
+ auto table_scan = scan_table.NewScan();
+ fluss::LogScanner log_scanner;
+ ASSERT_OK(table_scan.CreateLogScanner(log_scanner));
+ ASSERT_OK(log_scanner.Subscribe(0, 0));
+
+ // Poll until we get 2 records
+ std::vector<fluss::ScanRecord> all_records;
+ fluss_test::PollRecords(log_scanner, 2,
+ [](const fluss::ScanRecord& rec) { return rec; }, all_records);
+ ASSERT_EQ(all_records.size(), 2u) << "Expected 2 records";
+
+ // Verify first record (all values)
+ auto& row = all_records[0].row;
+
+ EXPECT_EQ(row.GetInt32(0), col_tinyint) << "col_tinyint mismatch";
+ EXPECT_EQ(row.GetInt32(1), col_smallint) << "col_smallint mismatch";
+ EXPECT_EQ(row.GetInt32(2), col_int) << "col_int mismatch";
+ EXPECT_EQ(row.GetInt64(3), col_bigint) << "col_bigint mismatch";
+ EXPECT_NEAR(row.GetFloat32(4), col_float, 1e-6f) << "col_float mismatch";
+ EXPECT_NEAR(row.GetFloat64(5), col_double, 1e-15) << "col_double mismatch";
+ EXPECT_EQ(row.GetBool(6), col_boolean) << "col_boolean mismatch";
+ EXPECT_EQ(row.GetString(7), col_char) << "col_char mismatch";
+ EXPECT_EQ(row.GetString(8), col_string) << "col_string mismatch";
+ EXPECT_EQ(row.GetDecimalString(9), col_decimal) << "col_decimal mismatch";
+ EXPECT_EQ(row.GetDate(10).days_since_epoch, col_date.days_since_epoch) <<
"col_date mismatch";
+ EXPECT_EQ(row.GetTime(11).millis_since_midnight,
col_time.millis_since_midnight)
+ << "col_time mismatch";
+ EXPECT_EQ(row.GetTimestamp(12).epoch_millis, col_timestamp.epoch_millis)
+ << "col_timestamp millis mismatch";
+ EXPECT_EQ(row.GetTimestamp(12).nano_of_millisecond,
col_timestamp.nano_of_millisecond)
+ << "col_timestamp nanos mismatch";
+ EXPECT_EQ(row.GetTimestamp(13).epoch_millis,
col_timestamp_ltz.epoch_millis)
+ << "col_timestamp_ltz millis mismatch";
+ EXPECT_EQ(row.GetTimestamp(13).nano_of_millisecond,
col_timestamp_ltz.nano_of_millisecond)
+ << "col_timestamp_ltz nanos mismatch";
+
+ auto [bytes_ptr, bytes_len] = row.GetBytes(14);
+ EXPECT_EQ(bytes_len, col_bytes.size()) << "col_bytes length mismatch";
+ EXPECT_TRUE(std::memcmp(bytes_ptr, col_bytes.data(), bytes_len) == 0)
+ << "col_bytes mismatch";
+
+ auto [binary_ptr, binary_len] = row.GetBytes(15);
+ EXPECT_EQ(binary_len, col_binary.size()) << "col_binary length mismatch";
+ EXPECT_TRUE(std::memcmp(binary_ptr, col_binary.data(), binary_len) == 0)
+ << "col_binary mismatch";
+
+ // Verify second record (all nulls)
+ auto& null_row = all_records[1].row;
+ for (size_t i = 0; i < field_count; ++i) {
+ EXPECT_TRUE(null_row.IsNull(i)) << "column " << i << " should be null";
+ }
+
+ ASSERT_OK(adm.DropTable(table_path, false));
+}
+
+TEST_F(LogTableTest, PartitionedTableAppendScan) {
+ auto& adm = admin();
+ auto& conn = connection();
+
+ fluss::TablePath table_path("fluss", "test_partitioned_log_append_cpp");
+
+ // Create a partitioned log table
+ auto schema = fluss::Schema::NewBuilder()
+ .AddColumn("id", fluss::DataType::Int())
+ .AddColumn("region", fluss::DataType::String())
+ .AddColumn("value", fluss::DataType::BigInt())
+ .Build();
+
+ auto table_descriptor = fluss::TableDescriptor::NewBuilder()
+ .SetSchema(schema)
+ .SetPartitionKeys({"region"})
+ .SetProperty("table.replication.factor", "1")
+ .Build();
+
+ fluss_test::CreateTable(adm, table_path, table_descriptor);
+
+ // Create partitions
+ fluss_test::CreatePartitions(adm, table_path, "region", {"US", "EU"});
+
+ // Wait for partitions
+ std::this_thread::sleep_for(std::chrono::seconds(2));
+
+ fluss::Table table;
+ ASSERT_OK(conn.GetTable(table_path, table));
+
+ auto table_append = table.NewAppend();
+ fluss::AppendWriter append_writer;
+ ASSERT_OK(table_append.CreateWriter(append_writer));
+
+ // Append rows
+ struct TestData {
+ int32_t id;
+ std::string region;
+ int64_t value;
+ };
+ std::vector<TestData> test_data = {{1, "US", 100}, {2, "US", 200}, {3,
"EU", 300}, {4, "EU", 400}};
+
+ for (const auto& d : test_data) {
+ fluss::GenericRow row(3);
+ row.SetInt32(0, d.id);
+ row.SetString(1, d.region);
+ row.SetInt64(2, d.value);
+ ASSERT_OK(append_writer.Append(row));
+ }
+ ASSERT_OK(append_writer.Flush());
+
+ // Append arrow batches per partition
+ {
+ auto id_builder = arrow::Int32Builder();
+ id_builder.AppendValues({5, 6}).ok();
+ auto region_builder = arrow::StringBuilder();
+ region_builder.AppendValues({"US", "US"}).ok();
+ auto value_builder = arrow::Int64Builder();
+ value_builder.AppendValues({500, 600}).ok();
+
+ auto batch = arrow::RecordBatch::Make(
+ arrow::schema({arrow::field("id", arrow::int32()),
+ arrow::field("region", arrow::utf8()),
+ arrow::field("value", arrow::int64())}),
+ 2,
+ {id_builder.Finish().ValueOrDie(),
region_builder.Finish().ValueOrDie(),
+ value_builder.Finish().ValueOrDie()});
+
+ ASSERT_OK(append_writer.AppendArrowBatch(batch));
+ }
+
+ {
+ auto id_builder = arrow::Int32Builder();
+ id_builder.AppendValues({7, 8}).ok();
+ auto region_builder = arrow::StringBuilder();
+ region_builder.AppendValues({"EU", "EU"}).ok();
+ auto value_builder = arrow::Int64Builder();
+ value_builder.AppendValues({700, 800}).ok();
+
+ auto batch = arrow::RecordBatch::Make(
+ arrow::schema({arrow::field("id", arrow::int32()),
+ arrow::field("region", arrow::utf8()),
+ arrow::field("value", arrow::int64())}),
+ 2,
+ {id_builder.Finish().ValueOrDie(),
region_builder.Finish().ValueOrDie(),
+ value_builder.Finish().ValueOrDie()});
+
+ ASSERT_OK(append_writer.AppendArrowBatch(batch));
+ }
+ ASSERT_OK(append_writer.Flush());
+
+ // Test list partition offsets
+ std::unordered_map<int32_t, int64_t> us_offsets;
+ ASSERT_OK(adm.ListPartitionOffsets(table_path, "US", {0},
fluss::OffsetSpec::Latest(),
+ us_offsets));
+ EXPECT_EQ(us_offsets[0], 4) << "US partition should have 4 records";
+
+ std::unordered_map<int32_t, int64_t> eu_offsets;
+ ASSERT_OK(adm.ListPartitionOffsets(table_path, "EU", {0},
fluss::OffsetSpec::Latest(),
+ eu_offsets));
+ EXPECT_EQ(eu_offsets[0], 4) << "EU partition should have 4 records";
+
+ // Subscribe to all partitions and scan
+ fluss::Table scan_table;
+ ASSERT_OK(conn.GetTable(table_path, scan_table));
+ auto table_scan = scan_table.NewScan();
+ fluss::LogScanner log_scanner;
+ ASSERT_OK(table_scan.CreateLogScanner(log_scanner));
+
+ std::vector<fluss::PartitionInfo> partition_infos;
+ ASSERT_OK(adm.ListPartitionInfos(table_path, partition_infos));
+
+ for (const auto& pi : partition_infos) {
+ ASSERT_OK(log_scanner.SubscribePartitionBuckets(pi.partition_id, 0,
0));
+ }
+
+ // Collect all records
+ using Record = std::tuple<int32_t, std::string, int64_t>;
+ auto extract_record = [](const fluss::ScanRecord& rec) -> Record {
+ return {rec.row.GetInt32(0), std::string(rec.row.GetString(1)),
rec.row.GetInt64(2)};
+ };
+ std::vector<Record> collected;
+ fluss_test::PollRecords(log_scanner, 8, extract_record, collected);
+
+ ASSERT_EQ(collected.size(), 8u) << "Expected 8 records total";
+ std::sort(collected.begin(), collected.end());
+
+ std::vector<Record> expected = {{1, "US", 100}, {2, "US", 200}, {3,
"EU", 300},
+ {4, "EU", 400}, {5, "US", 500}, {6,
"US", 600},
+ {7, "EU", 700}, {8, "EU", 800}};
+ EXPECT_EQ(collected, expected);
+
+ // Test unsubscribe_partition: unsubscribe EU, should only get US data
+ {
+ fluss::Table unsub_table;
+ ASSERT_OK(conn.GetTable(table_path, unsub_table));
+ auto unsub_scan = unsub_table.NewScan();
+ fluss::LogScanner unsub_scanner;
+ ASSERT_OK(unsub_scan.CreateLogScanner(unsub_scanner));
+
+ int64_t eu_partition_id = -1;
+ for (const auto& pi : partition_infos) {
+ ASSERT_OK(unsub_scanner.SubscribePartitionBuckets(pi.partition_id,
0, 0));
+ if (pi.partition_name == "EU") {
+ eu_partition_id = pi.partition_id;
+ }
+ }
+ ASSERT_GE(eu_partition_id, 0) << "EU partition should exist";
+
+ ASSERT_OK(unsub_scanner.UnsubscribePartition(eu_partition_id, 0));
+
+ std::vector<Record> us_only;
+ fluss_test::PollRecords(unsub_scanner, 4, extract_record, us_only);
+
+ ASSERT_EQ(us_only.size(), 4u) << "Should receive exactly 4 US records";
+ for (const auto& [id, region, val] : us_only) {
+ EXPECT_EQ(region, "US") << "After unsubscribe EU, only US data
should be read";
+ }
+ }
+
+ // Test subscribe_partition_buckets (batch subscribe)
+ {
+ fluss::Table batch_table;
+ ASSERT_OK(conn.GetTable(table_path, batch_table));
+ auto batch_scan = batch_table.NewScan();
+ fluss::LogScanner batch_scanner;
+ ASSERT_OK(batch_scan.CreateLogScanner(batch_scanner));
+
+ std::vector<fluss::PartitionBucketSubscription> subs;
+ for (const auto& pi : partition_infos) {
+ subs.push_back({pi.partition_id, 0, 0});
+ }
+ ASSERT_OK(batch_scanner.SubscribePartitionBuckets(subs));
+
+ std::vector<Record> batch_collected;
+ fluss_test::PollRecords(batch_scanner, 8, extract_record,
batch_collected);
+ ASSERT_EQ(batch_collected.size(), 8u);
+ std::sort(batch_collected.begin(), batch_collected.end());
+ EXPECT_EQ(batch_collected, expected);
+ }
+
+ ASSERT_OK(adm.DropTable(table_path, false));
+}
diff --git a/bindings/cpp/test/test_main.cpp b/bindings/cpp/test/test_main.cpp
new file mode 100644
index 0000000..8c2e2d9
--- /dev/null
+++ b/bindings/cpp/test/test_main.cpp
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include "test_utils.h"
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+
+ // Register the global test environment (manages the Fluss cluster
lifecycle).
+
::testing::AddGlobalTestEnvironment(fluss_test::FlussTestEnvironment::Instance());
+
+ return RUN_ALL_TESTS();
+}
diff --git a/bindings/cpp/test/test_utils.h b/bindings/cpp/test/test_utils.h
new file mode 100644
index 0000000..bae5237
--- /dev/null
+++ b/bindings/cpp/test/test_utils.h
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <gtest/gtest.h>
+
+#include <algorithm>
+#include <chrono>
+#include <cstdio>
+#include <cstdlib>
+#include <cstring>
+#include <string>
+#include <thread>
+#include <vector>
+
+#ifdef _WIN32
+#include <winsock2.h>
+#include <ws2tcpip.h>
+#pragma comment(lib, "ws2_32.lib")
+#else
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <sys/socket.h>
+#include <unistd.h>
+#endif
+
+#include "fluss.hpp"
+
+// Macro to assert Result is OK and print error message on failure
+#define ASSERT_OK(result) ASSERT_TRUE((result).Ok()) << (result).error_message
+#define EXPECT_OK(result) EXPECT_TRUE((result).Ok()) << (result).error_message
+
+namespace fluss_test {
+
+static constexpr const char* kFlussVersion = "0.7.0";
+static constexpr const char* kNetworkName = "fluss-cpp-test-network";
+static constexpr const char* kZookeeperName = "zookeeper-cpp-test";
+static constexpr const char* kCoordinatorName = "coordinator-server-cpp-test";
+static constexpr const char* kTabletServerName = "tablet-server-cpp-test";
+static constexpr int kCoordinatorPort = 9123;
+static constexpr int kTabletServerPort = 9124;
+
+/// Execute a shell command and return its exit code.
+inline int RunCommand(const std::string& cmd) {
+ return system(cmd.c_str());
+}
+
+/// Wait until a TCP port is accepting connections, or timeout.
+inline bool WaitForPort(const std::string& host, int port, int timeout_seconds
= 60) {
+ auto deadline =
+ std::chrono::steady_clock::now() +
std::chrono::seconds(timeout_seconds);
+
+ while (std::chrono::steady_clock::now() < deadline) {
+ int sock = socket(AF_INET, SOCK_STREAM, 0);
+ if (sock < 0) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
+ continue;
+ }
+
+ struct sockaddr_in addr {};
+ addr.sin_family = AF_INET;
+ addr.sin_port = htons(static_cast<uint16_t>(port));
+ inet_pton(AF_INET, host.c_str(), &addr.sin_addr);
+
+ int result = connect(sock, reinterpret_cast<struct sockaddr*>(&addr),
sizeof(addr));
+#ifdef _WIN32
+ closesocket(sock);
+#else
+ close(sock);
+#endif
+ if (result == 0) {
+ return true;
+ }
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
+ }
+ return false;
+}
+
+/// Manages a Docker-based Fluss cluster for integration testing.
+class FlussTestCluster {
+ public:
+ FlussTestCluster() = default;
+
+ bool Start() {
+ const char* env_servers = std::getenv("FLUSS_BOOTSTRAP_SERVERS");
+ if (env_servers && std::strlen(env_servers) > 0) {
+ bootstrap_servers_ = env_servers;
+ external_cluster_ = true;
+ std::cout << "Using external cluster: " << bootstrap_servers_ <<
std::endl;
+ return true;
+ }
+
+ std::cout << "Starting Fluss cluster via Docker..." << std::endl;
+
+ // Create network
+ RunCommand(std::string("docker network create ") + kNetworkName + "
2>/dev/null || true");
+
+ // Start ZooKeeper
+ std::string zk_cmd = std::string("docker run -d --rm") +
+ " --name " + kZookeeperName +
+ " --network " + kNetworkName +
+ " zookeeper:3.9.2";
+ if (RunCommand(zk_cmd) != 0) {
+ std::cerr << "Failed to start ZooKeeper" << std::endl;
+ return false;
+ }
+
+ // Wait for ZooKeeper to be ready before starting Fluss servers
+ std::this_thread::sleep_for(std::chrono::seconds(5));
+
+ // Start Coordinator Server
+ std::string coord_props =
+ "zookeeper.address: " + std::string(kZookeeperName) + ":2181\\n"
+ "bind.listeners: INTERNAL://" + std::string(kCoordinatorName) +
":0, CLIENT://" +
+ std::string(kCoordinatorName) + ":9123\\n"
+ "advertised.listeners: CLIENT://localhost:9123\\n"
+ "internal.listener.name: INTERNAL\\n"
+ "netty.server.num-network-threads: 1\\n"
+ "netty.server.num-worker-threads: 3";
+
+ std::string coord_cmd = std::string("docker run -d --rm") +
+ " --name " + kCoordinatorName +
+ " --network " + kNetworkName +
+ " -p 9123:9123" +
+ " -e FLUSS_PROPERTIES=\"$(printf '" +
coord_props + "')\"" +
+ " fluss/fluss:" + kFlussVersion +
+ " coordinatorServer";
+ if (RunCommand(coord_cmd) != 0) {
+ std::cerr << "Failed to start Coordinator Server" << std::endl;
+ Stop();
+ return false;
+ }
+
+ // Wait for coordinator to be ready
+ if (!WaitForPort("127.0.0.1", kCoordinatorPort)) {
+ std::cerr << "Coordinator Server did not become ready" <<
std::endl;
+ Stop();
+ return false;
+ }
+
+ // Start Tablet Server
+ std::string ts_props =
+ "zookeeper.address: " + std::string(kZookeeperName) + ":2181\\n"
+ "bind.listeners: INTERNAL://" + std::string(kTabletServerName) +
":0, CLIENT://" +
+ std::string(kTabletServerName) + ":9123\\n"
+ "advertised.listeners: CLIENT://localhost:" +
std::to_string(kTabletServerPort) + "\\n"
+ "internal.listener.name: INTERNAL\\n"
+ "tablet-server.id: 0\\n"
+ "netty.server.num-network-threads: 1\\n"
+ "netty.server.num-worker-threads: 3";
+
+ std::string ts_cmd = std::string("docker run -d --rm") +
+ " --name " + kTabletServerName +
+ " --network " + kNetworkName +
+ " -p " + std::to_string(kTabletServerPort) +
":9123" +
+ " -e FLUSS_PROPERTIES=\"$(printf '" + ts_props +
"')\"" +
+ " fluss/fluss:" + kFlussVersion +
+ " tabletServer";
+ if (RunCommand(ts_cmd) != 0) {
+ std::cerr << "Failed to start Tablet Server" << std::endl;
+ Stop();
+ return false;
+ }
+
+ // Wait for tablet server to be ready
+ if (!WaitForPort("127.0.0.1", kTabletServerPort)) {
+ std::cerr << "Tablet Server did not become ready" << std::endl;
+ Stop();
+ return false;
+ }
+
+ bootstrap_servers_ = "127.0.0.1:9123";
+ std::cout << "Fluss cluster started successfully." << std::endl;
+ return true;
+ }
+
+ void Stop() {
+ if (external_cluster_) return;
+
+ std::cout << "Stopping Fluss cluster..." << std::endl;
+ RunCommand(std::string("docker stop ") + kTabletServerName + "
2>/dev/null || true");
+ RunCommand(std::string("docker stop ") + kCoordinatorName + "
2>/dev/null || true");
+ RunCommand(std::string("docker stop ") + kZookeeperName + "
2>/dev/null || true");
+ RunCommand(std::string("docker network rm ") + kNetworkName + "
2>/dev/null || true");
+ std::cout << "Fluss cluster stopped." << std::endl;
+ }
+
+ const std::string& GetBootstrapServers() const { return
bootstrap_servers_; }
+
+ private:
+ std::string bootstrap_servers_;
+ bool external_cluster_{false};
+};
+
+/// GoogleTest Environment that manages the Fluss cluster lifecycle.
+class FlussTestEnvironment : public ::testing::Environment {
+ public:
+ static FlussTestEnvironment* Instance() {
+ static FlussTestEnvironment* instance = nullptr;
+ if (!instance) {
+ instance = new FlussTestEnvironment();
+ }
+ return instance;
+ }
+
+ void SetUp() override {
+ if (!cluster_.Start()) {
+ GTEST_SKIP() << "Failed to start Fluss cluster. Skipping
integration tests.";
+ }
+
+ // Retry connection creation until the coordinator is fully
initialized.
+ fluss::Configuration config;
+ config.bootstrap_servers = cluster_.GetBootstrapServers();
+
+ auto deadline =
+ std::chrono::steady_clock::now() + std::chrono::seconds(60);
+ while (std::chrono::steady_clock::now() < deadline) {
+ auto result = fluss::Connection::Create(config, connection_);
+ if (result.Ok()) {
+ auto admin_result = connection_.GetAdmin(admin_);
+ if (admin_result.Ok()) {
+ std::cout << "Connected to Fluss cluster." << std::endl;
+ return;
+ }
+ }
+ std::cout << "Waiting for Fluss cluster to be ready..." <<
std::endl;
+ std::this_thread::sleep_for(std::chrono::seconds(2));
+ }
+ GTEST_SKIP() << "Fluss cluster did not become ready within timeout.";
+ }
+
+ void TearDown() override {
+ cluster_.Stop();
+ }
+
+ fluss::Connection& GetConnection() { return connection_; }
+ fluss::Admin& GetAdmin() { return admin_; }
+ const std::string& GetBootstrapServers() { return
cluster_.GetBootstrapServers(); }
+
+ private:
+ FlussTestEnvironment() = default;
+
+ FlussTestCluster cluster_;
+ fluss::Connection connection_;
+ fluss::Admin admin_;
+};
+
+/// Helper: create a table (assert success). Drops existing table first if it
exists.
+inline void CreateTable(fluss::Admin& admin, const fluss::TablePath& path,
+ const fluss::TableDescriptor& descriptor) {
+ admin.DropTable(path, true); // ignore if not exists
+ auto result = admin.CreateTable(path, descriptor, false);
+ ASSERT_OK(result);
+}
+
+/// Helper: create partitions for a partitioned table.
+inline void CreatePartitions(fluss::Admin& admin, const fluss::TablePath& path,
+ const std::string& partition_column,
+ const std::vector<std::string>& values) {
+ for (const auto& value : values) {
+ std::unordered_map<std::string, std::string> spec;
+ spec[partition_column] = value;
+ auto result = admin.CreatePartition(path, spec, true);
+ ASSERT_OK(result);
+ }
+}
+
+/// Poll a LogScanner for ScanRecords until `expected_count` items are
collected or timeout.
+/// `extract_fn` is called for each ScanRecord and should return a value of
type T.
+template <typename T, typename ExtractFn>
+void PollRecords(fluss::LogScanner& scanner, size_t expected_count,
+ ExtractFn extract_fn, std::vector<T>& out) {
+ auto deadline = std::chrono::steady_clock::now() +
std::chrono::seconds(10);
+ while (out.size() < expected_count && std::chrono::steady_clock::now() <
deadline) {
+ fluss::ScanRecords records;
+ ASSERT_OK(scanner.Poll(1000, records));
+ for (auto rec : records) {
+ out.push_back(extract_fn(rec));
+ }
+ }
+}
+
+/// Poll a LogScanner for ArrowRecordBatches until `expected_count` items are
collected or timeout.
+/// `extract_fn` is called with the full ArrowRecordBatches and should return
a std::vector<T>.
+template <typename T, typename ExtractFn>
+void PollRecordBatches(fluss::LogScanner& scanner, size_t expected_count,
+ ExtractFn extract_fn, std::vector<T>& out) {
+ auto deadline = std::chrono::steady_clock::now() +
std::chrono::seconds(10);
+ while (out.size() < expected_count && std::chrono::steady_clock::now() <
deadline) {
+ fluss::ArrowRecordBatches batches;
+ ASSERT_OK(scanner.PollRecordBatch(1000, batches));
+ auto items = extract_fn(batches);
+ out.insert(out.end(), items.begin(), items.end());
+ }
+}
+
+} // namespace fluss_test
diff --git a/bindings/python/test/test_log_table.py
b/bindings/python/test/test_log_table.py
index bfa9789..dd1a4d4 100644
--- a/bindings/python/test/test_log_table.py
+++ b/bindings/python/test/test_log_table.py
@@ -36,7 +36,9 @@ async def test_append_and_scan(connection, admin):
schema = fluss.Schema(
pa.schema([pa.field("c1", pa.int32()), pa.field("c2", pa.string())])
)
- table_descriptor = fluss.TableDescriptor(schema)
+ table_descriptor = fluss.TableDescriptor(
+ schema, bucket_count=3, bucket_keys=["c1"]
+ )
await admin.create_table(table_path, table_descriptor,
ignore_if_exists=False)
table = await connection.get_table(table_path)
diff --git a/crates/fluss/tests/integration/log_table.rs
b/crates/fluss/tests/integration/log_table.rs
index 7642067..eac72e5 100644
--- a/crates/fluss/tests/integration/log_table.rs
+++ b/crates/fluss/tests/integration/log_table.rs
@@ -39,7 +39,7 @@ mod table_test {
};
use arrow::array::record_batch;
use fluss::client::{EARLIEST_OFFSET, FlussTable, TableScan};
- use fluss::metadata::{DataTypes, Schema, TableBucket, TableDescriptor,
TablePath};
+ use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
use fluss::record::ScanRecord;
use fluss::row::InternalRow;
use fluss::rpc::message::OffsetSpec;
@@ -79,6 +79,7 @@ mod table_test {
.build()
.expect("Failed to build schema"),
)
+ .distributed_by(Some(3), vec!["c1".to_string()])
.build()
.expect("Failed to build table");
@@ -127,38 +128,34 @@ mod table_test {
.expect("Failed to subscribe with EARLIEST_OFFSET");
}
- // Poll for records
- let scan_records = log_scanner
- .poll(tokio::time::Duration::from_secs(10))
- .await
- .expect("Failed to poll records");
-
- // Verify the scanned records
- let table_bucket = TableBucket::new(table.get_table_info().table_id,
0);
- let records = scan_records.records(&table_bucket);
-
- assert_eq!(records.len(), 6, "Expected 6 records");
-
- // Verify record contents match what was appended
- let expected_c1_values = vec![1, 2, 3, 4, 5, 6];
- let expected_c2_values = vec!["a1", "a2", "a3", "a4", "a5", "a6"];
-
- for (i, record) in records.iter().enumerate() {
- let row = record.row();
- assert_eq!(
- row.get_int(0),
- expected_c1_values[i],
- "c1 value mismatch at row {}",
- i
- );
- assert_eq!(
- row.get_string(1),
- expected_c2_values[i],
- "c2 value mismatch at row {}",
- i
- );
+ // Poll for records across all buckets
+ let mut collected: Vec<(i32, String)> = Vec::new();
+ let start_time = std::time::Instant::now();
+ while collected.len() < 6 && start_time.elapsed() <
Duration::from_secs(10) {
+ let scan_records = log_scanner
+ .poll(Duration::from_millis(500))
+ .await
+ .expect("Failed to poll records");
+ for rec in scan_records {
+ let row = rec.row();
+ collected.push((row.get_int(0),
row.get_string(1).to_string()));
+ }
}
+ assert_eq!(collected.len(), 6, "Expected 6 records");
+
+ // Sort and verify record contents
+ collected.sort();
+ let expected: Vec<(i32, String)> = vec![
+ (1, "a1".to_string()),
+ (2, "a2".to_string()),
+ (3, "a3".to_string()),
+ (4, "a4".to_string()),
+ (5, "a5".to_string()),
+ (6, "a6".to_string()),
+ ];
+ assert_eq!(collected, expected);
+
// Test unsubscribe: unsubscribe from bucket 0, verify no error
log_scanner
.unsubscribe(0)