This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 57b2e4c chore: introduce getters in CPP Datum (#281)
57b2e4c is described below
commit 57b2e4c7841332308e976dbc98cc6319279e7090
Author: Anton Borisov <[email protected]>
AuthorDate: Sun Feb 8 06:56:25 2026 +0000
chore: introduce getters in CPP Datum (#281)
---
bindings/cpp/examples/example.cpp | 87 +++++++++----------
bindings/cpp/include/fluss.hpp | 106 ++++++++++++++++++++----
bindings/cpp/src/ffi_converter.hpp | 165 +++++++++++++++++++++----------------
3 files changed, 228 insertions(+), 130 deletions(-)
diff --git a/bindings/cpp/examples/example.cpp
b/bindings/cpp/examples/example.cpp
index e6f9619..f568422 100644
--- a/bindings/cpp/examples/example.cpp
+++ b/bindings/cpp/examples/example.cpp
@@ -153,40 +153,38 @@ int main() {
std::cout << "Scanned records: " << records.Size() << std::endl;
bool scan_ok = true;
for (const auto& rec : records.records) {
- const auto& f = rec.row.fields;
-
- if (f[4].type != fluss::DatumType::Date) {
- std::cerr << "ERROR: field 4 expected Date, got " <<
static_cast<int>(f[4].type)
- << std::endl;
+ if (rec.row.GetType(4) != fluss::DatumType::Date) {
+ std::cerr << "ERROR: field 4 expected Date, got "
+ << static_cast<int>(rec.row.GetType(4)) << std::endl;
scan_ok = false;
}
- if (f[5].type != fluss::DatumType::Time) {
- std::cerr << "ERROR: field 5 expected Time, got " <<
static_cast<int>(f[5].type)
- << std::endl;
+ if (rec.row.GetType(5) != fluss::DatumType::Time) {
+ std::cerr << "ERROR: field 5 expected Time, got "
+ << static_cast<int>(rec.row.GetType(5)) << std::endl;
scan_ok = false;
}
- if (f[6].type != fluss::DatumType::TimestampNtz) {
- std::cerr << "ERROR: field 6 expected TimestampNtz, got " <<
static_cast<int>(f[6].type)
- << std::endl;
+ if (rec.row.GetType(6) != fluss::DatumType::TimestampNtz) {
+ std::cerr << "ERROR: field 6 expected TimestampNtz, got "
+ << static_cast<int>(rec.row.GetType(6)) << std::endl;
scan_ok = false;
}
- if (f[7].type != fluss::DatumType::TimestampLtz) {
- std::cerr << "ERROR: field 7 expected TimestampLtz, got " <<
static_cast<int>(f[7].type)
- << std::endl;
+ if (rec.row.GetType(7) != fluss::DatumType::TimestampLtz) {
+ std::cerr << "ERROR: field 7 expected TimestampLtz, got "
+ << static_cast<int>(rec.row.GetType(7)) << std::endl;
scan_ok = false;
}
- auto date = f[4].GetDate();
- auto time = f[5].GetTime();
- auto ts_ntz = f[6].GetTimestamp();
- auto ts_ltz = f[7].GetTimestamp();
-
- std::cout << " id=" << f[0].i32_val << " name=" << f[1].string_val
- << " score=" << f[2].f32_val << " age=" << f[3].i32_val << "
date=" << date.Year()
- << "-" << date.Month() << "-" << date.Day() << " time=" <<
time.Hour() << ":"
- << time.Minute() << ":" << time.Second() << " ts_ntz=" <<
ts_ntz.epoch_millis
- << " ts_ltz=" << ts_ltz.epoch_millis << "+" <<
ts_ltz.nano_of_millisecond << "ns"
- << std::endl;
+ auto date = rec.row.GetDate(4);
+ auto time = rec.row.GetTime(5);
+ auto ts_ntz = rec.row.GetTimestamp(6);
+ auto ts_ltz = rec.row.GetTimestamp(7);
+
+ std::cout << " id=" << rec.row.GetInt32(0) << " name=" <<
rec.row.GetString(1)
+ << " score=" << rec.row.GetFloat32(2) << " age=" <<
rec.row.GetInt32(3)
+ << " date=" << date.Year() << "-" << date.Month() << "-" <<
date.Day()
+ << " time=" << time.Hour() << ":" << time.Minute() << ":" <<
time.Second()
+ << " ts_ntz=" << ts_ntz.epoch_millis << " ts_ltz=" <<
ts_ltz.epoch_millis << "+"
+ << ts_ltz.nano_of_millisecond << "ns" << std::endl;
}
if (!scan_ok) {
@@ -210,26 +208,24 @@ int main() {
std::cout << "Projected records: " << projected_records.Size() <<
std::endl;
for (const auto& rec : projected_records.records) {
- const auto& f = rec.row.fields;
-
- if (f.size() != 2) {
- std::cerr << "ERROR: expected 2 fields, got " << f.size() <<
std::endl;
+ if (rec.row.FieldCount() != 2) {
+ std::cerr << "ERROR: expected 2 fields, got " <<
rec.row.FieldCount() << std::endl;
scan_ok = false;
continue;
}
- if (f[0].type != fluss::DatumType::Int32) {
+ if (rec.row.GetType(0) != fluss::DatumType::Int32) {
std::cerr << "ERROR: projected field 0 expected Int32, got "
- << static_cast<int>(f[0].type) << std::endl;
+ << static_cast<int>(rec.row.GetType(0)) << std::endl;
scan_ok = false;
}
- if (f[1].type != fluss::DatumType::TimestampLtz) {
+ if (rec.row.GetType(1) != fluss::DatumType::TimestampLtz) {
std::cerr << "ERROR: projected field 1 expected TimestampLtz, got "
- << static_cast<int>(f[1].type) << std::endl;
+ << static_cast<int>(rec.row.GetType(1)) << std::endl;
scan_ok = false;
}
- auto ts = f[1].GetTimestamp();
- std::cout << " id=" << f[0].i32_val << " updated_at=" <<
ts.epoch_millis << "+"
+ auto ts = rec.row.GetTimestamp(1);
+ std::cout << " id=" << rec.row.GetInt32(0) << " updated_at=" <<
ts.epoch_millis << "+"
<< ts.nano_of_millisecond << "ns" << std::endl;
}
@@ -428,12 +424,9 @@ int main() {
std::cout << "Scanned decimal records: " << decimal_records.Size() <<
std::endl;
for (const auto& rec : decimal_records) {
- auto& price = rec.row.fields[1];
- auto& amount = rec.row.fields[2];
- std::cout << " id=" << rec.row.fields[0].i32_val << " price=" <<
price.DecimalToString()
- << " (raw=" << price.i64_val << ")"
- << " amount=" << amount.DecimalToString() << " is_decimal="
<< price.IsDecimal()
- << std::endl;
+ std::cout << " id=" << rec.row.GetInt32(0) << " price=" <<
rec.row.DecimalToString(1)
+ << " amount=" << rec.row.DecimalToString(2)
+ << " is_decimal=" << rec.row.IsDecimal(1) << std::endl;
}
// 13) Partitioned table example
@@ -525,9 +518,9 @@ int main() {
<< std::endl;
for (size_t i = 0; i < partition_records.Size(); ++i) {
const auto& rec = partition_records[i];
- std::cout << " Record " << i << ": id=" << rec.row.fields[0].i32_val
- << ", region=" << rec.row.fields[1].string_val
- << ", value=" << rec.row.fields[2].i64_val << std::endl;
+ std::cout << " Record " << i << ": id=" << rec.row.GetInt32(0)
+ << ", region=" << rec.row.GetString(1) << ", value=" <<
rec.row.GetInt64(2)
+ << std::endl;
}
// 13.2) subscribe_partition_buckets: batch subscribe to all partitions at
once
@@ -551,9 +544,9 @@ int main() {
<< " records from batch partition subscription" << std::endl;
for (size_t i = 0; i < partition_batch_records.Size(); ++i) {
const auto& rec = partition_batch_records[i];
- std::cout << " Record " << i << ": id=" << rec.row.fields[0].i32_val
- << ", region=" << rec.row.fields[1].string_val
- << ", value=" << rec.row.fields[2].i64_val << std::endl;
+ std::cout << " Record " << i << ": id=" << rec.row.GetInt32(0)
+ << ", region=" << rec.row.GetString(1) << ", value=" <<
rec.row.GetInt64(2)
+ << std::endl;
}
// Cleanup
diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index 3a10445..6b9d479 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -22,6 +22,7 @@
#include <chrono>
#include <cstdint>
#include <memory>
+#include <stdexcept>
#include <string>
#include <unordered_map>
#include <vector>
@@ -312,19 +313,13 @@ struct TableInfo {
Schema schema;
};
+namespace detail {
+struct FfiAccess;
+}
+
struct Datum {
- DatumType type{DatumType::Null};
- bool bool_val{false};
- int32_t i32_val{0};
- int64_t i64_val{0};
- float f32_val{0.0F};
- double f64_val{0.0};
- std::string string_val;
- std::vector<uint8_t> bytes_val;
- int32_t decimal_precision{0}; // Decimal: precision (total digits)
- int32_t decimal_scale{0}; // Decimal: scale (digits after decimal
point)
- int64_t i128_hi{0}; // Decimal (i128): high 64 bits of unscaled
value
- int64_t i128_lo{0}; // Decimal (i128): low 64 bits of unscaled
value
+ friend struct GenericRow;
+ friend struct detail::FfiAccess;
static Datum Null() { return {}; }
static Datum Bool(bool v) {
@@ -404,6 +399,29 @@ struct Datum {
return d;
}
+ private:
+ DatumType type{DatumType::Null};
+ bool bool_val{false};
+ int32_t i32_val{0};
+ int64_t i64_val{0};
+ float f32_val{0.0F};
+ double f64_val{0.0};
+ std::string string_val;
+ std::vector<uint8_t> bytes_val;
+ int32_t decimal_precision{0}; // Decimal: precision (total digits)
+ int32_t decimal_scale{0}; // Decimal: scale (digits after decimal
point)
+ int64_t i128_hi{0}; // Decimal (i128): high 64 bits of unscaled
value
+ int64_t i128_lo{0}; // Decimal (i128): low 64 bits of unscaled
value
+
+ DatumType GetType() const { return type; }
+ bool IsNull() const { return type == DatumType::Null; }
+ bool GetBool() const { return bool_val; }
+ int32_t GetInt32() const { return i32_val; }
+ int64_t GetInt64() const { return i64_val; }
+ float GetFloat32() const { return f32_val; }
+ double GetFloat64() const { return f64_val; }
+ const std::string& GetString() const { return string_val; }
+ const std::vector<uint8_t>& GetBytes() const { return bytes_val; }
fluss::Date GetDate() const { return {i32_val}; }
fluss::Time GetTime() const { return {i32_val}; }
fluss::Timestamp GetTimestamp() const { return {i64_val, i32_val}; }
@@ -428,7 +446,6 @@ struct Datum {
return "";
}
- private:
static std::string FormatUnscaled64(int64_t unscaled, int32_t scale) {
bool negative = unscaled < 0;
uint64_t abs_val =
@@ -469,7 +486,47 @@ struct Datum {
};
struct GenericRow {
- std::vector<Datum> fields;
+ friend struct detail::FfiAccess;
+
+ size_t FieldCount() const { return fields.size(); }
+
+ DatumType GetType(size_t idx) const { return GetField(idx).GetType(); }
+ bool IsNull(size_t idx) const { return GetField(idx).IsNull(); }
+ bool GetBool(size_t idx) const { return GetTypedField(idx,
DatumType::Bool).GetBool(); }
+ int32_t GetInt32(size_t idx) const { return GetTypedField(idx,
DatumType::Int32).GetInt32(); }
+ int64_t GetInt64(size_t idx) const { return GetTypedField(idx,
DatumType::Int64).GetInt64(); }
+ float GetFloat32(size_t idx) const {
+ return GetTypedField(idx, DatumType::Float32).GetFloat32();
+ }
+ double GetFloat64(size_t idx) const {
+ return GetTypedField(idx, DatumType::Float64).GetFloat64();
+ }
+ const std::string& GetString(size_t idx) const {
+ return GetTypedField(idx, DatumType::String).GetString();
+ }
+ const std::vector<uint8_t>& GetBytes(size_t idx) const {
+ return GetTypedField(idx, DatumType::Bytes).GetBytes();
+ }
+ fluss::Date GetDate(size_t idx) const { return GetTypedField(idx,
DatumType::Date).GetDate(); }
+ fluss::Time GetTime(size_t idx) const { return GetTypedField(idx,
DatumType::Time).GetTime(); }
+ fluss::Timestamp GetTimestamp(size_t idx) const {
+ const auto& d = GetField(idx);
+ auto t = d.GetType();
+ if (t != DatumType::TimestampNtz && t != DatumType::TimestampLtz) {
+ throw std::runtime_error("GenericRow: field " +
std::to_string(idx) +
+ " is not a Timestamp type");
+ }
+ return d.GetTimestamp();
+ }
+ bool IsDecimal(size_t idx) const { return GetField(idx).IsDecimal(); }
+ std::string DecimalToString(size_t idx) const {
+ const auto& d = GetField(idx);
+ if (!d.IsDecimal()) {
+ throw std::runtime_error("GenericRow: field " +
std::to_string(idx) +
+ " is not a Decimal type");
+ }
+ return d.DecimalToString();
+ }
void SetNull(size_t idx) {
EnsureSize(idx);
@@ -537,6 +594,27 @@ struct GenericRow {
}
private:
+ std::vector<Datum> fields;
+
+ const Datum& GetField(size_t idx) const {
+ if (idx >= fields.size()) {
+ throw std::runtime_error("GenericRow: index " +
std::to_string(idx) +
+ " out of bounds (size=" +
std::to_string(fields.size()) + ")");
+ }
+ return fields[idx];
+ }
+
+ const Datum& GetTypedField(size_t idx, DatumType expected) const {
+ const auto& d = GetField(idx);
+ if (d.GetType() != expected) {
+ throw std::runtime_error("GenericRow: field " +
std::to_string(idx) +
+ " type mismatch: expected " +
+
std::to_string(static_cast<int>(expected)) + ", got " +
+
std::to_string(static_cast<int>(d.GetType())));
+ }
+ return d;
+ }
+
void EnsureSize(size_t idx) {
if (fields.size() <= idx) {
fields.resize(idx + 1);
diff --git a/bindings/cpp/src/ffi_converter.hpp
b/bindings/cpp/src/ffi_converter.hpp
index e3e63a8..8adcd01 100644
--- a/bindings/cpp/src/ffi_converter.hpp
+++ b/bindings/cpp/src/ffi_converter.hpp
@@ -23,15 +23,94 @@
#include "lib.rs.h"
namespace fluss {
+
+namespace detail {
+struct FfiAccess {
+ static const std::vector<Datum>& fields(const GenericRow& row) { return
row.fields; }
+ static std::vector<Datum>& fields(GenericRow& row) { return row.fields; }
+
+ static ffi::FfiDatum to_ffi_datum(const Datum& datum) {
+ ffi::FfiDatum ffi_datum;
+ ffi_datum.datum_type = static_cast<int32_t>(datum.type);
+ ffi_datum.bool_val = datum.bool_val;
+ ffi_datum.i32_val = datum.i32_val;
+ ffi_datum.i64_val = datum.i64_val;
+ ffi_datum.f32_val = datum.f32_val;
+ ffi_datum.f64_val = datum.f64_val;
+ ffi_datum.string_val = rust::String(datum.string_val);
+ ffi_datum.decimal_precision = datum.decimal_precision;
+ ffi_datum.decimal_scale = datum.decimal_scale;
+ ffi_datum.i128_hi = datum.i128_hi;
+ ffi_datum.i128_lo = datum.i128_lo;
+
+ rust::Vec<uint8_t> bytes;
+ for (auto b : datum.bytes_val) {
+ bytes.push_back(b);
+ }
+ ffi_datum.bytes_val = std::move(bytes);
+
+ return ffi_datum;
+ }
+
+ static Datum from_ffi_datum(const ffi::FfiDatum& ffi_datum) {
+ auto dtype = static_cast<DatumType>(ffi_datum.datum_type);
+ switch (dtype) {
+ case DatumType::Null:
+ return Datum::Null();
+ case DatumType::Bool:
+ return Datum::Bool(ffi_datum.bool_val);
+ case DatumType::Int32:
+ return Datum::Int32(ffi_datum.i32_val);
+ case DatumType::Int64:
+ return Datum::Int64(ffi_datum.i64_val);
+ case DatumType::Float32:
+ return Datum::Float32(ffi_datum.f32_val);
+ case DatumType::Float64:
+ return Datum::Float64(ffi_datum.f64_val);
+ case DatumType::String:
+ return Datum::String(std::string(ffi_datum.string_val));
+ case DatumType::Bytes: {
+ std::vector<uint8_t> bytes;
+ for (auto b : ffi_datum.bytes_val) {
+ bytes.push_back(b);
+ }
+ return Datum::Bytes(std::move(bytes));
+ }
+ case DatumType::Date:
+ return Datum::Date(fluss::Date{ffi_datum.i32_val});
+ case DatumType::Time:
+ return Datum::Time(fluss::Time{ffi_datum.i32_val});
+ case DatumType::TimestampNtz:
+ return Datum::TimestampNtz(fluss::Timestamp{ffi_datum.i64_val,
ffi_datum.i32_val});
+ case DatumType::TimestampLtz:
+ return Datum::TimestampLtz(fluss::Timestamp{ffi_datum.i64_val,
ffi_datum.i32_val});
+ case DatumType::DecimalI64:
+ case DatumType::DecimalI128:
+ case DatumType::DecimalString: {
+ Datum d;
+ d.type = dtype;
+ d.i64_val = ffi_datum.i64_val;
+ d.decimal_precision = ffi_datum.decimal_precision;
+ d.decimal_scale = ffi_datum.decimal_scale;
+ d.i128_hi = ffi_datum.i128_hi;
+ d.i128_lo = ffi_datum.i128_lo;
+ if (dtype == DatumType::DecimalString) {
+ d.string_val = std::string(ffi_datum.string_val);
+ }
+ return d;
+ }
+ default:
+ return Datum::Null();
+ }
+ }
+};
+} // namespace detail
+
namespace utils {
-inline Result make_error(int32_t code, std::string msg) {
- return Result{code, std::move(msg)};
-}
+inline Result make_error(int32_t code, std::string msg) { return Result{code,
std::move(msg)}; }
-inline Result make_ok() {
- return Result{0, {}};
-}
+inline Result make_ok() { return Result{0, {}}; }
inline Result from_ffi_result(const ffi::FfiResult& ffi_result) {
return Result{ffi_result.error_code,
std::string(ffi_result.error_message)};
@@ -105,37 +184,14 @@ inline ffi::FfiTableDescriptor
to_ffi_table_descriptor(const TableDescriptor& de
return ffi_desc;
}
-inline ffi::FfiDatum to_ffi_datum(const Datum& datum) {
- ffi::FfiDatum ffi_datum;
- ffi_datum.datum_type = static_cast<int32_t>(datum.type);
- ffi_datum.bool_val = datum.bool_val;
- ffi_datum.i32_val = datum.i32_val;
- ffi_datum.i64_val = datum.i64_val;
- ffi_datum.f32_val = datum.f32_val;
- ffi_datum.f64_val = datum.f64_val;
- ffi_datum.string_val = rust::String(datum.string_val);
- ffi_datum.decimal_precision = datum.decimal_precision;
- ffi_datum.decimal_scale = datum.decimal_scale;
- ffi_datum.i128_hi = datum.i128_hi;
- ffi_datum.i128_lo = datum.i128_lo;
-
- rust::Vec<uint8_t> bytes;
- for (auto b : datum.bytes_val) {
- bytes.push_back(b);
- }
- ffi_datum.bytes_val = std::move(bytes);
-
- return ffi_datum;
-}
-
inline ffi::FfiGenericRow to_ffi_generic_row(const GenericRow& row) {
ffi::FfiGenericRow ffi_row;
- rust::Vec<ffi::FfiDatum> fields;
- for (const auto& field : row.fields) {
- fields.push_back(to_ffi_datum(field));
+ rust::Vec<ffi::FfiDatum> ffi_fields;
+ for (const auto& field : detail::FfiAccess::fields(row)) {
+ ffi_fields.push_back(detail::FfiAccess::to_ffi_datum(field));
}
- ffi_row.fields = std::move(fields);
+ ffi_row.fields = std::move(ffi_fields);
return ffi_row;
}
@@ -166,9 +222,8 @@ inline TableInfo from_ffi_table_info(const
ffi::FfiTableInfo& ffi_info) {
info.table_id = ffi_info.table_id;
info.schema_id = ffi_info.schema_id;
- info.table_path = TablePath{
- std::string(ffi_info.table_path.database_name),
- std::string(ffi_info.table_path.table_name)};
+ info.table_path = TablePath{std::string(ffi_info.table_path.database_name),
+ std::string(ffi_info.table_path.table_name)};
info.created_time = ffi_info.created_time;
info.modified_time = ffi_info.modified_time;
@@ -198,44 +253,19 @@ inline TableInfo from_ffi_table_info(const
ffi::FfiTableInfo& ffi_info) {
return info;
}
-inline Datum from_ffi_datum(const ffi::FfiDatum& ffi_datum) {
- Datum datum;
- datum.type = static_cast<DatumType>(ffi_datum.datum_type);
- datum.bool_val = ffi_datum.bool_val;
- datum.i32_val = ffi_datum.i32_val;
- datum.i64_val = ffi_datum.i64_val;
- datum.f32_val = ffi_datum.f32_val;
- datum.f64_val = ffi_datum.f64_val;
- // todo: avoid copy string
- datum.string_val = std::string(ffi_datum.string_val);
- datum.decimal_precision = ffi_datum.decimal_precision;
- datum.decimal_scale = ffi_datum.decimal_scale;
- datum.i128_hi = ffi_datum.i128_hi;
- datum.i128_lo = ffi_datum.i128_lo;
-
- for (auto b : ffi_datum.bytes_val) {
- datum.bytes_val.push_back(b);
- }
-
- return datum;
-}
-
inline GenericRow from_ffi_generic_row(const ffi::FfiGenericRow& ffi_row) {
GenericRow row;
for (const auto& field : ffi_row.fields) {
- row.fields.push_back(from_ffi_datum(field));
+
detail::FfiAccess::fields(row).push_back(detail::FfiAccess::from_ffi_datum(field));
}
return row;
}
inline ScanRecord from_ffi_scan_record(const ffi::FfiScanRecord& ffi_record) {
- return ScanRecord{
- ffi_record.bucket_id,
- ffi_record.offset,
- ffi_record.timestamp,
- from_ffi_generic_row(ffi_record.row)};
+ return ScanRecord{ffi_record.bucket_id, ffi_record.offset,
ffi_record.timestamp,
+ from_ffi_generic_row(ffi_record.row)};
}
inline ScanRecords from_ffi_scan_records(const ffi::FfiScanRecords&
ffi_records) {
@@ -253,11 +283,8 @@ inline LakeSnapshot from_ffi_lake_snapshot(const
ffi::FfiLakeSnapshot& ffi_snaps
snapshot.snapshot_id = ffi_snapshot.snapshot_id;
for (const auto& offset : ffi_snapshot.bucket_offsets) {
- snapshot.bucket_offsets.push_back(BucketOffset{
- offset.table_id,
- offset.partition_id,
- offset.bucket_id,
- offset.offset});
+ snapshot.bucket_offsets.push_back(
+ BucketOffset{offset.table_id, offset.partition_id,
offset.bucket_id, offset.offset});
}
return snapshot;