fresh-borzoni commented on code in PR #330:
URL: https://github.com/apache/fluss-rust/pull/330#discussion_r2810105778
##########
bindings/cpp/src/table.cpp:
##########
@@ -398,8 +731,7 @@ Result AppendWriter::Append(const GenericRow& row,
WriteResult& out) {
}
Review Comment:
Already handled — `row.Available()` check is present (see line 800). Same
for UpsertWriter::Upsert (line 906), Delete (line 930), and Lookuper::Lookup
(line 990).
##########
bindings/cpp/src/table.cpp:
##########
@@ -79,6 +79,321 @@ int Date::Day() const {
return tm.tm_mday;
}
+// ============================================================================
+// GenericRow — write-only row backed by opaque Rust GenericRowInner
+// ============================================================================
+
+GenericRow::GenericRow() {
+ auto box = ffi::new_generic_row(0);
+ inner_ = box.into_raw();
+}
+
+GenericRow::GenericRow(size_t field_count) {
+ auto box = ffi::new_generic_row(field_count);
+ inner_ = box.into_raw();
+}
+
+GenericRow::~GenericRow() noexcept { Destroy(); }
+
+void GenericRow::Destroy() noexcept {
+ if (inner_) {
+ rust::Box<ffi::GenericRowInner>::from_raw(inner_);
+ inner_ = nullptr;
+ }
+}
+
+GenericRow::GenericRow(GenericRow&& other) noexcept
+ : inner_(other.inner_), column_map_(std::move(other.column_map_)) {
+ other.inner_ = nullptr;
+}
+
+GenericRow& GenericRow::operator=(GenericRow&& other) noexcept {
+ if (this != &other) {
+ Destroy();
+ inner_ = other.inner_;
+ column_map_ = std::move(other.column_map_);
+ other.inner_ = nullptr;
+ }
+ return *this;
+}
+
+bool GenericRow::Available() const { return inner_ != nullptr; }
+
+void GenericRow::Reset() {
+ if (inner_) inner_->gr_reset();
+}
+
+void GenericRow::SetNull(size_t idx) {
+ if (inner_) inner_->gr_set_null(idx);
+}
+void GenericRow::SetBool(size_t idx, bool v) {
+ if (inner_) inner_->gr_set_bool(idx, v);
+}
+void GenericRow::SetInt32(size_t idx, int32_t v) {
+ if (inner_) inner_->gr_set_i32(idx, v);
+}
+void GenericRow::SetInt64(size_t idx, int64_t v) {
+ if (inner_) inner_->gr_set_i64(idx, v);
+}
+void GenericRow::SetFloat32(size_t idx, float v) {
+ if (inner_) inner_->gr_set_f32(idx, v);
+}
+void GenericRow::SetFloat64(size_t idx, double v) {
+ if (inner_) inner_->gr_set_f64(idx, v);
+}
+
+void GenericRow::SetString(size_t idx, std::string v) {
+ if (inner_) inner_->gr_set_str(idx, v);
+}
+
+void GenericRow::SetBytes(size_t idx, std::vector<uint8_t> v) {
+ if (inner_) inner_->gr_set_bytes(idx, rust::Slice<const uint8_t>(v.data(),
v.size()));
+}
+
+void GenericRow::SetDate(size_t idx, fluss::Date d) {
+ if (inner_) inner_->gr_set_date(idx, d.days_since_epoch);
+}
+
+void GenericRow::SetTime(size_t idx, fluss::Time t) {
+ if (inner_) inner_->gr_set_time(idx, t.millis_since_midnight);
+}
+
+void GenericRow::SetTimestampNtz(size_t idx, fluss::Timestamp ts) {
+ if (inner_) inner_->gr_set_ts_ntz(idx, ts.epoch_millis,
ts.nano_of_millisecond);
+}
+
+void GenericRow::SetTimestampLtz(size_t idx, fluss::Timestamp ts) {
+ if (inner_) inner_->gr_set_ts_ltz(idx, ts.epoch_millis,
ts.nano_of_millisecond);
+}
+
+void GenericRow::SetDecimal(size_t idx, const std::string& value) {
+ if (inner_) inner_->gr_set_decimal_str(idx, value);
+}
+
+// ============================================================================
+// RowView — zero-copy read-only row view for scan results
+// ============================================================================
+
+size_t RowView::FieldCount() const { return inner_ ?
inner_->sv_field_count(record_idx_) : 0; }
+
+TypeId RowView::GetType(size_t idx) const {
+ return inner_ ? static_cast<TypeId>(inner_->sv_column_type(idx)) :
TypeId::Int;
+}
+
+bool RowView::IsNull(size_t idx) const {
+ return inner_ ? inner_->sv_is_null(record_idx_, idx) : true;
+}
+bool RowView::GetBool(size_t idx) const {
+ return inner_ ? inner_->sv_get_bool(record_idx_, idx) : false;
+}
+int32_t RowView::GetInt32(size_t idx) const {
+ return inner_ ? inner_->sv_get_i32(record_idx_, idx) : 0;
+}
+int64_t RowView::GetInt64(size_t idx) const {
+ return inner_ ? inner_->sv_get_i64(record_idx_, idx) : 0;
+}
+float RowView::GetFloat32(size_t idx) const {
+ return inner_ ? inner_->sv_get_f32(record_idx_, idx) : 0.0f;
+}
+double RowView::GetFloat64(size_t idx) const {
+ return inner_ ? inner_->sv_get_f64(record_idx_, idx) : 0.0;
+}
+
+std::string_view RowView::GetString(size_t idx) const {
+ if (!inner_) return {};
+ auto s = inner_->sv_get_str(record_idx_, idx);
+ return std::string_view(s.data(), s.size());
+}
+
+std::pair<const uint8_t*, size_t> RowView::GetBytes(size_t idx) const {
+ if (!inner_) return {nullptr, 0};
+ auto bytes = inner_->sv_get_bytes(record_idx_, idx);
+ return {bytes.data(), bytes.size()};
+}
+
+Date RowView::GetDate(size_t idx) const {
+ return inner_ ? Date{inner_->sv_get_date_days(record_idx_, idx)} : Date{};
+}
+
+Time RowView::GetTime(size_t idx) const {
+ return inner_ ? Time{inner_->sv_get_time_millis(record_idx_, idx)} :
Time{};
+}
+
+Timestamp RowView::GetTimestamp(size_t idx) const {
+ if (!inner_) return {};
+ return Timestamp{inner_->sv_get_ts_millis(record_idx_, idx),
+ inner_->sv_get_ts_nanos(record_idx_, idx)};
+}
+
+bool RowView::IsDecimal(size_t idx) const { return GetType(idx) ==
TypeId::Decimal; }
+
+std::string RowView::GetDecimalString(size_t idx) const {
+ if (!inner_) return {};
+ return std::string(inner_->sv_get_decimal_str(record_idx_, idx));
+}
+
+// ============================================================================
+// ScanRecords — backed by opaque Rust ScanResultInner
+// ============================================================================
+
+ScanRecords::ScanRecords() noexcept = default;
+
+ScanRecords::~ScanRecords() noexcept { Destroy(); }
+
+void ScanRecords::Destroy() noexcept {
+ if (inner_) {
+ rust::Box<ffi::ScanResultInner>::from_raw(inner_);
+ inner_ = nullptr;
+ }
+}
+
+ScanRecords::ScanRecords(ScanRecords&& other) noexcept
+ : inner_(other.inner_), column_map_(std::move(other.column_map_)) {
+ other.inner_ = nullptr;
+}
+
+ScanRecords& ScanRecords::operator=(ScanRecords&& other) noexcept {
+ if (this != &other) {
+ Destroy();
+ inner_ = other.inner_;
+ column_map_ = std::move(other.column_map_);
+ other.inner_ = nullptr;
+ }
+ return *this;
+}
+
+size_t ScanRecords::Size() const { return inner_ ? inner_->sv_record_count() :
0; }
+
+bool ScanRecords::Empty() const { return Size() == 0; }
+
+void ScanRecords::BuildColumnMap() const {
+ if (!inner_) return;
+ auto map = std::make_shared<detail::ColumnMap>();
+ auto count = inner_->sv_column_count();
+ for (size_t i = 0; i < count; ++i) {
+ auto name = inner_->sv_column_name(i);
+ (*map)[std::string(name.data(), name.size())] = {
+ i, static_cast<TypeId>(inner_->sv_column_type(i))};
+ }
+ column_map_ = std::move(map);
+}
+
+const std::shared_ptr<detail::ColumnMap>& ScanRecords::GetColumnMap() const {
+ if (!column_map_) {
+ BuildColumnMap();
+ }
+ return column_map_;
+}
+
+ScanRecord ScanRecords::operator[](size_t idx) const {
+ if (!inner_ || idx >= inner_->sv_record_count())
+ return ScanRecord{0, std::nullopt, 0,
+ 0, ChangeType::AppendOnly, RowView(nullptr, 0,
nullptr)};
+ return ScanRecord{inner_->sv_bucket_id(idx),
+ inner_->sv_has_partition_id(idx)
+ ?
std::optional<int64_t>(inner_->sv_partition_id(idx))
+ : std::nullopt,
+ inner_->sv_offset(idx),
+ inner_->sv_timestamp(idx),
+ static_cast<ChangeType>(inner_->sv_change_type(idx)),
+ RowView(inner_, idx, GetColumnMap().get())};
+}
+
+ScanRecord ScanRecords::Iterator::operator*() const { return
owner_->operator[](idx_); }
+
+// ============================================================================
+// LookupResult — backed by opaque Rust LookupResultInner
+// ============================================================================
+
+LookupResult::LookupResult() noexcept = default;
+
+LookupResult::~LookupResult() noexcept { Destroy(); }
+
+void LookupResult::Destroy() noexcept {
+ if (inner_) {
+ rust::Box<ffi::LookupResultInner>::from_raw(inner_);
+ inner_ = nullptr;
+ }
+}
+
+LookupResult::LookupResult(LookupResult&& other) noexcept
+ : inner_(other.inner_), column_map_(std::move(other.column_map_)) {
+ other.inner_ = nullptr;
+}
+
+LookupResult& LookupResult::operator=(LookupResult&& other) noexcept {
+ if (this != &other) {
+ Destroy();
+ inner_ = other.inner_;
+ column_map_ = std::move(other.column_map_);
+ other.inner_ = nullptr;
+ }
+ return *this;
+}
+
+void LookupResult::BuildColumnMap() const {
+ if (!inner_) return;
+ auto map = std::make_shared<detail::ColumnMap>();
+ auto count = inner_->lv_field_count();
+ for (size_t i = 0; i < count; ++i) {
+ auto name = inner_->lv_column_name(i);
+ (*map)[std::string(name.data(), name.size())] = {
+ i, static_cast<TypeId>(inner_->lv_column_type(i))};
+ }
+ column_map_ = std::move(map);
+}
+
+bool LookupResult::Found() const { return inner_ && inner_->lv_found(); }
+
+size_t LookupResult::FieldCount() const { return inner_ ?
inner_->lv_field_count() : 0; }
+
+TypeId LookupResult::GetType(size_t idx) const {
+ return inner_ ? static_cast<TypeId>(inner_->lv_column_type(idx)) :
TypeId::Int;
+}
Review Comment:
row_reader::column_type() now returns Result<i32, String> and uses
get_column() which bounds-checks. On the C++ side, CXX converts errors to
exceptions, so out-of-range access throws instead of returning invalid TypeId.
The TypeId::Unknown = 0 sentinel is a good idea for a follow-up.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]