Copilot commented on code in PR #296:
URL: https://github.com/apache/fluss-rust/pull/296#discussion_r2788120823
##########
bindings/cpp/include/fluss.hpp:
##########
@@ -932,6 +936,61 @@ class Table {
mutable std::shared_ptr<GenericRow::ColumnMap> column_map_;
};
+class TableAppend {
+ public:
+ TableAppend(const TableAppend&) = delete;
+ TableAppend& operator=(const TableAppend&) = delete;
+ TableAppend(TableAppend&&) noexcept = default;
+ TableAppend& operator=(TableAppend&&) noexcept = default;
+
+ Result CreateWriter(AppendWriter& out);
+
+ private:
+ friend class Table;
+ explicit TableAppend(ffi::Table* table) noexcept;
+
+ ffi::Table* table_{nullptr};
+};
Review Comment:
TableAppend/TableUpsert/TableLookup store a raw `ffi::Table*`. Because
`Table` owns and frees that pointer and is movable, a builder can easily
outlive the originating `Table` (or survive a move-assignment) and end up
calling into a dangling pointer (UB). Consider tying the builder lifetime to
`Table` (e.g., hold a `std::shared_ptr`/ref-counted handle to the underlying
table, or store `Table*` plus a generation/availability check), or at minimum
document the required lifetime constraint prominently in the public header.
##########
bindings/cpp/src/table.cpp:
##########
@@ -124,7 +135,80 @@ Result Table::NewAppendWriter(AppendWriter& out) {
}
}
-TableScan Table::NewScan() { return TableScan(table_); }
+// TableUpsert implementation
+TableUpsert::TableUpsert(ffi::Table* table) noexcept : table_(table) {}
+
+TableUpsert& TableUpsert::PartialUpdateByIndex(std::vector<size_t>
column_indices) {
+ column_indices_ = std::move(column_indices);
+ column_names_.clear();
+ return *this;
+}
+
+TableUpsert& TableUpsert::PartialUpdateByName(std::vector<std::string>
column_names) {
+ column_names_ = std::move(column_names);
+ column_indices_.clear();
+ return *this;
+}
+
+std::vector<size_t> TableUpsert::ResolveNameProjection() const {
+ auto ffi_info = table_->get_table_info_from_table();
+ const auto& columns = ffi_info.schema.columns;
+
+ std::vector<size_t> indices;
+ for (const auto& name : column_names_) {
+ bool found = false;
+ for (size_t i = 0; i < columns.size(); ++i) {
+ if (std::string(columns[i].name) == name) {
+ indices.push_back(i);
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ throw std::runtime_error("Column '" + name + "' not found");
+ }
+ }
+ return indices;
+}
+
+Result TableUpsert::CreateWriter(UpsertWriter& out) {
+ if (table_ == nullptr) {
+ return utils::make_error(1, "Table not available");
+ }
+
+ try {
+ auto resolved_indices = !column_names_.empty() ?
ResolveNameProjection() : column_indices_;
+
+ rust::Vec<size_t> rust_indices;
+ for (size_t idx : resolved_indices) {
+ rust_indices.push_back(idx);
+ }
+ out =
UpsertWriter(table_->create_upsert_writer(std::move(rust_indices)));
+ return utils::make_ok();
Review Comment:
`TableUpsert::CreateWriter` cannot distinguish between (a) no partial update
requested and (b) partial update explicitly requested with an empty column list
(e.g., `PartialUpdateByIndex({})`). With the current logic, an empty list falls
back to a full upsert writer, which is a behavioral change from the previous
overload-based API (empty list would still attempt a partial update and error
in core). Track an explicit "partial update requested" flag in the builder
and/or return an error when PartialUpdate* is called with an empty list, so
callers don't silently get full-update semantics.
##########
bindings/cpp/src/lib.rs:
##########
@@ -1084,79 +1007,24 @@ impl Table {
self.has_pk
}
- fn new_upsert_writer(&self) -> Result<*mut UpsertWriter, String> {
- let _enter = RUNTIME.enter();
-
- let fluss_table = fcore::client::FlussTable::new(
- &self.connection,
- self.metadata.clone(),
- self.table_info.clone(),
- );
-
- let table_upsert = fluss_table
- .new_upsert()
- .map_err(|e| format!("Failed to create upsert: {e}"))?;
-
- let writer = table_upsert
- .create_writer()
- .map_err(|e| format!("Failed to create upsert writer: {e}"))?;
-
- Ok(Box::into_raw(Box::new(UpsertWriter {
- inner: writer,
- table_info: self.table_info.clone(),
- })))
- }
-
- fn new_upsert_writer_with_column_names(
- &self,
- column_names: Vec<String>,
- ) -> Result<*mut UpsertWriter, String> {
- let _enter = RUNTIME.enter();
-
- let fluss_table = fcore::client::FlussTable::new(
- &self.connection,
- self.metadata.clone(),
- self.table_info.clone(),
- );
-
- let table_upsert = fluss_table
- .new_upsert()
- .map_err(|e| format!("Failed to create upsert: {e}"))?;
-
- let col_refs: Vec<&str> = column_names.iter().map(|s|
s.as_str()).collect();
- let table_upsert = table_upsert
- .partial_update_with_column_names(&col_refs)
- .map_err(|e| format!("Failed to set partial update columns:
{e}"))?;
-
- let writer = table_upsert
- .create_writer()
- .map_err(|e| format!("Failed to create upsert writer: {e}"))?;
-
- Ok(Box::into_raw(Box::new(UpsertWriter {
- inner: writer,
- table_info: self.table_info.clone(),
- })))
- }
-
- fn new_upsert_writer_with_column_indices(
+ fn create_upsert_writer(
&self,
column_indices: Vec<usize>,
) -> Result<*mut UpsertWriter, String> {
let _enter = RUNTIME.enter();
- let fluss_table = fcore::client::FlussTable::new(
- &self.connection,
- self.metadata.clone(),
- self.table_info.clone(),
- );
-
- let table_upsert = fluss_table
+ let table_upsert = self
+ .fluss_table()
.new_upsert()
.map_err(|e| format!("Failed to create upsert: {e}"))?;
- let table_upsert = table_upsert
- .partial_update(Some(column_indices))
- .map_err(|e| format!("Failed to set partial update columns:
{e}"))?;
+ let table_upsert = if column_indices.is_empty() {
+ table_upsert
+ } else {
+ table_upsert
+ .partial_update(Some(column_indices))
+ .map_err(|e| format!("Failed to set partial update columns:
{e}"))?
+ };
Review Comment:
`create_upsert_writer` treats an empty `column_indices` vec as "no partial
update" and skips `partial_update(...)`. This changes semantics from the
previous FFI (`new_upsert_writer_with_column_indices`) where the partial-update
path was taken even for an empty vec, which would fail core validation (missing
PK in target columns) instead of silently creating a full-update writer.
Consider preserving the ability to represent `Some(vec![])` vs `None` (e.g.,
add a separate `partial_update` boolean, or keep a dedicated FFI function for
the partial-update case) to avoid accidental full updates when the caller
intended partial update.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]