fresh-borzoni commented on code in PR #330:
URL: https://github.com/apache/fluss-rust/pull/330#discussion_r2810105899
##########
bindings/cpp/src/table.cpp:
##########
@@ -79,6 +79,321 @@ int Date::Day() const {
return tm.tm_mday;
}
+// ============================================================================
+// GenericRow — write-only row backed by opaque Rust GenericRowInner
+// ============================================================================
+
+GenericRow::GenericRow() {
+ auto box = ffi::new_generic_row(0);
+ inner_ = box.into_raw();
+}
+
+GenericRow::GenericRow(size_t field_count) {
+ auto box = ffi::new_generic_row(field_count);
+ inner_ = box.into_raw();
+}
+
+GenericRow::~GenericRow() noexcept { Destroy(); }
+
+void GenericRow::Destroy() noexcept {
+ if (inner_) {
+ rust::Box<ffi::GenericRowInner>::from_raw(inner_);
+ inner_ = nullptr;
+ }
+}
+
+GenericRow::GenericRow(GenericRow&& other) noexcept
+ : inner_(other.inner_), column_map_(std::move(other.column_map_)) {
+ other.inner_ = nullptr;
+}
+
+GenericRow& GenericRow::operator=(GenericRow&& other) noexcept {
+ if (this != &other) {
+ Destroy();
+ inner_ = other.inner_;
+ column_map_ = std::move(other.column_map_);
+ other.inner_ = nullptr;
+ }
+ return *this;
+}
+
+bool GenericRow::Available() const { return inner_ != nullptr; }
+
+void GenericRow::Reset() {
+ if (inner_) inner_->gr_reset();
+}
+
+void GenericRow::SetNull(size_t idx) {
+ if (inner_) inner_->gr_set_null(idx);
+}
+void GenericRow::SetBool(size_t idx, bool v) {
+ if (inner_) inner_->gr_set_bool(idx, v);
+}
+void GenericRow::SetInt32(size_t idx, int32_t v) {
+ if (inner_) inner_->gr_set_i32(idx, v);
+}
+void GenericRow::SetInt64(size_t idx, int64_t v) {
+ if (inner_) inner_->gr_set_i64(idx, v);
+}
+void GenericRow::SetFloat32(size_t idx, float v) {
+ if (inner_) inner_->gr_set_f32(idx, v);
+}
+void GenericRow::SetFloat64(size_t idx, double v) {
+ if (inner_) inner_->gr_set_f64(idx, v);
+}
+
+void GenericRow::SetString(size_t idx, std::string v) {
+ if (inner_) inner_->gr_set_str(idx, v);
+}
+
+void GenericRow::SetBytes(size_t idx, std::vector<uint8_t> v) {
+ if (inner_) inner_->gr_set_bytes(idx, rust::Slice<const uint8_t>(v.data(),
v.size()));
+}
+
+void GenericRow::SetDate(size_t idx, fluss::Date d) {
+ if (inner_) inner_->gr_set_date(idx, d.days_since_epoch);
+}
+
+void GenericRow::SetTime(size_t idx, fluss::Time t) {
+ if (inner_) inner_->gr_set_time(idx, t.millis_since_midnight);
+}
+
+void GenericRow::SetTimestampNtz(size_t idx, fluss::Timestamp ts) {
+ if (inner_) inner_->gr_set_ts_ntz(idx, ts.epoch_millis,
ts.nano_of_millisecond);
+}
+
+void GenericRow::SetTimestampLtz(size_t idx, fluss::Timestamp ts) {
+ if (inner_) inner_->gr_set_ts_ltz(idx, ts.epoch_millis,
ts.nano_of_millisecond);
+}
+
+void GenericRow::SetDecimal(size_t idx, const std::string& value) {
+ if (inner_) inner_->gr_set_decimal_str(idx, value);
+}
+
+// ============================================================================
+// RowView — zero-copy read-only row view for scan results
+// ============================================================================
+
+size_t RowView::FieldCount() const { return inner_ ?
inner_->sv_field_count(record_idx_) : 0; }
+
+TypeId RowView::GetType(size_t idx) const {
+ return inner_ ? static_cast<TypeId>(inner_->sv_column_type(idx)) :
TypeId::Int;
+}
+
+bool RowView::IsNull(size_t idx) const {
+ return inner_ ? inner_->sv_is_null(record_idx_, idx) : true;
+}
+bool RowView::GetBool(size_t idx) const {
+ return inner_ ? inner_->sv_get_bool(record_idx_, idx) : false;
+}
+int32_t RowView::GetInt32(size_t idx) const {
+ return inner_ ? inner_->sv_get_i32(record_idx_, idx) : 0;
+}
+int64_t RowView::GetInt64(size_t idx) const {
+ return inner_ ? inner_->sv_get_i64(record_idx_, idx) : 0;
+}
+float RowView::GetFloat32(size_t idx) const {
+ return inner_ ? inner_->sv_get_f32(record_idx_, idx) : 0.0f;
+}
+double RowView::GetFloat64(size_t idx) const {
+ return inner_ ? inner_->sv_get_f64(record_idx_, idx) : 0.0;
+}
+
+std::string_view RowView::GetString(size_t idx) const {
+ if (!inner_) return {};
+ auto s = inner_->sv_get_str(record_idx_, idx);
+ return std::string_view(s.data(), s.size());
+}
+
+std::pair<const uint8_t*, size_t> RowView::GetBytes(size_t idx) const {
+ if (!inner_) return {nullptr, 0};
+ auto bytes = inner_->sv_get_bytes(record_idx_, idx);
+ return {bytes.data(), bytes.size()};
+}
+
+Date RowView::GetDate(size_t idx) const {
+ return inner_ ? Date{inner_->sv_get_date_days(record_idx_, idx)} : Date{};
+}
+
+Time RowView::GetTime(size_t idx) const {
+ return inner_ ? Time{inner_->sv_get_time_millis(record_idx_, idx)} :
Time{};
+}
+
+Timestamp RowView::GetTimestamp(size_t idx) const {
+ if (!inner_) return {};
+ return Timestamp{inner_->sv_get_ts_millis(record_idx_, idx),
+ inner_->sv_get_ts_nanos(record_idx_, idx)};
+}
+
+bool RowView::IsDecimal(size_t idx) const { return GetType(idx) ==
TypeId::Decimal; }
+
+std::string RowView::GetDecimalString(size_t idx) const {
+ if (!inner_) return {};
+ return std::string(inner_->sv_get_decimal_str(record_idx_, idx));
+}
+
+// ============================================================================
+// ScanRecords — backed by opaque Rust ScanResultInner
+// ============================================================================
+
+ScanRecords::ScanRecords() noexcept = default;
+
+ScanRecords::~ScanRecords() noexcept { Destroy(); }
+
+void ScanRecords::Destroy() noexcept {
+ if (inner_) {
+ rust::Box<ffi::ScanResultInner>::from_raw(inner_);
+ inner_ = nullptr;
+ }
+}
+
+ScanRecords::ScanRecords(ScanRecords&& other) noexcept
+ : inner_(other.inner_), column_map_(std::move(other.column_map_)) {
+ other.inner_ = nullptr;
+}
+
+ScanRecords& ScanRecords::operator=(ScanRecords&& other) noexcept {
+ if (this != &other) {
+ Destroy();
+ inner_ = other.inner_;
+ column_map_ = std::move(other.column_map_);
+ other.inner_ = nullptr;
+ }
+ return *this;
+}
+
+size_t ScanRecords::Size() const { return inner_ ? inner_->sv_record_count() :
0; }
+
+bool ScanRecords::Empty() const { return Size() == 0; }
+
+void ScanRecords::BuildColumnMap() const {
+ if (!inner_) return;
+ auto map = std::make_shared<detail::ColumnMap>();
+ auto count = inner_->sv_column_count();
+ for (size_t i = 0; i < count; ++i) {
+ auto name = inner_->sv_column_name(i);
+ (*map)[std::string(name.data(), name.size())] = {
+ i, static_cast<TypeId>(inner_->sv_column_type(i))};
+ }
+ column_map_ = std::move(map);
+}
+
+const std::shared_ptr<detail::ColumnMap>& ScanRecords::GetColumnMap() const {
+ if (!column_map_) {
+ BuildColumnMap();
+ }
+ return column_map_;
+}
+
+ScanRecord ScanRecords::operator[](size_t idx) const {
+ if (!inner_ || idx >= inner_->sv_record_count())
+ return ScanRecord{0, std::nullopt, 0,
+ 0, ChangeType::AppendOnly, RowView(nullptr, 0,
nullptr)};
+ return ScanRecord{inner_->sv_bucket_id(idx),
+ inner_->sv_has_partition_id(idx)
+ ?
std::optional<int64_t>(inner_->sv_partition_id(idx))
+ : std::nullopt,
+ inner_->sv_offset(idx),
+ inner_->sv_timestamp(idx),
+ static_cast<ChangeType>(inner_->sv_change_type(idx)),
+ RowView(inner_, idx, GetColumnMap().get())};
+}
+
+ScanRecord ScanRecords::Iterator::operator*() const { return
owner_->operator[](idx_); }
+
+// ============================================================================
+// LookupResult — backed by opaque Rust LookupResultInner
+// ============================================================================
+
+LookupResult::LookupResult() noexcept = default;
+
+LookupResult::~LookupResult() noexcept { Destroy(); }
+
+void LookupResult::Destroy() noexcept {
+ if (inner_) {
+ rust::Box<ffi::LookupResultInner>::from_raw(inner_);
+ inner_ = nullptr;
+ }
+}
+
+LookupResult::LookupResult(LookupResult&& other) noexcept
+ : inner_(other.inner_), column_map_(std::move(other.column_map_)) {
+ other.inner_ = nullptr;
+}
+
+LookupResult& LookupResult::operator=(LookupResult&& other) noexcept {
+ if (this != &other) {
+ Destroy();
+ inner_ = other.inner_;
+ column_map_ = std::move(other.column_map_);
+ other.inner_ = nullptr;
+ }
+ return *this;
+}
+
+void LookupResult::BuildColumnMap() const {
+ if (!inner_) return;
+ auto map = std::make_shared<detail::ColumnMap>();
+ auto count = inner_->lv_field_count();
+ for (size_t i = 0; i < count; ++i) {
+ auto name = inner_->lv_column_name(i);
+ (*map)[std::string(name.data(), name.size())] = {
+ i, static_cast<TypeId>(inner_->lv_column_type(i))};
+ }
+ column_map_ = std::move(map);
+}
+
+bool LookupResult::Found() const { return inner_ && inner_->lv_found(); }
+
+size_t LookupResult::FieldCount() const { return inner_ ?
inner_->lv_field_count() : 0; }
+
+TypeId LookupResult::GetType(size_t idx) const {
+ return inner_ ? static_cast<TypeId>(inner_->lv_column_type(idx)) :
TypeId::Int;
+}
Review Comment:
row_reader::column_type() now returns Result<i32, String> and uses
get_column() which bounds-checks. On the C++ side, CXX converts errors to
exceptions, so out-of-range access throws instead of returning invalid TypeId.
The TypeId::Unknown = 0 sentinel is a good idea for a follow-up.
##########
bindings/cpp/src/types.rs:
##########
@@ -110,6 +87,8 @@ fn core_data_type_to_ffi(dt: &fcore::metadata::DataType) ->
i32 {
fcore::metadata::DataType::Timestamp(_) => DATA_TYPE_TIMESTAMP,
fcore::metadata::DataType::TimestampLTz(_) => DATA_TYPE_TIMESTAMP_LTZ,
fcore::metadata::DataType::Decimal(_) => DATA_TYPE_DECIMAL,
+ fcore::metadata::DataType::Char(_) => DATA_TYPE_CHAR,
+ fcore::metadata::DataType::Binary(_) => DATA_TYPE_BINARY,
_ => 0,
Review Comment:
Agreed that a panic here is better than silently returning 0. Will address
in a follow-up together with adding TypeId::Unknown on the C++ side.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]