This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 2815d6b feat: support decimals in cpp binding (#265)
2815d6b is described below
commit 2815d6bac13e8125b55f29b2adb2d5db9434f64f
Author: Anton Borisov <[email protected]>
AuthorDate: Sun Feb 8 03:24:59 2026 +0000
feat: support decimals in cpp binding (#265)
---
Cargo.toml | 1 +
bindings/cpp/Cargo.toml | 1 +
bindings/cpp/examples/example.cpp | 95 +++++++++++++++++++++++---
bindings/cpp/include/fluss.hpp | 121 +++++++++++++++++++++++++++++++--
bindings/cpp/src/ffi_converter.hpp | 14 +++-
bindings/cpp/src/lib.rs | 15 ++++-
bindings/cpp/src/types.rs | 133 +++++++++++++++++++++++++++++++++----
crates/fluss/Cargo.toml | 2 +-
8 files changed, 352 insertions(+), 30 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index dfddd8d..77d7140 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -37,6 +37,7 @@ tokio = { version = "1.44.2", features = ["full"] }
clap = { version = "4.5.37", features = ["derive"] }
arrow = { version = "57.0.0", features = ["ipc_compression"] }
+bigdecimal = "0.4"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
opendal = "0.53"
diff --git a/bindings/cpp/Cargo.toml b/bindings/cpp/Cargo.toml
index 8606a22..2681652 100644
--- a/bindings/cpp/Cargo.toml
+++ b/bindings/cpp/Cargo.toml
@@ -29,6 +29,7 @@ crate-type = ["staticlib"]
[dependencies]
anyhow = "1.0"
arrow = { workspace = true, features = ["ffi"] }
+bigdecimal = { workspace = true }
cxx = "1.0"
fluss = { workspace = true, features = ["storage-all"] }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
diff --git a/bindings/cpp/examples/example.cpp
b/bindings/cpp/examples/example.cpp
index 92ebe9c..efdf2e8 100644
--- a/bindings/cpp/examples/example.cpp
+++ b/bindings/cpp/examples/example.cpp
@@ -54,14 +54,14 @@ int main() {
// 3) Schema with scalar and temporal columns
auto schema = fluss::Schema::NewBuilder()
- .AddColumn("id", fluss::DataType::Int)
- .AddColumn("name", fluss::DataType::String)
- .AddColumn("score", fluss::DataType::Float)
- .AddColumn("age", fluss::DataType::Int)
- .AddColumn("event_date", fluss::DataType::Date)
- .AddColumn("event_time", fluss::DataType::Time)
- .AddColumn("created_at", fluss::DataType::Timestamp)
- .AddColumn("updated_at", fluss::DataType::TimestampLtz)
+ .AddColumn("id", fluss::DataType::Int())
+ .AddColumn("name", fluss::DataType::String())
+ .AddColumn("score", fluss::DataType::Float())
+ .AddColumn("age", fluss::DataType::Int())
+ .AddColumn("event_date", fluss::DataType::Date())
+ .AddColumn("event_time", fluss::DataType::Time())
+ .AddColumn("created_at", fluss::DataType::Timestamp())
+ .AddColumn("updated_at",
fluss::DataType::TimestampLtz())
.Build();
auto descriptor = fluss::TableDescriptor::NewBuilder()
@@ -131,6 +131,10 @@ int main() {
row.SetString(1, "AckTest");
row.SetFloat32(2, 99.9f);
row.SetInt32(3, 42);
+ row.SetDate(4, fluss::Date::FromYMD(2025, 3, 1));
+ row.SetTime(5, fluss::Time::FromHMS(12, 0, 0));
+ row.SetTimestampNtz(6, fluss::Timestamp::FromMillis(1740787200000));
+ row.SetTimestampLtz(7, fluss::Timestamp::FromMillis(1740787200000));
fluss::WriteResult wr;
check("append", writer.Append(row, wr));
check("wait", wr.Wait());
@@ -365,5 +369,80 @@ int main() {
}
}
+ // 12) Decimal support example
+ std::cout << "\n=== Decimal Support Example ===" << std::endl;
+
+ fluss::TablePath decimal_table_path("fluss", "decimal_table_cpp_v1");
+
+ // Drop table if exists
+ admin.DropTable(decimal_table_path, true);
+
+ // Create schema with decimal columns
+ auto decimal_schema = fluss::Schema::NewBuilder()
+ .AddColumn("id", fluss::DataType::Int())
+ .AddColumn("price", fluss::DataType::Decimal(10,
2)) // compact
+ .AddColumn("amount",
fluss::DataType::Decimal(28, 8)) // i128
+ .Build();
+
+ auto decimal_descriptor = fluss::TableDescriptor::NewBuilder()
+ .SetSchema(decimal_schema)
+ .SetBucketCount(1)
+ .SetComment("cpp decimal example table")
+ .Build();
+
+ check("create_decimal_table", admin.CreateTable(decimal_table_path,
decimal_descriptor, false));
+
+ // Get table and writer
+ fluss::Table decimal_table;
+ check("get_decimal_table", conn.GetTable(decimal_table_path,
decimal_table));
+
+ fluss::AppendWriter decimal_writer;
+ check("new_decimal_writer", decimal_table.NewAppendWriter(decimal_writer));
+
+ // Just provide the value — Rust resolves (p,s) from schema
+ {
+ fluss::GenericRow row;
+ row.SetInt32(0, 1);
+ row.SetDecimal(1, "123.45"); // Rust knows DECIMAL(10,2)
+ row.SetDecimal(2, "1.00000000"); // Rust knows DECIMAL(28,8)
+ check("append_decimal", decimal_writer.Append(row));
+ }
+ {
+ fluss::GenericRow row;
+ row.SetInt32(0, 2);
+ row.SetDecimal(1, "-999.99");
+ row.SetDecimal(2, "3.14159265");
+ check("append_decimal", decimal_writer.Append(row));
+ }
+ {
+ fluss::GenericRow row;
+ row.SetInt32(0, 3);
+ row.SetDecimal(1, "500.00");
+ row.SetDecimal(2, "2.71828182");
+ check("append_decimal", decimal_writer.Append(row));
+ }
+ check("flush_decimal", decimal_writer.Flush());
+ std::cout << "Wrote 3 decimal rows" << std::endl;
+
+ // Scan and read back
+ fluss::LogScanner decimal_scanner;
+ check("new_decimal_scanner",
decimal_table.NewScan().CreateLogScanner(decimal_scanner));
+ check("subscribe_decimal", decimal_scanner.Subscribe(0, 0));
+
+ fluss::ScanRecords decimal_records;
+ check("poll_decimal", decimal_scanner.Poll(5000, decimal_records));
+
+ std::cout << "Scanned decimal records: " << decimal_records.Size() <<
std::endl;
+ for (const auto& rec : decimal_records) {
+ auto& price = rec.row.fields[1];
+ auto& amount = rec.row.fields[2];
+ std::cout << " id=" << rec.row.fields[0].i32_val
+ << " price=" << price.DecimalToString()
+ << " (raw=" << price.i64_val << ")"
+ << " amount=" << amount.DecimalToString()
+ << " is_decimal=" << price.IsDecimal()
+ << std::endl;
+ }
+
return 0;
}
diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index 239d9a4..8125c49 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -20,8 +20,8 @@
#pragma once
#include <chrono>
+#include <cstdint>
#include <memory>
-#include <optional>
#include <string>
#include <unordered_map>
#include <vector>
@@ -100,7 +100,7 @@ struct Timestamp {
}
};
-enum class DataType {
+enum class TypeId {
Boolean = 1,
TinyInt = 2,
SmallInt = 3,
@@ -114,6 +114,43 @@ enum class DataType {
Time = 11,
Timestamp = 12,
TimestampLtz = 13,
+ Decimal = 14,
+};
+
+class DataType {
+public:
+ explicit DataType(TypeId id, int32_t p = 0, int32_t s = 0)
+ : id_(id), precision_(p), scale_(s) {}
+
+ static DataType Boolean() { return DataType(TypeId::Boolean); }
+ static DataType TinyInt() { return DataType(TypeId::TinyInt); }
+ static DataType SmallInt() { return DataType(TypeId::SmallInt); }
+ static DataType Int() { return DataType(TypeId::Int); }
+ static DataType BigInt() { return DataType(TypeId::BigInt); }
+ static DataType Float() { return DataType(TypeId::Float); }
+ static DataType Double() { return DataType(TypeId::Double); }
+ static DataType String() { return DataType(TypeId::String); }
+ static DataType Bytes() { return DataType(TypeId::Bytes); }
+ static DataType Date() { return DataType(TypeId::Date); }
+ static DataType Time() { return DataType(TypeId::Time); }
+ static DataType Timestamp(int32_t precision = 6) {
+ return DataType(TypeId::Timestamp, precision, 0);
+ }
+ static DataType TimestampLtz(int32_t precision = 6) {
+ return DataType(TypeId::TimestampLtz, precision, 0);
+ }
+ static DataType Decimal(int32_t precision, int32_t scale) {
+ return DataType(TypeId::Decimal, precision, scale);
+ }
+
+ TypeId id() const { return id_; }
+ int32_t precision() const { return precision_; }
+ int32_t scale() const { return scale_; }
+
+private:
+ TypeId id_;
+ int32_t precision_{0};
+ int32_t scale_{0};
};
enum class DatumType {
@@ -125,7 +162,9 @@ enum class DatumType {
Float64 = 5,
String = 6,
Bytes = 7,
- // 8-10 reserved for decimal types
+ DecimalI64 = 8,
+ DecimalI128 = 9,
+ DecimalString = 10,
Date = 11,
Time = 12,
TimestampNtz = 13,
@@ -182,7 +221,7 @@ struct Schema {
public:
Builder& AddColumn(std::string name, DataType type,
std::string comment = "") {
- columns_.push_back({std::move(name), type, std::move(comment)});
+ columns_.push_back({std::move(name), std::move(type),
std::move(comment)});
return *this;
}
@@ -290,6 +329,10 @@ struct Datum {
double f64_val{0.0};
std::string string_val;
std::vector<uint8_t> bytes_val;
+ int32_t decimal_precision{0}; // Decimal: precision (total digits)
+ int32_t decimal_scale{0}; // Decimal: scale (digits after decimal
point)
+ int64_t i128_hi{0}; // Decimal (i128): high 64 bits of unscaled
value
+ int64_t i128_lo{0}; // Decimal (i128): low 64 bits of unscaled
value
static Datum Null() { return {}; }
static Datum Bool(bool v) {
@@ -360,10 +403,75 @@ struct Datum {
dat.i32_val = ts.nano_of_millisecond;
return dat;
}
+ // Stores the decimal string as-is. Rust side will parse via BigDecimal,
+ // look up (p,s) from the schema, validate, and create the Decimal.
+ static Datum DecimalString(std::string str) {
+ Datum d;
+ d.type = DatumType::DecimalString;
+ d.string_val = std::move(str);
+ return d;
+ }
fluss::Date GetDate() const { return {i32_val}; }
fluss::Time GetTime() const { return {i32_val}; }
fluss::Timestamp GetTimestamp() const { return {i64_val, i32_val}; }
+
+ bool IsDecimal() const {
+ return type == DatumType::DecimalI64 || type == DatumType::DecimalI128
+ || type == DatumType::DecimalString;
+ }
+
+ std::string DecimalToString() const {
+ if (type == DatumType::DecimalI64) {
+ return FormatUnscaled64(i64_val, decimal_scale);
+ } else if (type == DatumType::DecimalI128) {
+ unsigned __int128 uval = (static_cast<unsigned
__int128>(static_cast<uint64_t>(i128_hi)) << 64) |
+ static_cast<unsigned
__int128>(static_cast<uint64_t>(i128_lo));
+ __int128 val = static_cast<__int128>(uval);
+ return FormatUnscaled128(val, decimal_scale);
+ } else if (type == DatumType::DecimalString) {
+ return string_val;
+ }
+ return "";
+ }
+
+private:
+ static std::string FormatUnscaled64(int64_t unscaled, int32_t scale) {
+ bool negative = unscaled < 0;
+ uint64_t abs_val = negative ? -static_cast<uint64_t>(unscaled) :
static_cast<uint64_t>(unscaled);
+ std::string digits = std::to_string(abs_val);
+ if (scale <= 0) {
+ return (negative ? "-" : "") + digits;
+ }
+ while (static_cast<int32_t>(digits.size()) <= scale) {
+ digits = "0" + digits;
+ }
+ auto pos = digits.size() - static_cast<size_t>(scale);
+ return (negative ? "-" : "") + digits.substr(0, pos) + "." +
digits.substr(pos);
+ }
+
+ static std::string FormatUnscaled128(__int128 val, int32_t scale) {
+ bool negative = val < 0;
+ unsigned __int128 abs_val = negative ? -static_cast<unsigned
__int128>(val)
+ : static_cast<unsigned
__int128>(val);
+ std::string digits;
+ if (abs_val == 0) {
+ digits = "0";
+ } else {
+ while (abs_val > 0) {
+ digits = static_cast<char>('0' + static_cast<int>(abs_val %
10)) + digits;
+ abs_val /= 10;
+ }
+ }
+ if (scale <= 0) {
+ return (negative ? "-" : "") + digits;
+ }
+ while (static_cast<int32_t>(digits.size()) <= scale) {
+ digits = "0" + digits;
+ }
+ auto pos = digits.size() - static_cast<size_t>(scale);
+ return (negative ? "-" : "") + digits.substr(0, pos) + "." +
digits.substr(pos);
+ }
};
struct GenericRow {
@@ -429,6 +537,11 @@ struct GenericRow {
fields[idx] = Datum::TimestampLtz(ts);
}
+ void SetDecimal(size_t idx, const std::string& value) {
+ EnsureSize(idx);
+ fields[idx] = Datum::DecimalString(value);
+ }
+
private:
void EnsureSize(size_t idx) {
if (fields.size() <= idx) {
diff --git a/bindings/cpp/src/ffi_converter.hpp
b/bindings/cpp/src/ffi_converter.hpp
index 63a2e91..e3e63a8 100644
--- a/bindings/cpp/src/ffi_converter.hpp
+++ b/bindings/cpp/src/ffi_converter.hpp
@@ -47,8 +47,10 @@ inline ffi::FfiTablePath to_ffi_table_path(const TablePath&
path) {
inline ffi::FfiColumn to_ffi_column(const Column& col) {
ffi::FfiColumn ffi_col;
ffi_col.name = rust::String(col.name);
- ffi_col.data_type = static_cast<int32_t>(col.data_type);
+ ffi_col.data_type = static_cast<int32_t>(col.data_type.id());
ffi_col.comment = rust::String(col.comment);
+ ffi_col.precision = col.data_type.precision();
+ ffi_col.scale = col.data_type.scale();
return ffi_col;
}
@@ -112,6 +114,10 @@ inline ffi::FfiDatum to_ffi_datum(const Datum& datum) {
ffi_datum.f32_val = datum.f32_val;
ffi_datum.f64_val = datum.f64_val;
ffi_datum.string_val = rust::String(datum.string_val);
+ ffi_datum.decimal_precision = datum.decimal_precision;
+ ffi_datum.decimal_scale = datum.decimal_scale;
+ ffi_datum.i128_hi = datum.i128_hi;
+ ffi_datum.i128_lo = datum.i128_lo;
rust::Vec<uint8_t> bytes;
for (auto b : datum.bytes_val) {
@@ -137,7 +143,7 @@ inline ffi::FfiGenericRow to_ffi_generic_row(const
GenericRow& row) {
inline Column from_ffi_column(const ffi::FfiColumn& ffi_col) {
return Column{
std::string(ffi_col.name),
- static_cast<DataType>(ffi_col.data_type),
+ DataType(static_cast<TypeId>(ffi_col.data_type), ffi_col.precision,
ffi_col.scale),
std::string(ffi_col.comment)};
}
@@ -202,6 +208,10 @@ inline Datum from_ffi_datum(const ffi::FfiDatum&
ffi_datum) {
datum.f64_val = ffi_datum.f64_val;
// todo: avoid copy string
datum.string_val = std::string(ffi_datum.string_val);
+ datum.decimal_precision = ffi_datum.decimal_precision;
+ datum.decimal_scale = ffi_datum.decimal_scale;
+ datum.i128_hi = ffi_datum.i128_hi;
+ datum.i128_lo = ffi_datum.i128_lo;
for (auto b : ffi_datum.bytes_val) {
datum.bytes_val.push_back(b);
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index 5a26613..4aeb13d 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -51,6 +51,8 @@ mod ffi {
name: String,
data_type: i32,
comment: String,
+ precision: i32,
+ scale: i32,
}
struct FfiSchema {
@@ -98,6 +100,10 @@ mod ffi {
f64_val: f64,
string_val: String,
bytes_val: Vec<u8>,
+ decimal_precision: i32,
+ decimal_scale: i32,
+ i128_hi: i64,
+ i128_lo: i64,
}
struct FfiGenericRow {
@@ -301,6 +307,7 @@ pub struct Table {
pub struct AppendWriter {
inner: fcore::client::AppendWriter,
+ table_info: fcore::metadata::TableInfo,
}
pub struct WriteResult {
@@ -636,7 +643,10 @@ impl Table {
Ok(w) => w,
Err(e) => return Err(format!("Failed to create writer: {e}")),
};
- let writer = Box::into_raw(Box::new(AppendWriter { inner: writer }));
+ let writer = Box::into_raw(Box::new(AppendWriter {
+ inner: writer,
+ table_info: self.table_info.clone(),
+ }));
Ok(writer)
}
@@ -792,7 +802,8 @@ unsafe fn delete_append_writer(writer: *mut AppendWriter) {
impl AppendWriter {
fn append(&mut self, row: &ffi::FfiGenericRow) -> Result<Box<WriteResult>,
String> {
- let generic_row = types::ffi_row_to_core(row);
+ let schema = self.table_info.get_schema();
+ let generic_row = types::ffi_row_to_core(row,
Some(schema)).map_err(|e| e.to_string())?;
let result_future = self
.inner
diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs
index f546b68..7837032 100644
--- a/bindings/cpp/src/types.rs
+++ b/bindings/cpp/src/types.rs
@@ -18,8 +18,8 @@
use crate::ffi;
use anyhow::{Result, anyhow};
use arrow::array::{
- Date32Array, LargeBinaryArray, LargeStringArray, Time32MillisecondArray,
Time32SecondArray,
- Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray,
+ Date32Array, Decimal128Array, LargeBinaryArray, LargeStringArray,
Time32MillisecondArray,
+ Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray,
TimestampMicrosecondArray,
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
};
use arrow::datatypes::{DataType as ArrowDataType, TimeUnit};
@@ -27,6 +27,7 @@ use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
use fcore::row::InternalRow;
use fluss as fcore;
use std::borrow::Cow;
+use std::str::FromStr;
use arrow::array::Array;
@@ -43,6 +44,7 @@ pub const DATA_TYPE_DATE: i32 = 10;
pub const DATA_TYPE_TIME: i32 = 11;
pub const DATA_TYPE_TIMESTAMP: i32 = 12;
pub const DATA_TYPE_TIMESTAMP_LTZ: i32 = 13;
+pub const DATA_TYPE_DECIMAL: i32 = 14;
pub const DATUM_TYPE_NULL: i32 = 0;
pub const DATUM_TYPE_BOOL: i32 = 1;
@@ -52,6 +54,9 @@ pub const DATUM_TYPE_FLOAT32: i32 = 4;
pub const DATUM_TYPE_FLOAT64: i32 = 5;
pub const DATUM_TYPE_STRING: i32 = 6;
pub const DATUM_TYPE_BYTES: i32 = 7;
+pub const DATUM_TYPE_DECIMAL_I64: i32 = 8;
+pub const DATUM_TYPE_DECIMAL_I128: i32 = 9;
+pub const DATUM_TYPE_DECIMAL_STRING: i32 = 10;
pub const DATUM_TYPE_DATE: i32 = 11;
pub const DATUM_TYPE_TIME: i32 = 12;
pub const DATUM_TYPE_TIMESTAMP_NTZ: i32 = 13;
@@ -62,7 +67,7 @@ const MICROS_PER_MILLI: i64 = 1_000;
const NANOS_PER_MICRO: i64 = 1_000;
const NANOS_PER_MILLI: i64 = 1_000_000;
-fn ffi_data_type_to_core(dt: i32) -> Result<fcore::metadata::DataType> {
+fn ffi_data_type_to_core(dt: i32, precision: u32, scale: u32) ->
Result<fcore::metadata::DataType> {
match dt {
DATA_TYPE_BOOLEAN => Ok(fcore::metadata::DataTypes::boolean()),
DATA_TYPE_TINYINT => Ok(fcore::metadata::DataTypes::tinyint()),
@@ -75,8 +80,16 @@ fn ffi_data_type_to_core(dt: i32) ->
Result<fcore::metadata::DataType> {
DATA_TYPE_BYTES => Ok(fcore::metadata::DataTypes::bytes()),
DATA_TYPE_DATE => Ok(fcore::metadata::DataTypes::date()),
DATA_TYPE_TIME => Ok(fcore::metadata::DataTypes::time()),
- DATA_TYPE_TIMESTAMP => Ok(fcore::metadata::DataTypes::timestamp()),
- DATA_TYPE_TIMESTAMP_LTZ =>
Ok(fcore::metadata::DataTypes::timestamp_ltz()),
+ DATA_TYPE_TIMESTAMP =>
Ok(fcore::metadata::DataTypes::timestamp_with_precision(
+ precision,
+ )),
+ DATA_TYPE_TIMESTAMP_LTZ =>
Ok(fcore::metadata::DataTypes::timestamp_ltz_with_precision(
+ precision,
+ )),
+ DATA_TYPE_DECIMAL => {
+ let dt = fcore::metadata::DecimalType::new(precision, scale)?;
+ Ok(fcore::metadata::DataType::Decimal(dt))
+ }
_ => Err(anyhow!("Unknown data type: {dt}")),
}
}
@@ -96,6 +109,7 @@ fn core_data_type_to_ffi(dt: &fcore::metadata::DataType) ->
i32 {
fcore::metadata::DataType::Time(_) => DATA_TYPE_TIME,
fcore::metadata::DataType::Timestamp(_) => DATA_TYPE_TIMESTAMP,
fcore::metadata::DataType::TimestampLTz(_) => DATA_TYPE_TIMESTAMP_LTZ,
+ fcore::metadata::DataType::Decimal(_) => DATA_TYPE_DECIMAL,
_ => 0,
}
}
@@ -106,7 +120,13 @@ pub fn ffi_descriptor_to_core(
let mut schema_builder = fcore::metadata::Schema::builder();
for col in &descriptor.schema.columns {
- let dt = ffi_data_type_to_core(col.data_type)?;
+ if col.precision < 0 || col.scale < 0 {
+ return Err(anyhow!(
+ "Column '{}': precision and scale must be non-negative",
+ col.name
+ ));
+ }
+ let dt = ffi_data_type_to_core(col.data_type, col.precision as u32,
col.scale as u32)?;
schema_builder = schema_builder.column(&col.name, dt);
if !col.comment.is_empty() {
schema_builder = schema_builder.with_comment(&col.comment);
@@ -148,10 +168,22 @@ pub fn core_table_info_to_ffi(info:
&fcore::metadata::TableInfo) -> ffi::FfiTabl
let columns: Vec<ffi::FfiColumn> = schema
.columns()
.iter()
- .map(|col| ffi::FfiColumn {
- name: col.name().to_string(),
- data_type: core_data_type_to_ffi(col.data_type()),
- comment: col.comment().unwrap_or("").to_string(),
+ .map(|col| {
+ let (precision, scale) = match col.data_type() {
+ fcore::metadata::DataType::Decimal(dt) => {
+ (dt.precision() as i32, dt.scale() as i32)
+ }
+ fcore::metadata::DataType::Timestamp(dt) => (dt.precision() as
i32, 0),
+ fcore::metadata::DataType::TimestampLTz(dt) => (dt.precision()
as i32, 0),
+ _ => (0, 0),
+ };
+ ffi::FfiColumn {
+ name: col.name().to_string(),
+ data_type: core_data_type_to_ffi(col.data_type()),
+ comment: col.comment().unwrap_or("").to_string(),
+ precision,
+ scale,
+ }
})
.collect();
@@ -218,7 +250,21 @@ pub fn empty_table_info() -> ffi::FfiTableInfo {
}
}
-pub fn ffi_row_to_core(row: &ffi::FfiGenericRow) -> fcore::row::GenericRow<'_>
{
+/// Look up decimal (precision, scale) from schema for column `idx`.
+fn get_decimal_type(idx: usize, schema: Option<&fcore::metadata::Schema>) ->
Result<(u32, u32)> {
+ let col = schema
+ .and_then(|s| s.columns().get(idx))
+ .ok_or_else(|| anyhow!("Schema not available for decimal column
{idx}"))?;
+ match col.data_type() {
+ fcore::metadata::DataType::Decimal(dt) => Ok((dt.precision(),
dt.scale())),
+ other => Err(anyhow!("Column {idx} is {:?}, not Decimal", other)),
+ }
+}
+
+pub fn ffi_row_to_core<'a>(
+ row: &'a ffi::FfiGenericRow,
+ schema: Option<&fcore::metadata::Schema>,
+) -> Result<fcore::row::GenericRow<'a>> {
use fcore::row::Datum;
let mut generic_row = fcore::row::GenericRow::new(row.fields.len());
@@ -233,6 +279,40 @@ pub fn ffi_row_to_core(row: &ffi::FfiGenericRow) ->
fcore::row::GenericRow<'_> {
DATUM_TYPE_FLOAT64 => Datum::Float64(field.f64_val.into()),
DATUM_TYPE_STRING =>
Datum::String(Cow::Borrowed(field.string_val.as_str())),
DATUM_TYPE_BYTES =>
Datum::Blob(Cow::Borrowed(field.bytes_val.as_slice())),
+ DATUM_TYPE_DECIMAL_STRING => {
+ let (precision, scale) = get_decimal_type(idx, schema)?;
+ let bd =
+
bigdecimal::BigDecimal::from_str(field.string_val.as_str()).map_err(|e| {
+ anyhow!(
+ "Column {idx}: invalid decimal string '{}': {e}",
+ field.string_val
+ )
+ })?;
+ let decimal = fcore::row::Decimal::from_big_decimal(bd,
precision, scale)
+ .map_err(|e| anyhow!("Column {idx}: {e}"))?;
+ Datum::Decimal(decimal)
+ }
+ DATUM_TYPE_DECIMAL_I64 => {
+ let precision = field.decimal_precision as u32;
+ let scale = field.decimal_scale as u32;
+ let decimal =
+ fcore::row::Decimal::from_unscaled_long(field.i64_val,
precision, scale)
+ .map_err(|e| anyhow!("Column {idx}: {e}"))?;
+ Datum::Decimal(decimal)
+ }
+ DATUM_TYPE_DECIMAL_I128 => {
+ let precision = field.decimal_precision as u32;
+ let scale = field.decimal_scale as u32;
+ let i128_val = ((field.i128_hi as i128) << 64) |
(field.i128_lo as u64 as i128);
+ let decimal = fcore::row::Decimal::from_arrow_decimal128(
+ i128_val,
+ scale as i64,
+ precision,
+ scale,
+ )
+ .map_err(|e| anyhow!("Column {idx}: {e}"))?;
+ Datum::Decimal(decimal)
+ }
DATUM_TYPE_DATE =>
Datum::Date(fcore::row::Date::new(field.i32_val)),
DATUM_TYPE_TIME =>
Datum::Time(fcore::row::Time::new(field.i32_val)),
DATUM_TYPE_TIMESTAMP_NTZ => Datum::TimestampNtz(
@@ -243,12 +323,12 @@ pub fn ffi_row_to_core(row: &ffi::FfiGenericRow) ->
fcore::row::GenericRow<'_> {
fcore::row::TimestampLtz::from_millis_nanos(field.i64_val,
field.i32_val)
.unwrap_or_else(|_|
fcore::row::TimestampLtz::new(field.i64_val)),
),
- _ => Datum::Null,
+ other => return Err(anyhow!("Column {idx}: unknown datum type
{other}")),
};
generic_row.set_field(idx, datum);
}
- generic_row
+ Ok(generic_row)
}
pub fn core_scan_records_to_ffi(
@@ -292,6 +372,10 @@ fn core_row_to_ffi_fields(
f64_val: 0.0,
string_val: String::new(),
bytes_val: vec![],
+ decimal_precision: 0,
+ decimal_scale: 0,
+ i128_hi: 0,
+ i128_lo: 0,
}
}
@@ -485,6 +569,29 @@ fn core_row_to_ffi_fields(
}
_ => panic!("Will never come here. Unsupported Time64 unit for
column {i}"),
},
+ ArrowDataType::Decimal128(precision, scale) => {
+ let array = record_batch
+ .column(i)
+ .as_any()
+ .downcast_ref::<Decimal128Array>()
+ .expect("Decimal128 column expected");
+ let i128_val = array.value(row_id);
+
+ if fcore::row::Decimal::is_compact_precision(*precision as
u32) {
+ let mut datum = new_datum(DATUM_TYPE_DECIMAL_I64);
+ datum.i64_val = i128_val as i64;
+ datum.decimal_precision = *precision as i32;
+ datum.decimal_scale = *scale as i32;
+ datum
+ } else {
+ let mut datum = new_datum(DATUM_TYPE_DECIMAL_I128);
+ datum.i128_hi = (i128_val >> 64) as i64;
+ datum.i128_lo = i128_val as i64;
+ datum.decimal_precision = *precision as i32;
+ datum.decimal_scale = *scale as i32;
+ datum
+ }
+ }
other => panic!(
"Will never come here. Unsupported Arrow data type for column
{i}: {other:?}"
),
diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml
index 4d9be02..db1348a 100644
--- a/crates/fluss/Cargo.toml
+++ b/crates/fluss/Cargo.toml
@@ -59,7 +59,7 @@ tokio = { workspace = true }
parking_lot = "0.12"
bytes = "1.10.1"
dashmap = "6.1.0"
-bigdecimal = { version = "0.4", features = ["serde"] }
+bigdecimal = { workspace = true, features = ["serde"] }
ordered-float = { version = "5", features = ["serde"] }
parse-display = "0.10"
jiff = { workspace = true }