This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 2815d6b  feat: support decimals in cpp binding (#265)
2815d6b is described below

commit 2815d6bac13e8125b55f29b2adb2d5db9434f64f
Author: Anton Borisov <[email protected]>
AuthorDate: Sun Feb 8 03:24:59 2026 +0000

    feat: support decimals in cpp binding (#265)
---
 Cargo.toml                         |   1 +
 bindings/cpp/Cargo.toml            |   1 +
 bindings/cpp/examples/example.cpp  |  95 +++++++++++++++++++++++---
 bindings/cpp/include/fluss.hpp     | 121 +++++++++++++++++++++++++++++++--
 bindings/cpp/src/ffi_converter.hpp |  14 +++-
 bindings/cpp/src/lib.rs            |  15 ++++-
 bindings/cpp/src/types.rs          | 133 +++++++++++++++++++++++++++++++++----
 crates/fluss/Cargo.toml            |   2 +-
 8 files changed, 352 insertions(+), 30 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index dfddd8d..77d7140 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -37,6 +37,7 @@ tokio = { version = "1.44.2", features = ["full"] }
 clap = { version = "4.5.37", features = ["derive"] }
 arrow = { version = "57.0.0", features = ["ipc_compression"] }
 
+bigdecimal = "0.4"
 serde = { version = "1.0", features = ["derive"] }
 serde_json = "1.0"
 opendal = "0.53"
diff --git a/bindings/cpp/Cargo.toml b/bindings/cpp/Cargo.toml
index 8606a22..2681652 100644
--- a/bindings/cpp/Cargo.toml
+++ b/bindings/cpp/Cargo.toml
@@ -29,6 +29,7 @@ crate-type = ["staticlib"]
 [dependencies]
 anyhow = "1.0"
 arrow = { workspace = true, features = ["ffi"] }
+bigdecimal = { workspace = true }
 cxx = "1.0"
 fluss = { workspace = true, features = ["storage-all"] }
 tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
diff --git a/bindings/cpp/examples/example.cpp 
b/bindings/cpp/examples/example.cpp
index 92ebe9c..efdf2e8 100644
--- a/bindings/cpp/examples/example.cpp
+++ b/bindings/cpp/examples/example.cpp
@@ -54,14 +54,14 @@ int main() {
 
     // 3) Schema with scalar and temporal columns
     auto schema = fluss::Schema::NewBuilder()
-                        .AddColumn("id", fluss::DataType::Int)
-                        .AddColumn("name", fluss::DataType::String)
-                        .AddColumn("score", fluss::DataType::Float)
-                        .AddColumn("age", fluss::DataType::Int)
-                        .AddColumn("event_date", fluss::DataType::Date)
-                        .AddColumn("event_time", fluss::DataType::Time)
-                        .AddColumn("created_at", fluss::DataType::Timestamp)
-                        .AddColumn("updated_at", fluss::DataType::TimestampLtz)
+                        .AddColumn("id", fluss::DataType::Int())
+                        .AddColumn("name", fluss::DataType::String())
+                        .AddColumn("score", fluss::DataType::Float())
+                        .AddColumn("age", fluss::DataType::Int())
+                        .AddColumn("event_date", fluss::DataType::Date())
+                        .AddColumn("event_time", fluss::DataType::Time())
+                        .AddColumn("created_at", fluss::DataType::Timestamp())
+                        .AddColumn("updated_at", 
fluss::DataType::TimestampLtz())
                         .Build();
 
     auto descriptor = fluss::TableDescriptor::NewBuilder()
@@ -131,6 +131,10 @@ int main() {
         row.SetString(1, "AckTest");
         row.SetFloat32(2, 99.9f);
         row.SetInt32(3, 42);
+        row.SetDate(4, fluss::Date::FromYMD(2025, 3, 1));
+        row.SetTime(5, fluss::Time::FromHMS(12, 0, 0));
+        row.SetTimestampNtz(6, fluss::Timestamp::FromMillis(1740787200000));
+        row.SetTimestampLtz(7, fluss::Timestamp::FromMillis(1740787200000));
         fluss::WriteResult wr;
         check("append", writer.Append(row, wr));
         check("wait", wr.Wait());
@@ -365,5 +369,80 @@ int main() {
         }
     }
 
+    // 12) Decimal support example
+    std::cout << "\n=== Decimal Support Example ===" << std::endl;
+
+    fluss::TablePath decimal_table_path("fluss", "decimal_table_cpp_v1");
+
+    // Drop table if exists
+    admin.DropTable(decimal_table_path, true);
+
+    // Create schema with decimal columns
+    auto decimal_schema = fluss::Schema::NewBuilder()
+                              .AddColumn("id", fluss::DataType::Int())
+                              .AddColumn("price", fluss::DataType::Decimal(10, 
2))    // compact
+                              .AddColumn("amount", 
fluss::DataType::Decimal(28, 8))   // i128
+                              .Build();
+
+    auto decimal_descriptor = fluss::TableDescriptor::NewBuilder()
+                                  .SetSchema(decimal_schema)
+                                  .SetBucketCount(1)
+                                  .SetComment("cpp decimal example table")
+                                  .Build();
+
+    check("create_decimal_table", admin.CreateTable(decimal_table_path, 
decimal_descriptor, false));
+
+    // Get table and writer
+    fluss::Table decimal_table;
+    check("get_decimal_table", conn.GetTable(decimal_table_path, 
decimal_table));
+
+    fluss::AppendWriter decimal_writer;
+    check("new_decimal_writer", decimal_table.NewAppendWriter(decimal_writer));
+
+    // Just provide the value — Rust resolves (p,s) from schema
+    {
+        fluss::GenericRow row;
+        row.SetInt32(0, 1);
+        row.SetDecimal(1, "123.45");       // Rust knows DECIMAL(10,2)
+        row.SetDecimal(2, "1.00000000");   // Rust knows DECIMAL(28,8)
+        check("append_decimal", decimal_writer.Append(row));
+    }
+    {
+        fluss::GenericRow row;
+        row.SetInt32(0, 2);
+        row.SetDecimal(1, "-999.99");
+        row.SetDecimal(2, "3.14159265");
+        check("append_decimal", decimal_writer.Append(row));
+    }
+    {
+        fluss::GenericRow row;
+        row.SetInt32(0, 3);
+        row.SetDecimal(1, "500.00");
+        row.SetDecimal(2, "2.71828182");
+        check("append_decimal", decimal_writer.Append(row));
+    }
+    check("flush_decimal", decimal_writer.Flush());
+    std::cout << "Wrote 3 decimal rows" << std::endl;
+
+    // Scan and read back
+    fluss::LogScanner decimal_scanner;
+    check("new_decimal_scanner", 
decimal_table.NewScan().CreateLogScanner(decimal_scanner));
+    check("subscribe_decimal", decimal_scanner.Subscribe(0, 0));
+
+    fluss::ScanRecords decimal_records;
+    check("poll_decimal", decimal_scanner.Poll(5000, decimal_records));
+
+    std::cout << "Scanned decimal records: " << decimal_records.Size() << 
std::endl;
+    for (const auto& rec : decimal_records) {
+        auto& price = rec.row.fields[1];
+        auto& amount = rec.row.fields[2];
+        std::cout << "  id=" << rec.row.fields[0].i32_val
+                  << " price=" << price.DecimalToString()
+                  << " (raw=" << price.i64_val << ")"
+                  << " amount=" << amount.DecimalToString()
+                  << " is_decimal=" << price.IsDecimal()
+                  << std::endl;
+    }
+
     return 0;
 }
diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index 239d9a4..8125c49 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -20,8 +20,8 @@
 #pragma once
 
 #include <chrono>
+#include <cstdint>
 #include <memory>
-#include <optional>
 #include <string>
 #include <unordered_map>
 #include <vector>
@@ -100,7 +100,7 @@ struct Timestamp {
     }
 };
 
-enum class DataType {
+enum class TypeId {
     Boolean = 1,
     TinyInt = 2,
     SmallInt = 3,
@@ -114,6 +114,43 @@ enum class DataType {
     Time = 11,
     Timestamp = 12,
     TimestampLtz = 13,
+    Decimal = 14,
+};
+
+class DataType {
+public:
+    explicit DataType(TypeId id, int32_t p = 0, int32_t s = 0)
+        : id_(id), precision_(p), scale_(s) {}
+
+    static DataType Boolean() { return DataType(TypeId::Boolean); }
+    static DataType TinyInt() { return DataType(TypeId::TinyInt); }
+    static DataType SmallInt() { return DataType(TypeId::SmallInt); }
+    static DataType Int() { return DataType(TypeId::Int); }
+    static DataType BigInt() { return DataType(TypeId::BigInt); }
+    static DataType Float() { return DataType(TypeId::Float); }
+    static DataType Double() { return DataType(TypeId::Double); }
+    static DataType String() { return DataType(TypeId::String); }
+    static DataType Bytes() { return DataType(TypeId::Bytes); }
+    static DataType Date() { return DataType(TypeId::Date); }
+    static DataType Time() { return DataType(TypeId::Time); }
+    static DataType Timestamp(int32_t precision = 6) {
+        return DataType(TypeId::Timestamp, precision, 0);
+    }
+    static DataType TimestampLtz(int32_t precision = 6) {
+        return DataType(TypeId::TimestampLtz, precision, 0);
+    }
+    static DataType Decimal(int32_t precision, int32_t scale) {
+        return DataType(TypeId::Decimal, precision, scale);
+    }
+
+    TypeId id() const { return id_; }
+    int32_t precision() const { return precision_; }
+    int32_t scale() const { return scale_; }
+
+private:
+    TypeId id_;
+    int32_t precision_{0};
+    int32_t scale_{0};
 };
 
 enum class DatumType {
@@ -125,7 +162,9 @@ enum class DatumType {
     Float64 = 5,
     String = 6,
     Bytes = 7,
-    // 8-10 reserved for decimal types
+    DecimalI64 = 8,
+    DecimalI128 = 9,
+    DecimalString = 10,
     Date = 11,
     Time = 12,
     TimestampNtz = 13,
@@ -182,7 +221,7 @@ struct Schema {
     public:
         Builder& AddColumn(std::string name, DataType type,
                            std::string comment = "") {
-            columns_.push_back({std::move(name), type, std::move(comment)});
+            columns_.push_back({std::move(name), std::move(type), 
std::move(comment)});
             return *this;
         }
 
@@ -290,6 +329,10 @@ struct Datum {
     double f64_val{0.0};
     std::string string_val;
     std::vector<uint8_t> bytes_val;
+    int32_t decimal_precision{0};  // Decimal: precision (total digits)
+    int32_t decimal_scale{0};      // Decimal: scale (digits after decimal 
point)
+    int64_t i128_hi{0};            // Decimal (i128): high 64 bits of unscaled 
value
+    int64_t i128_lo{0};            // Decimal (i128): low 64 bits of unscaled 
value
 
     static Datum Null() { return {}; }
     static Datum Bool(bool v) {
@@ -360,10 +403,75 @@ struct Datum {
         dat.i32_val = ts.nano_of_millisecond;
         return dat;
     }
+    // Stores the decimal string as-is. Rust side will parse via BigDecimal,
+    // look up (p,s) from the schema, validate, and create the Decimal.
+    static Datum DecimalString(std::string str) {
+        Datum d;
+        d.type = DatumType::DecimalString;
+        d.string_val = std::move(str);
+        return d;
+    }
 
     fluss::Date GetDate() const { return {i32_val}; }
     fluss::Time GetTime() const { return {i32_val}; }
     fluss::Timestamp GetTimestamp() const { return {i64_val, i32_val}; }
+
+    bool IsDecimal() const {
+        return type == DatumType::DecimalI64 || type == DatumType::DecimalI128
+            || type == DatumType::DecimalString;
+    }
+
+    std::string DecimalToString() const {
+        if (type == DatumType::DecimalI64) {
+            return FormatUnscaled64(i64_val, decimal_scale);
+        } else if (type == DatumType::DecimalI128) {
+            unsigned __int128 uval = (static_cast<unsigned 
__int128>(static_cast<uint64_t>(i128_hi)) << 64) |
+                                     static_cast<unsigned 
__int128>(static_cast<uint64_t>(i128_lo));
+            __int128 val = static_cast<__int128>(uval);
+            return FormatUnscaled128(val, decimal_scale);
+        } else if (type == DatumType::DecimalString) {
+            return string_val;
+        }
+        return "";
+    }
+
+private:
+    static std::string FormatUnscaled64(int64_t unscaled, int32_t scale) {
+        bool negative = unscaled < 0;
+        uint64_t abs_val = negative ? -static_cast<uint64_t>(unscaled) : 
static_cast<uint64_t>(unscaled);
+        std::string digits = std::to_string(abs_val);
+        if (scale <= 0) {
+            return (negative ? "-" : "") + digits;
+        }
+        while (static_cast<int32_t>(digits.size()) <= scale) {
+            digits = "0" + digits;
+        }
+        auto pos = digits.size() - static_cast<size_t>(scale);
+        return (negative ? "-" : "") + digits.substr(0, pos) + "." + 
digits.substr(pos);
+    }
+
+    static std::string FormatUnscaled128(__int128 val, int32_t scale) {
+        bool negative = val < 0;
+        unsigned __int128 abs_val = negative ? -static_cast<unsigned 
__int128>(val)
+                                             : static_cast<unsigned 
__int128>(val);
+        std::string digits;
+        if (abs_val == 0) {
+            digits = "0";
+        } else {
+            while (abs_val > 0) {
+                digits = static_cast<char>('0' + static_cast<int>(abs_val % 
10)) + digits;
+                abs_val /= 10;
+            }
+        }
+        if (scale <= 0) {
+            return (negative ? "-" : "") + digits;
+        }
+        while (static_cast<int32_t>(digits.size()) <= scale) {
+            digits = "0" + digits;
+        }
+        auto pos = digits.size() - static_cast<size_t>(scale);
+        return (negative ? "-" : "") + digits.substr(0, pos) + "." + 
digits.substr(pos);
+    }
 };
 
 struct GenericRow {
@@ -429,6 +537,11 @@ struct GenericRow {
         fields[idx] = Datum::TimestampLtz(ts);
     }
 
+    void SetDecimal(size_t idx, const std::string& value) {
+        EnsureSize(idx);
+        fields[idx] = Datum::DecimalString(value);
+    }
+
 private:
     void EnsureSize(size_t idx) {
         if (fields.size() <= idx) {
diff --git a/bindings/cpp/src/ffi_converter.hpp 
b/bindings/cpp/src/ffi_converter.hpp
index 63a2e91..e3e63a8 100644
--- a/bindings/cpp/src/ffi_converter.hpp
+++ b/bindings/cpp/src/ffi_converter.hpp
@@ -47,8 +47,10 @@ inline ffi::FfiTablePath to_ffi_table_path(const TablePath& 
path) {
 inline ffi::FfiColumn to_ffi_column(const Column& col) {
     ffi::FfiColumn ffi_col;
     ffi_col.name = rust::String(col.name);
-    ffi_col.data_type = static_cast<int32_t>(col.data_type);
+    ffi_col.data_type = static_cast<int32_t>(col.data_type.id());
     ffi_col.comment = rust::String(col.comment);
+    ffi_col.precision = col.data_type.precision();
+    ffi_col.scale = col.data_type.scale();
     return ffi_col;
 }
 
@@ -112,6 +114,10 @@ inline ffi::FfiDatum to_ffi_datum(const Datum& datum) {
     ffi_datum.f32_val = datum.f32_val;
     ffi_datum.f64_val = datum.f64_val;
     ffi_datum.string_val = rust::String(datum.string_val);
+    ffi_datum.decimal_precision = datum.decimal_precision;
+    ffi_datum.decimal_scale = datum.decimal_scale;
+    ffi_datum.i128_hi = datum.i128_hi;
+    ffi_datum.i128_lo = datum.i128_lo;
 
     rust::Vec<uint8_t> bytes;
     for (auto b : datum.bytes_val) {
@@ -137,7 +143,7 @@ inline ffi::FfiGenericRow to_ffi_generic_row(const 
GenericRow& row) {
 inline Column from_ffi_column(const ffi::FfiColumn& ffi_col) {
     return Column{
         std::string(ffi_col.name),
-        static_cast<DataType>(ffi_col.data_type),
+        DataType(static_cast<TypeId>(ffi_col.data_type), ffi_col.precision, 
ffi_col.scale),
         std::string(ffi_col.comment)};
 }
 
@@ -202,6 +208,10 @@ inline Datum from_ffi_datum(const ffi::FfiDatum& 
ffi_datum) {
     datum.f64_val = ffi_datum.f64_val;
     // todo: avoid copy string
     datum.string_val = std::string(ffi_datum.string_val);
+    datum.decimal_precision = ffi_datum.decimal_precision;
+    datum.decimal_scale = ffi_datum.decimal_scale;
+    datum.i128_hi = ffi_datum.i128_hi;
+    datum.i128_lo = ffi_datum.i128_lo;
 
     for (auto b : ffi_datum.bytes_val) {
         datum.bytes_val.push_back(b);
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index 5a26613..4aeb13d 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -51,6 +51,8 @@ mod ffi {
         name: String,
         data_type: i32,
         comment: String,
+        precision: i32,
+        scale: i32,
     }
 
     struct FfiSchema {
@@ -98,6 +100,10 @@ mod ffi {
         f64_val: f64,
         string_val: String,
         bytes_val: Vec<u8>,
+        decimal_precision: i32,
+        decimal_scale: i32,
+        i128_hi: i64,
+        i128_lo: i64,
     }
 
     struct FfiGenericRow {
@@ -301,6 +307,7 @@ pub struct Table {
 
 pub struct AppendWriter {
     inner: fcore::client::AppendWriter,
+    table_info: fcore::metadata::TableInfo,
 }
 
 pub struct WriteResult {
@@ -636,7 +643,10 @@ impl Table {
             Ok(w) => w,
             Err(e) => return Err(format!("Failed to create writer: {e}")),
         };
-        let writer = Box::into_raw(Box::new(AppendWriter { inner: writer }));
+        let writer = Box::into_raw(Box::new(AppendWriter {
+            inner: writer,
+            table_info: self.table_info.clone(),
+        }));
         Ok(writer)
     }
 
@@ -792,7 +802,8 @@ unsafe fn delete_append_writer(writer: *mut AppendWriter) {
 
 impl AppendWriter {
     fn append(&mut self, row: &ffi::FfiGenericRow) -> Result<Box<WriteResult>, 
String> {
-        let generic_row = types::ffi_row_to_core(row);
+        let schema = self.table_info.get_schema();
+        let generic_row = types::ffi_row_to_core(row, 
Some(schema)).map_err(|e| e.to_string())?;
 
         let result_future = self
             .inner
diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs
index f546b68..7837032 100644
--- a/bindings/cpp/src/types.rs
+++ b/bindings/cpp/src/types.rs
@@ -18,8 +18,8 @@
 use crate::ffi;
 use anyhow::{Result, anyhow};
 use arrow::array::{
-    Date32Array, LargeBinaryArray, LargeStringArray, Time32MillisecondArray, 
Time32SecondArray,
-    Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray,
+    Date32Array, Decimal128Array, LargeBinaryArray, LargeStringArray, 
Time32MillisecondArray,
+    Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray, 
TimestampMicrosecondArray,
     TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
 };
 use arrow::datatypes::{DataType as ArrowDataType, TimeUnit};
@@ -27,6 +27,7 @@ use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
 use fcore::row::InternalRow;
 use fluss as fcore;
 use std::borrow::Cow;
+use std::str::FromStr;
 
 use arrow::array::Array;
 
@@ -43,6 +44,7 @@ pub const DATA_TYPE_DATE: i32 = 10;
 pub const DATA_TYPE_TIME: i32 = 11;
 pub const DATA_TYPE_TIMESTAMP: i32 = 12;
 pub const DATA_TYPE_TIMESTAMP_LTZ: i32 = 13;
+pub const DATA_TYPE_DECIMAL: i32 = 14;
 
 pub const DATUM_TYPE_NULL: i32 = 0;
 pub const DATUM_TYPE_BOOL: i32 = 1;
@@ -52,6 +54,9 @@ pub const DATUM_TYPE_FLOAT32: i32 = 4;
 pub const DATUM_TYPE_FLOAT64: i32 = 5;
 pub const DATUM_TYPE_STRING: i32 = 6;
 pub const DATUM_TYPE_BYTES: i32 = 7;
+pub const DATUM_TYPE_DECIMAL_I64: i32 = 8;
+pub const DATUM_TYPE_DECIMAL_I128: i32 = 9;
+pub const DATUM_TYPE_DECIMAL_STRING: i32 = 10;
 pub const DATUM_TYPE_DATE: i32 = 11;
 pub const DATUM_TYPE_TIME: i32 = 12;
 pub const DATUM_TYPE_TIMESTAMP_NTZ: i32 = 13;
@@ -62,7 +67,7 @@ const MICROS_PER_MILLI: i64 = 1_000;
 const NANOS_PER_MICRO: i64 = 1_000;
 const NANOS_PER_MILLI: i64 = 1_000_000;
 
-fn ffi_data_type_to_core(dt: i32) -> Result<fcore::metadata::DataType> {
+fn ffi_data_type_to_core(dt: i32, precision: u32, scale: u32) -> 
Result<fcore::metadata::DataType> {
     match dt {
         DATA_TYPE_BOOLEAN => Ok(fcore::metadata::DataTypes::boolean()),
         DATA_TYPE_TINYINT => Ok(fcore::metadata::DataTypes::tinyint()),
@@ -75,8 +80,16 @@ fn ffi_data_type_to_core(dt: i32) -> 
Result<fcore::metadata::DataType> {
         DATA_TYPE_BYTES => Ok(fcore::metadata::DataTypes::bytes()),
         DATA_TYPE_DATE => Ok(fcore::metadata::DataTypes::date()),
         DATA_TYPE_TIME => Ok(fcore::metadata::DataTypes::time()),
-        DATA_TYPE_TIMESTAMP => Ok(fcore::metadata::DataTypes::timestamp()),
-        DATA_TYPE_TIMESTAMP_LTZ => 
Ok(fcore::metadata::DataTypes::timestamp_ltz()),
+        DATA_TYPE_TIMESTAMP => 
Ok(fcore::metadata::DataTypes::timestamp_with_precision(
+            precision,
+        )),
+        DATA_TYPE_TIMESTAMP_LTZ => 
Ok(fcore::metadata::DataTypes::timestamp_ltz_with_precision(
+            precision,
+        )),
+        DATA_TYPE_DECIMAL => {
+            let dt = fcore::metadata::DecimalType::new(precision, scale)?;
+            Ok(fcore::metadata::DataType::Decimal(dt))
+        }
         _ => Err(anyhow!("Unknown data type: {dt}")),
     }
 }
@@ -96,6 +109,7 @@ fn core_data_type_to_ffi(dt: &fcore::metadata::DataType) -> 
i32 {
         fcore::metadata::DataType::Time(_) => DATA_TYPE_TIME,
         fcore::metadata::DataType::Timestamp(_) => DATA_TYPE_TIMESTAMP,
         fcore::metadata::DataType::TimestampLTz(_) => DATA_TYPE_TIMESTAMP_LTZ,
+        fcore::metadata::DataType::Decimal(_) => DATA_TYPE_DECIMAL,
         _ => 0,
     }
 }
@@ -106,7 +120,13 @@ pub fn ffi_descriptor_to_core(
     let mut schema_builder = fcore::metadata::Schema::builder();
 
     for col in &descriptor.schema.columns {
-        let dt = ffi_data_type_to_core(col.data_type)?;
+        if col.precision < 0 || col.scale < 0 {
+            return Err(anyhow!(
+                "Column '{}': precision and scale must be non-negative",
+                col.name
+            ));
+        }
+        let dt = ffi_data_type_to_core(col.data_type, col.precision as u32, 
col.scale as u32)?;
         schema_builder = schema_builder.column(&col.name, dt);
         if !col.comment.is_empty() {
             schema_builder = schema_builder.with_comment(&col.comment);
@@ -148,10 +168,22 @@ pub fn core_table_info_to_ffi(info: 
&fcore::metadata::TableInfo) -> ffi::FfiTabl
     let columns: Vec<ffi::FfiColumn> = schema
         .columns()
         .iter()
-        .map(|col| ffi::FfiColumn {
-            name: col.name().to_string(),
-            data_type: core_data_type_to_ffi(col.data_type()),
-            comment: col.comment().unwrap_or("").to_string(),
+        .map(|col| {
+            let (precision, scale) = match col.data_type() {
+                fcore::metadata::DataType::Decimal(dt) => {
+                    (dt.precision() as i32, dt.scale() as i32)
+                }
+                fcore::metadata::DataType::Timestamp(dt) => (dt.precision() as 
i32, 0),
+                fcore::metadata::DataType::TimestampLTz(dt) => (dt.precision() 
as i32, 0),
+                _ => (0, 0),
+            };
+            ffi::FfiColumn {
+                name: col.name().to_string(),
+                data_type: core_data_type_to_ffi(col.data_type()),
+                comment: col.comment().unwrap_or("").to_string(),
+                precision,
+                scale,
+            }
         })
         .collect();
 
@@ -218,7 +250,21 @@ pub fn empty_table_info() -> ffi::FfiTableInfo {
     }
 }
 
-pub fn ffi_row_to_core(row: &ffi::FfiGenericRow) -> fcore::row::GenericRow<'_> 
{
+/// Look up decimal (precision, scale) from schema for column `idx`.
+fn get_decimal_type(idx: usize, schema: Option<&fcore::metadata::Schema>) -> 
Result<(u32, u32)> {
+    let col = schema
+        .and_then(|s| s.columns().get(idx))
+        .ok_or_else(|| anyhow!("Schema not available for decimal column 
{idx}"))?;
+    match col.data_type() {
+        fcore::metadata::DataType::Decimal(dt) => Ok((dt.precision(), 
dt.scale())),
+        other => Err(anyhow!("Column {idx} is {:?}, not Decimal", other)),
+    }
+}
+
+pub fn ffi_row_to_core<'a>(
+    row: &'a ffi::FfiGenericRow,
+    schema: Option<&fcore::metadata::Schema>,
+) -> Result<fcore::row::GenericRow<'a>> {
     use fcore::row::Datum;
 
     let mut generic_row = fcore::row::GenericRow::new(row.fields.len());
@@ -233,6 +279,40 @@ pub fn ffi_row_to_core(row: &ffi::FfiGenericRow) -> 
fcore::row::GenericRow<'_> {
             DATUM_TYPE_FLOAT64 => Datum::Float64(field.f64_val.into()),
             DATUM_TYPE_STRING => 
Datum::String(Cow::Borrowed(field.string_val.as_str())),
             DATUM_TYPE_BYTES => 
Datum::Blob(Cow::Borrowed(field.bytes_val.as_slice())),
+            DATUM_TYPE_DECIMAL_STRING => {
+                let (precision, scale) = get_decimal_type(idx, schema)?;
+                let bd =
+                    
bigdecimal::BigDecimal::from_str(field.string_val.as_str()).map_err(|e| {
+                        anyhow!(
+                            "Column {idx}: invalid decimal string '{}': {e}",
+                            field.string_val
+                        )
+                    })?;
+                let decimal = fcore::row::Decimal::from_big_decimal(bd, 
precision, scale)
+                    .map_err(|e| anyhow!("Column {idx}: {e}"))?;
+                Datum::Decimal(decimal)
+            }
+            DATUM_TYPE_DECIMAL_I64 => {
+                let precision = field.decimal_precision as u32;
+                let scale = field.decimal_scale as u32;
+                let decimal =
+                    fcore::row::Decimal::from_unscaled_long(field.i64_val, 
precision, scale)
+                        .map_err(|e| anyhow!("Column {idx}: {e}"))?;
+                Datum::Decimal(decimal)
+            }
+            DATUM_TYPE_DECIMAL_I128 => {
+                let precision = field.decimal_precision as u32;
+                let scale = field.decimal_scale as u32;
+                let i128_val = ((field.i128_hi as i128) << 64) | 
(field.i128_lo as u64 as i128);
+                let decimal = fcore::row::Decimal::from_arrow_decimal128(
+                    i128_val,
+                    scale as i64,
+                    precision,
+                    scale,
+                )
+                .map_err(|e| anyhow!("Column {idx}: {e}"))?;
+                Datum::Decimal(decimal)
+            }
             DATUM_TYPE_DATE => 
Datum::Date(fcore::row::Date::new(field.i32_val)),
             DATUM_TYPE_TIME => 
Datum::Time(fcore::row::Time::new(field.i32_val)),
             DATUM_TYPE_TIMESTAMP_NTZ => Datum::TimestampNtz(
@@ -243,12 +323,12 @@ pub fn ffi_row_to_core(row: &ffi::FfiGenericRow) -> 
fcore::row::GenericRow<'_> {
                 fcore::row::TimestampLtz::from_millis_nanos(field.i64_val, 
field.i32_val)
                     .unwrap_or_else(|_| 
fcore::row::TimestampLtz::new(field.i64_val)),
             ),
-            _ => Datum::Null,
+            other => return Err(anyhow!("Column {idx}: unknown datum type 
{other}")),
         };
         generic_row.set_field(idx, datum);
     }
 
-    generic_row
+    Ok(generic_row)
 }
 
 pub fn core_scan_records_to_ffi(
@@ -292,6 +372,10 @@ fn core_row_to_ffi_fields(
             f64_val: 0.0,
             string_val: String::new(),
             bytes_val: vec![],
+            decimal_precision: 0,
+            decimal_scale: 0,
+            i128_hi: 0,
+            i128_lo: 0,
         }
     }
 
@@ -485,6 +569,29 @@ fn core_row_to_ffi_fields(
                 }
                 _ => panic!("Will never come here. Unsupported Time64 unit for 
column {i}"),
             },
+            ArrowDataType::Decimal128(precision, scale) => {
+                let array = record_batch
+                    .column(i)
+                    .as_any()
+                    .downcast_ref::<Decimal128Array>()
+                    .expect("Decimal128 column expected");
+                let i128_val = array.value(row_id);
+
+                if fcore::row::Decimal::is_compact_precision(*precision as 
u32) {
+                    let mut datum = new_datum(DATUM_TYPE_DECIMAL_I64);
+                    datum.i64_val = i128_val as i64;
+                    datum.decimal_precision = *precision as i32;
+                    datum.decimal_scale = *scale as i32;
+                    datum
+                } else {
+                    let mut datum = new_datum(DATUM_TYPE_DECIMAL_I128);
+                    datum.i128_hi = (i128_val >> 64) as i64;
+                    datum.i128_lo = i128_val as i64;
+                    datum.decimal_precision = *precision as i32;
+                    datum.decimal_scale = *scale as i32;
+                    datum
+                }
+            }
             other => panic!(
                 "Will never come here. Unsupported Arrow data type for column 
{i}: {other:?}"
             ),
diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml
index 4d9be02..db1348a 100644
--- a/crates/fluss/Cargo.toml
+++ b/crates/fluss/Cargo.toml
@@ -59,7 +59,7 @@ tokio = { workspace = true }
 parking_lot = "0.12"
 bytes = "1.10.1"
 dashmap = "6.1.0"
-bigdecimal = { version = "0.4", features = ["serde"] }
+bigdecimal = { workspace = true, features = ["serde"] }
 ordered-float = { version = "5", features = ["serde"] }
 parse-display = "0.10"
 jiff = { workspace = true }

Reply via email to