Copilot commented on code in PR #302:
URL: https://github.com/apache/fluss-rust/pull/302#discussion_r2798974794


##########
bindings/python/README.md:
##########
@@ -356,7 +356,7 @@ Same as non-partitioned tables — include partition column 
values in each row:
 
 ```python
 table = await conn.get_table(table_path)
-writer = await table.new_append_writer()
+writer = await table.new_append()

Review Comment:
   This partitioned-table writing snippet still treats `new_append()` as an 
async `AppendWriter` factory (`await table.new_append()`), but the API now 
returns a synchronous `TableAppend` builder. Update to `writer = 
table.new_append().create_writer()` (no `await`) before calling 
`append()`/`flush()`.
   ```suggestion
   writer = table.new_append().create_writer()
   ```



##########
crates/fluss/src/metadata/table.rs:
##########
@@ -506,13 +506,9 @@ impl TableDescriptor {
     pub fn replication_factor(&self) -> Result<i32> {
         self.properties
             .get("table.replication.factor")
-            .ok_or_else(|| InvalidTableError {
-                message: "Replication factor is not set".to_string(),
-            })?
+            .ok_or_else(|| Error::invalid_table("Replication factor is not 
set"))?
             .parse()
-            .map_err(|_e| InvalidTableError {
-                message: "Replication factor can't be convert into 
int".to_string(),
-            })
+            .map_err(|_e| Error::invalid_table("Replication factor can't be 
convert into int"))
     }

Review Comment:
   Typo in error message: "can't be convert into int" → "can't be converted 
into int" (or "cannot be converted to an int"). This string is user-facing and 
was modified in this PR.



##########
bindings/python/src/table.rs:
##########
@@ -517,6 +507,141 @@ impl FlussTable {
     }
 }
 
+/// Builder for creating an AppendWriter.
+///
+/// Obtain via `FlussTable.new_append()`, then call `create_writer()`.
+#[pyclass]
+pub struct TableAppend {
+    inner: fcore::client::TableAppend,
+    table_info: fcore::metadata::TableInfo,
+}
+
+#[pymethods]
+impl TableAppend {
+    /// Create an AppendWriter from this builder.
+    pub fn create_writer(&self) -> PyResult<AppendWriter> {
+        let rust_writer = self
+            .inner
+            .create_writer()
+            .map_err(|e| FlussError::from_core_error(&e))?;
+        Ok(AppendWriter::from_core(
+            rust_writer,
+            self.table_info.clone(),
+        ))
+    }
+
+    fn __repr__(&self) -> String {
+        "TableAppend()".to_string()
+    }
+}
+
+/// Builder for creating an UpsertWriter, with optional partial update 
configuration.
+///
+/// Obtain via `FlussTable.new_upsert()`, then optionally call
+/// `partial_update_by_name()` or `partial_update_by_index()`,
+/// then call `create_writer()`.
+#[pyclass]
+pub struct TableUpsert {
+    inner: fcore::client::TableUpsert,
+    table_info: fcore::metadata::TableInfo,
+    /// Column indices for partial updates, tracked for Python's 
dict→GenericRow conversion.
+    target_columns: Option<Vec<usize>>,
+}
+
+#[pymethods]
+impl TableUpsert {
+    /// Configure partial update by column names.
+    ///
+    /// Only the specified columns will be updated on upsert.
+    ///
+    /// Args:
+    ///     columns: List of column names to update.
+    ///
+    /// Returns:
+    ///     A new TableUpsert configured for partial update.
+    pub fn partial_update_by_name(&self, columns: Vec<String>) -> 
PyResult<TableUpsert> {
+        let col_refs: Vec<&str> = columns.iter().map(|s| s.as_str()).collect();
+        // Core validates and resolves names → indices internally
+        let updated = self
+            .inner
+            .partial_update_with_column_names(&col_refs)
+            .map_err(|e| FlussError::from_core_error(&e))?;
+        // Resolve indices for Python's row conversion layer (core validated 
names above)
+        let row_type = self.table_info.row_type();
+        let indices: Vec<usize> = columns
+            .iter()
+            .map(|name| row_type.get_field_index(name).unwrap())
+            .collect();

Review Comment:
   `row_type.get_field_index(name).unwrap()` can panic if the mapping ever 
diverges (even if core validation changes). Prefer converting the `Option` into 
a Python error (`FlussError`) instead of unwrapping.
   ```suggestion
               .map(|name| {
                   row_type
                       .get_field_index(name)
                       .ok_or_else(|| {
                           FlussError::new_err(format!(
                               "Unknown column name '{}' for partial update",
                               name
                           ))
                       })
               })
               .collect::<PyResult<Vec<usize>>>()?;
   ```



##########
docs/rust-client.md:
##########
@@ -454,7 +454,7 @@ let log_scanner = table.new_scan().create_log_scanner()?;
 // Subscribe to each partition's buckets
 for partition_info in &partitions {
     let partition_id = partition_info.get_partition_id();
-    let num_buckets = table.get_table_info().get_num_buckets();
+    let num_buckets = table.table_info().get_num_buckets();
 
     for bucket_id in 0..num_buckets {
         log_scanner.subscribe_partition(partition_id, bucket_id, 0).await?;

Review Comment:
   Same issue as above: `table.table_info()` is no longer a valid API, so this 
partitioned-table example won't compile. Use the current table-info accessor 
(e.g. `get_table_info()`).



##########
docs/rust-client.md:
##########
@@ -366,7 +366,7 @@ log_scanner.subscribe(0, LATEST_OFFSET).await?;
 log_scanner.subscribe(0, 42).await?;
 
 // Subscribe to all buckets
-let num_buckets = table.get_table_info().get_num_buckets();
+let num_buckets = table.table_info().get_num_buckets();
 for bucket_id in 0..num_buckets {
     log_scanner.subscribe(bucket_id, 0).await?;
 }

Review Comment:
   The Rust client now exposes `get_table_info()` on `FlussTable` (and 
`table_info()` no longer exists). These examples will not compile as written; 
update to `table.get_table_info().get_num_buckets()` (or the current accessor 
name used elsewhere in the crate).



##########
bindings/python/README.md:
##########
@@ -68,14 +68,14 @@ async def main():
 
     # Write
     table = await conn.get_table(table_path)
-    writer = await table.new_append_writer()
+    writer = await table.new_append()

Review Comment:
   `FlussTable.new_append()` now returns a `TableAppend` builder (non-async) 
and you must call `.create_writer()` to get an `AppendWriter`. This quick start 
still uses `await table.new_append()` and then calls `append/flush` on the 
builder, which won’t work.
   ```suggestion
       append_builder = table.new_append()
       writer = await append_builder.create_writer()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to