zhaohaidao commented on code in PR #330:
URL: https://github.com/apache/fluss-rust/pull/330#discussion_r2809226540
##########
bindings/cpp/src/lib.rs:
##########
@@ -1417,3 +1510,494 @@ impl LogScanner {
}
}
}
+
+// ============================================================================
+// Opaque types: GenericRowInner (write path)
+// ============================================================================
+
+pub struct GenericRowInner {
+ row: fcore::row::GenericRow<'static>,
+}
+
+fn new_generic_row(field_count: usize) -> Box<GenericRowInner> {
+ Box::new(GenericRowInner {
+ row: fcore::row::GenericRow::new(field_count),
+ })
+}
+
+impl GenericRowInner {
+ fn gr_reset(&mut self) {
+ let len = self.row.values.len();
+ self.row = fcore::row::GenericRow::new(len);
+ }
+
+ fn gr_set_null(&mut self, idx: usize) {
+ self.ensure_size(idx);
+ self.row.set_field(idx, fcore::row::Datum::Null);
+ }
+
+ fn gr_set_bool(&mut self, idx: usize, val: bool) {
+ self.ensure_size(idx);
+ self.row.set_field(idx, fcore::row::Datum::Bool(val));
+ }
+
+ fn gr_set_i32(&mut self, idx: usize, val: i32) {
+ self.ensure_size(idx);
+ self.row.set_field(idx, fcore::row::Datum::Int32(val));
+ }
+
+ fn gr_set_i64(&mut self, idx: usize, val: i64) {
+ self.ensure_size(idx);
+ self.row.set_field(idx, fcore::row::Datum::Int64(val));
+ }
+
+ fn gr_set_f32(&mut self, idx: usize, val: f32) {
+ self.ensure_size(idx);
+ self.row
+ .set_field(idx, fcore::row::Datum::Float32(val.into()));
+ }
+
+ fn gr_set_f64(&mut self, idx: usize, val: f64) {
+ self.ensure_size(idx);
+ self.row
+ .set_field(idx, fcore::row::Datum::Float64(val.into()));
+ }
+
+ fn gr_set_str(&mut self, idx: usize, val: &str) {
+ self.ensure_size(idx);
+ self.row.set_field(
+ idx,
+
fcore::row::Datum::String(std::borrow::Cow::Owned(val.to_string())),
+ );
+ }
+
+ fn gr_set_bytes(&mut self, idx: usize, val: &[u8]) {
+ self.ensure_size(idx);
+ self.row.set_field(
+ idx,
+ fcore::row::Datum::Blob(std::borrow::Cow::Owned(val.to_vec())),
+ );
+ }
+
+ fn gr_set_date(&mut self, idx: usize, days: i32) {
+ self.ensure_size(idx);
+ self.row
+ .set_field(idx,
fcore::row::Datum::Date(fcore::row::Date::new(days)));
+ }
+
+ fn gr_set_time(&mut self, idx: usize, millis: i32) {
+ self.ensure_size(idx);
+ self.row
+ .set_field(idx,
fcore::row::Datum::Time(fcore::row::Time::new(millis)));
+ }
+
+ fn gr_set_ts_ntz(&mut self, idx: usize, millis: i64, nanos: i32) {
+ self.ensure_size(idx);
+ // Use from_millis_nanos, falling back to millis-only on error
+ let ts = fcore::row::TimestampNtz::from_millis_nanos(millis, nanos)
+ .unwrap_or_else(|_| fcore::row::TimestampNtz::new(millis));
+ self.row.set_field(idx, fcore::row::Datum::TimestampNtz(ts));
+ }
+
+ fn gr_set_ts_ltz(&mut self, idx: usize, millis: i64, nanos: i32) {
+ self.ensure_size(idx);
+ let ts = fcore::row::TimestampLtz::from_millis_nanos(millis, nanos)
+ .unwrap_or_else(|_| fcore::row::TimestampLtz::new(millis));
+ self.row.set_field(idx, fcore::row::Datum::TimestampLtz(ts));
+ }
+
+ fn gr_set_decimal_str(&mut self, idx: usize, val: &str) {
+ self.ensure_size(idx);
+ // Store as string; resolve_row_types() will parse and validate
against schema
+ self.row.set_field(
+ idx,
+
fcore::row::Datum::String(std::borrow::Cow::Owned(val.to_string())),
+ );
+ }
+
+ fn ensure_size(&mut self, idx: usize) {
+ if self.row.values.len() <= idx {
+ self.row.values.resize(idx + 1, fcore::row::Datum::Null);
+ }
+ }
+}
+
+// ============================================================================
+// Shared row-reading helpers (used by both ScanResultInner and
LookupResultInner)
+// ============================================================================
+
+mod row_reader {
Review Comment:
InternalRow::get_* uses many unwrap/expect; out-of-bounds, type mismatch, or
null misuse can panic. Panic across CXX FFI can terminate the process.
Can you add unified pre-checks in row_reader: field < columns.len(), field <
row.get_field_count(), type compatibility, and null checks where needed
```rust
impl<'a> InternalRow for GenericRow<'a> {
fn get_field_count(&self) -> usize {
self.values.len()
}
....
fn get_boolean(&self, pos: usize) -> bool {
self.values.get(pos).unwrap().try_into().unwrap()
}
fn get_byte(&self, pos: usize) -> i8 {
self.values.get(pos).unwrap().try_into().unwrap()
}
fn get_short(&self, pos: usize) -> i16 {
self.values.get(pos).unwrap().try_into().unwrap()
}
fn get_int(&self, pos: usize) -> i32 {
self.values.get(pos).unwrap().try_into().unwrap()
}
fn get_long(&self, _pos: usize) -> i64 {
self.values.get(_pos).unwrap().try_into().unwrap()
}
fn get_float(&self, pos: usize) -> f32 {
self.values.get(pos).unwrap().try_into().unwrap()
}
fn get_double(&self, pos: usize) -> f64 {
self.values.get(pos).unwrap().try_into().unwrap()
}
```
##########
bindings/cpp/src/table.cpp:
##########
@@ -79,6 +79,321 @@ int Date::Day() const {
return tm.tm_mday;
}
+// ============================================================================
+// GenericRow — write-only row backed by opaque Rust GenericRowInner
+// ============================================================================
+
+GenericRow::GenericRow() {
+ auto box = ffi::new_generic_row(0);
+ inner_ = box.into_raw();
+}
+
+GenericRow::GenericRow(size_t field_count) {
+ auto box = ffi::new_generic_row(field_count);
+ inner_ = box.into_raw();
+}
+
+GenericRow::~GenericRow() noexcept { Destroy(); }
+
+void GenericRow::Destroy() noexcept {
+ if (inner_) {
+ rust::Box<ffi::GenericRowInner>::from_raw(inner_);
+ inner_ = nullptr;
+ }
+}
+
+GenericRow::GenericRow(GenericRow&& other) noexcept
+ : inner_(other.inner_), column_map_(std::move(other.column_map_)) {
+ other.inner_ = nullptr;
+}
+
+GenericRow& GenericRow::operator=(GenericRow&& other) noexcept {
+ if (this != &other) {
+ Destroy();
+ inner_ = other.inner_;
+ column_map_ = std::move(other.column_map_);
+ other.inner_ = nullptr;
+ }
+ return *this;
+}
+
+bool GenericRow::Available() const { return inner_ != nullptr; }
+
+void GenericRow::Reset() {
+ if (inner_) inner_->gr_reset();
+}
+
+void GenericRow::SetNull(size_t idx) {
+ if (inner_) inner_->gr_set_null(idx);
+}
+void GenericRow::SetBool(size_t idx, bool v) {
+ if (inner_) inner_->gr_set_bool(idx, v);
+}
+void GenericRow::SetInt32(size_t idx, int32_t v) {
+ if (inner_) inner_->gr_set_i32(idx, v);
+}
+void GenericRow::SetInt64(size_t idx, int64_t v) {
+ if (inner_) inner_->gr_set_i64(idx, v);
+}
+void GenericRow::SetFloat32(size_t idx, float v) {
+ if (inner_) inner_->gr_set_f32(idx, v);
+}
+void GenericRow::SetFloat64(size_t idx, double v) {
+ if (inner_) inner_->gr_set_f64(idx, v);
+}
+
+void GenericRow::SetString(size_t idx, std::string v) {
+ if (inner_) inner_->gr_set_str(idx, v);
+}
+
+void GenericRow::SetBytes(size_t idx, std::vector<uint8_t> v) {
+ if (inner_) inner_->gr_set_bytes(idx, rust::Slice<const uint8_t>(v.data(),
v.size()));
+}
+
+void GenericRow::SetDate(size_t idx, fluss::Date d) {
+ if (inner_) inner_->gr_set_date(idx, d.days_since_epoch);
+}
+
+void GenericRow::SetTime(size_t idx, fluss::Time t) {
+ if (inner_) inner_->gr_set_time(idx, t.millis_since_midnight);
+}
+
+void GenericRow::SetTimestampNtz(size_t idx, fluss::Timestamp ts) {
+ if (inner_) inner_->gr_set_ts_ntz(idx, ts.epoch_millis,
ts.nano_of_millisecond);
+}
+
+void GenericRow::SetTimestampLtz(size_t idx, fluss::Timestamp ts) {
+ if (inner_) inner_->gr_set_ts_ltz(idx, ts.epoch_millis,
ts.nano_of_millisecond);
+}
+
+void GenericRow::SetDecimal(size_t idx, const std::string& value) {
+ if (inner_) inner_->gr_set_decimal_str(idx, value);
+}
+
+// ============================================================================
+// RowView — zero-copy read-only row view for scan results
+// ============================================================================
+
+size_t RowView::FieldCount() const { return inner_ ?
inner_->sv_field_count(record_idx_) : 0; }
+
+TypeId RowView::GetType(size_t idx) const {
+ return inner_ ? static_cast<TypeId>(inner_->sv_column_type(idx)) :
TypeId::Int;
+}
+
+bool RowView::IsNull(size_t idx) const {
+ return inner_ ? inner_->sv_is_null(record_idx_, idx) : true;
+}
+bool RowView::GetBool(size_t idx) const {
+ return inner_ ? inner_->sv_get_bool(record_idx_, idx) : false;
+}
+int32_t RowView::GetInt32(size_t idx) const {
+ return inner_ ? inner_->sv_get_i32(record_idx_, idx) : 0;
+}
+int64_t RowView::GetInt64(size_t idx) const {
+ return inner_ ? inner_->sv_get_i64(record_idx_, idx) : 0;
+}
+float RowView::GetFloat32(size_t idx) const {
+ return inner_ ? inner_->sv_get_f32(record_idx_, idx) : 0.0f;
+}
+double RowView::GetFloat64(size_t idx) const {
+ return inner_ ? inner_->sv_get_f64(record_idx_, idx) : 0.0;
+}
+
+std::string_view RowView::GetString(size_t idx) const {
+ if (!inner_) return {};
+ auto s = inner_->sv_get_str(record_idx_, idx);
+ return std::string_view(s.data(), s.size());
+}
+
+std::pair<const uint8_t*, size_t> RowView::GetBytes(size_t idx) const {
+ if (!inner_) return {nullptr, 0};
+ auto bytes = inner_->sv_get_bytes(record_idx_, idx);
+ return {bytes.data(), bytes.size()};
+}
+
+Date RowView::GetDate(size_t idx) const {
+ return inner_ ? Date{inner_->sv_get_date_days(record_idx_, idx)} : Date{};
+}
+
+Time RowView::GetTime(size_t idx) const {
+ return inner_ ? Time{inner_->sv_get_time_millis(record_idx_, idx)} :
Time{};
+}
+
+Timestamp RowView::GetTimestamp(size_t idx) const {
+ if (!inner_) return {};
+ return Timestamp{inner_->sv_get_ts_millis(record_idx_, idx),
+ inner_->sv_get_ts_nanos(record_idx_, idx)};
+}
+
+bool RowView::IsDecimal(size_t idx) const { return GetType(idx) ==
TypeId::Decimal; }
+
+std::string RowView::GetDecimalString(size_t idx) const {
+ if (!inner_) return {};
+ return std::string(inner_->sv_get_decimal_str(record_idx_, idx));
+}
+
+// ============================================================================
+// ScanRecords — backed by opaque Rust ScanResultInner
+// ============================================================================
+
+ScanRecords::ScanRecords() noexcept = default;
+
+ScanRecords::~ScanRecords() noexcept { Destroy(); }
+
+void ScanRecords::Destroy() noexcept {
+ if (inner_) {
+ rust::Box<ffi::ScanResultInner>::from_raw(inner_);
+ inner_ = nullptr;
Review Comment:
Destroy() releases inner_ but does not clear column_map_, so reused objects
keep stale mappings
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]