leekeiabstraction commented on code in PR #330:
URL: https://github.com/apache/fluss-rust/pull/330#discussion_r2809051764


##########
bindings/cpp/src/lib.rs:
##########
@@ -1417,3 +1510,494 @@ impl LogScanner {
         }
     }
 }
+
+// ============================================================================
+// Opaque types: GenericRowInner (write path)
+// ============================================================================
+
+pub struct GenericRowInner {
+    row: fcore::row::GenericRow<'static>,
+}
+
+fn new_generic_row(field_count: usize) -> Box<GenericRowInner> {
+    Box::new(GenericRowInner {
+        row: fcore::row::GenericRow::new(field_count),
+    })
+}
+
+impl GenericRowInner {
+    fn gr_reset(&mut self) {
+        let len = self.row.values.len();
+        self.row = fcore::row::GenericRow::new(len);
+    }
+
+    fn gr_set_null(&mut self, idx: usize) {
+        self.ensure_size(idx);
+        self.row.set_field(idx, fcore::row::Datum::Null);
+    }
+
+    fn gr_set_bool(&mut self, idx: usize, val: bool) {
+        self.ensure_size(idx);
+        self.row.set_field(idx, fcore::row::Datum::Bool(val));
+    }
+
+    fn gr_set_i32(&mut self, idx: usize, val: i32) {
+        self.ensure_size(idx);
+        self.row.set_field(idx, fcore::row::Datum::Int32(val));
+    }
+
+    fn gr_set_i64(&mut self, idx: usize, val: i64) {
+        self.ensure_size(idx);
+        self.row.set_field(idx, fcore::row::Datum::Int64(val));
+    }
+
+    fn gr_set_f32(&mut self, idx: usize, val: f32) {
+        self.ensure_size(idx);
+        self.row
+            .set_field(idx, fcore::row::Datum::Float32(val.into()));
+    }
+
+    fn gr_set_f64(&mut self, idx: usize, val: f64) {
+        self.ensure_size(idx);
+        self.row
+            .set_field(idx, fcore::row::Datum::Float64(val.into()));
+    }
+
+    fn gr_set_str(&mut self, idx: usize, val: &str) {
+        self.ensure_size(idx);
+        self.row.set_field(
+            idx,
+            
fcore::row::Datum::String(std::borrow::Cow::Owned(val.to_string())),
+        );
+    }
+
+    fn gr_set_bytes(&mut self, idx: usize, val: &[u8]) {
+        self.ensure_size(idx);
+        self.row.set_field(
+            idx,
+            fcore::row::Datum::Blob(std::borrow::Cow::Owned(val.to_vec())),
+        );
+    }
+
+    fn gr_set_date(&mut self, idx: usize, days: i32) {
+        self.ensure_size(idx);
+        self.row
+            .set_field(idx, 
fcore::row::Datum::Date(fcore::row::Date::new(days)));
+    }
+
+    fn gr_set_time(&mut self, idx: usize, millis: i32) {
+        self.ensure_size(idx);
+        self.row
+            .set_field(idx, 
fcore::row::Datum::Time(fcore::row::Time::new(millis)));
+    }
+
+    fn gr_set_ts_ntz(&mut self, idx: usize, millis: i64, nanos: i32) {
+        self.ensure_size(idx);
+        // Use from_millis_nanos, falling back to millis-only on error
+        let ts = fcore::row::TimestampNtz::from_millis_nanos(millis, nanos)
+            .unwrap_or_else(|_| fcore::row::TimestampNtz::new(millis));
+        self.row.set_field(idx, fcore::row::Datum::TimestampNtz(ts));
+    }
+
+    fn gr_set_ts_ltz(&mut self, idx: usize, millis: i64, nanos: i32) {
+        self.ensure_size(idx);
+        let ts = fcore::row::TimestampLtz::from_millis_nanos(millis, nanos)
+            .unwrap_or_else(|_| fcore::row::TimestampLtz::new(millis));
+        self.row.set_field(idx, fcore::row::Datum::TimestampLtz(ts));
+    }
+
+    fn gr_set_decimal_str(&mut self, idx: usize, val: &str) {
+        self.ensure_size(idx);
+        // Store as string; resolve_row_types() will parse and validate 
against schema
+        self.row.set_field(
+            idx,
+            
fcore::row::Datum::String(std::borrow::Cow::Owned(val.to_string())),
+        );
+    }
+
+    fn ensure_size(&mut self, idx: usize) {
+        if self.row.values.len() <= idx {
+            self.row.values.resize(idx + 1, fcore::row::Datum::Null);
+        }
+    }
+}
+
+// ============================================================================
+// Shared row-reading helpers (used by both ScanResultInner and 
LookupResultInner)
+// ============================================================================
+
+mod row_reader {
+    use fcore::row::InternalRow;
+    use fluss as fcore;
+
+    use crate::types;
+
+    pub fn column_type(columns: &[fcore::metadata::Column], field: usize) -> 
i32 {
+        columns
+            .get(field)
+            .map_or(0, |c| types::core_data_type_to_ffi(c.data_type()))
+    }
+
+    pub fn column_name(columns: &[fcore::metadata::Column], field: usize) -> 
&str {
+        columns.get(field).map_or("", |c| c.name())
+    }
+
+    pub fn is_null(row: &dyn InternalRow, _field: usize) -> bool {
+        row.is_null_at(_field)

Review Comment:
   Rename `_field` to `field` as it is actually used



##########
bindings/cpp/src/table.cpp:
##########
@@ -79,6 +79,321 @@ int Date::Day() const {
     return tm.tm_mday;
 }
 
+// ============================================================================
+// GenericRow — write-only row backed by opaque Rust GenericRowInner
+// ============================================================================
+
+GenericRow::GenericRow() {
+    auto box = ffi::new_generic_row(0);
+    inner_ = box.into_raw();
+}
+
+GenericRow::GenericRow(size_t field_count) {
+    auto box = ffi::new_generic_row(field_count);
+    inner_ = box.into_raw();
+}
+
+GenericRow::~GenericRow() noexcept { Destroy(); }
+
+void GenericRow::Destroy() noexcept {
+    if (inner_) {
+        rust::Box<ffi::GenericRowInner>::from_raw(inner_);
+        inner_ = nullptr;
+    }
+}
+
+GenericRow::GenericRow(GenericRow&& other) noexcept
+    : inner_(other.inner_), column_map_(std::move(other.column_map_)) {
+    other.inner_ = nullptr;
+}
+
+GenericRow& GenericRow::operator=(GenericRow&& other) noexcept {
+    if (this != &other) {
+        Destroy();
+        inner_ = other.inner_;
+        column_map_ = std::move(other.column_map_);
+        other.inner_ = nullptr;
+    }
+    return *this;
+}
+
+bool GenericRow::Available() const { return inner_ != nullptr; }
+
+void GenericRow::Reset() {
+    if (inner_) inner_->gr_reset();
+}
+
+void GenericRow::SetNull(size_t idx) {
+    if (inner_) inner_->gr_set_null(idx);
+}
+void GenericRow::SetBool(size_t idx, bool v) {
+    if (inner_) inner_->gr_set_bool(idx, v);
+}
+void GenericRow::SetInt32(size_t idx, int32_t v) {
+    if (inner_) inner_->gr_set_i32(idx, v);
+}
+void GenericRow::SetInt64(size_t idx, int64_t v) {
+    if (inner_) inner_->gr_set_i64(idx, v);
+}
+void GenericRow::SetFloat32(size_t idx, float v) {
+    if (inner_) inner_->gr_set_f32(idx, v);
+}
+void GenericRow::SetFloat64(size_t idx, double v) {
+    if (inner_) inner_->gr_set_f64(idx, v);
+}
+
+void GenericRow::SetString(size_t idx, std::string v) {
+    if (inner_) inner_->gr_set_str(idx, v);
+}
+
+void GenericRow::SetBytes(size_t idx, std::vector<uint8_t> v) {
+    if (inner_) inner_->gr_set_bytes(idx, rust::Slice<const uint8_t>(v.data(), 
v.size()));
+}
+
+void GenericRow::SetDate(size_t idx, fluss::Date d) {
+    if (inner_) inner_->gr_set_date(idx, d.days_since_epoch);
+}
+
+void GenericRow::SetTime(size_t idx, fluss::Time t) {
+    if (inner_) inner_->gr_set_time(idx, t.millis_since_midnight);
+}
+
+void GenericRow::SetTimestampNtz(size_t idx, fluss::Timestamp ts) {
+    if (inner_) inner_->gr_set_ts_ntz(idx, ts.epoch_millis, 
ts.nano_of_millisecond);
+}
+
+void GenericRow::SetTimestampLtz(size_t idx, fluss::Timestamp ts) {
+    if (inner_) inner_->gr_set_ts_ltz(idx, ts.epoch_millis, 
ts.nano_of_millisecond);
+}
+
+void GenericRow::SetDecimal(size_t idx, const std::string& value) {
+    if (inner_) inner_->gr_set_decimal_str(idx, value);
+}
+
+// ============================================================================
+// RowView — zero-copy read-only row view for scan results
+// ============================================================================
+
+size_t RowView::FieldCount() const { return inner_ ? 
inner_->sv_field_count(record_idx_) : 0; }
+
+TypeId RowView::GetType(size_t idx) const {
+    return inner_ ? static_cast<TypeId>(inner_->sv_column_type(idx)) : 
TypeId::Int;
+}
+
+bool RowView::IsNull(size_t idx) const {
+    return inner_ ? inner_->sv_is_null(record_idx_, idx) : true;
+}
+bool RowView::GetBool(size_t idx) const {
+    return inner_ ? inner_->sv_get_bool(record_idx_, idx) : false;
+}
+int32_t RowView::GetInt32(size_t idx) const {
+    return inner_ ? inner_->sv_get_i32(record_idx_, idx) : 0;
+}
+int64_t RowView::GetInt64(size_t idx) const {
+    return inner_ ? inner_->sv_get_i64(record_idx_, idx) : 0;
+}
+float RowView::GetFloat32(size_t idx) const {
+    return inner_ ? inner_->sv_get_f32(record_idx_, idx) : 0.0f;
+}
+double RowView::GetFloat64(size_t idx) const {
+    return inner_ ? inner_->sv_get_f64(record_idx_, idx) : 0.0;
+}
+
+std::string_view RowView::GetString(size_t idx) const {
+    if (!inner_) return {};
+    auto s = inner_->sv_get_str(record_idx_, idx);
+    return std::string_view(s.data(), s.size());
+}
+
+std::pair<const uint8_t*, size_t> RowView::GetBytes(size_t idx) const {
+    if (!inner_) return {nullptr, 0};
+    auto bytes = inner_->sv_get_bytes(record_idx_, idx);
+    return {bytes.data(), bytes.size()};
+}
+
+Date RowView::GetDate(size_t idx) const {
+    return inner_ ? Date{inner_->sv_get_date_days(record_idx_, idx)} : Date{};
+}
+
+Time RowView::GetTime(size_t idx) const {
+    return inner_ ? Time{inner_->sv_get_time_millis(record_idx_, idx)} : 
Time{};
+}
+
+Timestamp RowView::GetTimestamp(size_t idx) const {
+    if (!inner_) return {};
+    return Timestamp{inner_->sv_get_ts_millis(record_idx_, idx),
+                     inner_->sv_get_ts_nanos(record_idx_, idx)};
+}
+
+bool RowView::IsDecimal(size_t idx) const { return GetType(idx) == 
TypeId::Decimal; }
+
+std::string RowView::GetDecimalString(size_t idx) const {
+    if (!inner_) return {};
+    return std::string(inner_->sv_get_decimal_str(record_idx_, idx));
+}
+
+// ============================================================================
+// ScanRecords — backed by opaque Rust ScanResultInner
+// ============================================================================
+
+ScanRecords::ScanRecords() noexcept = default;
+
+ScanRecords::~ScanRecords() noexcept { Destroy(); }
+
+void ScanRecords::Destroy() noexcept {
+    if (inner_) {
+        rust::Box<ffi::ScanResultInner>::from_raw(inner_);
+        inner_ = nullptr;
+    }
+}
+
+ScanRecords::ScanRecords(ScanRecords&& other) noexcept
+    : inner_(other.inner_), column_map_(std::move(other.column_map_)) {
+    other.inner_ = nullptr;
+}
+
+ScanRecords& ScanRecords::operator=(ScanRecords&& other) noexcept {
+    if (this != &other) {
+        Destroy();
+        inner_ = other.inner_;
+        column_map_ = std::move(other.column_map_);
+        other.inner_ = nullptr;
+    }
+    return *this;
+}
+
+size_t ScanRecords::Size() const { return inner_ ? inner_->sv_record_count() : 
0; }
+
+bool ScanRecords::Empty() const { return Size() == 0; }
+
+void ScanRecords::BuildColumnMap() const {
+    if (!inner_) return;
+    auto map = std::make_shared<detail::ColumnMap>();
+    auto count = inner_->sv_column_count();
+    for (size_t i = 0; i < count; ++i) {
+        auto name = inner_->sv_column_name(i);
+        (*map)[std::string(name.data(), name.size())] = {
+            i, static_cast<TypeId>(inner_->sv_column_type(i))};
+    }
+    column_map_ = std::move(map);
+}
+
+const std::shared_ptr<detail::ColumnMap>& ScanRecords::GetColumnMap() const {
+    if (!column_map_) {
+        BuildColumnMap();
+    }
+    return column_map_;
+}
+
+ScanRecord ScanRecords::operator[](size_t idx) const {
+    if (!inner_ || idx >= inner_->sv_record_count())
+        return ScanRecord{0, std::nullopt,           0,
+                          0, ChangeType::AppendOnly, RowView(nullptr, 0, 
nullptr)};

Review Comment:
   Should we consider applying fail-fast by throwing an exception here instead? 
Looks like operator[] is public and user can access it directly.



##########
bindings/cpp/include/fluss.hpp:
##########
@@ -232,6 +238,14 @@ struct Timestamp {
     }
 };
 
+enum class ChangeType {
+    AppendOnly = 0,
+    Insert = 1,
+    UpdateBefore = 2,
+    UpdateAfter = 3,
+    Delete = 4,
+};
+
 enum class TypeId {

Review Comment:
   I wonder if we can define this in the ffi mod instead? So we can get rid of 
the enum declaration here as well as constants in `types.rs`?



##########
bindings/cpp/include/fluss.hpp:
##########
@@ -799,48 +608,122 @@ struct GenericRow {
         return it->second;
     }
 
-    const Datum& GetField(size_t idx) const {
-        if (idx >= fields.size()) {
-            throw std::runtime_error("GenericRow: index " + 
std::to_string(idx) +
-                                     " out of bounds (size=" + 
std::to_string(fields.size()) + ")");
-        }
-        return fields[idx];
-    }
+    void Destroy() noexcept;
 
-    const Datum& GetTypedField(size_t idx, DatumType expected) const {
-        const auto& d = GetField(idx);
-        if (d.GetType() != expected) {
-            throw std::runtime_error("GenericRow: field " + 
std::to_string(idx) +
-                                     " type mismatch: expected " +
-                                     
std::to_string(static_cast<int>(expected)) + ", got " +
-                                     
std::to_string(static_cast<int>(d.GetType())));
-        }
-        return d;
-    }
+    ffi::GenericRowInner* inner_{nullptr};
+    std::shared_ptr<ColumnMap> column_map_;
+};
 
-    void EnsureSize(size_t idx) {
-        if (fields.size() <= idx) {
-            fields.resize(idx + 1);
+/// Read-only row view for scan results. Zero-copy access to string and bytes 
data.
+///
+/// WARNING: RowView borrows from ScanRecords. It must not outlive the 
ScanRecords
+/// that produced it (similar to std::string_view borrowing from std::string).
+class RowView : public detail::NamedGetters<RowView> {

Review Comment:
   I think we should have documentation updated with this specific warning as 
well where appropriate. 
   
   What do you think about providing some guidance/example when user want to 
use RowView beyond ScanRecords lifetime in the doc, e.g. manually copying 
before ScanRecords get out of scope?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to