This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-mosaic.git
The following commit(s) were added to refs/heads/main by this push:
new eba2108 Add ffi and cpp (#4)
eba2108 is described below
commit eba2108971f3e91749c36c441b8b7deb49963488
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue May 19 10:25:41 2026 +0800
Add ffi and cpp (#4)
- ffi crate: C API via Arrow C Data Interface for writer and reader,
with cbindgen-generated header
- include/mosaic.hpp: C++ RAII wrapper (Writer, Reader, statistics)
- cpp/: CMake project with integration tests covering roundtrip,
nulls, all types, projection, statistics, compression, schema
export, and multiple row groups
---
.github/workflows/ci.yml | 39 ++
Cargo.toml | 2 +-
Cargo.toml => cpp/CMakeLists.txt | 25 +-
cpp/test_mosaic.cpp | 481 +++++++++++++++++++++++
Cargo.toml => ffi/Cargo.toml | 20 +-
ffi/build.rs | 29 ++
Cargo.toml => ffi/cbindgen.toml | 26 +-
ffi/src/lib.rs | 819 +++++++++++++++++++++++++++++++++++++++
include/arrow_c_data.h | 70 ++++
include/mosaic.hpp | 301 ++++++++++++++
10 files changed, 1799 insertions(+), 13 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 9850579..70b4fec 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -63,3 +63,42 @@ jobs:
- name: Test
run: cargo test
+
+ cpp-test:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v6
+
+ - name: Rust Cache
+ uses: actions/cache@v5
+ with:
+ path: |
+ ~/.cargo/registry
+ ~/.cargo/git
+ target
+ key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}
+ restore-keys: |
+ ${{ runner.os }}-cargo-
+
+ - name: Install Arrow C++
+ run: |
+ sudo apt-get update
+ sudo apt-get install -y apt-transport-https lsb-release
ca-certificates curl
+ curl -fsSL https://apache.jfrog.io/artifactory/arrow/$(lsb_release
--id --short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release
--codename --short).deb -o /tmp/arrow-apt.deb
+ sudo apt-get install -y /tmp/arrow-apt.deb
+ sudo apt-get update
+ sudo apt-get install -y libarrow-dev
+
+ - name: Build FFI library
+ run: cargo build --release -p mosaic-ffi
+
+ - name: Build C++ test
+ run: |
+ cmake -S cpp -B cpp/build \
+ -DMOSAIC_FFI_LIB=${{ github.workspace
}}/target/release/libmosaic_ffi.so
+ cmake --build cpp/build
+
+ - name: Run C++ test
+ run: cpp/build/test_mosaic
+ env:
+ LD_LIBRARY_PATH: ${{ github.workspace }}/target/release
diff --git a/Cargo.toml b/Cargo.toml
index ff31345..39c103c 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -16,7 +16,7 @@
# under the License.
[workspace]
-members = ["core"]
+members = ["core", "ffi"]
resolver = "2"
[profile.release]
diff --git a/Cargo.toml b/cpp/CMakeLists.txt
similarity index 50%
copy from Cargo.toml
copy to cpp/CMakeLists.txt
index ff31345..6e69fcb 100644
--- a/Cargo.toml
+++ b/cpp/CMakeLists.txt
@@ -15,8 +15,25 @@
# specific language governing permissions and limitations
# under the License.
-[workspace]
-members = ["core"]
-resolver = "2"
+cmake_minimum_required(VERSION 3.16)
+project(mosaic-cpp-test CXX)
-[profile.release]
+set(CMAKE_CXX_STANDARD 17)
+set(CMAKE_CXX_STANDARD_REQUIRED ON)
+
+find_package(Arrow REQUIRED)
+
+set(MOSAIC_FFI_LIB "" CACHE FILEPATH "Path to libmosaic_ffi shared library")
+if(NOT MOSAIC_FFI_LIB)
+ if(APPLE)
+ set(MOSAIC_FFI_LIB
"${CMAKE_SOURCE_DIR}/../target/release/libmosaic_ffi.dylib")
+ elseif(WIN32)
+ set(MOSAIC_FFI_LIB
"${CMAKE_SOURCE_DIR}/../target/release/mosaic_ffi.dll")
+ else()
+ set(MOSAIC_FFI_LIB
"${CMAKE_SOURCE_DIR}/../target/release/libmosaic_ffi.so")
+ endif()
+endif()
+
+add_executable(test_mosaic test_mosaic.cpp)
+target_include_directories(test_mosaic PRIVATE ${CMAKE_SOURCE_DIR}/../include)
+target_link_libraries(test_mosaic Arrow::arrow_shared ${MOSAIC_FFI_LIB})
diff --git a/cpp/test_mosaic.cpp b/cpp/test_mosaic.cpp
new file mode 100644
index 0000000..4de8cad
--- /dev/null
+++ b/cpp/test_mosaic.cpp
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "mosaic.hpp"
+
+#include <arrow/api.h>
+#include <arrow/c/bridge.h>
+
+#include <algorithm>
+#include <cassert>
+#include <cmath>
+#include <cstdio>
+#include <cstring>
+#include <functional>
+#include <vector>
+
+#define ASSERT_EQ(a, b) do { if ((a) != (b)) { \
+ fprintf(stderr, "FAIL %s:%d: %s != %s\n", __FILE__, __LINE__, #a, #b);
abort(); } } while(0)
+#define ASSERT_TRUE(x) do { if (!(x)) { \
+ fprintf(stderr, "FAIL %s:%d: %s\n", __FILE__, __LINE__, #x); abort(); } }
while(0)
+
+struct MemBuffer {
+ std::vector<uint8_t> data;
+ size_t pos = 0;
+};
+
+static mosaic::OutputFile make_output(MemBuffer& buf) {
+ mosaic::OutputFile out;
+ out.write_fn = [&buf](const uint8_t* data, size_t len) -> int {
+ buf.data.insert(buf.data.end(), data, data + len);
+ buf.pos += len;
+ return 0;
+ };
+ out.flush_fn = [&buf]() -> int { return 0; };
+ out.get_pos_fn = [&buf]() -> int64_t { return
static_cast<int64_t>(buf.pos); };
+ return out;
+}
+
+static mosaic::InputFile make_input(const MemBuffer& buf) {
+ mosaic::InputFile in;
+ in.read_at_fn = [&buf](uint64_t offset, uint8_t* dst, size_t len) -> int {
+ if (offset + len > buf.data.size()) return -1;
+ memcpy(dst, buf.data.data() + offset, len);
+ return 0;
+ };
+ in.file_length = buf.data.size();
+ return in;
+}
+
+static std::vector<uint8_t> write_and_get(
+ const std::shared_ptr<arrow::Schema>& schema,
+ const std::shared_ptr<arrow::RecordBatch>& batch,
+ mosaic::WriterOptions opts = {})
+{
+ MemBuffer buf;
+
+ struct ArrowSchema c_schema;
+ auto st = arrow::ExportSchema(*schema, &c_schema);
+ assert(st.ok());
+
+ mosaic::Writer writer(make_output(buf), &c_schema, opts);
+
+ struct ArrowArray c_array;
+ struct ArrowSchema c_batch_schema;
+ st = arrow::ExportRecordBatch(*batch, &c_array, &c_batch_schema);
+ assert(st.ok());
+
+ writer.write(&c_array, &c_batch_schema);
+ writer.close();
+ return buf.data;
+}
+
+static std::shared_ptr<arrow::RecordBatch> read_row_group(
+ mosaic::Reader& reader, uint32_t rg,
+ const uint32_t* cols = nullptr, uint32_t num_cols = 0)
+{
+ struct ArrowArray c_array;
+ struct ArrowSchema c_schema;
+
+ if (cols && num_cols > 0) {
+ reader.read_row_group(rg, cols, num_cols, &c_array, &c_schema);
+ } else {
+ reader.read_row_group(rg, &c_array, &c_schema);
+ }
+
+ auto result = arrow::ImportRecordBatch(&c_array, &c_schema);
+ assert(result.ok());
+ return result.ValueUnsafe();
+}
+
+// ======================== Tests ========================
+
+static void test_basic_roundtrip() {
+ auto schema = arrow::schema({
+ arrow::field("id", arrow::int32(), false),
+ arrow::field("name", arrow::utf8()),
+ arrow::field("score", arrow::float64()),
+ });
+
+ arrow::Int32Builder id_b;
+ arrow::StringBuilder name_b;
+ arrow::DoubleBuilder score_b;
+ for (int i = 0; i < 50; i++) {
+ assert(id_b.Append(i).ok());
+ assert(name_b.Append("user_" + std::to_string(i)).ok());
+ assert(score_b.Append(i * 1.5).ok());
+ }
+ auto batch = arrow::RecordBatch::Make(schema, 50, {
+ id_b.Finish().ValueUnsafe(),
+ name_b.Finish().ValueUnsafe(),
+ score_b.Finish().ValueUnsafe(),
+ });
+
+ mosaic::WriterOptions opts;
+ opts.num_buckets = 2;
+ auto data_vec = write_and_get(schema, batch, opts);
+ ASSERT_TRUE(data_vec.size() > 32);
+
+ MemBuffer buf;
+ buf.data = data_vec;
+ auto reader = mosaic::make_reader(make_input(buf), buf.data.size());
+ ASSERT_TRUE(reader.num_row_groups() >= 1);
+
+ auto rb = read_row_group(reader, 0);
+ ASSERT_EQ(rb->num_rows(), 50);
+ ASSERT_EQ(rb->num_columns(), 3);
+
+ auto ids = std::static_pointer_cast<arrow::Int32Array>(rb->column(0));
+ auto names = std::static_pointer_cast<arrow::StringArray>(rb->column(1));
+ auto scores = std::static_pointer_cast<arrow::DoubleArray>(rb->column(2));
+
+ for (int i = 0; i < 50; i++) {
+ ASSERT_EQ(ids->Value(i), i);
+ ASSERT_EQ(names->GetString(i), "user_" + std::to_string(i));
+ ASSERT_TRUE(std::abs(scores->Value(i) - i * 1.5) < 1e-9);
+ }
+ printf(" PASS test_basic_roundtrip\n");
+}
+
+static void test_null_values() {
+ auto schema = arrow::schema({
+ arrow::field("id", arrow::int32()),
+ arrow::field("name", arrow::utf8()),
+ });
+
+ arrow::Int32Builder id_b;
+ assert(id_b.Append(1).ok());
+ assert(id_b.Append(2).ok());
+ assert(id_b.Append(3).ok());
+
+ arrow::StringBuilder name_b;
+ assert(name_b.Append("hello").ok());
+ assert(name_b.AppendNull().ok());
+ assert(name_b.Append("world").ok());
+
+ auto batch = arrow::RecordBatch::Make(schema, 3, {
+ id_b.Finish().ValueUnsafe(), name_b.Finish().ValueUnsafe()});
+ auto data_vec = write_and_get(schema, batch);
+
+ MemBuffer buf;
+ buf.data = data_vec;
+ auto reader = mosaic::make_reader(make_input(buf), buf.data.size());
+ auto rb = read_row_group(reader, 0);
+ ASSERT_EQ(rb->num_rows(), 3);
+
+ auto names = std::static_pointer_cast<arrow::StringArray>(rb->column(1));
+ ASSERT_TRUE(!names->IsNull(0));
+ ASSERT_EQ(names->GetString(0), "hello");
+ ASSERT_TRUE(names->IsNull(1));
+ ASSERT_TRUE(!names->IsNull(2));
+ ASSERT_EQ(names->GetString(2), "world");
+ printf(" PASS test_null_values\n");
+}
+
+static void test_all_types() {
+ auto schema = arrow::schema({
+ arrow::field("f_bool", arrow::boolean()),
+ arrow::field("f_int8", arrow::int8()),
+ arrow::field("f_int16", arrow::int16()),
+ arrow::field("f_int32", arrow::int32()),
+ arrow::field("f_int64", arrow::int64()),
+ arrow::field("f_float32", arrow::float32()),
+ arrow::field("f_float64", arrow::float64()),
+ arrow::field("f_utf8", arrow::utf8()),
+ arrow::field("f_binary", arrow::binary()),
+ });
+
+ arrow::BooleanBuilder bool_b;
+ assert(bool_b.Append(true).ok());
+ arrow::Int8Builder i8_b;
+ assert(i8_b.Append(42).ok());
+ arrow::Int16Builder i16_b;
+ assert(i16_b.Append(1234).ok());
+ arrow::Int32Builder i32_b;
+ assert(i32_b.Append(100000).ok());
+ arrow::Int64Builder i64_b;
+ assert(i64_b.Append(9999999999LL).ok());
+ arrow::FloatBuilder f32_b;
+ assert(f32_b.Append(3.14f).ok());
+ arrow::DoubleBuilder f64_b;
+ assert(f64_b.Append(2.718281828).ok());
+ arrow::StringBuilder utf8_b;
+ assert(utf8_b.Append("hello").ok());
+ arrow::BinaryBuilder bin_b;
+ uint8_t bin_data[] = {0x01, 0x02};
+ assert(bin_b.Append(bin_data, 2).ok());
+
+ auto batch = arrow::RecordBatch::Make(schema, 1, {
+ bool_b.Finish().ValueUnsafe(),
+ i8_b.Finish().ValueUnsafe(),
+ i16_b.Finish().ValueUnsafe(),
+ i32_b.Finish().ValueUnsafe(),
+ i64_b.Finish().ValueUnsafe(),
+ f32_b.Finish().ValueUnsafe(),
+ f64_b.Finish().ValueUnsafe(),
+ utf8_b.Finish().ValueUnsafe(),
+ bin_b.Finish().ValueUnsafe(),
+ });
+
+ auto data_vec = write_and_get(schema, batch);
+
+ MemBuffer buf;
+ buf.data = data_vec;
+ auto reader = mosaic::make_reader(make_input(buf), buf.data.size());
+ auto rb = read_row_group(reader, 0);
+ ASSERT_EQ(rb->num_rows(), 1);
+ ASSERT_EQ(rb->num_columns(), 9);
+
+
ASSERT_TRUE(std::static_pointer_cast<arrow::BooleanArray>(rb->column(0))->Value(0));
+
ASSERT_EQ(std::static_pointer_cast<arrow::Int8Array>(rb->column(1))->Value(0),
42);
+
ASSERT_EQ(std::static_pointer_cast<arrow::Int16Array>(rb->column(2))->Value(0),
1234);
+
ASSERT_EQ(std::static_pointer_cast<arrow::Int32Array>(rb->column(3))->Value(0),
100000);
+
ASSERT_EQ(std::static_pointer_cast<arrow::Int64Array>(rb->column(4))->Value(0),
9999999999LL);
+
ASSERT_TRUE(std::abs(std::static_pointer_cast<arrow::FloatArray>(rb->column(5))->Value(0)
- 3.14f) < 1e-5f);
+
ASSERT_TRUE(std::abs(std::static_pointer_cast<arrow::DoubleArray>(rb->column(6))->Value(0)
- 2.718281828) < 1e-9);
+
ASSERT_EQ(std::static_pointer_cast<arrow::StringArray>(rb->column(7))->GetString(0),
"hello");
+ printf(" PASS test_all_types\n");
+}
+
+static void test_projection() {
+ auto schema = arrow::schema({
+ arrow::field("a", arrow::int32()),
+ arrow::field("b", arrow::utf8()),
+ arrow::field("c", arrow::float64()),
+ arrow::field("d", arrow::utf8()),
+ });
+
+ arrow::Int32Builder ab;
+ arrow::StringBuilder bb, db;
+ arrow::DoubleBuilder cb;
+ for (int i = 0; i < 20; i++) {
+ assert(ab.Append(i).ok());
+ assert(bb.Append("val_" + std::to_string(i)).ok());
+ assert(cb.Append(static_cast<double>(i)).ok());
+ assert(db.Append("extra_" + std::to_string(i)).ok());
+ }
+ auto batch = arrow::RecordBatch::Make(schema, 20, {
+ ab.Finish().ValueUnsafe(), bb.Finish().ValueUnsafe(),
+ cb.Finish().ValueUnsafe(), db.Finish().ValueUnsafe(),
+ });
+
+ mosaic::WriterOptions opts;
+ opts.num_buckets = 2;
+ auto data_vec = write_and_get(schema, batch, opts);
+
+ MemBuffer buf;
+ buf.data = data_vec;
+ auto reader = mosaic::make_reader(make_input(buf), buf.data.size());
+
+ uint32_t cols[] = {0, 1};
+ auto rb = read_row_group(reader, 0, cols, 2);
+ ASSERT_EQ(rb->num_columns(), 2);
+ ASSERT_EQ(rb->num_rows(), 20);
+ printf(" PASS test_projection\n");
+}
+
+static void test_statistics() {
+ auto schema = arrow::schema({
+ arrow::field("id", arrow::int32()),
+ arrow::field("name", arrow::utf8()),
+ arrow::field("score", arrow::float64()),
+ });
+
+ arrow::Int32Builder id_b;
+ arrow::StringBuilder name_b;
+ arrow::DoubleBuilder score_b;
+ for (int i = 0; i < 10; i++) {
+ assert(id_b.Append(i * 10).ok());
+ assert(name_b.Append("item_" + std::to_string(i)).ok());
+ assert(score_b.Append(i * 1.1).ok());
+ }
+ auto batch = arrow::RecordBatch::Make(schema, 10, {
+ id_b.Finish().ValueUnsafe(), name_b.Finish().ValueUnsafe(),
+ score_b.Finish().ValueUnsafe(),
+ });
+
+ mosaic::WriterOptions opts;
+ uint32_t stats_cols[] = {0, 2};
+ opts.stats_columns = stats_cols;
+ opts.num_stats_columns = 2;
+ auto data_vec = write_and_get(schema, batch, opts);
+
+ MemBuffer buf;
+ buf.data = data_vec;
+ auto reader = mosaic::make_reader(make_input(buf), buf.data.size());
+
+ auto stats = reader.get_row_group_statistics(0);
+ ASSERT_TRUE(stats.size() > 0);
+
+ for (auto& s : stats) {
+ ASSERT_TRUE(s.column_index == 0 || s.column_index == 2);
+ ASSERT_EQ(s.null_count, 0u);
+ ASSERT_TRUE(s.has_min_max());
+ }
+ printf(" PASS test_statistics\n");
+}
+
+static void test_compression_zstd() {
+ auto schema = arrow::schema({
+ arrow::field("x", arrow::int32()),
+ arrow::field("y", arrow::utf8()),
+ });
+
+ arrow::Int32Builder xb;
+ arrow::StringBuilder yb;
+ for (int i = 0; i < 100; i++) {
+ assert(xb.Append(i).ok());
+ assert(yb.Append("v_" + std::to_string(i)).ok());
+ }
+ auto batch = arrow::RecordBatch::Make(schema, 100, {
+ xb.Finish().ValueUnsafe(), yb.Finish().ValueUnsafe(),
+ });
+
+ mosaic::WriterOptions opts;
+ opts.compression = 1;
+ opts.zstd_level = 3;
+ auto data_vec = write_and_get(schema, batch, opts);
+
+ MemBuffer buf;
+ buf.data = data_vec;
+ auto reader = mosaic::make_reader(make_input(buf), buf.data.size());
+ auto rb = read_row_group(reader, 0);
+ ASSERT_EQ(rb->num_rows(), 100);
+
+ auto xs = std::static_pointer_cast<arrow::Int32Array>(rb->column(0));
+ for (int i = 0; i < 100; i++) {
+ ASSERT_EQ(xs->Value(i), i);
+ }
+ printf(" PASS test_compression_zstd\n");
+}
+
+static void test_schema_roundtrip() {
+ auto schema = arrow::schema({
+ arrow::field("id", arrow::int32(), false),
+ arrow::field("name", arrow::utf8(), true),
+ arrow::field("score", arrow::float64(), true),
+ });
+
+ arrow::Int32Builder sr_id_b;
+ assert(sr_id_b.Append(1).ok());
+ arrow::StringBuilder sr_name_b;
+ assert(sr_name_b.Append("x").ok());
+ arrow::DoubleBuilder sr_score_b;
+ assert(sr_score_b.Append(1.0).ok());
+
+ auto batch = arrow::RecordBatch::Make(schema, 1, {
+ sr_id_b.Finish().ValueUnsafe(),
+ sr_name_b.Finish().ValueUnsafe(),
+ sr_score_b.Finish().ValueUnsafe(),
+ });
+
+ auto data_vec = write_and_get(schema, batch);
+
+ MemBuffer buf;
+ buf.data = data_vec;
+ auto reader = mosaic::make_reader(make_input(buf), buf.data.size());
+
+ struct ArrowSchema c_schema;
+ reader.export_schema(&c_schema);
+ auto imported = arrow::ImportSchema(&c_schema);
+ assert(imported.ok());
+ auto read_schema = imported.ValueUnsafe();
+
+ ASSERT_EQ(read_schema->num_fields(), 3);
+ ASSERT_EQ(read_schema->field(0)->name(), "id");
+ ASSERT_EQ(read_schema->field(1)->name(), "name");
+ ASSERT_EQ(read_schema->field(2)->name(), "score");
+ ASSERT_TRUE(!read_schema->field(0)->nullable());
+ ASSERT_TRUE(read_schema->field(1)->nullable());
+ printf(" PASS test_schema_roundtrip\n");
+}
+
+static void test_multiple_row_groups() {
+ auto schema = arrow::schema({
+ arrow::field("id", arrow::int32()),
+ arrow::field("data", arrow::int64()),
+ });
+
+ mosaic::WriterOptions opts;
+ opts.compression = 0;
+ opts.num_buckets = 1;
+ opts.row_group_max_size = 200;
+
+ MemBuffer write_buf;
+ struct ArrowSchema c_schema;
+ auto st = arrow::ExportSchema(*schema, &c_schema);
+ assert(st.ok());
+ mosaic::Writer writer(make_output(write_buf), &c_schema, opts);
+
+ const int total_rows = 500;
+ const int batch_size = 50;
+ for (int start = 0; start < total_rows; start += batch_size) {
+ int end = std::min(start + batch_size, total_rows);
+ int n = end - start;
+ arrow::Int32Builder id_b;
+ arrow::Int64Builder data_b;
+ for (int i = start; i < end; i++) {
+ assert(id_b.Append(i).ok());
+ assert(data_b.Append(static_cast<int64_t>(i) * 3).ok());
+ }
+ auto batch = arrow::RecordBatch::Make(schema, n, {
+ id_b.Finish().ValueUnsafe(), data_b.Finish().ValueUnsafe(),
+ });
+ struct ArrowArray c_array;
+ struct ArrowSchema c_batch_schema;
+ st = arrow::ExportRecordBatch(*batch, &c_array, &c_batch_schema);
+ assert(st.ok());
+ writer.write(&c_array, &c_batch_schema);
+ }
+ writer.close();
+ auto data_vec = write_buf.data;
+
+ MemBuffer buf;
+ buf.data = data_vec;
+ auto reader = mosaic::make_reader(make_input(buf), buf.data.size());
+ ASSERT_TRUE(reader.num_row_groups() > 1);
+
+ int offset = 0;
+ for (uint32_t rg = 0; rg < reader.num_row_groups(); rg++) {
+ auto rb = read_row_group(reader, rg);
+ auto ids = std::static_pointer_cast<arrow::Int32Array>(rb->column(0));
+ auto datas =
std::static_pointer_cast<arrow::Int64Array>(rb->column(1));
+ for (int64_t i = 0; i < rb->num_rows(); i++) {
+ ASSERT_EQ(ids->Value(i), offset + static_cast<int>(i));
+ ASSERT_EQ(datas->Value(i), static_cast<int64_t>(offset + i) * 3);
+ }
+ offset += static_cast<int>(rb->num_rows());
+ }
+ ASSERT_EQ(offset, 500);
+ printf(" PASS test_multiple_row_groups\n");
+}
+
+int main() {
+ printf("Running Mosaic C++ tests...\n");
+ test_basic_roundtrip();
+ test_null_values();
+ test_all_types();
+ test_projection();
+ test_statistics();
+ test_compression_zstd();
+ test_schema_roundtrip();
+ test_multiple_row_groups();
+ printf("All %d tests passed.\n", 8);
+ return 0;
+}
diff --git a/Cargo.toml b/ffi/Cargo.toml
similarity index 69%
copy from Cargo.toml
copy to ffi/Cargo.toml
index ff31345..e690775 100644
--- a/Cargo.toml
+++ b/ffi/Cargo.toml
@@ -15,8 +15,20 @@
# specific language governing permissions and limitations
# under the License.
-[workspace]
-members = ["core"]
-resolver = "2"
+[package]
+name = "mosaic-ffi"
+version = "0.1.0"
+edition = "2021"
+description = "Mosaic file format — C/C++ FFI bindings"
+license = "Apache-2.0"
-[profile.release]
+[lib]
+crate-type = ["cdylib"]
+
+[dependencies]
+mosaic-core = { path = "../core" }
+arrow-schema = "58"
+arrow-array = { version = "58", features = ["ffi"] }
+
+[build-dependencies]
+cbindgen = "0.27"
diff --git a/ffi/build.rs b/ffi/build.rs
new file mode 100644
index 0000000..cc64f1c
--- /dev/null
+++ b/ffi/build.rs
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+fn main() {
+ let crate_dir = std::env::var("CARGO_MANIFEST_DIR").unwrap();
+ let workspace_dir = std::path::Path::new(&crate_dir).parent().unwrap();
+ let output_path = workspace_dir.join("include").join("mosaic.h");
+
+ cbindgen::Builder::new()
+ .with_crate(&crate_dir)
+ .with_config(cbindgen::Config::from_file("cbindgen.toml").unwrap())
+ .generate()
+ .expect("Unable to generate C bindings")
+ .write_to_file(output_path);
+}
diff --git a/Cargo.toml b/ffi/cbindgen.toml
similarity index 61%
copy from Cargo.toml
copy to ffi/cbindgen.toml
index ff31345..8b55127 100644
--- a/Cargo.toml
+++ b/ffi/cbindgen.toml
@@ -15,8 +15,26 @@
# specific language governing permissions and limitations
# under the License.
-[workspace]
-members = ["core"]
-resolver = "2"
+language = "C"
+header = "/* Generated by cbindgen — do not edit manually */"
+include_guard = "MOSAIC_H"
+autogen_warning = "/* Warning: this file is autogenerated by cbindgen. Don't
modify this manually. */"
+tab_width = 4
+style = "both"
+after_includes = "#include \"arrow_c_data.h\""
-[profile.release]
+[defines]
+
+[export]
+prefix = ""
+
+[export.rename]
+"FFI_ArrowSchema" = "ArrowSchema"
+"FFI_ArrowArray" = "ArrowArray"
+
+[enum]
+rename_variants = "ScreamingSnakeCase"
+prefix_with_name = true
+
+[fn]
+rename_args = "None"
diff --git a/ffi/src/lib.rs b/ffi/src/lib.rs
new file mode 100644
index 0000000..7e1cd8f
--- /dev/null
+++ b/ffi/src/lib.rs
@@ -0,0 +1,819 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#![allow(clippy::missing_safety_doc)]
+
+use std::cell::RefCell;
+use std::ffi::CString;
+use std::io;
+use std::os::raw::{c_char, c_int};
+use std::panic::{self, AssertUnwindSafe};
+use std::ptr;
+
+use arrow_array::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
+use arrow_array::{RecordBatch, StructArray};
+use arrow_schema::{Field, Schema};
+
+use mosaic_core::reader::{InputFile, MosaicReader, ReaderAccess};
+use mosaic_core::spec::*;
+use mosaic_core::writer::{MosaicWriter, OutputFile, WriterOptions};
+
+thread_local! {
+ static LAST_ERROR: RefCell<Option<CString>> = const { RefCell::new(None) };
+}
+
+fn set_error(msg: String) {
+ LAST_ERROR.with(|e| {
+ *e.borrow_mut() = CString::new(msg).ok();
+ });
+}
+
+fn panic_message(e: &Box<dyn std::any::Any + Send>) -> String {
+ if let Some(s) = e.downcast_ref::<String>() {
+ format!("native panic: {}", s)
+ } else if let Some(s) = e.downcast_ref::<&str>() {
+ format!("native panic: {}", s)
+ } else {
+ "native panic: unknown".to_string()
+ }
+}
+
+// ======================== OutputFile ========================
+
+#[repr(C)]
+pub struct MosaicOutputFile {
+ pub ctx: *mut std::ffi::c_void,
+ pub write_fn: Option<unsafe extern "C" fn(*mut std::ffi::c_void, *const
u8, usize) -> i32>,
+ pub flush_fn: Option<unsafe extern "C" fn(*mut std::ffi::c_void) -> i32>,
+ pub get_pos_fn: Option<unsafe extern "C" fn(*mut std::ffi::c_void) -> i64>,
+}
+
+struct FfiOutputFile {
+ raw: MosaicOutputFile,
+ pos: u64,
+}
+
+impl OutputFile for FfiOutputFile {
+ fn write(&mut self, data: &[u8]) -> io::Result<()> {
+ if let Some(write_fn) = self.raw.write_fn {
+ let result = unsafe { write_fn(self.raw.ctx, data.as_ptr(),
data.len()) };
+ if result != 0 {
+ return Err(io::Error::other("write callback failed"));
+ }
+ self.pos += data.len() as u64;
+ Ok(())
+ } else {
+ Err(io::Error::other("write_fn is null"))
+ }
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ if let Some(flush_fn) = self.raw.flush_fn {
+ let result = unsafe { flush_fn(self.raw.ctx) };
+ if result != 0 {
+ return Err(io::Error::other("flush callback failed"));
+ }
+ }
+ Ok(())
+ }
+
+ fn pos(&self) -> u64 {
+ if let Some(get_pos_fn) = self.raw.get_pos_fn {
+ let p = unsafe { get_pos_fn(self.raw.ctx) };
+ if p < 0 {
+ return self.pos;
+ }
+ p as u64
+ } else {
+ self.pos
+ }
+ }
+}
+
+// ======================== Writer Options ========================
+
+#[repr(C)]
+pub struct MosaicWriterOptions {
+ pub compression: u8,
+ pub zstd_level: c_int,
+ pub num_buckets: u32,
+ pub row_group_max_size: u64,
+ pub max_dict_total_bytes: u32,
+ pub max_dict_entries: u32,
+ pub stats_columns: *const u32,
+ pub num_stats_columns: u32,
+ pub page_size_threshold: u32,
+}
+
+/// Returns default writer options.
+#[no_mangle]
+pub extern "C" fn mosaic_writer_options_default() -> MosaicWriterOptions {
+ MosaicWriterOptions {
+ compression: COMPRESSION_ZSTD,
+ zstd_level: DEFAULT_ZSTD_LEVEL as c_int,
+ num_buckets: DEFAULT_NUM_BUCKETS as u32,
+ row_group_max_size: DEFAULT_ROW_GROUP_MAX_SIZE,
+ max_dict_total_bytes: DEFAULT_DICT_MAX_TOTAL_BYTES as u32,
+ max_dict_entries: DEFAULT_DICT_MAX_ENTRIES as u32,
+ stats_columns: ptr::null(),
+ num_stats_columns: 0,
+ page_size_threshold: DEFAULT_PAGE_SIZE_THRESHOLD as u32,
+ }
+}
+
+// ======================== Writer ========================
+
+pub struct MosaicWriterHandle {
+ inner: MosaicWriter<FfiOutputFile>,
+}
+
+/// Open a writer. The `ffi_schema` is consumed: ownership transfers to the
callee
+/// and the caller's struct is zeroed to prevent double-release.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_writer_open(
+ stream: MosaicOutputFile,
+ ffi_schema: *mut FFI_ArrowSchema,
+ options: MosaicWriterOptions,
+) -> *mut MosaicWriterHandle {
+ let result = panic::catch_unwind(AssertUnwindSafe(|| {
+ if ffi_schema.is_null() {
+ set_error("null schema".into());
+ return ptr::null_mut();
+ }
+ let ffi_owned = ptr::read(ffi_schema);
+ ptr::write_bytes(ffi_schema, 0, 1);
+ let arrow_schema = match Schema::try_from(&ffi_owned) {
+ Ok(s) => s,
+ Err(e) => {
+ set_error(format!("Arrow schema import failed: {}", e));
+ return ptr::null_mut();
+ }
+ };
+ let ffi_stream = FfiOutputFile {
+ raw: stream,
+ pos: 0,
+ };
+ let stats_cols = if options.stats_columns.is_null() ||
options.num_stats_columns == 0 {
+ Vec::new()
+ } else {
+ std::slice::from_raw_parts(options.stats_columns,
options.num_stats_columns as usize)
+ .iter()
+ .map(|&c| c as usize)
+ .collect()
+ };
+ let num_buckets = if options.num_buckets == 0 {
+ DEFAULT_NUM_BUCKETS
+ } else {
+ options.num_buckets as usize
+ };
+ let opts = WriterOptions {
+ compression: options.compression,
+ zstd_level: options.zstd_level,
+ num_buckets,
+ row_group_max_size: options.row_group_max_size,
+ max_dict_total_bytes: options.max_dict_total_bytes as usize,
+ max_dict_entries: options.max_dict_entries as usize,
+ stats_columns: stats_cols,
+ page_size_threshold: options.page_size_threshold as usize,
+ };
+ match MosaicWriter::new(ffi_stream, &arrow_schema, opts) {
+ Ok(writer) => Box::into_raw(Box::new(MosaicWriterHandle { inner:
writer })),
+ Err(e) => {
+ set_error(format!("writer open failed: {}", e));
+ ptr::null_mut()
+ }
+ }
+ }));
+ match result {
+ Ok(val) => val,
+ Err(e) => {
+ set_error(panic_message(&e));
+ ptr::null_mut()
+ }
+ }
+}
+
+/// Close the writer (flush all data and write footer).
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_writer_close(handle: *mut MosaicWriterHandle)
-> c_int {
+ let result = panic::catch_unwind(AssertUnwindSafe(|| {
+ if handle.is_null() {
+ return -1;
+ }
+ let h = &mut *handle;
+ match h.inner.close() {
+ Ok(()) => 0,
+ Err(e) => {
+ set_error(format!("close failed: {}", e));
+ -1
+ }
+ }
+ }));
+ match result {
+ Ok(val) => val,
+ Err(e) => {
+ set_error(panic_message(&e));
+ -1
+ }
+ }
+}
+
+/// Free the writer handle.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_writer_free(handle: *mut MosaicWriterHandle) {
+ if !handle.is_null() {
+ drop(Box::from_raw(handle));
+ }
+}
+
+/// Get estimated file size for rolling decisions.
+/// Returns 0 on success, -1 on error. Writes result to `out`.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_writer_estimated_file_size(
+ handle: *const MosaicWriterHandle,
+ out: *mut i64,
+) -> c_int {
+ if handle.is_null() || out.is_null() {
+ set_error("null pointer".into());
+ return -1;
+ }
+ *out = (&*handle).inner.estimated_file_size() as i64;
+ 0
+}
+
+/// Write an Arrow RecordBatch to the writer via the Arrow C Data Interface.
+/// The caller provides ArrowArray and ArrowSchema pointers that represent the
batch.
+/// Ownership of both structs transfers to the callee; the caller's structs
are zeroed.
+/// Returns 0 on success, -1 on error.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_writer_write_batch(
+ handle: *mut MosaicWriterHandle,
+ ffi_array: *mut FFI_ArrowArray,
+ ffi_schema: *mut FFI_ArrowSchema,
+) -> c_int {
+ let result = panic::catch_unwind(AssertUnwindSafe(|| {
+ if handle.is_null() || ffi_array.is_null() || ffi_schema.is_null() {
+ set_error("null pointer".into());
+ return -1;
+ }
+ let h = &mut *handle;
+ let arr_owned = ptr::read(ffi_array);
+ let schema_owned = ptr::read(ffi_schema);
+ ptr::write_bytes(ffi_array, 0, 1);
+ ptr::write_bytes(ffi_schema, 0, 1);
+ let arr_data = match arrow_array::ffi::from_ffi(arr_owned,
&schema_owned) {
+ Ok(d) => d,
+ Err(e) => {
+ set_error(format!("Arrow import failed: {}", e));
+ return -1;
+ }
+ };
+ let struct_array = StructArray::from(arr_data);
+ let batch = RecordBatch::from(struct_array);
+ match h.inner.write_batch(&batch) {
+ Ok(()) => 0,
+ Err(e) => {
+ set_error(format!("write_batch failed: {}", e));
+ -1
+ }
+ }
+ }));
+ result.unwrap_or_else(|e| {
+ set_error(panic_message(&e));
+ -1
+ })
+}
+
+// ======================== Reader ========================
+
+/// Input file for reading Mosaic files.
+///
+/// `read_at_fn` must be thread-safe: the reader may invoke it concurrently
+/// from multiple threads to perform parallel IO.
+#[repr(C)]
+pub struct MosaicInputFile {
+ pub ctx: *mut std::ffi::c_void,
+ pub read_at_fn: Option<unsafe extern "C" fn(*mut std::ffi::c_void, u64,
*mut u8, usize) -> i32>,
+ pub length_fn: Option<unsafe extern "C" fn(*mut std::ffi::c_void) -> u64>,
+}
+
+struct FfiInputFile {
+ raw: MosaicInputFile,
+}
+
+unsafe impl Send for FfiInputFile {}
+unsafe impl Sync for FfiInputFile {}
+
+impl InputFile for FfiInputFile {
+ fn read_at(&self, offset: u64, buf: &mut [u8]) -> io::Result<()> {
+ if let Some(read_at_fn) = self.raw.read_at_fn {
+ let result = unsafe { read_at_fn(self.raw.ctx, offset,
buf.as_mut_ptr(), buf.len()) };
+ if result != 0 {
+ return Err(io::Error::other("read_at callback failed"));
+ }
+ Ok(())
+ } else {
+ Err(io::Error::other("read_at_fn is null"))
+ }
+ }
+}
+
+pub struct MosaicReaderHandle {
+ reader: MosaicReader<FfiInputFile>,
+}
+
+pub struct MosaicRowGroupReaderHandle {
+ inner: mosaic_core::reader::RowGroupReader,
+}
+
+/// Open a reader from an InputFile callback.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_reader_open(
+ input_file: MosaicInputFile,
+) -> *mut MosaicReaderHandle {
+ let result = panic::catch_unwind(AssertUnwindSafe(|| {
+ let file_len = if let Some(length_fn) = input_file.length_fn {
+ unsafe { length_fn(input_file.ctx) }
+ } else {
+ 0
+ };
+ let ffi_input = FfiInputFile { raw: input_file };
+ match MosaicReader::new(ffi_input, file_len) {
+ Ok(reader) => Box::into_raw(Box::new(MosaicReaderHandle { reader
})),
+ Err(e) => {
+ set_error(format!("open failed: {}", e));
+ ptr::null_mut()
+ }
+ }
+ }));
+ match result {
+ Ok(val) => val,
+ Err(e) => {
+ set_error(panic_message(&e));
+ ptr::null_mut()
+ }
+ }
+}
+
+/// Free a reader.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_reader_free(handle: *mut MosaicReaderHandle) {
+ if !handle.is_null() {
+ drop(Box::from_raw(handle));
+ }
+}
+
+/// Get number of row groups.
+/// Returns 0 on success, -1 on error. Writes result to `out`.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_reader_num_row_groups(
+ handle: *const MosaicReaderHandle,
+ out: *mut u32,
+) -> c_int {
+ if handle.is_null() || out.is_null() {
+ set_error("null pointer".into());
+ return -1;
+ }
+ *out = (*handle).reader.num_row_groups() as u32;
+ 0
+}
+
+/// Export the reader's schema via the Arrow C Data Interface.
+/// Writes into caller-provided ArrowSchema struct.
+/// Returns 0 on success, -1 on error.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_reader_export_schema(
+ handle: *const MosaicReaderHandle,
+ out_schema: *mut FFI_ArrowSchema,
+) -> c_int {
+ let result = panic::catch_unwind(AssertUnwindSafe(|| {
+ if handle.is_null() || out_schema.is_null() {
+ set_error("null pointer".into());
+ return -1;
+ }
+ let h = &*handle;
+ let schema = h.reader.schema();
+ let fields: Vec<Field> = schema
+ .columns
+ .iter()
+ .map(|c| Field::new(&c.name, c.data_type.clone(), c.nullable))
+ .collect();
+ let arrow_schema = Schema::new(fields);
+ match FFI_ArrowSchema::try_from(&arrow_schema) {
+ Ok(ffi_schema) => {
+ ptr::write(out_schema, ffi_schema);
+ 0
+ }
+ Err(e) => {
+ set_error(format!("Arrow schema export failed: {}", e));
+ -1
+ }
+ }
+ }));
+ match result {
+ Ok(val) => val,
+ Err(e) => {
+ set_error(panic_message(&e));
+ -1
+ }
+ }
+}
+
+/// Open a row group reader.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_reader_open_row_group(
+ handle: *mut MosaicReaderHandle,
+ rg_index: u32,
+) -> *mut MosaicRowGroupReaderHandle {
+ let result = panic::catch_unwind(AssertUnwindSafe(|| {
+ if handle.is_null() {
+ set_error("null reader handle".into());
+ return ptr::null_mut();
+ }
+ let h = &*handle;
+ match h.reader.row_group_reader(rg_index as usize) {
+ Ok(rg) => Box::into_raw(Box::new(MosaicRowGroupReaderHandle {
inner: rg })),
+ Err(e) => {
+ set_error(format!("open row group failed: {}", e));
+ ptr::null_mut()
+ }
+ }
+ }));
+ match result {
+ Ok(val) => val,
+ Err(e) => {
+ set_error(panic_message(&e));
+ ptr::null_mut()
+ }
+ }
+}
+
+/// Open a row group reader with projection pushdown.
+/// Only decompresses buckets containing the specified columns.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_reader_open_row_group_projected(
+ handle: *mut MosaicReaderHandle,
+ rg_index: u32,
+ columns: *const u32,
+ num_columns: u32,
+) -> *mut MosaicRowGroupReaderHandle {
+ let result = panic::catch_unwind(AssertUnwindSafe(|| {
+ if handle.is_null() {
+ set_error("null reader handle".into());
+ return ptr::null_mut();
+ }
+ if columns.is_null() && num_columns > 0 {
+ set_error("null columns pointer".into());
+ return ptr::null_mut();
+ }
+ let h = &*handle;
+ let col_indices: Vec<usize> = if num_columns > 0 {
+ std::slice::from_raw_parts(columns, num_columns as usize)
+ .iter()
+ .map(|&c| c as usize)
+ .collect()
+ } else {
+ Vec::new()
+ };
+ match h
+ .reader
+ .row_group_reader_projected(rg_index as usize, &col_indices)
+ {
+ Ok(rg) => Box::into_raw(Box::new(MosaicRowGroupReaderHandle {
inner: rg })),
+ Err(e) => {
+ set_error(format!("open row group projected failed: {}", e));
+ ptr::null_mut()
+ }
+ }
+ }));
+ match result {
+ Ok(val) => val,
+ Err(e) => {
+ set_error(panic_message(&e));
+ ptr::null_mut()
+ }
+ }
+}
+
+/// Free a row group reader.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_row_group_reader_free(handle: *mut
MosaicRowGroupReaderHandle) {
+ if !handle.is_null() {
+ drop(Box::from_raw(handle));
+ }
+}
+
+/// Get number of rows in the row group.
+/// Returns 0 on success, -1 on error. Writes result to `out`.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_row_group_reader_num_rows(
+ handle: *const MosaicRowGroupReaderHandle,
+ out: *mut u32,
+) -> c_int {
+ if handle.is_null() || out.is_null() {
+ set_error("null pointer".into());
+ return -1;
+ }
+ *out = (*handle).inner.num_rows() as u32;
+ 0
+}
+
+// ======================== Row Group Stats ========================
+
+/// Get number of stats entries for a row group.
+/// Returns 0 on success, -1 on error. Writes result to `out`.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_reader_row_group_num_stats(
+ handle: *const MosaicReaderHandle,
+ rg_index: u32,
+ out: *mut u32,
+) -> c_int {
+ if handle.is_null() || out.is_null() {
+ set_error("null pointer".into());
+ return -1;
+ }
+ let h = &*handle;
+ let stats = match h.reader.row_group_stats(rg_index as usize) {
+ Ok(s) => s,
+ Err(e) => {
+ set_error(e.to_string());
+ return -1;
+ }
+ };
+ *out = stats.len() as u32;
+ 0
+}
+
+/// Get the column index for a stats entry.
+/// Returns 0 on success, -1 on error. Writes result to `out`.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_reader_row_group_stat_column_index(
+ handle: *const MosaicReaderHandle,
+ rg_index: u32,
+ stat_index: u32,
+ out: *mut u32,
+) -> c_int {
+ if handle.is_null() || out.is_null() {
+ set_error("null pointer".into());
+ return -1;
+ }
+ let h = &*handle;
+ let stats = match h.reader.row_group_stats(rg_index as usize) {
+ Ok(s) => s,
+ Err(e) => {
+ set_error(e.to_string());
+ return -1;
+ }
+ };
+ let idx = stat_index as usize;
+ if idx >= stats.len() {
+ set_error("stat_index out of range".into());
+ return -1;
+ }
+ *out = stats[idx].column_index as u32;
+ 0
+}
+
+/// Get the null count for a stats entry.
+/// Returns 0 on success, -1 on error. Writes result to `out`.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_reader_row_group_stat_null_count(
+ handle: *const MosaicReaderHandle,
+ rg_index: u32,
+ stat_index: u32,
+ out: *mut u64,
+) -> c_int {
+ if handle.is_null() || out.is_null() {
+ set_error("null pointer".into());
+ return -1;
+ }
+ let h = &*handle;
+ let stats = match h.reader.row_group_stats(rg_index as usize) {
+ Ok(s) => s,
+ Err(e) => {
+ set_error(e.to_string());
+ return -1;
+ }
+ };
+ let idx = stat_index as usize;
+ if idx >= stats.len() {
+ set_error("stat_index out of range".into());
+ return -1;
+ }
+ *out = stats[idx].null_count as u64;
+ 0
+}
+
+/// Get the min value for a stats entry as raw bytes.
+/// Returns null if the column is all-null (no min). Sets out_len to the byte
length.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_reader_row_group_stat_min(
+ handle: *const MosaicReaderHandle,
+ rg_index: u32,
+ stat_index: u32,
+ out_len: *mut usize,
+) -> *const u8 {
+ stat_value_ptr(handle, rg_index, stat_index, out_len, true)
+}
+
+/// Get the max value for a stats entry as raw bytes.
+/// Returns null if the column is all-null (no max). Sets out_len to the byte
length.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_reader_row_group_stat_max(
+ handle: *const MosaicReaderHandle,
+ rg_index: u32,
+ stat_index: u32,
+ out_len: *mut usize,
+) -> *const u8 {
+ stat_value_ptr(handle, rg_index, stat_index, out_len, false)
+}
+
+thread_local! {
+ static STAT_MIN_BUF: RefCell<Vec<u8>> = const { RefCell::new(Vec::new()) };
+ static STAT_MAX_BUF: RefCell<Vec<u8>> = const { RefCell::new(Vec::new()) };
+}
+
+unsafe fn stat_value_ptr(
+ handle: *const MosaicReaderHandle,
+ rg_index: u32,
+ stat_index: u32,
+ out_len: *mut usize,
+ is_min: bool,
+) -> *const u8 {
+ if handle.is_null() || out_len.is_null() {
+ return ptr::null();
+ }
+ let h = &*handle;
+ let stats = match h.reader.row_group_stats(rg_index as usize) {
+ Ok(s) => s,
+ Err(_) => {
+ *out_len = 0;
+ return ptr::null();
+ }
+ };
+ let idx = stat_index as usize;
+ if idx >= stats.len() {
+ *out_len = 0;
+ return ptr::null();
+ }
+ let value = if is_min {
+ &stats[idx].min
+ } else {
+ &stats[idx].max
+ };
+ match value {
+ Some(v) => {
+ let bytes = v.to_be_bytes();
+ let buf_ref = if is_min { &STAT_MIN_BUF } else { &STAT_MAX_BUF };
+ buf_ref.with(|buf| {
+ let mut b = buf.borrow_mut();
+ *b = bytes;
+ *out_len = b.len();
+ b.as_ptr()
+ })
+ }
+ None => {
+ *out_len = 0;
+ ptr::null()
+ }
+ }
+}
+
+// ======================== Record Batch (Arrow C Data Interface)
========================
+
+pub struct MosaicRecordBatchHandle {
+ batch: RecordBatch,
+}
+
+/// Read the entire row group as an Arrow RecordBatch.
+/// Returns a record batch handle, or null on error.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_row_group_reader_read_columns(
+ handle: *mut MosaicRowGroupReaderHandle,
+) -> *mut MosaicRecordBatchHandle {
+ let result = panic::catch_unwind(AssertUnwindSafe(|| {
+ if handle.is_null() {
+ set_error("null handle".into());
+ return ptr::null_mut();
+ }
+ let h = &mut *handle;
+ match h.inner.read_columns() {
+ Ok(batch) => Box::into_raw(Box::new(MosaicRecordBatchHandle {
batch })),
+ Err(e) => {
+ set_error(format!("read_columns failed: {}", e));
+ ptr::null_mut()
+ }
+ }
+ }));
+ match result {
+ Ok(val) => val,
+ Err(e) => {
+ set_error(panic_message(&e));
+ ptr::null_mut()
+ }
+ }
+}
+
+/// Get number of rows in the record batch.
+/// Returns 0 on success, -1 on error. Writes result to `out`.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_record_batch_num_rows(
+ handle: *const MosaicRecordBatchHandle,
+ out: *mut u32,
+) -> c_int {
+ if handle.is_null() || out.is_null() {
+ set_error("null pointer".into());
+ return -1;
+ }
+ *out = (*handle).batch.num_rows() as u32;
+ 0
+}
+
+/// Get number of columns in the record batch.
+/// Returns 0 on success, -1 on error. Writes result to `out`.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_record_batch_num_columns(
+ handle: *const MosaicRecordBatchHandle,
+ out: *mut u32,
+) -> c_int {
+ if handle.is_null() || out.is_null() {
+ set_error("null pointer".into());
+ return -1;
+ }
+ *out = (*handle).batch.num_columns() as u32;
+ 0
+}
+
+/// Export the record batch via the Arrow C Data Interface.
+/// Writes into caller-provided ArrowArray and ArrowSchema structs.
+/// Returns 0 on success, -1 on error.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_record_batch_export(
+ handle: *const MosaicRecordBatchHandle,
+ out_array: *mut FFI_ArrowArray,
+ out_schema: *mut FFI_ArrowSchema,
+) -> c_int {
+ let result = panic::catch_unwind(AssertUnwindSafe(|| {
+ if handle.is_null() || out_array.is_null() || out_schema.is_null() {
+ set_error("null pointer".into());
+ return -1;
+ }
+ let h = &*handle;
+ let struct_array = StructArray::from(h.batch.clone());
+ match arrow_array::ffi::to_ffi(&struct_array.into()) {
+ Ok((ffi_array, ffi_schema)) => {
+ ptr::write(out_array, ffi_array);
+ ptr::write(out_schema, ffi_schema);
+ 0
+ }
+ Err(e) => {
+ set_error(format!("Arrow export failed: {}", e));
+ -1
+ }
+ }
+ }));
+ match result {
+ Ok(val) => val,
+ Err(e) => {
+ set_error(panic_message(&e));
+ -1
+ }
+ }
+}
+
+/// Free a record batch handle.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_record_batch_free(handle: *mut
MosaicRecordBatchHandle) {
+ if !handle.is_null() {
+ drop(Box::from_raw(handle));
+ }
+}
+
+// ======================== Error ========================
+
+/// Get the last error message. Returns a NUL-terminated pointer to a
thread-local string.
+/// The pointer is valid until the next FFI call on the same thread.
+#[no_mangle]
+pub extern "C" fn mosaic_last_error() -> *const c_char {
+ LAST_ERROR.with(|e| {
+ let borrow = e.borrow();
+ match borrow.as_ref() {
+ Some(cs) => cs.as_ptr(),
+ None => ptr::null(),
+ }
+ })
+}
diff --git a/include/arrow_c_data.h b/include/arrow_c_data.h
new file mode 100644
index 0000000..95810ea
--- /dev/null
+++ b/include/arrow_c_data.h
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef ARROW_C_DATA_H
+#define ARROW_C_DATA_H
+
+#include <stdint.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#ifndef ARROW_C_DATA_INTERFACE
+#define ARROW_C_DATA_INTERFACE
+
+#define ARROW_FLAG_DICTIONARY_ORDERED 1
+#define ARROW_FLAG_NULLABLE 2
+#define ARROW_FLAG_MAP_KEYS_SORTED 4
+
+struct ArrowSchema {
+ const char* format;
+ const char* name;
+ const char* metadata;
+ int64_t flags;
+ int64_t n_children;
+ struct ArrowSchema** children;
+ struct ArrowSchema* dictionary;
+ void (*release)(struct ArrowSchema*);
+ void* private_data;
+};
+
+struct ArrowArray {
+ int64_t length;
+ int64_t null_count;
+ int64_t offset;
+ int64_t n_buffers;
+ int64_t n_children;
+ const void** buffers;
+ struct ArrowArray** children;
+ struct ArrowArray* dictionary;
+ void (*release)(struct ArrowArray*);
+ void* private_data;
+};
+
+#endif /* ARROW_C_DATA_INTERFACE */
+
+typedef struct ArrowSchema ArrowSchema;
+typedef struct ArrowArray ArrowArray;
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ARROW_C_DATA_H */
diff --git a/include/mosaic.hpp b/include/mosaic.hpp
new file mode 100644
index 0000000..45c8fd1
--- /dev/null
+++ b/include/mosaic.hpp
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+extern "C" {
+#include "mosaic.h"
+}
+#include <cstdio>
+#include <functional>
+#include <memory>
+#include <stdexcept>
+#include <string>
+#include <utility>
+#include <vector>
+
+namespace mosaic {
+
+class Error : public std::runtime_error {
+public:
+ explicit Error(const std::string& msg) : std::runtime_error(msg) {}
+};
+
+inline void check(int result) {
+ if (result != 0) {
+ const char* err = mosaic_last_error();
+ throw Error(err ? err : "unknown error");
+ }
+}
+
+// OutputFile adapter: wraps a C++ object into C callbacks
+struct OutputFile {
+ std::function<int(const uint8_t*, size_t)> write_fn;
+ std::function<int()> flush_fn;
+ std::function<int64_t()> get_pos_fn;
+};
+
+namespace detail {
+
+inline int32_t stream_write(void* ctx, const uint8_t* data, size_t len)
noexcept {
+ try {
+ auto* cbs = static_cast<OutputFile*>(ctx);
+ return cbs->write_fn(data, len);
+ } catch (...) {
+ return -1;
+ }
+}
+
+inline int32_t stream_flush(void* ctx) noexcept {
+ try {
+ auto* cbs = static_cast<OutputFile*>(ctx);
+ return cbs->flush_fn();
+ } catch (...) {
+ return -1;
+ }
+}
+
+inline int64_t stream_get_pos(void* ctx) noexcept {
+ try {
+ auto* cbs = static_cast<OutputFile*>(ctx);
+ return cbs->get_pos_fn();
+ } catch (...) {
+ return -1;
+ }
+}
+
+} // namespace detail
+
+struct WriterOptions {
+ uint8_t compression = 1; // ZSTD
+ int zstd_level = 1;
+ uint32_t num_buckets = 0;
+ uint64_t row_group_max_size = 256ULL * 1024 * 1024;
+ uint32_t max_dict_total_bytes = 32 * 1024;
+ uint32_t max_dict_entries = 255;
+ const uint32_t* stats_columns = nullptr;
+ uint32_t num_stats_columns = 0;
+ uint32_t page_size_threshold = 32 * 1024;
+};
+
+class Writer {
+public:
+ /// Construct a writer. `arrow_schema` is a pointer to an ArrowSchema
(Arrow C Data Interface).
+ Writer(OutputFile callbacks, void* arrow_schema, WriterOptions opts = {})
+ : callbacks_(std::make_shared<OutputFile>(std::move(callbacks))) {
+ MosaicOutputFile stream;
+ stream.ctx = callbacks_.get();
+ stream.write_fn = detail::stream_write;
+ stream.flush_fn = detail::stream_flush;
+ stream.get_pos_fn = detail::stream_get_pos;
+
+ MosaicWriterOptions c_opts = mosaic_writer_options_default();
+ c_opts.compression = opts.compression;
+ c_opts.zstd_level = opts.zstd_level;
+ c_opts.num_buckets = opts.num_buckets;
+ c_opts.row_group_max_size = opts.row_group_max_size;
+ c_opts.max_dict_total_bytes = opts.max_dict_total_bytes;
+ c_opts.max_dict_entries = opts.max_dict_entries;
+ c_opts.stats_columns = opts.stats_columns;
+ c_opts.num_stats_columns = opts.num_stats_columns;
+ c_opts.page_size_threshold = opts.page_size_threshold;
+
+ handle_ = mosaic_writer_open(stream,
static_cast<ArrowSchema*>(arrow_schema), c_opts);
+ if (!handle_) throw Error("failed to open writer");
+ }
+
+ Writer(const Writer&) = delete;
+ Writer& operator=(const Writer&) = delete;
+ Writer(Writer&& other) noexcept
+ : callbacks_(std::move(other.callbacks_)), handle_(other.handle_) {
+ other.handle_ = nullptr;
+ }
+ Writer& operator=(Writer&& other) noexcept {
+ if (this != &other) {
+ if (handle_) {
+ mosaic_writer_close(handle_);
+ mosaic_writer_free(handle_);
+ }
+ callbacks_ = std::move(other.callbacks_);
+ handle_ = other.handle_;
+ other.handle_ = nullptr;
+ }
+ return *this;
+ }
+
+ ~Writer() {
+ if (handle_) {
+ mosaic_writer_close(handle_);
+ mosaic_writer_free(handle_);
+ }
+ }
+
+ void write(void* ffi_array, void* ffi_schema) {
+ check(mosaic_writer_write_batch(
+ handle_,
+ static_cast<ArrowArray*>(ffi_array),
+ static_cast<ArrowSchema*>(ffi_schema)));
+ }
+
+ int64_t estimated_file_size() const {
+ int64_t out = 0;
+ check(mosaic_writer_estimated_file_size(handle_, &out));
+ return out;
+ }
+
+ void close() {
+ check(mosaic_writer_close(handle_));
+ }
+
+private:
+ std::shared_ptr<OutputFile> callbacks_;
+ MosaicWriterHandle* handle_ = nullptr;
+};
+
+// ======================== Statistics ========================
+
+struct ColumnStatistics {
+ uint32_t column_index;
+ uint64_t null_count;
+ std::vector<uint8_t> min_value;
+ std::vector<uint8_t> max_value;
+ bool has_min_max() const { return !min_value.empty(); }
+};
+
+// ======================== Reader ========================
+
+/// Input file for reading Mosaic files.
+///
+/// `read_at_fn` must be thread-safe: the reader may call it concurrently
+/// from multiple threads to perform parallel IO.
+struct InputFile {
+ std::function<int(uint64_t offset, uint8_t* buf, size_t len)> read_at_fn;
+ uint64_t file_length = 0;
+};
+
+namespace detail {
+
+inline int32_t input_read_at(void* ctx, uint64_t offset, uint8_t* buf, size_t
len) noexcept {
+ try {
+ auto* cbs = static_cast<InputFile*>(ctx);
+ return cbs->read_at_fn(offset, buf, len);
+ } catch (...) {
+ return -1;
+ }
+}
+
+inline uint64_t input_length(void* ctx) noexcept {
+ auto* cbs = static_cast<InputFile*>(ctx);
+ return cbs->file_length;
+}
+
+} // namespace detail
+
+class Reader {
+public:
+ Reader(const Reader&) = delete;
+ Reader& operator=(const Reader&) = delete;
+ Reader(Reader&& other) noexcept
+ : callbacks_(std::move(other.callbacks_)), handle_(other.handle_) {
+ other.handle_ = nullptr;
+ }
+
+ ~Reader() {
+ if (handle_) mosaic_reader_free(handle_);
+ }
+
+ uint32_t num_row_groups() const {
+ uint32_t out = 0;
+ check(mosaic_reader_num_row_groups(handle_, &out));
+ return out;
+ }
+
+ void export_schema(void* out_schema) const {
+ check(mosaic_reader_export_schema(handle_,
static_cast<ArrowSchema*>(out_schema)));
+ }
+
+ void read_row_group(uint32_t rg_index, void* out_array, void* out_schema) {
+ auto* rg = mosaic_reader_open_row_group(handle_, rg_index);
+ if (!rg) throw Error("failed to open row group");
+ auto* rb = mosaic_row_group_reader_read_columns(rg);
+ mosaic_row_group_reader_free(rg);
+ if (!rb) throw Error("read_columns failed");
+ int rc = mosaic_record_batch_export(rb,
+ static_cast<ArrowArray*>(out_array),
+ static_cast<ArrowSchema*>(out_schema));
+ mosaic_record_batch_free(rb);
+ if (rc != 0) throw Error("record_batch_export failed");
+ }
+
+ void read_row_group(uint32_t rg_index, const uint32_t* cols, uint32_t
num_cols,
+ void* out_array, void* out_schema) {
+ auto* rg = mosaic_reader_open_row_group_projected(handle_, rg_index,
cols, num_cols);
+ if (!rg) throw Error("failed to open row group");
+ auto* rb = mosaic_row_group_reader_read_columns(rg);
+ mosaic_row_group_reader_free(rg);
+ if (!rb) throw Error("read_columns failed");
+ int rc = mosaic_record_batch_export(rb,
+ static_cast<ArrowArray*>(out_array),
+ static_cast<ArrowSchema*>(out_schema));
+ mosaic_record_batch_free(rb);
+ if (rc != 0) throw Error("record_batch_export failed");
+ }
+
+ std::vector<ColumnStatistics> get_row_group_statistics(uint32_t rg_index)
const {
+ uint32_t n = 0;
+ check(mosaic_reader_row_group_num_stats(handle_, rg_index, &n));
+ std::vector<ColumnStatistics> result;
+ result.reserve(n);
+ for (uint32_t i = 0; i < n; i++) {
+ ColumnStatistics s;
+ check(mosaic_reader_row_group_stat_column_index(handle_, rg_index,
i, &s.column_index));
+ check(mosaic_reader_row_group_stat_null_count(handle_, rg_index,
i, &s.null_count));
+ size_t min_len = 0, max_len = 0;
+ const uint8_t* min_ptr = mosaic_reader_row_group_stat_min(handle_,
rg_index, i, &min_len);
+ const uint8_t* max_ptr = mosaic_reader_row_group_stat_max(handle_,
rg_index, i, &max_len);
+ if (min_ptr && min_len > 0)
+ s.min_value.assign(min_ptr, min_ptr + min_len);
+ if (max_ptr && max_len > 0)
+ s.max_value.assign(max_ptr, max_ptr + max_len);
+ result.push_back(std::move(s));
+ }
+ return result;
+ }
+
+private:
+ friend Reader make_reader(InputFile callbacks, uint64_t len);
+ Reader(std::shared_ptr<InputFile> cbs, MosaicReaderHandle* h)
+ : callbacks_(std::move(cbs)), handle_(h) {}
+ std::shared_ptr<InputFile> callbacks_;
+ MosaicReaderHandle* handle_;
+};
+
+inline Reader make_reader(InputFile callbacks, uint64_t len) {
+ callbacks.file_length = len;
+ auto cbs = std::make_shared<InputFile>(std::move(callbacks));
+ MosaicInputFile input;
+ input.ctx = cbs.get();
+ input.read_at_fn = detail::input_read_at;
+ input.length_fn = detail::input_length;
+ auto* handle = mosaic_reader_open(input);
+ if (!handle) throw Error("failed to open reader");
+ return Reader(std::move(cbs), handle);
+}
+
+} // namespace mosaic