This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-mosaic.git


The following commit(s) were added to refs/heads/main by this push:
     new eba2108  Add ffi and cpp (#4)
eba2108 is described below

commit eba2108971f3e91749c36c441b8b7deb49963488
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue May 19 10:25:41 2026 +0800

    Add ffi and cpp (#4)
    
    - ffi crate: C API via Arrow C Data Interface for writer and reader,
      with cbindgen-generated header
    - include/mosaic.hpp: C++ RAII wrapper (Writer, Reader, statistics)
    - cpp/: CMake project with integration tests covering roundtrip,
      nulls, all types, projection, statistics, compression, schema
      export, and multiple row groups
---
 .github/workflows/ci.yml         |  39 ++
 Cargo.toml                       |   2 +-
 Cargo.toml => cpp/CMakeLists.txt |  25 +-
 cpp/test_mosaic.cpp              | 481 +++++++++++++++++++++++
 Cargo.toml => ffi/Cargo.toml     |  20 +-
 ffi/build.rs                     |  29 ++
 Cargo.toml => ffi/cbindgen.toml  |  26 +-
 ffi/src/lib.rs                   | 819 +++++++++++++++++++++++++++++++++++++++
 include/arrow_c_data.h           |  70 ++++
 include/mosaic.hpp               | 301 ++++++++++++++
 10 files changed, 1799 insertions(+), 13 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 9850579..70b4fec 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -63,3 +63,42 @@ jobs:
 
       - name: Test
         run: cargo test
+
+  cpp-test:
+    runs-on: ubuntu-latest
+    steps:
+      - uses: actions/checkout@v6
+
+      - name: Rust Cache
+        uses: actions/cache@v5
+        with:
+          path: |
+            ~/.cargo/registry
+            ~/.cargo/git
+            target
+          key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}
+          restore-keys: |
+            ${{ runner.os }}-cargo-
+
+      - name: Install Arrow C++
+        run: |
+          sudo apt-get update
+          sudo apt-get install -y apt-transport-https lsb-release 
ca-certificates curl
+          curl -fsSL https://apache.jfrog.io/artifactory/arrow/$(lsb_release 
--id --short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release 
--codename --short).deb -o /tmp/arrow-apt.deb
+          sudo apt-get install -y /tmp/arrow-apt.deb
+          sudo apt-get update
+          sudo apt-get install -y libarrow-dev
+
+      - name: Build FFI library
+        run: cargo build --release -p mosaic-ffi
+
+      - name: Build C++ test
+        run: |
+          cmake -S cpp -B cpp/build \
+            -DMOSAIC_FFI_LIB=${{ github.workspace 
}}/target/release/libmosaic_ffi.so
+          cmake --build cpp/build
+
+      - name: Run C++ test
+        run: cpp/build/test_mosaic
+        env:
+          LD_LIBRARY_PATH: ${{ github.workspace }}/target/release
diff --git a/Cargo.toml b/Cargo.toml
index ff31345..39c103c 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -16,7 +16,7 @@
 # under the License.
 
 [workspace]
-members = ["core"]
+members = ["core", "ffi"]
 resolver = "2"
 
 [profile.release]
diff --git a/Cargo.toml b/cpp/CMakeLists.txt
similarity index 50%
copy from Cargo.toml
copy to cpp/CMakeLists.txt
index ff31345..6e69fcb 100644
--- a/Cargo.toml
+++ b/cpp/CMakeLists.txt
@@ -15,8 +15,25 @@
 # specific language governing permissions and limitations
 # under the License.
 
-[workspace]
-members = ["core"]
-resolver = "2"
+cmake_minimum_required(VERSION 3.16)
+project(mosaic-cpp-test CXX)
 
