Copilot commented on code in PR #296:
URL: https://github.com/apache/fluss-rust/pull/296#discussion_r2788120823


##########
bindings/cpp/include/fluss.hpp:
##########
@@ -932,6 +936,61 @@ class Table {
     mutable std::shared_ptr<GenericRow::ColumnMap> column_map_;
 };
 
+class TableAppend {
+   public:
+    TableAppend(const TableAppend&) = delete;
+    TableAppend& operator=(const TableAppend&) = delete;
+    TableAppend(TableAppend&&) noexcept = default;
+    TableAppend& operator=(TableAppend&&) noexcept = default;
+
+    Result CreateWriter(AppendWriter& out);
+
+   private:
+    friend class Table;
+    explicit TableAppend(ffi::Table* table) noexcept;
+
+    ffi::Table* table_{nullptr};
+};

Review Comment:
   TableAppend/TableUpsert/TableLookup store a raw `ffi::Table*`. Because 
`Table` owns and frees that pointer and is movable, a builder can easily 
outlive the originating `Table` (or survive a move-assignment) and end up 
calling into a dangling pointer (UB). Consider tying the builder lifetime to 
`Table` (e.g., hold a `std::shared_ptr`/ref-counted handle to the underlying 
table, or store `Table*` plus a generation/availability check), or at minimum 
document the required lifetime constraint prominently in the public header.



##########
bindings/cpp/src/table.cpp:
##########
@@ -124,7 +135,80 @@ Result Table::NewAppendWriter(AppendWriter& out) {
     }
 }
 
-TableScan Table::NewScan() { return TableScan(table_); }
+// TableUpsert implementation
+TableUpsert::TableUpsert(ffi::Table* table) noexcept : table_(table) {}
+
+TableUpsert& TableUpsert::PartialUpdateByIndex(std::vector<size_t> 
column_indices) {
+    column_indices_ = std::move(column_indices);
+    column_names_.clear();
+    return *this;
+}
+
+TableUpsert& TableUpsert::PartialUpdateByName(std::vector<std::string> 
column_names) {
+    column_names_ = std::move(column_names);
+    column_indices_.clear();
+    return *this;
+}
+
+std::vector<size_t> TableUpsert::ResolveNameProjection() const {
+    auto ffi_info = table_->get_table_info_from_table();
+    const auto& columns = ffi_info.schema.columns;
+
+    std::vector<size_t> indices;
+    for (const auto& name : column_names_) {
+        bool found = false;
+        for (size_t i = 0; i < columns.size(); ++i) {
+            if (std::string(columns[i].name) == name) {
+                indices.push_back(i);
+                found = true;
+                break;
+            }
+        }
+        if (!found) {
+            throw std::runtime_error("Column '" + name + "' not found");
+        }
+    }
+    return indices;
+}
+
+Result TableUpsert::CreateWriter(UpsertWriter& out) {
+    if (table_ == nullptr) {
+        return utils::make_error(1, "Table not available");
+    }
+
+    try {
+        auto resolved_indices = !column_names_.empty() ? 
ResolveNameProjection() : column_indices_;
+
+        rust::Vec<size_t> rust_indices;
+        for (size_t idx : resolved_indices) {
+            rust_indices.push_back(idx);
+        }
+        out = 
UpsertWriter(table_->create_upsert_writer(std::move(rust_indices)));
+        return utils::make_ok();

Review Comment:
   `TableUpsert::CreateWriter` cannot distinguish between (a) no partial update 
requested and (b) partial update explicitly requested with an empty column list 
(e.g., `PartialUpdateByIndex({})`). With the current logic, an empty list falls 
back to a full upsert writer, which is a behavioral change from the previous 
overload-based API (empty list would still attempt a partial update and error 
in core). Track an explicit "partial update requested" flag in the builder 
and/or return an error when PartialUpdate* is called with an empty list, so 
callers don't silently get full-update semantics.



##########
bindings/cpp/src/lib.rs:
##########
@@ -1084,79 +1007,24 @@ impl Table {
         self.has_pk
     }
 
-    fn new_upsert_writer(&self) -> Result<*mut UpsertWriter, String> {
-        let _enter = RUNTIME.enter();
-
-        let fluss_table = fcore::client::FlussTable::new(
-            &self.connection,
-            self.metadata.clone(),
-            self.table_info.clone(),
-        );
-
-        let table_upsert = fluss_table
-            .new_upsert()
-            .map_err(|e| format!("Failed to create upsert: {e}"))?;
-
-        let writer = table_upsert
-            .create_writer()
-            .map_err(|e| format!("Failed to create upsert writer: {e}"))?;
-
-        Ok(Box::into_raw(Box::new(UpsertWriter {
-            inner: writer,
-            table_info: self.table_info.clone(),
-        })))
-    }
-
-    fn new_upsert_writer_with_column_names(
-        &self,
-        column_names: Vec<String>,
-    ) -> Result<*mut UpsertWriter, String> {
-        let _enter = RUNTIME.enter();
-
-        let fluss_table = fcore::client::FlussTable::new(
-            &self.connection,
-            self.metadata.clone(),
-            self.table_info.clone(),
-        );
-
-        let table_upsert = fluss_table
-            .new_upsert()
-            .map_err(|e| format!("Failed to create upsert: {e}"))?;
-
-        let col_refs: Vec<&str> = column_names.iter().map(|s| 
s.as_str()).collect();
-        let table_upsert = table_upsert
-            .partial_update_with_column_names(&col_refs)
-            .map_err(|e| format!("Failed to set partial update columns: 
{e}"))?;
-
-        let writer = table_upsert
-            .create_writer()
-            .map_err(|e| format!("Failed to create upsert writer: {e}"))?;
-
-        Ok(Box::into_raw(Box::new(UpsertWriter {
-            inner: writer,
-            table_info: self.table_info.clone(),
-        })))
-    }
-
-    fn new_upsert_writer_with_column_indices(
+    fn create_upsert_writer(
         &self,
         column_indices: Vec<usize>,
     ) -> Result<*mut UpsertWriter, String> {
         let _enter = RUNTIME.enter();
 
-        let fluss_table = fcore::client::FlussTable::new(
-            &self.connection,
-            self.metadata.clone(),
-            self.table_info.clone(),
-        );
-
-        let table_upsert = fluss_table
+        let table_upsert = self
+            .fluss_table()
             .new_upsert()
             .map_err(|e| format!("Failed to create upsert: {e}"))?;
 
-        let table_upsert = table_upsert
-            .partial_update(Some(column_indices))
-            .map_err(|e| format!("Failed to set partial update columns: 
{e}"))?;
+        let table_upsert = if column_indices.is_empty() {
+            table_upsert
+        } else {
+            table_upsert
+                .partial_update(Some(column_indices))
+                .map_err(|e| format!("Failed to set partial update columns: 
{e}"))?
+        };

Review Comment:
   `create_upsert_writer` treats an empty `column_indices` vec as "no partial 
update" and skips `partial_update(...)`. This changes semantics from the 
previous FFI (`new_upsert_writer_with_column_indices`) where the partial-update 
path was taken even for an empty vec, which would fail core validation (missing 
PK in target columns) instead of silently creating a full-update writer. 
Consider preserving the ability to represent `Some(vec![])` vs `None` (e.g., 
add a separate `partial_update` boolean, or keep a dedicated FFI function for 
the partial-update case) to avoid accidental full updates when the caller 
intended partial update.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to