Copilot commented on code in PR #9:
URL: https://github.com/apache/fluss-rust/pull/9#discussion_r2431986134


##########
bindings/python/src/table.rs:
##########
@@ -0,0 +1,433 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::TOKIO_RUNTIME;
+use crate::*;
+use pyo3_async_runtimes::tokio::future_into_py;
+use std::collections::HashMap;
+use std::sync::Arc;
+
+/// Represents a Fluss table for data operations
+#[pyclass]
+pub struct FlussTable {
+    connection: Arc<fcore::client::FlussConnection>,
+    metadata: Arc<fcore::client::Metadata>,
+    table_info: fcore::metadata::TableInfo,
+    table_path: fcore::metadata::TablePath,
+    has_primary_key: bool,
+}
+
+#[pymethods]
+impl FlussTable {
+    /// Create a new append writer for the table
+    fn new_append_writer<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, 
PyAny>> {
+        let conn = self.connection.clone();
+        let metadata = self.metadata.clone();
+        let table_info = self.table_info.clone();
+
+        future_into_py(py, async move {
+            let fluss_table = fcore::client::FlussTable::new(&conn, metadata, 
table_info);
+
+            let table_append = fluss_table
+                .new_append()
+                .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+            let rust_writer = table_append.create_writer();
+
+            let py_writer = AppendWriter::from_core(rust_writer);
+
+            Python::with_gil(|py| Py::new(py, py_writer))
+        })
+    }
+
+    /// Create a new log scanner for the table
+    fn new_log_scanner<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, 
PyAny>> {
+        let conn = self.connection.clone();
+        let metadata = self.metadata.clone();
+        let table_info = self.table_info.clone();
+
+        future_into_py(py, async move {
+            let fluss_table =
+                fcore::client::FlussTable::new(&conn, metadata.clone(), 
table_info.clone());
+
+            let table_scan = fluss_table.new_scan();
+
+            let rust_scanner = table_scan.create_log_scanner();
+
+            let py_scanner = LogScanner::from_core(rust_scanner, 
table_info.clone());
+
+            Python::with_gil(|py| Py::new(py, py_scanner))
+        })
+    }
+
+    /// synchronous version of new_log_scanner
+    fn new_log_scanner_sync(&self) -> PyResult<LogScanner> {
+        let conn = self.connection.clone();
+        let metadata = self.metadata.clone();
+        let table_info = self.table_info.clone();
+
+        let rust_scanner = TOKIO_RUNTIME.block_on(async {
+            let fluss_table =
+                fcore::client::FlussTable::new(&conn, metadata.clone(), 
table_info.clone());
+
+            let table_scan = fluss_table.new_scan();
+            table_scan.create_log_scanner()
+        });
+
+        let py_scanner = LogScanner::from_core(rust_scanner, 
table_info.clone());
+
+        Ok(py_scanner)
+    }
+
+    /// Get table information
+    pub fn get_table_info(&self) -> TableInfo {
+        TableInfo::from_core(self.table_info.clone())
+    }
+
+    /// Get table path
+    pub fn get_table_path(&self) -> TablePath {
+        TablePath::from_core(self.table_path.clone())
+    }
+
+    /// Check if table has primary key
+    pub fn has_primary_key(&self) -> bool {
+        self.has_primary_key
+    }
+
+    fn __repr__(&self) -> String {
+        format!(
+            "FlussTable(path={}.{})",
+            self.table_path.database(),
+            self.table_path.table()
+        )
+    }
+}
+
+impl FlussTable {
+    /// Create a FlussTable
+    pub fn new_table(
+        connection: Arc<fcore::client::FlussConnection>,
+        metadata: Arc<fcore::client::Metadata>,
+        table_info: fcore::metadata::TableInfo,
+        table_path: fcore::metadata::TablePath,
+        has_primary_key: bool,
+    ) -> Self {
+        Self {
+            connection,
+            metadata,
+            table_info,
+            table_path,
+            has_primary_key,
+        }
+    }
+}
+
+/// Writer for appending data to a Fluss table
+#[pyclass]
+pub struct AppendWriter {
+    inner: fcore::client::AppendWriter,
+}
+
+#[pymethods]
+impl AppendWriter {
+    /// Write Arrow table data
+    pub fn write_arrow(&mut self, py: Python, table: PyObject) -> PyResult<()> 
{
+        // Convert Arrow Table to batches and write each batch
+        let batches = table.call_method0(py, "to_batches")?;
+        let batch_list: Vec<PyObject> = batches.extract(py)?;
+
+        for batch in batch_list {
+            self.write_arrow_batch(py, batch)?;
+        }
+        Ok(())
+    }
+
+    /// Write Arrow batch data
+    pub fn write_arrow_batch(&mut self, py: Python, batch: PyObject) -> 
PyResult<()> {
+        // Extract number of rows and columns from the Arrow batch
+        let num_rows: usize = batch.getattr(py, "num_rows")?.extract(py)?;
+        let num_columns: usize = batch.getattr(py, 
"num_columns")?.extract(py)?;
+
+        // Process each row in the batch
+        for row_idx in 0..num_rows {
+            let mut generic_row = fcore::row::GenericRow::new();
+
+            // Extract values for each column in this row
+            for col_idx in 0..num_columns {
+                let column = batch.call_method1(py, "column", (col_idx,))?;
+                let value = column.call_method1(py, "__getitem__", 
(row_idx,))?;
+
+                // Convert the Python value to a Datum and add to the row
+                let datum = self.convert_python_value_to_datum(py, value)?;
+                generic_row.set_field(col_idx, datum);
+            }
+
+            // Append this row using the async append method
+            TOKIO_RUNTIME.block_on(async {
+                self.inner
+                    .append(generic_row)
+                    .await
+                    .map_err(|e| FlussError::new_err(e.to_string()))
+            })?;
+        }
+
+        Ok(())
+    }
+
+    /// Write Pandas DataFrame data
+    pub fn write_pandas(&mut self, py: Python, df: PyObject) -> PyResult<()> {
+        // Import pyarrow module
+        let pyarrow = py.import("pyarrow")?;
+
+        // Get the Table class from pyarrow module
+        let table_class = pyarrow.getattr("Table")?;
+
+        // Call Table.from_pandas(df) - from_pandas is a class method
+        let pa_table = table_class.call_method1("from_pandas", (df,))?;
+
+        // Then call write_arrow with the converted table
+        self.write_arrow(py, pa_table.into())
+    }
+
+    /// Flush any pending data
+    pub fn flush(&mut self) -> PyResult<()> {
+        TOKIO_RUNTIME.block_on(async {
+            self.inner
+                .flush()
+                .await
+                .map_err(|e| FlussError::new_err(e.to_string()))
+        })
+    }
+
+    fn __repr__(&self) -> String {
+        "AppendWriter()".to_string()
+    }
+}
+
+impl AppendWriter {
+    /// Create a AppendWriter from a core append writer
+    pub fn from_core(append: fcore::client::AppendWriter) -> Self {
+        Self { inner: append }
+    }
+
+    fn convert_python_value_to_datum(
+        &self,
+        py: Python,
+        value: PyObject,
+    ) -> PyResult<fcore::row::Datum<'static>> {
+        use fcore::row::{Blob, Datum, F32, F64};
+
+        // Check for None (null)
+        if value.is_none(py) {
+            return Ok(Datum::Null);
+        }
+
+        // Try to extract different types
+        if let Ok(type_name) = value.bind(py).get_type().name() {
+            if type_name == "StringScalar" {
+                if let Ok(py_value) = value.call_method0(py, "as_py") {
+                    if let Ok(str_val) = py_value.extract::<String>(py) {
+                        let leaked_str: &'static str = 
Box::leak(str_val.into_boxed_str());
+                        return Ok(Datum::String(leaked_str));
+                    }
+                }
+            }
+        }
+
+        if let Ok(bool_val) = value.extract::<bool>(py) {
+            return Ok(Datum::Bool(bool_val));
+        }
+
+        if let Ok(int_val) = value.extract::<i32>(py) {
+            return Ok(Datum::Int32(int_val));
+        }
+
+        if let Ok(int_val) = value.extract::<i64>(py) {
+            return Ok(Datum::Int64(int_val));
+        }
+
+        if let Ok(float_val) = value.extract::<f32>(py) {
+            return Ok(Datum::Float32(F32::from(float_val)));
+        }
+
+        if let Ok(float_val) = value.extract::<f64>(py) {
+            return Ok(Datum::Float64(F64::from(float_val)));
+        }
+
+        if let Ok(str_val) = value.extract::<String>(py) {
+            // Convert String to &'static str by leaking memory
+            // This is a simplified approach - in production, you might want 
better lifetime management
+            let leaked_str: &'static str = Box::leak(str_val.into_boxed_str());

Review Comment:
   Memory leak: Using Box::leak creates intentional memory leaks for string 
lifetime management. Consider using Cow<'static, str> or a proper string pool 
to avoid accumulating leaked memory in long-running processes.



##########
bindings/python/src/table.rs:
##########
@@ -0,0 +1,433 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::TOKIO_RUNTIME;
+use crate::*;
+use pyo3_async_runtimes::tokio::future_into_py;
+use std::collections::HashMap;
+use std::sync::Arc;
+
+/// Represents a Fluss table for data operations
+#[pyclass]
+pub struct FlussTable {
+    connection: Arc<fcore::client::FlussConnection>,
+    metadata: Arc<fcore::client::Metadata>,
+    table_info: fcore::metadata::TableInfo,
+    table_path: fcore::metadata::TablePath,
+    has_primary_key: bool,
+}
+
+#[pymethods]
+impl FlussTable {
+    /// Create a new append writer for the table
+    fn new_append_writer<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, 
PyAny>> {
+        let conn = self.connection.clone();
+        let metadata = self.metadata.clone();
+        let table_info = self.table_info.clone();
+
+        future_into_py(py, async move {
+            let fluss_table = fcore::client::FlussTable::new(&conn, metadata, 
table_info);
+
+            let table_append = fluss_table
+                .new_append()
+                .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+            let rust_writer = table_append.create_writer();
+
+            let py_writer = AppendWriter::from_core(rust_writer);
+
+            Python::with_gil(|py| Py::new(py, py_writer))
+        })
+    }
+
+    /// Create a new log scanner for the table
+    fn new_log_scanner<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, 
PyAny>> {
+        let conn = self.connection.clone();
+        let metadata = self.metadata.clone();
+        let table_info = self.table_info.clone();
+
+        future_into_py(py, async move {
+            let fluss_table =
+                fcore::client::FlussTable::new(&conn, metadata.clone(), 
table_info.clone());
+
+            let table_scan = fluss_table.new_scan();
+
+            let rust_scanner = table_scan.create_log_scanner();
+
+            let py_scanner = LogScanner::from_core(rust_scanner, 
table_info.clone());
+
+            Python::with_gil(|py| Py::new(py, py_scanner))
+        })
+    }
+
+    /// synchronous version of new_log_scanner
+    fn new_log_scanner_sync(&self) -> PyResult<LogScanner> {
+        let conn = self.connection.clone();
+        let metadata = self.metadata.clone();
+        let table_info = self.table_info.clone();
+
+        let rust_scanner = TOKIO_RUNTIME.block_on(async {
+            let fluss_table =
+                fcore::client::FlussTable::new(&conn, metadata.clone(), 
table_info.clone());
+
+            let table_scan = fluss_table.new_scan();
+            table_scan.create_log_scanner()
+        });
+
+        let py_scanner = LogScanner::from_core(rust_scanner, 
table_info.clone());
+
+        Ok(py_scanner)
+    }
+
+    /// Get table information
+    pub fn get_table_info(&self) -> TableInfo {
+        TableInfo::from_core(self.table_info.clone())
+    }
+
+    /// Get table path
+    pub fn get_table_path(&self) -> TablePath {
+        TablePath::from_core(self.table_path.clone())
+    }
+
+    /// Check if table has primary key
+    pub fn has_primary_key(&self) -> bool {
+        self.has_primary_key
+    }
+
+    fn __repr__(&self) -> String {
+        format!(
+            "FlussTable(path={}.{})",
+            self.table_path.database(),
+            self.table_path.table()
+        )
+    }
+}
+
+impl FlussTable {
+    /// Create a FlussTable
+    pub fn new_table(
+        connection: Arc<fcore::client::FlussConnection>,
+        metadata: Arc<fcore::client::Metadata>,
+        table_info: fcore::metadata::TableInfo,
+        table_path: fcore::metadata::TablePath,
+        has_primary_key: bool,
+    ) -> Self {
+        Self {
+            connection,
+            metadata,
+            table_info,
+            table_path,
+            has_primary_key,
+        }
+    }
+}
+
+/// Writer for appending data to a Fluss table
+#[pyclass]
+pub struct AppendWriter {
+    inner: fcore::client::AppendWriter,
+}
+
+#[pymethods]
+impl AppendWriter {
+    /// Write Arrow table data
+    pub fn write_arrow(&mut self, py: Python, table: PyObject) -> PyResult<()> 
{
+        // Convert Arrow Table to batches and write each batch
+        let batches = table.call_method0(py, "to_batches")?;
+        let batch_list: Vec<PyObject> = batches.extract(py)?;
+
+        for batch in batch_list {
+            self.write_arrow_batch(py, batch)?;
+        }
+        Ok(())
+    }
+
+    /// Write Arrow batch data
+    pub fn write_arrow_batch(&mut self, py: Python, batch: PyObject) -> 
PyResult<()> {
+        // Extract number of rows and columns from the Arrow batch
+        let num_rows: usize = batch.getattr(py, "num_rows")?.extract(py)?;
+        let num_columns: usize = batch.getattr(py, 
"num_columns")?.extract(py)?;
+
+        // Process each row in the batch
+        for row_idx in 0..num_rows {
+            let mut generic_row = fcore::row::GenericRow::new();
+
+            // Extract values for each column in this row
+            for col_idx in 0..num_columns {
+                let column = batch.call_method1(py, "column", (col_idx,))?;
+                let value = column.call_method1(py, "__getitem__", 
(row_idx,))?;
+
+                // Convert the Python value to a Datum and add to the row
+                let datum = self.convert_python_value_to_datum(py, value)?;
+                generic_row.set_field(col_idx, datum);
+            }
+
+            // Append this row using the async append method
+            TOKIO_RUNTIME.block_on(async {
+                self.inner
+                    .append(generic_row)
+                    .await
+                    .map_err(|e| FlussError::new_err(e.to_string()))
+            })?;
+        }
+
+        Ok(())
+    }
+
+    /// Write Pandas DataFrame data
+    pub fn write_pandas(&mut self, py: Python, df: PyObject) -> PyResult<()> {
+        // Import pyarrow module
+        let pyarrow = py.import("pyarrow")?;
+
+        // Get the Table class from pyarrow module
+        let table_class = pyarrow.getattr("Table")?;
+
+        // Call Table.from_pandas(df) - from_pandas is a class method
+        let pa_table = table_class.call_method1("from_pandas", (df,))?;
+
+        // Then call write_arrow with the converted table
+        self.write_arrow(py, pa_table.into())
+    }
+
+    /// Flush any pending data
+    pub fn flush(&mut self) -> PyResult<()> {
+        TOKIO_RUNTIME.block_on(async {
+            self.inner
+                .flush()
+                .await
+                .map_err(|e| FlussError::new_err(e.to_string()))
+        })
+    }
+
+    fn __repr__(&self) -> String {
+        "AppendWriter()".to_string()
+    }
+}
+
+impl AppendWriter {
+    /// Create a AppendWriter from a core append writer
+    pub fn from_core(append: fcore::client::AppendWriter) -> Self {
+        Self { inner: append }
+    }
+
+    fn convert_python_value_to_datum(
+        &self,
+        py: Python,
+        value: PyObject,
+    ) -> PyResult<fcore::row::Datum<'static>> {
+        use fcore::row::{Blob, Datum, F32, F64};
+
+        // Check for None (null)
+        if value.is_none(py) {
+            return Ok(Datum::Null);
+        }
+
+        // Try to extract different types
+        if let Ok(type_name) = value.bind(py).get_type().name() {
+            if type_name == "StringScalar" {
+                if let Ok(py_value) = value.call_method0(py, "as_py") {
+                    if let Ok(str_val) = py_value.extract::<String>(py) {
+                        let leaked_str: &'static str = 
Box::leak(str_val.into_boxed_str());

Review Comment:
   Memory leak: Using Box::leak creates intentional memory leaks for string 
lifetime management. Consider using Cow<'static, str> or a proper string pool 
to avoid accumulating leaked memory in long-running processes.



##########
bindings/python/src/metadata.rs:
##########
@@ -509,7 +530,7 @@ impl TableBucket {
 
     /// Convert to core TableBucket (internal use)
     pub fn to_core(&self) -> fcore::metadata::TableBucket {
-        fcore::metadata::TableBucket::new(self.table_id, self.partition_id, 
self.bucket)
+        fcore::metadata::TableBucket::new(self.table_id, self.bucket)

Review Comment:
   Missing partition_id parameter in TableBucket::new call. The struct has a 
partition_id field but it's not being passed to the constructor, which may 
cause data loss or incorrect bucket identification.
   ```suggestion
           fcore::metadata::TableBucket::new(self.table_id, self.partition_id, 
self.bucket)
   ```



##########
crates/fluss/src/record/mod.rs:
##########
@@ -158,6 +158,10 @@ impl ScanRecords {
     pub fn is_empty(&self) -> bool {
         self.records.is_empty()
     }
+
+    pub fn buckets(&self) -> &HashMap<TableBucket, Vec<ScanRecord>> {

Review Comment:
   [nitpick] Method name 'buckets' is ambiguous. Consider renaming to 
'records_by_bucket' or 'bucket_records' to clearly indicate it returns a 
mapping of buckets to their records.
   ```suggestion
       pub fn records_by_bucket(&self) -> &HashMap<TableBucket, 
Vec<ScanRecord>> {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to