-[profile.release]
+set(CMAKE_CXX_STANDARD 17)
+set(CMAKE_CXX_STANDARD_REQUIRED ON)
+
+find_package(Arrow REQUIRED)
+
+set(MOSAIC_FFI_LIB "" CACHE FILEPATH "Path to libmosaic_ffi shared library")
+if(NOT MOSAIC_FFI_LIB)
+    if(APPLE)
+        set(MOSAIC_FFI_LIB 
"${CMAKE_SOURCE_DIR}/../target/release/libmosaic_ffi.dylib")
+    elseif(WIN32)
+        set(MOSAIC_FFI_LIB 
"${CMAKE_SOURCE_DIR}/../target/release/mosaic_ffi.dll")
+    else()
+        set(MOSAIC_FFI_LIB 
"${CMAKE_SOURCE_DIR}/../target/release/libmosaic_ffi.so")
+    endif()
+endif()
+
+add_executable(test_mosaic test_mosaic.cpp)
+target_include_directories(test_mosaic PRIVATE ${CMAKE_SOURCE_DIR}/../include)
+target_link_libraries(test_mosaic Arrow::arrow_shared ${MOSAIC_FFI_LIB})
diff --git a/cpp/test_mosaic.cpp b/cpp/test_mosaic.cpp
new file mode 100644
index 0000000..4de8cad
--- /dev/null
+++ b/cpp/test_mosaic.cpp
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "mosaic.hpp"
+
+#include <arrow/api.h>
+#include <arrow/c/bridge.h>
+
+#include <algorithm>
+#include <cassert>
+#include <cmath>
+#include <cstdio>
+#include <cstring>
+#include <functional>
+#include <vector>
+
+#define ASSERT_EQ(a, b) do { if ((a) != (b)) { \
+    fprintf(stderr, "FAIL %s:%d: %s != %s\n", __FILE__, __LINE__, #a, #b); 
abort(); } } while(0)
+#define ASSERT_TRUE(x) do { if (!(x)) { \
+    fprintf(stderr, "FAIL %s:%d: %s\n", __FILE__, __LINE__, #x); abort(); } } 
while(0)
+
+struct MemBuffer {
+    std::vector<uint8_t> data;
+    size_t pos = 0;
+};
+
+static mosaic::OutputFile make_output(MemBuffer& buf) {
+    mosaic::OutputFile out;
+    out.write_fn = [&buf](const uint8_t* data, size_t len) -> int {
+        buf.data.insert(buf.data.end(), data, data + len);
+        buf.pos += len;
+        return 0;
+    };
+    out.flush_fn = [&buf]() -> int { return 0; };
+    out.get_pos_fn = [&buf]() -> int64_t { return 
static_cast<int64_t>(buf.pos); };
+    return out;
+}
+
+static mosaic::InputFile make_input(const MemBuffer& buf) {
+    mosaic::InputFile in;
+    in.read_at_fn = [&buf](uint64_t offset, uint8_t* dst, size_t len) -> int {
+        if (offset + len > buf.data.size()) return -1;
+        memcpy(dst, buf.data.data() + offset, len);
+        return 0;
+    };
+    in.file_length = buf.data.size();
+    return in;
+}
+
+static std::vector<uint8_t> write_and_get(
+    const std::shared_ptr<arrow::Schema>& schema,
+    const std::shared_ptr<arrow::RecordBatch>& batch,
+    mosaic::WriterOptions opts = {})
+{
+    MemBuffer buf;
+
+    struct ArrowSchema c_schema;
+    auto st = arrow::ExportSchema(*schema, &c_schema);
+    assert(st.ok());
+
+    mosaic::Writer writer(make_output(buf), &c_schema, opts);
+
+    struct ArrowArray c_array;
+    struct ArrowSchema c_batch_schema;
+    st = arrow::ExportRecordBatch(*batch, &c_array, &c_batch_schema);
+    assert(st.ok());
+
+    writer.write(&c_array, &c_batch_schema);
+    writer.close();
+    return buf.data;
+}
+
+static std::shared_ptr<arrow::RecordBatch> read_row_group(
+    mosaic::Reader& reader, uint32_t rg,
+    const uint32_t* cols = nullptr, uint32_t num_cols = 0)
+{
+    struct ArrowArray c_array;
+    struct ArrowSchema c_schema;
+
+    if (cols && num_cols > 0) {
+        reader.read_row_group(rg, cols, num_cols, &c_array, &c_schema);
+    } else {
+        reader.read_row_group(rg, &c_array, &c_schema);
+    }
+
+    auto result = arrow::ImportRecordBatch(&c_array, &c_schema);
+    assert(result.ok());
+    return result.ValueUnsafe();
+}
+
+// ======================== Tests ========================
+
+static void test_basic_roundtrip() {
+    auto schema = arrow::schema({
+        arrow::field("id", arrow::int32(), false),
+        arrow::field("name", arrow::utf8()),
+        arrow::field("score", arrow::float64()),
+    });
+
+    arrow::Int32Builder id_b;
+    arrow::StringBuilder name_b;
+    arrow::DoubleBuilder score_b;
+    for (int i = 0; i < 50; i++) {
+        assert(id_b.Append(i).ok());
+        assert(name_b.Append("user_" + std::to_string(i)).ok());
+        assert(score_b.Append(i * 1.5).ok());
+    }
+    auto batch = arrow::RecordBatch::Make(schema, 50, {
+        id_b.Finish().ValueUnsafe(),
+        name_b.Finish().ValueUnsafe(),
+        score_b.Finish().ValueUnsafe(),
+    });
+
+    mosaic::WriterOptions opts;
+    opts.num_buckets = 2;
+    auto data_vec = write_and_get(schema, batch, opts);
+    ASSERT_TRUE(data_vec.size() > 32);
+
+    MemBuffer buf;
+    buf.data = data_vec;
+    auto reader = mosaic::make_reader(make_input(buf), buf.data.size());
+    ASSERT_TRUE(reader.num_row_groups() >= 1);
+
+    auto rb = read_row_group(reader, 0);
+    ASSERT_EQ(rb->num_rows(), 50);
+    ASSERT_EQ(rb->num_columns(), 3);
+
+    auto ids = std::static_pointer_cast<arrow::Int32Array>(rb->column(0));
+    auto names = std::static_pointer_cast<arrow::StringArray>(rb->column(1));
+    auto scores = std::static_pointer_cast<arrow::DoubleArray>(rb->column(2));
+
+    for (int i = 0; i < 50; i++) {
+        ASSERT_EQ(ids->Value(i), i);
+        ASSERT_EQ(names->GetString(i), "user_" + std::to_string(i));
+        ASSERT_TRUE(std::abs(scores->Value(i) - i * 1.5) < 1e-9);
+    }
+    printf("  PASS test_basic_roundtrip\n");
+}
+
+static void test_null_values() {
+    auto schema = arrow::schema({
+        arrow::field("id", arrow::int32()),
+        arrow::field("name", arrow::utf8()),
+    });
+
+    arrow::Int32Builder id_b;
+    assert(id_b.Append(1).ok());
+    assert(id_b.Append(2).ok());
+    assert(id_b.Append(3).ok());
+
+    arrow::StringBuilder name_b;
+    assert(name_b.Append("hello").ok());
+    assert(name_b.AppendNull().ok());
+    assert(name_b.Append("world").ok());
+
+    auto batch = arrow::RecordBatch::Make(schema, 3, {
+        id_b.Finish().ValueUnsafe(), name_b.Finish().ValueUnsafe()});
+    auto data_vec = write_and_get(schema, batch);
+
+    MemBuffer buf;
+    buf.data = data_vec;
+    auto reader = mosaic::make_reader(make_input(buf), buf.data.size());
+    auto rb = read_row_group(reader, 0);
+    ASSERT_EQ(rb->num_rows(), 3);
+
+    auto names = std::static_pointer_cast<arrow::StringArray>(rb->column(1));
+    ASSERT_TRUE(!names->IsNull(0));
+    ASSERT_EQ(names->GetString(0), "hello");
+    ASSERT_TRUE(names->IsNull(1));
+    ASSERT_TRUE(!names->IsNull(2));
+    ASSERT_EQ(names->GetString(2), "world");
+    printf("  PASS test_null_values\n");
+}
+
+static void test_all_types() {
+    auto schema = arrow::schema({
+        arrow::field("f_bool", arrow::boolean()),
+        arrow::field("f_int8", arrow::int8()),
+        arrow::field("f_int16", arrow::int16()),
+        arrow::field("f_int32", arrow::int32()),
+        arrow::field("f_int64", arrow::int64()),
+        arrow::field("f_float32", arrow::float32()),
+        arrow::field("f_float64", arrow::float64()),
+        arrow::field("f_utf8", arrow::utf8()),
+        arrow::field("f_binary", arrow::binary()),
+    });
+
+    arrow::BooleanBuilder bool_b;
+    assert(bool_b.Append(true).ok());
+    arrow::Int8Builder i8_b;
+    assert(i8_b.Append(42).ok());
+    arrow::Int16Builder i16_b;
+    assert(i16_b.Append(1234).ok());
+    arrow::Int32Builder i32_b;
+    assert(i32_b.Append(100000).ok());
+    arrow::Int64Builder i64_b;
+    assert(i64_b.Append(9999999999LL).ok());
+    arrow::FloatBuilder f32_b;
+    assert(f32_b.Append(3.14f).ok());
+    arrow::DoubleBuilder f64_b;
+    assert(f64_b.Append(2.718281828).ok());
+    arrow::StringBuilder utf8_b;
+    assert(utf8_b.Append("hello").ok());
+    arrow::BinaryBuilder bin_b;
+    uint8_t bin_data[] = {0x01, 0x02};
+    assert(bin_b.Append(bin_data, 2).ok());
+
+    auto batch = arrow::RecordBatch::Make(schema, 1, {
+        bool_b.Finish().ValueUnsafe(),
+        i8_b.Finish().ValueUnsafe(),
+        i16_b.Finish().ValueUnsafe(),
+        i32_b.Finish().ValueUnsafe(),
+        i64_b.Finish().ValueUnsafe(),
+        f32_b.Finish().ValueUnsafe(),
+        f64_b.Finish().ValueUnsafe(),
+        utf8_b.Finish().ValueUnsafe(),
+        bin_b.Finish().ValueUnsafe(),
+    });
+
+    auto data_vec = write_and_get(schema, batch);
+
+    MemBuffer buf;
+    buf.data = data_vec;
+    auto reader = mosaic::make_reader(make_input(buf), buf.data.size());
+    auto rb = read_row_group(reader, 0);
+    ASSERT_EQ(rb->num_rows(), 1);
+    ASSERT_EQ(rb->num_columns(), 9);
+
+    
ASSERT_TRUE(std::static_pointer_cast<arrow::BooleanArray>(rb->column(0))->Value(0));
+    
ASSERT_EQ(std::static_pointer_cast<arrow::Int8Array>(rb->column(1))->Value(0), 
42);
+    
ASSERT_EQ(std::static_pointer_cast<arrow::Int16Array>(rb->column(2))->Value(0), 
1234);
+    
ASSERT_EQ(std::static_pointer_cast<arrow::Int32Array>(rb->column(3))->Value(0), 
100000);
+    
ASSERT_EQ(std::static_pointer_cast<arrow::Int64Array>(rb->column(4))->Value(0), 
9999999999LL);
+    
ASSERT_TRUE(std::abs(std::static_pointer_cast<arrow::FloatArray>(rb->column(5))->Value(0)
 - 3.14f) < 1e-5f);
+    
ASSERT_TRUE(std::abs(std::static_pointer_cast<arrow::DoubleArray>(rb->column(6))->Value(0)
 - 2.718281828) < 1e-9);
+    
ASSERT_EQ(std::static_pointer_cast<arrow::StringArray>(rb->column(7))->GetString(0),
 "hello");
+    printf("  PASS test_all_types\n");
+}
+
+static void test_projection() {
+    auto schema = arrow::schema({
+        arrow::field("a", arrow::int32()),
+        arrow::field("b", arrow::utf8()),
+        arrow::field("c", arrow::float64()),
+        arrow::field("d", arrow::utf8()),
+    });
+
+    arrow::Int32Builder ab;
+    arrow::StringBuilder bb, db;
+    arrow::DoubleBuilder cb;
+    for (int i = 0; i < 20; i++) {
+        assert(ab.Append(i).ok());
+        assert(bb.Append("val_" + std::to_string(i)).ok());
+        assert(cb.Append(static_cast<double>(i)).ok());
+        assert(db.Append("extra_" + std::to_string(i)).ok());
+    }
+    auto batch = arrow::RecordBatch::Make(schema, 20, {
+        ab.Finish().ValueUnsafe(), bb.Finish().ValueUnsafe(),
+        cb.Finish().ValueUnsafe(), db.Finish().ValueUnsafe(),
+    });
+
+    mosaic::WriterOptions opts;
+    opts.num_buckets = 2;
+    auto data_vec = write_and_get(schema, batch, opts);
+
+    MemBuffer buf;
+    buf.data = data_vec;
+    auto reader = mosaic::make_reader(make_input(buf), buf.data.size());
+
+    uint32_t cols[] = {0, 1};
+    auto rb = read_row_group(reader, 0, cols, 2);
+    ASSERT_EQ(rb->num_columns(), 2);
+    ASSERT_EQ(rb->num_rows(), 20);
+    printf("  PASS test_projection\n");
+}
+
+static void test_statistics() {
+    auto schema = arrow::schema({
+        arrow::field("id", arrow::int32()),
+        arrow::field("name", arrow::utf8()),
+        arrow::field("score", arrow::float64()),
+    });
+
+    arrow::Int32Builder id_b;
+    arrow::StringBuilder name_b;
+    arrow::DoubleBuilder score_b;
+    for (int i = 0; i < 10; i++) {
+        assert(id_b.Append(i * 10).ok());
+        assert(name_b.Append("item_" + std::to_string(i)).ok());
+        assert(score_b.Append(i * 1.1).ok());
+    }
+    auto batch = arrow::RecordBatch::Make(schema, 10, {
+        id_b.Finish().ValueUnsafe(), name_b.Finish().ValueUnsafe(),
+        score_b.Finish().ValueUnsafe(),
+    });
+
+    mosaic::WriterOptions opts;
+    uint32_t stats_cols[] = {0, 2};
+    opts.stats_columns = stats_cols;
+    opts.num_stats_columns = 2;
+    auto data_vec = write_and_get(schema, batch, opts);
+
+    MemBuffer buf;
+    buf.data = data_vec;
+    auto reader = mosaic::make_reader(make_input(buf), buf.data.size());
+
+    auto stats = reader.get_row_group_statistics(0);
+    ASSERT_TRUE(stats.size() > 0);
+
+    for (auto& s : stats) {
+        ASSERT_TRUE(s.column_index == 0 || s.column_index == 2);
+        ASSERT_EQ(s.null_count, 0u);
+        ASSERT_TRUE(s.has_min_max());
+    }
+    printf("  PASS test_statistics\n");
+}
+
+static void test_compression_zstd() {
+    auto schema = arrow::schema({
+        arrow::field("x", arrow::int32()),
+        arrow::field("y", arrow::utf8()),
+    });
+
+    arrow::Int32Builder xb;
+    arrow::StringBuilder yb;
+    for (int i = 0; i < 100; i++) {
+        assert(xb.Append(i).ok());
+        assert(yb.Append("v_" + std::to_string(i)).ok());
+    }
+    auto batch = arrow::RecordBatch::Make(schema, 100, {
+        xb.Finish().ValueUnsafe(), yb.Finish().ValueUnsafe(),
+    });
+
+    mosaic::WriterOptions opts;
+    opts.compression = 1;
+    opts.zstd_level = 3;
+    auto data_vec = write_and_get(schema, batch, opts);
+
+    MemBuffer buf;
+    buf.data = data_vec;
+    auto reader = mosaic::make_reader(make_input(buf), buf.data.size());
+    auto rb = read_row_group(reader, 0);
+    ASSERT_EQ(rb->num_rows(), 100);
+
+    auto xs = std::static_pointer_cast<arrow::Int32Array>(rb->column(0));
+    for (int i = 0; i < 100; i++) {
+        ASSERT_EQ(xs->Value(i), i);
+    }
+    printf("  PASS test_compression_zstd\n");
+}
+
+static void test_schema_roundtrip() {
+    auto schema = arrow::schema({
+        arrow::field("id", arrow::int32(), false),
+        arrow::field("name", arrow::utf8(), true),
+        arrow::field("score", arrow::float64(), true),
+    });
+
+    arrow::Int32Builder sr_id_b;
+    assert(sr_id_b.Append(1).ok());
+    arrow::StringBuilder sr_name_b;
+    assert(sr_name_b.Append("x").ok());
+    arrow::DoubleBuilder sr_score_b;
+    assert(sr_score_b.Append(1.0).ok());
+
+    auto batch = arrow::RecordBatch::Make(schema, 1, {
+        sr_id_b.Finish().ValueUnsafe(),
+        sr_name_b.Finish().ValueUnsafe(),
+        sr_score_b.Finish().ValueUnsafe(),
+    });
+
+    auto data_vec = write_and_get(schema, batch);
+
+    MemBuffer buf;
+    buf.data = data_vec;
+    auto reader = mosaic::make_reader(make_input(buf), buf.data.size());
+
+    struct ArrowSchema c_schema;
+    reader.export_schema(&c_schema);
+    auto imported = arrow::ImportSchema(&c_schema);
+    assert(imported.ok());
+    auto read_schema = imported.ValueUnsafe();
+
+    ASSERT_EQ(read_schema->num_fields(), 3);
+    ASSERT_EQ(read_schema->field(0)->name(), "id");
+    ASSERT_EQ(read_schema->field(1)->name(), "name");
+    ASSERT_EQ(read_schema->field(2)->name(), "score");
+    ASSERT_TRUE(!read_schema->field(0)->nullable());
+    ASSERT_TRUE(read_schema->field(1)->nullable());
+    printf("  PASS test_schema_roundtrip\n");
+}
+
+static void test_multiple_row_groups() {
+    auto schema = arrow::schema({
+        arrow::field("id", arrow::int32()),
+        arrow::field("data", arrow::int64()),
+    });
+
+    mosaic::WriterOptions opts;
+    opts.compression = 0;
+    opts.num_buckets = 1;
+    opts.row_group_max_size = 200;
+
+    MemBuffer write_buf;
+    struct ArrowSchema c_schema;
+    auto st = arrow::ExportSchema(*schema, &c_schema);
+    assert(st.ok());
+    mosaic::Writer writer(make_output(write_buf), &c_schema, opts);
+
+    const int total_rows = 500;
+    const int batch_size = 50;
+    for (int start = 0; start < total_rows; start += batch_size) {
+        int end = std::min(start + batch_size, total_rows);
+        int n = end - start;
+        arrow::Int32Builder id_b;
+        arrow::Int64Builder data_b;
+        for (int i = start; i < end; i++) {
+            assert(id_b.Append(i).ok());
+            assert(data_b.Append(static_cast<int64_t>(i) * 3).ok());
+        }
+        auto batch = arrow::RecordBatch::Make(schema, n, {
+            id_b.Finish().ValueUnsafe(), data_b.Finish().ValueUnsafe(),
+        });
+        struct ArrowArray c_array;
+        struct ArrowSchema c_batch_schema;
+        st = arrow::ExportRecordBatch(*batch, &c_array, &c_batch_schema);
+        assert(st.ok());
+        writer.write(&c_array, &c_batch_schema);
+    }
+    writer.close();
+    auto data_vec = write_buf.data;
+
+    MemBuffer buf;
+    buf.data = data_vec;
+    auto reader = mosaic::make_reader(make_input(buf), buf.data.size());
+    ASSERT_TRUE(reader.num_row_groups() > 1);
+
+    int offset = 0;
+    for (uint32_t rg = 0; rg < reader.num_row_groups(); rg++) {
+        auto rb = read_row_group(reader, rg);
+        auto ids = std::static_pointer_cast<arrow::Int32Array>(rb->column(0));
+        auto datas = 
std::static_pointer_cast<arrow::Int64Array>(rb->column(1));
+        for (int64_t i = 0; i < rb->num_rows(); i++) {
+            ASSERT_EQ(ids->Value(i), offset + static_cast<int>(i));
+            ASSERT_EQ(datas->Value(i), static_cast<int64_t>(offset + i) * 3);
+        }
+        offset += static_cast<int>(rb->num_rows());
+    }
+    ASSERT_EQ(offset, 500);
+    printf("  PASS test_multiple_row_groups\n");
+}
+
+int main() {
+    printf("Running Mosaic C++ tests...\n");
+    test_basic_roundtrip();
+    test_null_values();
+    test_all_types();
+    test_projection();
+    test_statistics();
+    test_compression_zstd();
+    test_schema_roundtrip();
+    test_multiple_row_groups();
+    printf("All %d tests passed.\n", 8);
+    return 0;
+}
diff --git a/Cargo.toml b/ffi/Cargo.toml
similarity index 69%
copy from Cargo.toml
copy to ffi/Cargo.toml
index ff31345..e690775 100644
--- a/Cargo.toml
+++ b/ffi/Cargo.toml
@@ -15,8 +15,20 @@
 # specific language governing permissions and limitations
 # under the License.
 
-[workspace]
-members = ["core"]
-resolver = "2"
+[package]
+name = "mosaic-ffi"
+version = "0.1.0"
+edition = "2021"
+description = "Mosaic file format — C/C++ FFI bindings"
+license = "Apache-2.0"
 
-[profile.release]
+[lib]
+crate-type = ["cdylib"]
+
+[dependencies]
+mosaic-core = { path = "../core" }
+arrow-schema = "58"
+arrow-array = { version = "58", features = ["ffi"] }
+
+[build-dependencies]
+cbindgen = "0.27"
diff --git a/ffi/build.rs b/ffi/build.rs
new file mode 100644
index 0000000..cc64f1c
--- /dev/null
+++ b/ffi/build.rs
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+fn main() {
+    let crate_dir = std::env::var("CARGO_MANIFEST_DIR").unwrap();
+    let workspace_dir = std::path::Path::new(&crate_dir).parent().unwrap();
+    let output_path = workspace_dir.join("include").join("mosaic.h");
+
+    cbindgen::Builder::new()
+        .with_crate(&crate_dir)
+        .with_config(cbindgen::Config::from_file("cbindgen.toml").unwrap())
+        .generate()
+        .expect("Unable to generate C bindings")
+        .write_to_file(output_path);
+}
diff --git a/Cargo.toml b/ffi/cbindgen.toml
similarity index 61%
copy from Cargo.toml
copy to ffi/cbindgen.toml
index ff31345..8b55127 100644
--- a/Cargo.toml
+++ b/ffi/cbindgen.toml
@@ -15,8 +15,26 @@
 # specific language governing permissions and limitations
 # under the License.
 
-[workspace]
-members = ["core"]
-resolver = "2"
+language = "C"
+header = "/* Generated by cbindgen — do not edit manually */"
+include_guard = "MOSAIC_H"
+autogen_warning = "/* Warning: this file is autogenerated by cbindgen. Don't 
modify this manually. */"
+tab_width = 4
+style = "both"
+after_includes = "#include \"arrow_c_data.h\""
 
-[profile.release]
+[defines]
+
+[export]
+prefix = ""
+
+[export.rename]
+"FFI_ArrowSchema" = "ArrowSchema"
+"FFI_ArrowArray" = "ArrowArray"
+
+[enum]
+rename_variants = "ScreamingSnakeCase"
+prefix_with_name = true
+
+[fn]
+rename_args = "None"
diff --git a/ffi/src/lib.rs b/ffi/src/lib.rs
new file mode 100644
index 0000000..7e1cd8f
--- /dev/null
+++ b/ffi/src/lib.rs
@@ -0,0 +1,819 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#![allow(clippy::missing_safety_doc)]
+
+use std::cell::RefCell;
+use std::ffi::CString;
+use std::io;
+use std::os::raw::{c_char, c_int};
+use std::panic::{self, AssertUnwindSafe};
+use std::ptr;
+
+use arrow_array::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
+use arrow_array::{RecordBatch, StructArray};
+use arrow_schema::{Field, Schema};
+
+use mosaic_core::reader::{InputFile, MosaicReader, ReaderAccess};
+use mosaic_core::spec::*;
+use mosaic_core::writer::{MosaicWriter, OutputFile, WriterOptions};
+
+thread_local! {
+    static LAST_ERROR: RefCell<Option<CString>> = const { RefCell::new(None) };
+}
+
+fn set_error(msg: String) {
+    LAST_ERROR.with(|e| {
+        *e.borrow_mut() = CString::new(msg).ok();
+    });
+}
+
+fn panic_message(e: &Box<dyn std::any::Any + Send>) -> String {
+    if let Some(s) = e.downcast_ref::<String>() {
+        format!("native panic: {}", s)
+    } else if let Some(s) = e.downcast_ref::<&str>() {
+        format!("native panic: {}", s)
+    } else {
+        "native panic: unknown".to_string()
+    }
+}
+
+// ======================== OutputFile ========================
+
+#[repr(C)]
+pub struct MosaicOutputFile {
+    pub ctx: *mut std::ffi::c_void,
+    pub write_fn: Option<unsafe extern "C" fn(*mut std::ffi::c_void, *const 
u8, usize) -> i32>,
+    pub flush_fn: Option<unsafe extern "C" fn(*mut std::ffi::c_void) -> i32>,
+    pub get_pos_fn: Option<unsafe extern "C" fn(*mut std::ffi::c_void) -> i64>,
+}
+
+struct FfiOutputFile {
+    raw: MosaicOutputFile,
+    pos: u64,
+}
+
+impl OutputFile for FfiOutputFile {
+    fn write(&mut self, data: &[u8]) -> io::Result<()> {
+        if let Some(write_fn) = self.raw.write_fn {
+            let result = unsafe { write_fn(self.raw.ctx, data.as_ptr(), 
data.len()) };
+            if result != 0 {
+                return Err(io::Error::other("write callback failed"));
+            }
+            self.pos += data.len() as u64;
+            Ok(())
+        } else {
+            Err(io::Error::other("write_fn is null"))
+        }
+    }
+
+    fn flush(&mut self) -> io::Result<()> {
+        if let Some(flush_fn) = self.raw.flush_fn {
+            let result = unsafe { flush_fn(self.raw.ctx) };
+            if result != 0 {
+                return Err(io::Error::other("flush callback failed"));
+            }
+        }
+        Ok(())
+    }
+
+    fn pos(&self) -> u64 {
+        if let Some(get_pos_fn) = self.raw.get_pos_fn {
+            let p = unsafe { get_pos_fn(self.raw.ctx) };
+            if p < 0 {
+                return self.pos;
+            }
+            p as u64
+        } else {
+            self.pos
+        }
+    }
+}
+
+// ======================== Writer Options ========================
+
+#[repr(C)]
+pub struct MosaicWriterOptions {
+    pub compression: u8,
+    pub zstd_level: c_int,
+    pub num_buckets: u32,
+    pub row_group_max_size: u64,
+    pub max_dict_total_bytes: u32,
+    pub max_dict_entries: u32,
+    pub stats_columns: *const u32,
+    pub num_stats_columns: u32,
+    pub page_size_threshold: u32,
+}
+
+/// Returns default writer options.
+#[no_mangle]
+pub extern "C" fn mosaic_writer_options_default() -> MosaicWriterOptions {
+    MosaicWriterOptions {
+        compression: COMPRESSION_ZSTD,
+        zstd_level: DEFAULT_ZSTD_LEVEL as c_int,
+        num_buckets: DEFAULT_NUM_BUCKETS as u32,
+        row_group_max_size: DEFAULT_ROW_GROUP_MAX_SIZE,
+        max_dict_total_bytes: DEFAULT_DICT_MAX_TOTAL_BYTES as u32,
+        max_dict_entries: DEFAULT_DICT_MAX_ENTRIES as u32,
+        stats_columns: ptr::null(),
+        num_stats_columns: 0,
+        page_size_threshold: DEFAULT_PAGE_SIZE_THRESHOLD as u32,
+    }
+}
+
+// ======================== Writer ========================
+
+pub struct MosaicWriterHandle {
+    inner: MosaicWriter<FfiOutputFile>,
+}
+
+/// Open a writer. The `ffi_schema` is consumed: ownership transfers to the 
callee
+/// and the caller's struct is zeroed to prevent double-release.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_writer_open(
+    stream: MosaicOutputFile,
+    ffi_schema: *mut FFI_ArrowSchema,
+    options: MosaicWriterOptions,
+) -> *mut MosaicWriterHandle {
+    let result = panic::catch_unwind(AssertUnwindSafe(|| {
+        if ffi_schema.is_null() {
+            set_error("null schema".into());
+            return ptr::null_mut();
+        }
+        let ffi_owned = ptr::read(ffi_schema);
+        ptr::write_bytes(ffi_schema, 0, 1);
+        let arrow_schema = match Schema::try_from(&ffi_owned) {
+            Ok(s) => s,
+            Err(e) => {
+                set_error(format!("Arrow schema import failed: {}", e));
+                return ptr::null_mut();
+            }
+        };
+        let ffi_stream = FfiOutputFile {
+            raw: stream,
+            pos: 0,
+        };
+        let stats_cols = if options.stats_columns.is_null() || 
options.num_stats_columns == 0 {
+            Vec::new()
+        } else {
+            std::slice::from_raw_parts(options.stats_columns, 
options.num_stats_columns as usize)
+                .iter()
+                .map(|&c| c as usize)
+                .collect()
+        };
+        let num_buckets = if options.num_buckets == 0 {
+            DEFAULT_NUM_BUCKETS
+        } else {
+            options.num_buckets as usize
+        };
+        let opts = WriterOptions {
+            compression: options.compression,
+            zstd_level: options.zstd_level,
+            num_buckets,
+            row_group_max_size: options.row_group_max_size,
+            max_dict_total_bytes: options.max_dict_total_bytes as usize,
+            max_dict_entries: options.max_dict_entries as usize,
+            stats_columns: stats_cols,
+            page_size_threshold: options.page_size_threshold as usize,
+        };
+        match MosaicWriter::new(ffi_stream, &arrow_schema, opts) {
+            Ok(writer) => Box::into_raw(Box::new(MosaicWriterHandle { inner: 
writer })),
+            Err(e) => {
+                set_error(format!("writer open failed: {}", e));
+                ptr::null_mut()
+            }
+        }
+    }));
+    match result {
+        Ok(val) => val,
+        Err(e) => {
+            set_error(panic_message(&e));
+            ptr::null_mut()
+        }
+    }
+}
+
+/// Close the writer (flush all data and write footer).
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_writer_close(handle: *mut MosaicWriterHandle) 
-> c_int {
+    let result = panic::catch_unwind(AssertUnwindSafe(|| {
+        if handle.is_null() {
+            return -1;
+        }
+        let h = &mut *handle;
+        match h.inner.close() {
+            Ok(()) => 0,
+            Err(e) => {
+                set_error(format!("close failed: {}", e));
+                -1
+            }
+        }
+    }));
+    match result {
+        Ok(val) => val,
+        Err(e) => {
+            set_error(panic_message(&e));
+            -1
+        }
+    }
+}
+
+/// Free the writer handle.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_writer_free(handle: *mut MosaicWriterHandle) {
+    if !handle.is_null() {
+        drop(Box::from_raw(handle));
+    }
+}
+
+/// Get estimated file size for rolling decisions.
+/// Returns 0 on success, -1 on error. Writes result to `out`.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_writer_estimated_file_size(
+    handle: *const MosaicWriterHandle,
+    out: *mut i64,
+) -> c_int {
+    if handle.is_null() || out.is_null() {
+        set_error("null pointer".into());
+        return -1;
+    }
+    *out = (&*handle).inner.estimated_file_size() as i64;
+    0
+}
+
+/// Write an Arrow RecordBatch to the writer via the Arrow C Data Interface.
+/// The caller provides ArrowArray and ArrowSchema pointers that represent the 
batch.
+/// Ownership of both structs transfers to the callee; the caller's structs 
are zeroed.
+/// Returns 0 on success, -1 on error.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_writer_write_batch(
+    handle: *mut MosaicWriterHandle,
+    ffi_array: *mut FFI_ArrowArray,
+    ffi_schema: *mut FFI_ArrowSchema,
+) -> c_int {
+    let result = panic::catch_unwind(AssertUnwindSafe(|| {
+        if handle.is_null() || ffi_array.is_null() || ffi_schema.is_null() {
+            set_error("null pointer".into());
+            return -1;
+        }
+        let h = &mut *handle;
+        let arr_owned = ptr::read(ffi_array);
+        let schema_owned = ptr::read(ffi_schema);
+        ptr::write_bytes(ffi_array, 0, 1);
+        ptr::write_bytes(ffi_schema, 0, 1);
+        let arr_data = match arrow_array::ffi::from_ffi(arr_owned, 
&schema_owned) {
+            Ok(d) => d,
+            Err(e) => {
+                set_error(format!("Arrow import failed: {}", e));
+                return -1;
+            }
+        };
+        let struct_array = StructArray::from(arr_data);
+        let batch = RecordBatch::from(struct_array);
+        match h.inner.write_batch(&batch) {
+            Ok(()) => 0,
+            Err(e) => {
+                set_error(format!("write_batch failed: {}", e));
+                -1
+            }
+        }
+    }));
+    result.unwrap_or_else(|e| {
+        set_error(panic_message(&e));
+        -1
+    })
+}
+
+// ======================== Reader ========================
+
+/// Input file for reading Mosaic files.
+///
+/// `read_at_fn` must be thread-safe: the reader may invoke it concurrently
+/// from multiple threads to perform parallel IO.
+#[repr(C)]
+pub struct MosaicInputFile {
+    pub ctx: *mut std::ffi::c_void,
+    pub read_at_fn: Option<unsafe extern "C" fn(*mut std::ffi::c_void, u64, 
*mut u8, usize) -> i32>,
+    pub length_fn: Option<unsafe extern "C" fn(*mut std::ffi::c_void) -> u64>,
+}
+
+struct FfiInputFile {
+    raw: MosaicInputFile,
+}
+
+unsafe impl Send for FfiInputFile {}
+unsafe impl Sync for FfiInputFile {}
+
+impl InputFile for FfiInputFile {
+    fn read_at(&self, offset: u64, buf: &mut [u8]) -> io::Result<()> {
+        if let Some(read_at_fn) = self.raw.read_at_fn {
+            let result = unsafe { read_at_fn(self.raw.ctx, offset, 
buf.as_mut_ptr(), buf.len()) };
+            if result != 0 {
+                return Err(io::Error::other("read_at callback failed"));
+            }
+            Ok(())
+        } else {
+            Err(io::Error::other("read_at_fn is null"))
+        }
+    }
+}
+
+pub struct MosaicReaderHandle {
+    reader: MosaicReader<FfiInputFile>,
+}
+
+pub struct MosaicRowGroupReaderHandle {
+    inner: mosaic_core::reader::RowGroupReader,
+}
+
+/// Open a reader from an InputFile callback.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_reader_open(
+    input_file: MosaicInputFile,
+) -> *mut MosaicReaderHandle {
+    let result = panic::catch_unwind(AssertUnwindSafe(|| {
+        let file_len = if let Some(length_fn) = input_file.length_fn {
+            unsafe { length_fn(input_file.ctx) }
+        } else {
+            0
+        };
+        let ffi_input = FfiInputFile { raw: input_file };
+        match MosaicReader::new(ffi_input, file_len) {
+            Ok(reader) => Box::into_raw(Box::new(MosaicReaderHandle { reader 
})),
+            Err(e) => {
+                set_error(format!("open failed: {}", e));
+                ptr::null_mut()
+            }
+        }
+    }));
+    match result {
+        Ok(val) => val,
+        Err(e) => {
+            set_error(panic_message(&e));
+            ptr::null_mut()
+        }
+    }
+}
+
+/// Free a reader.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_reader_free(handle: *mut MosaicReaderHandle) {
+    if !handle.is_null() {
+        drop(Box::from_raw(handle));
+    }
+}
+
+/// Get number of row groups.
+/// Returns 0 on success, -1 on error. Writes result to `out`.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_reader_num_row_groups(
+    handle: *const MosaicReaderHandle,
+    out: *mut u32,
+) -> c_int {
+    if handle.is_null() || out.is_null() {
+        set_error("null pointer".into());
+        return -1;
+    }
+    *out = (*handle).reader.num_row_groups() as u32;
+    0
+}
+
+/// Export the reader's schema via the Arrow C Data Interface.
+/// Writes into caller-provided ArrowSchema struct.
+/// Returns 0 on success, -1 on error.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_reader_export_schema(
+    handle: *const MosaicReaderHandle,
+    out_schema: *mut FFI_ArrowSchema,
+) -> c_int {
+    let result = panic::catch_unwind(AssertUnwindSafe(|| {
+        if handle.is_null() || out_schema.is_null() {
+            set_error("null pointer".into());
+            return -1;
+        }
+        let h = &*handle;
+        let schema = h.reader.schema();
+        let fields: Vec<Field> = schema
+            .columns
+            .iter()
+            .map(|c| Field::new(&c.name, c.data_type.clone(), c.nullable))
+            .collect();
+        let arrow_schema = Schema::new(fields);
+        match FFI_ArrowSchema::try_from(&arrow_schema) {
+            Ok(ffi_schema) => {
+                ptr::write(out_schema, ffi_schema);
+                0
+            }
+            Err(e) => {
+                set_error(format!("Arrow schema export failed: {}", e));
+                -1
+            }
+        }
+    }));
+    match result {
+        Ok(val) => val,
+        Err(e) => {
+            set_error(panic_message(&e));
+            -1
+        }
+    }
+}
+
+/// Open a row group reader.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_reader_open_row_group(
+    handle: *mut MosaicReaderHandle,
+    rg_index: u32,
+) -> *mut MosaicRowGroupReaderHandle {
+    let result = panic::catch_unwind(AssertUnwindSafe(|| {
+        if handle.is_null() {
+            set_error("null reader handle".into());
+            return ptr::null_mut();
+        }
+        let h = &*handle;
+        match h.reader.row_group_reader(rg_index as usize) {
+            Ok(rg) => Box::into_raw(Box::new(MosaicRowGroupReaderHandle { 
inner: rg })),
+            Err(e) => {
+                set_error(format!("open row group failed: {}", e));
+                ptr::null_mut()
+            }
+        }
+    }));
+    match result {
+        Ok(val) => val,
+        Err(e) => {
+            set_error(panic_message(&e));
+            ptr::null_mut()
+        }
+    }
+}
+
+/// Open a row group reader with projection pushdown.
+/// Only decompresses buckets containing the specified columns.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_reader_open_row_group_projected(
+    handle: *mut MosaicReaderHandle,
+    rg_index: u32,
+    columns: *const u32,
+    num_columns: u32,
+) -> *mut MosaicRowGroupReaderHandle {
+    let result = panic::catch_unwind(AssertUnwindSafe(|| {
+        if handle.is_null() {
+            set_error("null reader handle".into());
+            return ptr::null_mut();
+        }
+        if columns.is_null() && num_columns > 0 {
+            set_error("null columns pointer".into());
+            return ptr::null_mut();
+        }
+        let h = &*handle;
+        let col_indices: Vec<usize> = if num_columns > 0 {
+            std::slice::from_raw_parts(columns, num_columns as usize)
+                .iter()
+                .map(|&c| c as usize)
+                .collect()
+        } else {
+            Vec::new()
+        };
+        match h
+            .reader
+            .row_group_reader_projected(rg_index as usize, &col_indices)
+        {
+            Ok(rg) => Box::into_raw(Box::new(MosaicRowGroupReaderHandle { 
inner: rg })),
+            Err(e) => {
+                set_error(format!("open row group projected failed: {}", e));
+                ptr::null_mut()
+            }
+        }
+    }));
+    match result {
+        Ok(val) => val,
+        Err(e) => {
+            set_error(panic_message(&e));
+            ptr::null_mut()
+        }
+    }
+}
+
+/// Free a row group reader.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_row_group_reader_free(handle: *mut 
MosaicRowGroupReaderHandle) {
+    if !handle.is_null() {
+        drop(Box::from_raw(handle));
+    }
+}
+
+/// Get number of rows in the row group.
+/// Returns 0 on success, -1 on error. Writes result to `out`.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_row_group_reader_num_rows(
+    handle: *const MosaicRowGroupReaderHandle,
+    out: *mut u32,
+) -> c_int {
+    if handle.is_null() || out.is_null() {
+        set_error("null pointer".into());
+        return -1;
+    }
+    *out = (*handle).inner.num_rows() as u32;
+    0
+}
+
+// ======================== Row Group Stats ========================
+
+/// Get number of stats entries for a row group.
+/// Returns 0 on success, -1 on error. Writes result to `out`.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_reader_row_group_num_stats(
+    handle: *const MosaicReaderHandle,
+    rg_index: u32,
+    out: *mut u32,
+) -> c_int {
+    if handle.is_null() || out.is_null() {
+        set_error("null pointer".into());
+        return -1;
+    }
+    let h = &*handle;
+    let stats = match h.reader.row_group_stats(rg_index as usize) {
+        Ok(s) => s,
+        Err(e) => {
+            set_error(e.to_string());
+            return -1;
+        }
+    };
+    *out = stats.len() as u32;
+    0
+}
+
+/// Get the column index for a stats entry.
+/// Returns 0 on success, -1 on error. Writes result to `out`.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_reader_row_group_stat_column_index(
+    handle: *const MosaicReaderHandle,
+    rg_index: u32,
+    stat_index: u32,
+    out: *mut u32,
+) -> c_int {
+    if handle.is_null() || out.is_null() {
+        set_error("null pointer".into());
+        return -1;
+    }
+    let h = &*handle;
+    let stats = match h.reader.row_group_stats(rg_index as usize) {
+        Ok(s) => s,
+        Err(e) => {
+            set_error(e.to_string());
+            return -1;
+        }
+    };
+    let idx = stat_index as usize;
+    if idx >= stats.len() {
+        set_error("stat_index out of range".into());
+        return -1;
+    }
+    *out = stats[idx].column_index as u32;
+    0
+}
+
+/// Get the null count for a stats entry.
+/// Returns 0 on success, -1 on error. Writes result to `out`.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_reader_row_group_stat_null_count(
+    handle: *const MosaicReaderHandle,
+    rg_index: u32,
+    stat_index: u32,
+    out: *mut u64,
+) -> c_int {
+    if handle.is_null() || out.is_null() {
+        set_error("null pointer".into());
+        return -1;
+    }
+    let h = &*handle;
+    let stats = match h.reader.row_group_stats(rg_index as usize) {
+        Ok(s) => s,
+        Err(e) => {
+            set_error(e.to_string());
+            return -1;
+        }
+    };
+    let idx = stat_index as usize;
+    if idx >= stats.len() {
+        set_error("stat_index out of range".into());
+        return -1;
+    }
+    *out = stats[idx].null_count as u64;
+    0
+}
+
+/// Get the min value for a stats entry as raw bytes.
+/// Returns null if the column is all-null (no min). Sets out_len to the byte 
length.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_reader_row_group_stat_min(
+    handle: *const MosaicReaderHandle,
+    rg_index: u32,
+    stat_index: u32,
+    out_len: *mut usize,
+) -> *const u8 {
+    stat_value_ptr(handle, rg_index, stat_index, out_len, true)
+}
+
+/// Get the max value for a stats entry as raw bytes.
+/// Returns null if the column is all-null (no max). Sets out_len to the byte 
length.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_reader_row_group_stat_max(
+    handle: *const MosaicReaderHandle,
+    rg_index: u32,
+    stat_index: u32,
+    out_len: *mut usize,
+) -> *const u8 {
+    stat_value_ptr(handle, rg_index, stat_index, out_len, false)
+}
+
+thread_local! {
+    static STAT_MIN_BUF: RefCell<Vec<u8>> = const { RefCell::new(Vec::new()) };
+    static STAT_MAX_BUF: RefCell<Vec<u8>> = const { RefCell::new(Vec::new()) };
+}
+
+unsafe fn stat_value_ptr(
+    handle: *const MosaicReaderHandle,
+    rg_index: u32,
+    stat_index: u32,
+    out_len: *mut usize,
+    is_min: bool,
+) -> *const u8 {
+    if handle.is_null() || out_len.is_null() {
+        return ptr::null();
+    }
+    let h = &*handle;
+    let stats = match h.reader.row_group_stats(rg_index as usize) {
+        Ok(s) => s,
+        Err(_) => {
+            *out_len = 0;
+            return ptr::null();
+        }
+    };
+    let idx = stat_index as usize;
+    if idx >= stats.len() {
+        *out_len = 0;
+        return ptr::null();
+    }
+    let value = if is_min {
+        &stats[idx].min
+    } else {
+        &stats[idx].max
+    };
+    match value {
+        Some(v) => {
+            let bytes = v.to_be_bytes();
+            let buf_ref = if is_min { &STAT_MIN_BUF } else { &STAT_MAX_BUF };
+            buf_ref.with(|buf| {
+                let mut b = buf.borrow_mut();
+                *b = bytes;
+                *out_len = b.len();
+                b.as_ptr()
+            })
+        }
+        None => {
+            *out_len = 0;
+            ptr::null()
+        }
+    }
+}
+
+// ======================== Record Batch (Arrow C Data Interface) 
========================
+
+pub struct MosaicRecordBatchHandle {
+    batch: RecordBatch,
+}
+
+/// Read the entire row group as an Arrow RecordBatch.
+/// Returns a record batch handle, or null on error.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_row_group_reader_read_columns(
+    handle: *mut MosaicRowGroupReaderHandle,
+) -> *mut MosaicRecordBatchHandle {
+    let result = panic::catch_unwind(AssertUnwindSafe(|| {
+        if handle.is_null() {
+            set_error("null handle".into());
+            return ptr::null_mut();
+        }
+        let h = &mut *handle;
+        match h.inner.read_columns() {
+            Ok(batch) => Box::into_raw(Box::new(MosaicRecordBatchHandle { 
batch })),
+            Err(e) => {
+                set_error(format!("read_columns failed: {}", e));
+                ptr::null_mut()
+            }
+        }
+    }));
+    match result {
+        Ok(val) => val,
+        Err(e) => {
+            set_error(panic_message(&e));
+            ptr::null_mut()
+        }
+    }
+}
+
+/// Get number of rows in the record batch.
+/// Returns 0 on success, -1 on error. Writes result to `out`.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_record_batch_num_rows(
+    handle: *const MosaicRecordBatchHandle,
+    out: *mut u32,
+) -> c_int {
+    if handle.is_null() || out.is_null() {
+        set_error("null pointer".into());
+        return -1;
+    }
+    *out = (*handle).batch.num_rows() as u32;
+    0
+}
+
+/// Get number of columns in the record batch.
+/// Returns 0 on success, -1 on error. Writes result to `out`.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_record_batch_num_columns(
+    handle: *const MosaicRecordBatchHandle,
+    out: *mut u32,
+) -> c_int {
+    if handle.is_null() || out.is_null() {
+        set_error("null pointer".into());
+        return -1;
+    }
+    *out = (*handle).batch.num_columns() as u32;
+    0
+}
+
+/// Export the record batch via the Arrow C Data Interface.
+/// Writes into caller-provided ArrowArray and ArrowSchema structs.
+/// Returns 0 on success, -1 on error.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_record_batch_export(
+    handle: *const MosaicRecordBatchHandle,
+    out_array: *mut FFI_ArrowArray,
+    out_schema: *mut FFI_ArrowSchema,
+) -> c_int {
+    let result = panic::catch_unwind(AssertUnwindSafe(|| {
+        if handle.is_null() || out_array.is_null() || out_schema.is_null() {
+            set_error("null pointer".into());
+            return -1;
+        }
+        let h = &*handle;
+        let struct_array = StructArray::from(h.batch.clone());
+        match arrow_array::ffi::to_ffi(&struct_array.into()) {
+            Ok((ffi_array, ffi_schema)) => {
+                ptr::write(out_array, ffi_array);
+                ptr::write(out_schema, ffi_schema);
+                0
+            }
+            Err(e) => {
+                set_error(format!("Arrow export failed: {}", e));
+                -1
+            }
+        }
+    }));
+    match result {
+        Ok(val) => val,
+        Err(e) => {
+            set_error(panic_message(&e));
+            -1
+        }
+    }
+}
+
+/// Free a record batch handle.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_record_batch_free(handle: *mut 
MosaicRecordBatchHandle) {
+    if !handle.is_null() {
+        drop(Box::from_raw(handle));
+    }
+}
+
+// ======================== Error ========================
+
+/// Get the last error message. Returns a NUL-terminated pointer to a 
thread-local string.
+/// The pointer is valid until the next FFI call on the same thread.
+#[no_mangle]
+pub extern "C" fn mosaic_last_error() -> *const c_char {
+    LAST_ERROR.with(|e| {
+        let borrow = e.borrow();
+        match borrow.as_ref() {
+            Some(cs) => cs.as_ptr(),
+            None => ptr::null(),
+        }
+    })
+}
diff --git a/include/arrow_c_data.h b/include/arrow_c_data.h
new file mode 100644
index 0000000..95810ea
--- /dev/null
+++ b/include/arrow_c_data.h
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef ARROW_C_DATA_H
+#define ARROW_C_DATA_H
+
+#include <stdint.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#ifndef ARROW_C_DATA_INTERFACE
+#define ARROW_C_DATA_INTERFACE
+
+#define ARROW_FLAG_DICTIONARY_ORDERED 1
+#define ARROW_FLAG_NULLABLE 2
+#define ARROW_FLAG_MAP_KEYS_SORTED 4
+
+struct ArrowSchema {
+    const char* format;
+    const char* name;
+    const char* metadata;
+    int64_t flags;
+    int64_t n_children;
+    struct ArrowSchema** children;
+    struct ArrowSchema* dictionary;
+    void (*release)(struct ArrowSchema*);
+    void* private_data;
+};
+
+struct ArrowArray {
+    int64_t length;
+    int64_t null_count;
+    int64_t offset;
+    int64_t n_buffers;
+    int64_t n_children;
+    const void** buffers;
+    struct ArrowArray** children;
+    struct ArrowArray* dictionary;
+    void (*release)(struct ArrowArray*);
+    void* private_data;
+};
+
+#endif /* ARROW_C_DATA_INTERFACE */
+
+typedef struct ArrowSchema ArrowSchema;
+typedef struct ArrowArray ArrowArray;
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ARROW_C_DATA_H */
diff --git a/include/mosaic.hpp b/include/mosaic.hpp
new file mode 100644
index 0000000..45c8fd1
--- /dev/null
+++ b/include/mosaic.hpp
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+extern "C" {
+#include "mosaic.h"
+}
+#include <cstdio>
+#include <functional>
+#include <memory>
+#include <stdexcept>
+#include <string>
+#include <utility>
+#include <vector>
+
+namespace mosaic {
+
+class Error : public std::runtime_error {
+public:
+    explicit Error(const std::string& msg) : std::runtime_error(msg) {}
+};
+
+inline void check(int result) {
+    if (result != 0) {
+        const char* err = mosaic_last_error();
+        throw Error(err ? err : "unknown error");
+    }
+}
+
+// OutputFile adapter: wraps a C++ object into C callbacks
+struct OutputFile {
+    std::function<int(const uint8_t*, size_t)> write_fn;
+    std::function<int()> flush_fn;
+    std::function<int64_t()> get_pos_fn;
+};
+
+namespace detail {
+
+inline int32_t stream_write(void* ctx, const uint8_t* data, size_t len) 
noexcept {
+    try {
+        auto* cbs = static_cast<OutputFile*>(ctx);
+        return cbs->write_fn(data, len);
+    } catch (...) {
+        return -1;
+    }
+}
+
+inline int32_t stream_flush(void* ctx) noexcept {
+    try {
+        auto* cbs = static_cast<OutputFile*>(ctx);
+        return cbs->flush_fn();
+    } catch (...) {
+        return -1;
+    }
+}
+
+inline int64_t stream_get_pos(void* ctx) noexcept {
+    try {
+        auto* cbs = static_cast<OutputFile*>(ctx);
+        return cbs->get_pos_fn();
+    } catch (...) {
+        return -1;
+    }
+}
+
+} // namespace detail
+
+struct WriterOptions {
+    uint8_t compression = 1;  // ZSTD
+    int zstd_level = 1;
+    uint32_t num_buckets = 0;
+    uint64_t row_group_max_size = 256ULL * 1024 * 1024;
+    uint32_t max_dict_total_bytes = 32 * 1024;
+    uint32_t max_dict_entries = 255;
+    const uint32_t* stats_columns = nullptr;
+    uint32_t num_stats_columns = 0;
+    uint32_t page_size_threshold = 32 * 1024;
+};
+
+class Writer {
+public:
+    /// Construct a writer. `arrow_schema` is a pointer to an ArrowSchema 
(Arrow C Data Interface).
+    Writer(OutputFile callbacks, void* arrow_schema, WriterOptions opts = {})
+        : callbacks_(std::make_shared<OutputFile>(std::move(callbacks))) {
+        MosaicOutputFile stream;
+        stream.ctx = callbacks_.get();
+        stream.write_fn = detail::stream_write;
+        stream.flush_fn = detail::stream_flush;
+        stream.get_pos_fn = detail::stream_get_pos;
+
+        MosaicWriterOptions c_opts = mosaic_writer_options_default();
+        c_opts.compression = opts.compression;
+        c_opts.zstd_level = opts.zstd_level;
+        c_opts.num_buckets = opts.num_buckets;
+        c_opts.row_group_max_size = opts.row_group_max_size;
+        c_opts.max_dict_total_bytes = opts.max_dict_total_bytes;
+        c_opts.max_dict_entries = opts.max_dict_entries;
+        c_opts.stats_columns = opts.stats_columns;
+        c_opts.num_stats_columns = opts.num_stats_columns;
+        c_opts.page_size_threshold = opts.page_size_threshold;
+
+        handle_ = mosaic_writer_open(stream, 
static_cast<ArrowSchema*>(arrow_schema), c_opts);
+        if (!handle_) throw Error("failed to open writer");
+    }
+
+    Writer(const Writer&) = delete;
+    Writer& operator=(const Writer&) = delete;
+    Writer(Writer&& other) noexcept
+        : callbacks_(std::move(other.callbacks_)), handle_(other.handle_) {
+        other.handle_ = nullptr;
+    }
+    Writer& operator=(Writer&& other) noexcept {
+        if (this != &other) {
+            if (handle_) {
+                mosaic_writer_close(handle_);
+                mosaic_writer_free(handle_);
+            }
+            callbacks_ = std::move(other.callbacks_);
+            handle_ = other.handle_;
+            other.handle_ = nullptr;
+        }
+        return *this;
+    }
+
+    ~Writer() {
+        if (handle_) {
+            mosaic_writer_close(handle_);
+            mosaic_writer_free(handle_);
+        }
+    }
+
+    void write(void* ffi_array, void* ffi_schema) {
+        check(mosaic_writer_write_batch(
+            handle_,
+            static_cast<ArrowArray*>(ffi_array),
+            static_cast<ArrowSchema*>(ffi_schema)));
+    }
+
+    int64_t estimated_file_size() const {
+        int64_t out = 0;
+        check(mosaic_writer_estimated_file_size(handle_, &out));
+        return out;
+    }
+
+    void close() {
+        check(mosaic_writer_close(handle_));
+    }
+
+private:
+    std::shared_ptr<OutputFile> callbacks_;
+    MosaicWriterHandle* handle_ = nullptr;
+};
+
+// ======================== Statistics ========================
+
+struct ColumnStatistics {
+    uint32_t column_index;
+    uint64_t null_count;
+    std::vector<uint8_t> min_value;
+    std::vector<uint8_t> max_value;
+    bool has_min_max() const { return !min_value.empty(); }
+};
+
+// ======================== Reader ========================
+
+/// Input file for reading Mosaic files.
+///
+/// `read_at_fn` must be thread-safe: the reader may call it concurrently
+/// from multiple threads to perform parallel IO.
+struct InputFile {
+    std::function<int(uint64_t offset, uint8_t* buf, size_t len)> read_at_fn;
+    uint64_t file_length = 0;
+};
+
+namespace detail {
+
+inline int32_t input_read_at(void* ctx, uint64_t offset, uint8_t* buf, size_t 
len) noexcept {
+    try {
+        auto* cbs = static_cast<InputFile*>(ctx);
+        return cbs->read_at_fn(offset, buf, len);
+    } catch (...) {
+        return -1;
+    }
+}
+
+inline uint64_t input_length(void* ctx) noexcept {
+    auto* cbs = static_cast<InputFile*>(ctx);
+    return cbs->file_length;
+}
+
+} // namespace detail
+
+class Reader {
+public:
+    Reader(const Reader&) = delete;
+    Reader& operator=(const Reader&) = delete;
+    Reader(Reader&& other) noexcept
+        : callbacks_(std::move(other.callbacks_)), handle_(other.handle_) {
+        other.handle_ = nullptr;
+    }
+
+    ~Reader() {
+        if (handle_) mosaic_reader_free(handle_);
+    }
+
+    uint32_t num_row_groups() const {
+        uint32_t out = 0;
+        check(mosaic_reader_num_row_groups(handle_, &out));
+        return out;
+    }
+
+    void export_schema(void* out_schema) const {
+        check(mosaic_reader_export_schema(handle_, 
static_cast<ArrowSchema*>(out_schema)));
+    }
+
+    void read_row_group(uint32_t rg_index, void* out_array, void* out_schema) {
+        auto* rg = mosaic_reader_open_row_group(handle_, rg_index);
+        if (!rg) throw Error("failed to open row group");
+        auto* rb = mosaic_row_group_reader_read_columns(rg);
+        mosaic_row_group_reader_free(rg);
+        if (!rb) throw Error("read_columns failed");
+        int rc = mosaic_record_batch_export(rb,
+            static_cast<ArrowArray*>(out_array),
+            static_cast<ArrowSchema*>(out_schema));
+        mosaic_record_batch_free(rb);
+        if (rc != 0) throw Error("record_batch_export failed");
+    }
+
+    void read_row_group(uint32_t rg_index, const uint32_t* cols, uint32_t 
num_cols,
+                        void* out_array, void* out_schema) {
+        auto* rg = mosaic_reader_open_row_group_projected(handle_, rg_index, 
cols, num_cols);
+        if (!rg) throw Error("failed to open row group");
+        auto* rb = mosaic_row_group_reader_read_columns(rg);
+        mosaic_row_group_reader_free(rg);
+        if (!rb) throw Error("read_columns failed");
+        int rc = mosaic_record_batch_export(rb,
+            static_cast<ArrowArray*>(out_array),
+            static_cast<ArrowSchema*>(out_schema));
+        mosaic_record_batch_free(rb);
+        if (rc != 0) throw Error("record_batch_export failed");
+    }
+
+    std::vector<ColumnStatistics> get_row_group_statistics(uint32_t rg_index) 
const {
+        uint32_t n = 0;
+        check(mosaic_reader_row_group_num_stats(handle_, rg_index, &n));
+        std::vector<ColumnStatistics> result;
+        result.reserve(n);
+        for (uint32_t i = 0; i < n; i++) {
+            ColumnStatistics s;
+            check(mosaic_reader_row_group_stat_column_index(handle_, rg_index, 
i, &s.column_index));
+            check(mosaic_reader_row_group_stat_null_count(handle_, rg_index, 
i, &s.null_count));
+            size_t min_len = 0, max_len = 0;
+            const uint8_t* min_ptr = mosaic_reader_row_group_stat_min(handle_, 
rg_index, i, &min_len);
+            const uint8_t* max_ptr = mosaic_reader_row_group_stat_max(handle_, 
rg_index, i, &max_len);
+            if (min_ptr && min_len > 0)
+                s.min_value.assign(min_ptr, min_ptr + min_len);
+            if (max_ptr && max_len > 0)
+                s.max_value.assign(max_ptr, max_ptr + max_len);
+            result.push_back(std::move(s));
+        }
+        return result;
+    }
+
+private:
+    friend Reader make_reader(InputFile callbacks, uint64_t len);
+    Reader(std::shared_ptr<InputFile> cbs, MosaicReaderHandle* h)
+        : callbacks_(std::move(cbs)), handle_(h) {}
+    std::shared_ptr<InputFile> callbacks_;
+    MosaicReaderHandle* handle_;
+};
+
+inline Reader make_reader(InputFile callbacks, uint64_t len) {
+    callbacks.file_length = len;
+    auto cbs = std::make_shared<InputFile>(std::move(callbacks));
+    MosaicInputFile input;
+    input.ctx = cbs.get();
+    input.read_at_fn = detail::input_read_at;
+    input.length_fn = detail::input_length;
+    auto* handle = mosaic_reader_open(input);
+    if (!handle) throw Error("failed to open reader");
+    return Reader(std::move(cbs), handle);
+}
+
+} // namespace mosaic

Reply via email to