luoyuxia commented on code in PR #9:
URL: https://github.com/apache/fluss-rust/pull/9#discussion_r2432355293
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -102,6 +105,18 @@ impl LogScanner {
Ok(())
}
+ pub async fn list_offsets_latest(&self, bucket_ids: Vec<i32>) ->
Result<HashMap<i32, i64>> {
+ let cluster = self.metadata.get_cluster();
+ let tablet_server = cluster.get_one_available_server();
Review Comment:
we can't just get one available_server. we should get the leader server to
send list offsets request. Refer to java code for details. The quick fix is to
iterate all bucket_ids one by one and find the leader server, and then send
request one by one.
##########
bindings/python/src/table.rs:
##########
@@ -0,0 +1,433 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::TOKIO_RUNTIME;
+use crate::*;
+use pyo3_async_runtimes::tokio::future_into_py;
+use std::collections::HashMap;
+use std::sync::Arc;
+
+/// Represents a Fluss table for data operations
+#[pyclass]
+pub struct FlussTable {
+ connection: Arc<fcore::client::FlussConnection>,
+ metadata: Arc<fcore::client::Metadata>,
+ table_info: fcore::metadata::TableInfo,
+ table_path: fcore::metadata::TablePath,
+ has_primary_key: bool,
+}
+
+#[pymethods]
+impl FlussTable {
+ /// Create a new append writer for the table
+ fn new_append_writer<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py,
PyAny>> {
+ let conn = self.connection.clone();
+ let metadata = self.metadata.clone();
+ let table_info = self.table_info.clone();
+
+ future_into_py(py, async move {
+ let fluss_table = fcore::client::FlussTable::new(&conn, metadata,
table_info);
+
+ let table_append = fluss_table
+ .new_append()
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+ let rust_writer = table_append.create_writer();
+
+ let py_writer = AppendWriter::from_core(rust_writer);
+
+ Python::with_gil(|py| Py::new(py, py_writer))
+ })
+ }
+
+ /// Create a new log scanner for the table
+ fn new_log_scanner<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py,
PyAny>> {
+ let conn = self.connection.clone();
+ let metadata = self.metadata.clone();
+ let table_info = self.table_info.clone();
+
+ future_into_py(py, async move {
+ let fluss_table =
+ fcore::client::FlussTable::new(&conn, metadata.clone(),
table_info.clone());
+
+ let table_scan = fluss_table.new_scan();
+
+ let rust_scanner = table_scan.create_log_scanner();
+
+ let py_scanner = LogScanner::from_core(rust_scanner,
table_info.clone());
+
+ Python::with_gil(|py| Py::new(py, py_scanner))
+ })
+ }
+
+ /// synchronous version of new_log_scanner
+ fn new_log_scanner_sync(&self) -> PyResult<LogScanner> {
+ let conn = self.connection.clone();
+ let metadata = self.metadata.clone();
+ let table_info = self.table_info.clone();
+
+ let rust_scanner = TOKIO_RUNTIME.block_on(async {
+ let fluss_table =
+ fcore::client::FlussTable::new(&conn, metadata.clone(),
table_info.clone());
+
+ let table_scan = fluss_table.new_scan();
+ table_scan.create_log_scanner()
+ });
+
+ let py_scanner = LogScanner::from_core(rust_scanner,
table_info.clone());
+
+ Ok(py_scanner)
+ }
+
+ /// Get table information
+ pub fn get_table_info(&self) -> TableInfo {
+ TableInfo::from_core(self.table_info.clone())
+ }
+
+ /// Get table path
+ pub fn get_table_path(&self) -> TablePath {
+ TablePath::from_core(self.table_path.clone())
+ }
+
+ /// Check if table has primary key
+ pub fn has_primary_key(&self) -> bool {
+ self.has_primary_key
+ }
+
+ fn __repr__(&self) -> String {
+ format!(
+ "FlussTable(path={}.{})",
+ self.table_path.database(),
+ self.table_path.table()
+ )
+ }
+}
+
+impl FlussTable {
+ /// Create a FlussTable
+ pub fn new_table(
+ connection: Arc<fcore::client::FlussConnection>,
+ metadata: Arc<fcore::client::Metadata>,
+ table_info: fcore::metadata::TableInfo,
+ table_path: fcore::metadata::TablePath,
+ has_primary_key: bool,
+ ) -> Self {
+ Self {
+ connection,
+ metadata,
+ table_info,
+ table_path,
+ has_primary_key,
+ }
+ }
+}
+
+/// Writer for appending data to a Fluss table
+#[pyclass]
+pub struct AppendWriter {
+ inner: fcore::client::AppendWriter,
+}
+
+#[pymethods]
+impl AppendWriter {
+ /// Write Arrow table data
+ pub fn write_arrow(&mut self, py: Python, table: PyObject) -> PyResult<()>
{
+ // Convert Arrow Table to batches and write each batch
+ let batches = table.call_method0(py, "to_batches")?;
+ let batch_list: Vec<PyObject> = batches.extract(py)?;
+
+ for batch in batch_list {
+ self.write_arrow_batch(py, batch)?;
+ }
+ Ok(())
+ }
+
+ /// Write Arrow batch data
+ pub fn write_arrow_batch(&mut self, py: Python, batch: PyObject) ->
PyResult<()> {
+ // Extract number of rows and columns from the Arrow batch
+ let num_rows: usize = batch.getattr(py, "num_rows")?.extract(py)?;
+ let num_columns: usize = batch.getattr(py,
"num_columns")?.extract(py)?;
+
+ // Process each row in the batch
+ for row_idx in 0..num_rows {
+ let mut generic_row = fcore::row::GenericRow::new();
+
+ // Extract values for each column in this row
+ for col_idx in 0..num_columns {
+ let column = batch.call_method1(py, "column", (col_idx,))?;
+ let value = column.call_method1(py, "__getitem__",
(row_idx,))?;
+
+ // Convert the Python value to a Datum and add to the row
+ let datum = self.convert_python_value_to_datum(py, value)?;
+ generic_row.set_field(col_idx, datum);
+ }
+
+ // Append this row using the async append method
+ TOKIO_RUNTIME.block_on(async {
+ self.inner
+ .append(generic_row)
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))
+ })?;
+ }
+
+ Ok(())
+ }
+
+ /// Write Pandas DataFrame data
+ pub fn write_pandas(&mut self, py: Python, df: PyObject) -> PyResult<()> {
+ // Import pyarrow module
+ let pyarrow = py.import("pyarrow")?;
+
+ // Get the Table class from pyarrow module
+ let table_class = pyarrow.getattr("Table")?;
+
+ // Call Table.from_pandas(df) - from_pandas is a class method
+ let pa_table = table_class.call_method1("from_pandas", (df,))?;
+
+ // Then call write_arrow with the converted table
+ self.write_arrow(py, pa_table.into())
+ }
+
+ /// Flush any pending data
+ pub fn flush(&mut self) -> PyResult<()> {
+ TOKIO_RUNTIME.block_on(async {
+ self.inner
+ .flush()
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))
+ })
+ }
+
+ fn __repr__(&self) -> String {
+ "AppendWriter()".to_string()
+ }
+}
+
+impl AppendWriter {
+ /// Create a AppendWriter from a core append writer
+ pub fn from_core(append: fcore::client::AppendWriter) -> Self {
+ Self { inner: append }
+ }
+
+ fn convert_python_value_to_datum(
+ &self,
+ py: Python,
+ value: PyObject,
+ ) -> PyResult<fcore::row::Datum<'static>> {
+ use fcore::row::{Blob, Datum, F32, F64};
+
+ // Check for None (null)
+ if value.is_none(py) {
+ return Ok(Datum::Null);
+ }
+
+ // Try to extract different types
+ if let Ok(type_name) = value.bind(py).get_type().name() {
+ if type_name == "StringScalar" {
+ if let Ok(py_value) = value.call_method0(py, "as_py") {
+ if let Ok(str_val) = py_value.extract::<String>(py) {
+ let leaked_str: &'static str =
Box::leak(str_val.into_boxed_str());
+ return Ok(Datum::String(leaked_str));
+ }
+ }
+ }
+ }
+
+ if let Ok(bool_val) = value.extract::<bool>(py) {
+ return Ok(Datum::Bool(bool_val));
+ }
+
+ if let Ok(int_val) = value.extract::<i32>(py) {
+ return Ok(Datum::Int32(int_val));
+ }
+
+ if let Ok(int_val) = value.extract::<i64>(py) {
+ return Ok(Datum::Int64(int_val));
+ }
+
+ if let Ok(float_val) = value.extract::<f32>(py) {
+ return Ok(Datum::Float32(F32::from(float_val)));
+ }
+
+ if let Ok(float_val) = value.extract::<f64>(py) {
+ return Ok(Datum::Float64(F64::from(float_val)));
+ }
+
+ if let Ok(str_val) = value.extract::<String>(py) {
+ // Convert String to &'static str by leaking memory
+ // This is a simplified approach - in production, you might want
better lifetime management
+ let leaked_str: &'static str = Box::leak(str_val.into_boxed_str());
+ return Ok(Datum::String(leaked_str));
+ }
+
+ if let Ok(bytes_val) = value.extract::<Vec<u8>>(py) {
+ let blob = Blob::from(bytes_val);
+ return Ok(Datum::Blob(blob));
+ }
+
+ // If we can't convert, return an error
+ let type_name = value.bind(py).get_type().name()?;
+ Err(FlussError::new_err(format!(
+ "Cannot convert Python value to Datum: {:?}",
+ type_name
+ )))
+ }
+}
+
+/// Scanner for reading log data from a Fluss table
+#[pyclass]
+pub struct LogScanner {
+ inner: fcore::client::LogScanner,
+ table_info: fcore::metadata::TableInfo,
+ #[allow(dead_code)]
+ start_timestamp: Option<i64>,
+ #[allow(dead_code)]
+ end_timestamp: Option<i64>,
+}
+
+#[pymethods]
+impl LogScanner {
+ /// Subscribe to log data with timestamp range
+ fn subscribe(
+ &mut self,
+ _start_timestamp: Option<i64>,
+ _end_timestamp: Option<i64>,
+ ) -> PyResult<()> {
+ if _start_timestamp.is_some() {
+ return Err(FlussError::new_err(
+ "Specifying start_timestamp is not yet supported. Please use
None.".to_string(),
+ ));
+ }
+ if _end_timestamp.is_some() {
+ return Err(FlussError::new_err(
+ "Specifying end_timestamp is not yet supported. Please use
None.".to_string(),
+ ));
+ }
+
+ let num_buckets = self.table_info.get_num_buckets();
+ for bucket_id in 0..num_buckets {
+ // -2 for the earliest offset.
+ let start_offset = -2;
+
+ TOKIO_RUNTIME.block_on(async {
+ self.inner
+ .subscribe(bucket_id, start_offset)
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))
+ })?;
+ }
+
+ Ok(())
+ }
+
+ /// Convert all data to Arrow Table
+ fn to_arrow(&self, py: Python) -> PyResult<PyObject> {
+ use std::collections::HashMap;
+ use std::time::Duration;
+
+ let mut all_batches = Vec::new();
+
+ let num_buckets = self.table_info.get_num_buckets();
+ let bucket_ids: Vec<i32> = (0..num_buckets).collect();
+
+ // todo: after supporting list_offsets with timestamp, we can use
start_timestamp and end_timestamp here
+ let target_offsets: HashMap<i32, i64> = if !bucket_ids.is_empty() {
+ TOKIO_RUNTIME
+ .block_on(async {
self.inner.list_offsets_latest(bucket_ids).await })
+ .map_err(|e| FlussError::new_err(e.to_string()))?
+ } else {
+ HashMap::new()
+ };
+
+ let mut current_offsets: HashMap<i32, i64> = HashMap::new();
+
+ if !target_offsets.is_empty() {
+ loop {
+ let batch_result = TOKIO_RUNTIME
+ .block_on(async {
self.inner.poll(Duration::from_millis(1000)).await });
Review Comment:
```suggestion
.block_on(async {
self.inner.poll(Duration::from_millis(500)).await });
```
to make it faster
##########
bindings/python/src/table.rs:
##########
@@ -0,0 +1,433 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::TOKIO_RUNTIME;
+use crate::*;
+use pyo3_async_runtimes::tokio::future_into_py;
+use std::collections::HashMap;
+use std::sync::Arc;
+
+/// Represents a Fluss table for data operations
+#[pyclass]
+pub struct FlussTable {
+ connection: Arc<fcore::client::FlussConnection>,
+ metadata: Arc<fcore::client::Metadata>,
+ table_info: fcore::metadata::TableInfo,
+ table_path: fcore::metadata::TablePath,
+ has_primary_key: bool,
+}
+
+#[pymethods]
+impl FlussTable {
+ /// Create a new append writer for the table
+ fn new_append_writer<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py,
PyAny>> {
+ let conn = self.connection.clone();
+ let metadata = self.metadata.clone();
+ let table_info = self.table_info.clone();
+
+ future_into_py(py, async move {
+ let fluss_table = fcore::client::FlussTable::new(&conn, metadata,
table_info);
+
+ let table_append = fluss_table
+ .new_append()
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+ let rust_writer = table_append.create_writer();
+
+ let py_writer = AppendWriter::from_core(rust_writer);
+
+ Python::with_gil(|py| Py::new(py, py_writer))
+ })
+ }
+
+ /// Create a new log scanner for the table
+ fn new_log_scanner<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py,
PyAny>> {
+ let conn = self.connection.clone();
+ let metadata = self.metadata.clone();
+ let table_info = self.table_info.clone();
+
+ future_into_py(py, async move {
+ let fluss_table =
+ fcore::client::FlussTable::new(&conn, metadata.clone(),
table_info.clone());
+
+ let table_scan = fluss_table.new_scan();
+
+ let rust_scanner = table_scan.create_log_scanner();
+
+ let py_scanner = LogScanner::from_core(rust_scanner,
table_info.clone());
+
+ Python::with_gil(|py| Py::new(py, py_scanner))
+ })
+ }
+
+ /// synchronous version of new_log_scanner
+ fn new_log_scanner_sync(&self) -> PyResult<LogScanner> {
+ let conn = self.connection.clone();
+ let metadata = self.metadata.clone();
+ let table_info = self.table_info.clone();
+
+ let rust_scanner = TOKIO_RUNTIME.block_on(async {
+ let fluss_table =
+ fcore::client::FlussTable::new(&conn, metadata.clone(),
table_info.clone());
+
+ let table_scan = fluss_table.new_scan();
+ table_scan.create_log_scanner()
+ });
+
+ let py_scanner = LogScanner::from_core(rust_scanner,
table_info.clone());
+
+ Ok(py_scanner)
+ }
+
+ /// Get table information
+ pub fn get_table_info(&self) -> TableInfo {
+ TableInfo::from_core(self.table_info.clone())
+ }
+
+ /// Get table path
+ pub fn get_table_path(&self) -> TablePath {
+ TablePath::from_core(self.table_path.clone())
+ }
+
+ /// Check if table has primary key
+ pub fn has_primary_key(&self) -> bool {
+ self.has_primary_key
+ }
+
+ fn __repr__(&self) -> String {
+ format!(
+ "FlussTable(path={}.{})",
+ self.table_path.database(),
+ self.table_path.table()
+ )
+ }
+}
+
+impl FlussTable {
+ /// Create a FlussTable
+ pub fn new_table(
+ connection: Arc<fcore::client::FlussConnection>,
+ metadata: Arc<fcore::client::Metadata>,
+ table_info: fcore::metadata::TableInfo,
+ table_path: fcore::metadata::TablePath,
+ has_primary_key: bool,
+ ) -> Self {
+ Self {
+ connection,
+ metadata,
+ table_info,
+ table_path,
+ has_primary_key,
+ }
+ }
+}
+
+/// Writer for appending data to a Fluss table
+#[pyclass]
+pub struct AppendWriter {
+ inner: fcore::client::AppendWriter,
+}
+
+#[pymethods]
+impl AppendWriter {
+ /// Write Arrow table data
+ pub fn write_arrow(&mut self, py: Python, table: PyObject) -> PyResult<()>
{
+ // Convert Arrow Table to batches and write each batch
+ let batches = table.call_method0(py, "to_batches")?;
+ let batch_list: Vec<PyObject> = batches.extract(py)?;
+
+ for batch in batch_list {
+ self.write_arrow_batch(py, batch)?;
+ }
+ Ok(())
+ }
+
+ /// Write Arrow batch data
+ pub fn write_arrow_batch(&mut self, py: Python, batch: PyObject) ->
PyResult<()> {
+ // Extract number of rows and columns from the Arrow batch
+ let num_rows: usize = batch.getattr(py, "num_rows")?.extract(py)?;
+ let num_columns: usize = batch.getattr(py,
"num_columns")?.extract(py)?;
+
+ // Process each row in the batch
+ for row_idx in 0..num_rows {
+ let mut generic_row = fcore::row::GenericRow::new();
+
+ // Extract values for each column in this row
+ for col_idx in 0..num_columns {
+ let column = batch.call_method1(py, "column", (col_idx,))?;
+ let value = column.call_method1(py, "__getitem__",
(row_idx,))?;
+
+ // Convert the Python value to a Datum and add to the row
+ let datum = self.convert_python_value_to_datum(py, value)?;
+ generic_row.set_field(col_idx, datum);
+ }
+
+ // Append this row using the async append method
+ TOKIO_RUNTIME.block_on(async {
+ self.inner
+ .append(generic_row)
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))
+ })?;
+ }
+
+ Ok(())
+ }
+
+ /// Write Pandas DataFrame data
+ pub fn write_pandas(&mut self, py: Python, df: PyObject) -> PyResult<()> {
+ // Import pyarrow module
+ let pyarrow = py.import("pyarrow")?;
+
+ // Get the Table class from pyarrow module
+ let table_class = pyarrow.getattr("Table")?;
+
+ // Call Table.from_pandas(df) - from_pandas is a class method
+ let pa_table = table_class.call_method1("from_pandas", (df,))?;
+
+ // Then call write_arrow with the converted table
+ self.write_arrow(py, pa_table.into())
+ }
+
+ /// Flush any pending data
+ pub fn flush(&mut self) -> PyResult<()> {
+ TOKIO_RUNTIME.block_on(async {
+ self.inner
+ .flush()
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))
+ })
+ }
+
+ fn __repr__(&self) -> String {
+ "AppendWriter()".to_string()
+ }
+}
+
+impl AppendWriter {
+ /// Create a AppendWriter from a core append writer
+ pub fn from_core(append: fcore::client::AppendWriter) -> Self {
+ Self { inner: append }
+ }
+
+ fn convert_python_value_to_datum(
+ &self,
+ py: Python,
+ value: PyObject,
+ ) -> PyResult<fcore::row::Datum<'static>> {
+ use fcore::row::{Blob, Datum, F32, F64};
+
+ // Check for None (null)
+ if value.is_none(py) {
+ return Ok(Datum::Null);
+ }
+
+ // Try to extract different types
+ if let Ok(type_name) = value.bind(py).get_type().name() {
+ if type_name == "StringScalar" {
+ if let Ok(py_value) = value.call_method0(py, "as_py") {
+ if let Ok(str_val) = py_value.extract::<String>(py) {
+ let leaked_str: &'static str =
Box::leak(str_val.into_boxed_str());
+ return Ok(Datum::String(leaked_str));
+ }
+ }
+ }
+ }
+
+ if let Ok(bool_val) = value.extract::<bool>(py) {
+ return Ok(Datum::Bool(bool_val));
+ }
+
+ if let Ok(int_val) = value.extract::<i32>(py) {
+ return Ok(Datum::Int32(int_val));
+ }
+
+ if let Ok(int_val) = value.extract::<i64>(py) {
+ return Ok(Datum::Int64(int_val));
+ }
+
+ if let Ok(float_val) = value.extract::<f32>(py) {
+ return Ok(Datum::Float32(F32::from(float_val)));
+ }
+
+ if let Ok(float_val) = value.extract::<f64>(py) {
+ return Ok(Datum::Float64(F64::from(float_val)));
+ }
+
+ if let Ok(str_val) = value.extract::<String>(py) {
+ // Convert String to &'static str by leaking memory
+ // This is a simplified approach - in production, you might want
better lifetime management
+ let leaked_str: &'static str = Box::leak(str_val.into_boxed_str());
+ return Ok(Datum::String(leaked_str));
+ }
+
+ if let Ok(bytes_val) = value.extract::<Vec<u8>>(py) {
+ let blob = Blob::from(bytes_val);
+ return Ok(Datum::Blob(blob));
+ }
+
+ // If we can't convert, return an error
+ let type_name = value.bind(py).get_type().name()?;
+ Err(FlussError::new_err(format!(
+ "Cannot convert Python value to Datum: {:?}",
+ type_name
+ )))
+ }
+}
+
+/// Scanner for reading log data from a Fluss table
+#[pyclass]
+pub struct LogScanner {
+ inner: fcore::client::LogScanner,
+ table_info: fcore::metadata::TableInfo,
+ #[allow(dead_code)]
+ start_timestamp: Option<i64>,
+ #[allow(dead_code)]
+ end_timestamp: Option<i64>,
+}
+
+#[pymethods]
+impl LogScanner {
+ /// Subscribe to log data with timestamp range
+ fn subscribe(
+ &mut self,
+ _start_timestamp: Option<i64>,
+ _end_timestamp: Option<i64>,
+ ) -> PyResult<()> {
+ if _start_timestamp.is_some() {
+ return Err(FlussError::new_err(
+ "Specifying start_timestamp is not yet supported. Please use
None.".to_string(),
+ ));
+ }
+ if _end_timestamp.is_some() {
+ return Err(FlussError::new_err(
+ "Specifying end_timestamp is not yet supported. Please use
None.".to_string(),
+ ));
+ }
+
+ let num_buckets = self.table_info.get_num_buckets();
+ for bucket_id in 0..num_buckets {
+ // -2 for the earliest offset.
+ let start_offset = -2;
+
+ TOKIO_RUNTIME.block_on(async {
+ self.inner
+ .subscribe(bucket_id, start_offset)
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))
+ })?;
+ }
+
+ Ok(())
+ }
+
+ /// Convert all data to Arrow Table
+ fn to_arrow(&self, py: Python) -> PyResult<PyObject> {
+ use std::collections::HashMap;
+ use std::time::Duration;
+
+ let mut all_batches = Vec::new();
+
+ let num_buckets = self.table_info.get_num_buckets();
+ let bucket_ids: Vec<i32> = (0..num_buckets).collect();
+
+ // todo: after supporting list_offsets with timestamp, we can use
start_timestamp and end_timestamp here
+ let target_offsets: HashMap<i32, i64> = if !bucket_ids.is_empty() {
+ TOKIO_RUNTIME
+ .block_on(async {
self.inner.list_offsets_latest(bucket_ids).await })
+ .map_err(|e| FlussError::new_err(e.to_string()))?
+ } else {
+ HashMap::new()
+ };
+
+ let mut current_offsets: HashMap<i32, i64> = HashMap::new();
+
+ if !target_offsets.is_empty() {
+ loop {
+ let batch_result = TOKIO_RUNTIME
+ .block_on(async {
self.inner.poll(Duration::from_millis(1000)).await });
+
+ match batch_result {
+ Ok(scan_records) => {
+ if !scan_records.is_empty() {
+ for (bucket, records) in
scan_records.buckets().iter() {
+ if let Some(last_record) = records.last() {
+ let max_offset_in_batch =
last_record.offset();
+ let entry =
+
current_offsets.entry(bucket.bucket_id()).or_insert(0);
+ *entry = (*entry).max(max_offset_in_batch);
+ }
+ }
+
+ let arrow_batch =
Utils::convert_scan_records_to_arrow(scan_records);
+ all_batches.extend(arrow_batch);
+ }
+
+ if Self::check_if_done(&target_offsets,
¤t_offsets) {
+ break;
+ }
+ }
+ Err(e) => return Err(FlussError::new_err(e.to_string())),
+ }
+ }
+ }
+
+ Utils::combine_batches_to_table(py, all_batches)
+ }
+
+ /// Convert all data to Pandas DataFrame
+ fn to_pandas(&self, py: Python) -> PyResult<PyObject> {
+ let arrow_table = self.to_arrow(py)?;
+
+ // Convert Arrow Table to Pandas DataFrame using pyarrow
+ let df = arrow_table.call_method0(py, "to_pandas")?;
+ Ok(df)
+ }
+
+ fn __repr__(&self) -> String {
+ format!("LogScanner(table={})", self.table_info.table_path)
+ }
+}
+
+impl LogScanner {
+ /// Create LogScanner from core LogScanner
+ pub fn from_core(
+ inner: fcore::client::LogScanner,
+ table_info: fcore::metadata::TableInfo,
+ ) -> Self {
+ Self {
+ inner,
+ table_info,
+ start_timestamp: None,
+ end_timestamp: None,
+ }
+ }
+
+ #[allow(clippy::unnecessary_map_or)]
Review Comment:
nit:
```
fn check_if_done(
target_offsets: &HashMap<i32, i64>,
current_offsets: &HashMap<i32, i64>,
) -> bool {
target_offsets.iter().all(|(bucket_id, &target_offset)| {
matches!(
current_offsets.get(bucket_id).copied(),
Some(current_offset) if current_offset >= target_offset - 1
)
})
}
```
##########
bindings/python/src/table.rs:
##########
@@ -0,0 +1,433 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::TOKIO_RUNTIME;
+use crate::*;
+use pyo3_async_runtimes::tokio::future_into_py;
+use std::collections::HashMap;
+use std::sync::Arc;
+
+/// Represents a Fluss table for data operations
+#[pyclass]
+pub struct FlussTable {
+ connection: Arc<fcore::client::FlussConnection>,
+ metadata: Arc<fcore::client::Metadata>,
+ table_info: fcore::metadata::TableInfo,
+ table_path: fcore::metadata::TablePath,
+ has_primary_key: bool,
+}
+
+#[pymethods]
+impl FlussTable {
+ /// Create a new append writer for the table
+ fn new_append_writer<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py,
PyAny>> {
+ let conn = self.connection.clone();
+ let metadata = self.metadata.clone();
+ let table_info = self.table_info.clone();
+
+ future_into_py(py, async move {
+ let fluss_table = fcore::client::FlussTable::new(&conn, metadata,
table_info);
+
+ let table_append = fluss_table
+ .new_append()
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+ let rust_writer = table_append.create_writer();
+
+ let py_writer = AppendWriter::from_core(rust_writer);
+
+ Python::with_gil(|py| Py::new(py, py_writer))
+ })
+ }
+
+ /// Create a new log scanner for the table
+ fn new_log_scanner<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py,
PyAny>> {
+ let conn = self.connection.clone();
+ let metadata = self.metadata.clone();
+ let table_info = self.table_info.clone();
+
+ future_into_py(py, async move {
+ let fluss_table =
+ fcore::client::FlussTable::new(&conn, metadata.clone(),
table_info.clone());
+
+ let table_scan = fluss_table.new_scan();
+
+ let rust_scanner = table_scan.create_log_scanner();
+
+ let py_scanner = LogScanner::from_core(rust_scanner,
table_info.clone());
+
+ Python::with_gil(|py| Py::new(py, py_scanner))
+ })
+ }
+
+ /// synchronous version of new_log_scanner
+ fn new_log_scanner_sync(&self) -> PyResult<LogScanner> {
+ let conn = self.connection.clone();
+ let metadata = self.metadata.clone();
+ let table_info = self.table_info.clone();
+
+ let rust_scanner = TOKIO_RUNTIME.block_on(async {
+ let fluss_table =
+ fcore::client::FlussTable::new(&conn, metadata.clone(),
table_info.clone());
+
+ let table_scan = fluss_table.new_scan();
+ table_scan.create_log_scanner()
+ });
+
+ let py_scanner = LogScanner::from_core(rust_scanner,
table_info.clone());
+
+ Ok(py_scanner)
+ }
+
+ /// Get table information
+ pub fn get_table_info(&self) -> TableInfo {
+ TableInfo::from_core(self.table_info.clone())
+ }
+
+ /// Get table path
+ pub fn get_table_path(&self) -> TablePath {
+ TablePath::from_core(self.table_path.clone())
+ }
+
+ /// Check if table has primary key
+ pub fn has_primary_key(&self) -> bool {
+ self.has_primary_key
+ }
+
+ fn __repr__(&self) -> String {
+ format!(
+ "FlussTable(path={}.{})",
+ self.table_path.database(),
+ self.table_path.table()
+ )
+ }
+}
+
+impl FlussTable {
+ /// Create a FlussTable
+ pub fn new_table(
+ connection: Arc<fcore::client::FlussConnection>,
+ metadata: Arc<fcore::client::Metadata>,
+ table_info: fcore::metadata::TableInfo,
+ table_path: fcore::metadata::TablePath,
+ has_primary_key: bool,
+ ) -> Self {
+ Self {
+ connection,
+ metadata,
+ table_info,
+ table_path,
+ has_primary_key,
+ }
+ }
+}
+
+/// Writer for appending data to a Fluss table
+#[pyclass]
+pub struct AppendWriter {
+ inner: fcore::client::AppendWriter,
+}
+
+#[pymethods]
+impl AppendWriter {
+ /// Write Arrow table data
+ pub fn write_arrow(&mut self, py: Python, table: PyObject) -> PyResult<()>
{
+ // Convert Arrow Table to batches and write each batch
+ let batches = table.call_method0(py, "to_batches")?;
+ let batch_list: Vec<PyObject> = batches.extract(py)?;
+
+ for batch in batch_list {
+ self.write_arrow_batch(py, batch)?;
+ }
+ Ok(())
+ }
+
+ /// Write Arrow batch data
+ pub fn write_arrow_batch(&mut self, py: Python, batch: PyObject) ->
PyResult<()> {
+ // Extract number of rows and columns from the Arrow batch
+ let num_rows: usize = batch.getattr(py, "num_rows")?.extract(py)?;
+ let num_columns: usize = batch.getattr(py,
"num_columns")?.extract(py)?;
+
+ // Process each row in the batch
+ for row_idx in 0..num_rows {
+ let mut generic_row = fcore::row::GenericRow::new();
+
+ // Extract values for each column in this row
+ for col_idx in 0..num_columns {
+ let column = batch.call_method1(py, "column", (col_idx,))?;
+ let value = column.call_method1(py, "__getitem__",
(row_idx,))?;
+
+ // Convert the Python value to a Datum and add to the row
+ let datum = self.convert_python_value_to_datum(py, value)?;
+ generic_row.set_field(col_idx, datum);
+ }
+
+ // Append this row using the async append method
+ TOKIO_RUNTIME.block_on(async {
+ self.inner
+ .append(generic_row)
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))
+ })?;
+ }
+
+ Ok(())
+ }
+
+ /// Write Pandas DataFrame data
+ pub fn write_pandas(&mut self, py: Python, df: PyObject) -> PyResult<()> {
+ // Import pyarrow module
+ let pyarrow = py.import("pyarrow")?;
+
+ // Get the Table class from pyarrow module
+ let table_class = pyarrow.getattr("Table")?;
+
+ // Call Table.from_pandas(df) - from_pandas is a class method
+ let pa_table = table_class.call_method1("from_pandas", (df,))?;
+
+ // Then call write_arrow with the converted table
+ self.write_arrow(py, pa_table.into())
+ }
+
+ /// Flush any pending data
+ pub fn flush(&mut self) -> PyResult<()> {
+ TOKIO_RUNTIME.block_on(async {
+ self.inner
+ .flush()
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))
+ })
+ }
+
+ fn __repr__(&self) -> String {
+ "AppendWriter()".to_string()
+ }
+}
+
+impl AppendWriter {
+ /// Create a AppendWriter from a core append writer
+ pub fn from_core(append: fcore::client::AppendWriter) -> Self {
+ Self { inner: append }
+ }
+
+ fn convert_python_value_to_datum(
+ &self,
+ py: Python,
+ value: PyObject,
+ ) -> PyResult<fcore::row::Datum<'static>> {
+ use fcore::row::{Blob, Datum, F32, F64};
+
+ // Check for None (null)
+ if value.is_none(py) {
+ return Ok(Datum::Null);
+ }
+
+ // Try to extract different types
+ if let Ok(type_name) = value.bind(py).get_type().name() {
+ if type_name == "StringScalar" {
+ if let Ok(py_value) = value.call_method0(py, "as_py") {
+ if let Ok(str_val) = py_value.extract::<String>(py) {
+ let leaked_str: &'static str =
Box::leak(str_val.into_boxed_str());
+ return Ok(Datum::String(leaked_str));
+ }
+ }
+ }
+ }
+
+ if let Ok(bool_val) = value.extract::<bool>(py) {
+ return Ok(Datum::Bool(bool_val));
+ }
+
+ if let Ok(int_val) = value.extract::<i32>(py) {
+ return Ok(Datum::Int32(int_val));
+ }
+
+ if let Ok(int_val) = value.extract::<i64>(py) {
+ return Ok(Datum::Int64(int_val));
+ }
+
+ if let Ok(float_val) = value.extract::<f32>(py) {
+ return Ok(Datum::Float32(F32::from(float_val)));
+ }
+
+ if let Ok(float_val) = value.extract::<f64>(py) {
+ return Ok(Datum::Float64(F64::from(float_val)));
+ }
+
+ if let Ok(str_val) = value.extract::<String>(py) {
+ // Convert String to &'static str by leaking memory
+ // This is a simplified approach - in production, you might want
better lifetime management
+ let leaked_str: &'static str = Box::leak(str_val.into_boxed_str());
+ return Ok(Datum::String(leaked_str));
+ }
+
+ if let Ok(bytes_val) = value.extract::<Vec<u8>>(py) {
+ let blob = Blob::from(bytes_val);
+ return Ok(Datum::Blob(blob));
+ }
+
+ // If we can't convert, return an error
+ let type_name = value.bind(py).get_type().name()?;
+ Err(FlussError::new_err(format!(
+ "Cannot convert Python value to Datum: {:?}",
+ type_name
+ )))
+ }
+}
+
+/// Scanner for reading log data from a Fluss table
+#[pyclass]
+pub struct LogScanner {
+ inner: fcore::client::LogScanner,
+ table_info: fcore::metadata::TableInfo,
+ #[allow(dead_code)]
+ start_timestamp: Option<i64>,
+ #[allow(dead_code)]
+ end_timestamp: Option<i64>,
+}
+
+#[pymethods]
+impl LogScanner {
+ /// Subscribe to log data with timestamp range
+ fn subscribe(
+ &mut self,
+ _start_timestamp: Option<i64>,
+ _end_timestamp: Option<i64>,
+ ) -> PyResult<()> {
+ if _start_timestamp.is_some() {
+ return Err(FlussError::new_err(
+ "Specifying start_timestamp is not yet supported. Please use
None.".to_string(),
+ ));
+ }
+ if _end_timestamp.is_some() {
+ return Err(FlussError::new_err(
+ "Specifying end_timestamp is not yet supported. Please use
None.".to_string(),
+ ));
+ }
+
+ let num_buckets = self.table_info.get_num_buckets();
+ for bucket_id in 0..num_buckets {
+ // -2 for the earliest offset.
+ let start_offset = -2;
+
+ TOKIO_RUNTIME.block_on(async {
+ self.inner
+ .subscribe(bucket_id, start_offset)
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))
+ })?;
+ }
+
+ Ok(())
+ }
+
+ /// Convert all data to Arrow Table
+ fn to_arrow(&self, py: Python) -> PyResult<PyObject> {
+ use std::collections::HashMap;
+ use std::time::Duration;
+
+ let mut all_batches = Vec::new();
+
+ let num_buckets = self.table_info.get_num_buckets();
+ let bucket_ids: Vec<i32> = (0..num_buckets).collect();
+
+ // todo: after supporting list_offsets with timestamp, we can use
start_timestamp and end_timestamp here
+ let target_offsets: HashMap<i32, i64> = if !bucket_ids.is_empty() {
+ TOKIO_RUNTIME
+ .block_on(async {
self.inner.list_offsets_latest(bucket_ids).await })
+ .map_err(|e| FlussError::new_err(e.to_string()))?
+ } else {
+ HashMap::new()
+ };
+
+ let mut current_offsets: HashMap<i32, i64> = HashMap::new();
+
+ if !target_offsets.is_empty() {
+ loop {
+ let batch_result = TOKIO_RUNTIME
+ .block_on(async {
self.inner.poll(Duration::from_millis(1000)).await });
+
+ match batch_result {
+ Ok(scan_records) => {
+ if !scan_records.is_empty() {
+ for (bucket, records) in
scan_records.buckets().iter() {
+ if let Some(last_record) = records.last() {
Review Comment:
can be
```
if let Some(last_record) = records.last() {
current_offsets.insert(bucket.bucket_id(),
last_record.offset());
}
```
The last must be max until to now
##########
crates/fluss/src/rpc/message/list_offsets.rs:
##########
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{impl_read_version_type, impl_write_version_type, proto};
+
+use crate::error::Result as FlussResult;
+use crate::proto::ListOffsetsResponse;
+use crate::rpc::api_key::ApiKey;
+use crate::rpc::api_version::ApiVersion;
+use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
+use futures::future::join_all;
+use std::collections::HashMap;
+use tokio::sync::oneshot;
+
+use bytes::{Buf, BufMut};
+use prost::Message;
+
+/// Offset type constants as per proto comments
+pub const LIST_EARLIEST_OFFSET: i32 = 0;
+pub const LIST_LATEST_OFFSET: i32 = 1;
+pub const LIST_OFFSET_FROM_TIMESTAMP: i32 = 2;
+
+/// Client follower server id constant
+pub const CLIENT_FOLLOWER_SERVER_ID: i32 = -1;
+
+/// Offset specification for list offsets request
+#[derive(Debug, Clone)]
+pub enum OffsetSpec {
+ /// Earliest offset spec
+ Earliest,
+ /// Latest offset spec
+ Latest,
+ /// Timestamp offset spec
+ Timestamp(i64),
+}
+
+impl OffsetSpec {
+ pub fn offset_type(&self) -> i32 {
+ match self {
+ OffsetSpec::Earliest => LIST_EARLIEST_OFFSET,
+ OffsetSpec::Latest => LIST_LATEST_OFFSET,
+ OffsetSpec::Timestamp(_) => LIST_OFFSET_FROM_TIMESTAMP,
+ }
+ }
+
+ pub fn start_timestamp(&self) -> Option<i64> {
+ match self {
+ OffsetSpec::Timestamp(ts) => Some(*ts),
+ _ => None,
+ }
+ }
+}
+
+/// Result container for list offsets operation
Review Comment:
I think we can remove `ListOffsetsResult` in this pr.
##########
crates/fluss/src/rpc/message/list_offsets.rs:
##########
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{impl_read_version_type, impl_write_version_type, proto};
+
+use crate::error::Result as FlussResult;
+use crate::proto::ListOffsetsResponse;
+use crate::rpc::api_key::ApiKey;
+use crate::rpc::api_version::ApiVersion;
+use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
+use futures::future::join_all;
+use std::collections::HashMap;
+use tokio::sync::oneshot;
+
+use bytes::{Buf, BufMut};
+use prost::Message;
+
+/// Offset type constants as per proto comments
+pub const LIST_EARLIEST_OFFSET: i32 = 0;
+pub const LIST_LATEST_OFFSET: i32 = 1;
+pub const LIST_OFFSET_FROM_TIMESTAMP: i32 = 2;
+
+/// Client follower server id constant
+pub const CLIENT_FOLLOWER_SERVER_ID: i32 = -1;
+
+/// Offset specification for list offsets request
+#[derive(Debug, Clone)]
+pub enum OffsetSpec {
+ /// Earliest offset spec
+ Earliest,
+ /// Latest offset spec
+ Latest,
+ /// Timestamp offset spec
+ Timestamp(i64),
+}
+
+impl OffsetSpec {
+ pub fn offset_type(&self) -> i32 {
+ match self {
+ OffsetSpec::Earliest => LIST_EARLIEST_OFFSET,
+ OffsetSpec::Latest => LIST_LATEST_OFFSET,
+ OffsetSpec::Timestamp(_) => LIST_OFFSET_FROM_TIMESTAMP,
+ }
+ }
+
+ pub fn start_timestamp(&self) -> Option<i64> {
+ match self {
+ OffsetSpec::Timestamp(ts) => Some(*ts),
+ _ => None,
+ }
+ }
+}
+
+/// Result container for list offsets operation
+pub struct ListOffsetsResult {
+ futures: HashMap<i32, oneshot::Receiver<FlussResult<i64>>>,
+}
+
+impl ListOffsetsResult {
+ pub fn new(futures: HashMap<i32, oneshot::Receiver<FlussResult<i64>>>) ->
Self {
+ Self { futures }
+ }
+
+ /// Get the offset result for a specific bucket
+ pub async fn bucket_result(&mut self, bucket: i32) -> FlussResult<i64> {
+ if let Some(receiver) = self.futures.remove(&bucket) {
+ receiver
+ .await
+ .map_err(|_| crate::error::Error::WriteError("Channel
closed".to_string()))?
+ } else {
+ Err(crate::error::Error::IllegalArgument(format!(
+ "Bucket {} not found",
+ bucket
+ )))
+ }
+ }
+
+ /// Wait for all bucket results to complete and return a map
+ pub async fn all(self) -> FlussResult<HashMap<i32, i64>> {
+ let mut results = HashMap::new();
+ let mut tasks = Vec::new();
+
+ // Collect all futures
+ for (bucket_id, receiver) in self.futures {
+ let task = async move {
+ let result = receiver
+ .await
+ .map_err(|_| crate::error::Error::WriteError("Channel
closed".to_string()))?;
+ Ok::<(i32, i64), crate::error::Error>((bucket_id, result?))
+ };
+ tasks.push(task);
+ }
+
+ // Wait for all futures to complete
+ let task_results = join_all(tasks).await;
+
+ // Collect results
+ for task_result in task_results {
+ let (bucket_id, offset) = task_result?;
+ results.insert(bucket_id, offset);
+ }
+
+ Ok(results)
+ }
+}
+
+#[derive(Debug)]
+pub struct ListOffsetsRequest {
+ pub inner_request: proto::ListOffsetsRequest,
+}
+
+impl ListOffsetsRequest {
+ pub fn new(
+ table_id: i64,
+ partition_id: Option<i64>,
+ bucket_ids: Vec<i32>,
+ offset_spec: OffsetSpec,
+ ) -> FlussResult<Self> {
+ Ok(ListOffsetsRequest {
+ inner_request: proto::ListOffsetsRequest {
+ follower_server_id: CLIENT_FOLLOWER_SERVER_ID,
+ offset_type: offset_spec.offset_type(),
+ table_id,
+ partition_id,
+ bucket_id: bucket_ids,
+ start_timestamp: offset_spec.start_timestamp(),
+ },
+ })
+ }
+}
+
+impl RequestBody for ListOffsetsRequest {
+ type ResponseBody = ListOffsetsResponse;
+
+ const API_KEY: ApiKey = ApiKey::ListOffsets;
+
+ const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+}
+
+impl_write_version_type!(ListOffsetsRequest);
+impl_read_version_type!(ListOffsetsResponse);
+
+impl ListOffsetsResponse {
+ pub fn offsets_map(&self) -> HashMap<i32, i64> {
Review Comment:
```suggestion
pub fn offsets(&self) -> FlussResult<HashMap<i32, i64>> {
self.buckets_resp
.iter()
.map(|resp| {
if let Some(_) = &resp.error_code {
// todo: consider use another suitable error
Err(Error::WriteError(format!("Missing offset, error
message: {}", &resp.error_message
.as_deref().unwrap_or_else(|| "unknown server
exception"))))
} else {
// if no error msg, offset must exists
Ok((resp.bucket_id, resp.offset.unwrap()))
}
})
.collect()
}
```
need to handle exception
##########
bindings/python/src/table.rs:
##########
@@ -0,0 +1,433 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::TOKIO_RUNTIME;
+use crate::*;
+use pyo3_async_runtimes::tokio::future_into_py;
+use std::collections::HashMap;
+use std::sync::Arc;
+
+/// Represents a Fluss table for data operations
+#[pyclass]
+pub struct FlussTable {
+ connection: Arc<fcore::client::FlussConnection>,
+ metadata: Arc<fcore::client::Metadata>,
+ table_info: fcore::metadata::TableInfo,
+ table_path: fcore::metadata::TablePath,
+ has_primary_key: bool,
+}
+
+#[pymethods]
+impl FlussTable {
+ /// Create a new append writer for the table
+ fn new_append_writer<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py,
PyAny>> {
+ let conn = self.connection.clone();
+ let metadata = self.metadata.clone();
+ let table_info = self.table_info.clone();
+
+ future_into_py(py, async move {
+ let fluss_table = fcore::client::FlussTable::new(&conn, metadata,
table_info);
+
+ let table_append = fluss_table
+ .new_append()
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+ let rust_writer = table_append.create_writer();
+
+ let py_writer = AppendWriter::from_core(rust_writer);
+
+ Python::with_gil(|py| Py::new(py, py_writer))
+ })
+ }
+
+ /// Create a new log scanner for the table
+ fn new_log_scanner<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py,
PyAny>> {
+ let conn = self.connection.clone();
+ let metadata = self.metadata.clone();
+ let table_info = self.table_info.clone();
+
+ future_into_py(py, async move {
+ let fluss_table =
+ fcore::client::FlussTable::new(&conn, metadata.clone(),
table_info.clone());
+
+ let table_scan = fluss_table.new_scan();
+
+ let rust_scanner = table_scan.create_log_scanner();
+
+ let py_scanner = LogScanner::from_core(rust_scanner,
table_info.clone());
+
+ Python::with_gil(|py| Py::new(py, py_scanner))
+ })
+ }
+
+ /// synchronous version of new_log_scanner
+ fn new_log_scanner_sync(&self) -> PyResult<LogScanner> {
Review Comment:
can we remove this method now?
##########
bindings/python/src/table.rs:
##########
@@ -0,0 +1,433 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::TOKIO_RUNTIME;
+use crate::*;
+use pyo3_async_runtimes::tokio::future_into_py;
+use std::collections::HashMap;
+use std::sync::Arc;
+
+/// Represents a Fluss table for data operations
+#[pyclass]
+pub struct FlussTable {
+ connection: Arc<fcore::client::FlussConnection>,
+ metadata: Arc<fcore::client::Metadata>,
+ table_info: fcore::metadata::TableInfo,
+ table_path: fcore::metadata::TablePath,
+ has_primary_key: bool,
+}
+
+#[pymethods]
+impl FlussTable {
+ /// Create a new append writer for the table
+ fn new_append_writer<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py,
PyAny>> {
+ let conn = self.connection.clone();
+ let metadata = self.metadata.clone();
+ let table_info = self.table_info.clone();
+
+ future_into_py(py, async move {
+ let fluss_table = fcore::client::FlussTable::new(&conn, metadata,
table_info);
+
+ let table_append = fluss_table
+ .new_append()
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+ let rust_writer = table_append.create_writer();
+
+ let py_writer = AppendWriter::from_core(rust_writer);
+
+ Python::with_gil(|py| Py::new(py, py_writer))
+ })
+ }
+
+ /// Create a new log scanner for the table
+ fn new_log_scanner<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py,
PyAny>> {
+ let conn = self.connection.clone();
+ let metadata = self.metadata.clone();
+ let table_info = self.table_info.clone();
+
+ future_into_py(py, async move {
+ let fluss_table =
+ fcore::client::FlussTable::new(&conn, metadata.clone(),
table_info.clone());
+
+ let table_scan = fluss_table.new_scan();
+
+ let rust_scanner = table_scan.create_log_scanner();
+
+ let py_scanner = LogScanner::from_core(rust_scanner,
table_info.clone());
+
+ Python::with_gil(|py| Py::new(py, py_scanner))
+ })
+ }
+
+ /// synchronous version of new_log_scanner
+ fn new_log_scanner_sync(&self) -> PyResult<LogScanner> {
+ let conn = self.connection.clone();
+ let metadata = self.metadata.clone();
+ let table_info = self.table_info.clone();
+
+ let rust_scanner = TOKIO_RUNTIME.block_on(async {
+ let fluss_table =
+ fcore::client::FlussTable::new(&conn, metadata.clone(),
table_info.clone());
+
+ let table_scan = fluss_table.new_scan();
+ table_scan.create_log_scanner()
+ });
+
+ let py_scanner = LogScanner::from_core(rust_scanner,
table_info.clone());
+
+ Ok(py_scanner)
+ }
+
+ /// Get table information
+ pub fn get_table_info(&self) -> TableInfo {
+ TableInfo::from_core(self.table_info.clone())
+ }
+
+ /// Get table path
+ pub fn get_table_path(&self) -> TablePath {
+ TablePath::from_core(self.table_path.clone())
+ }
+
+ /// Check if table has primary key
+ pub fn has_primary_key(&self) -> bool {
+ self.has_primary_key
+ }
+
+ fn __repr__(&self) -> String {
+ format!(
+ "FlussTable(path={}.{})",
+ self.table_path.database(),
+ self.table_path.table()
+ )
+ }
+}
+
+impl FlussTable {
+ /// Create a FlussTable
+ pub fn new_table(
+ connection: Arc<fcore::client::FlussConnection>,
+ metadata: Arc<fcore::client::Metadata>,
+ table_info: fcore::metadata::TableInfo,
+ table_path: fcore::metadata::TablePath,
+ has_primary_key: bool,
+ ) -> Self {
+ Self {
+ connection,
+ metadata,
+ table_info,
+ table_path,
+ has_primary_key,
+ }
+ }
+}
+
+/// Writer for appending data to a Fluss table
+#[pyclass]
+pub struct AppendWriter {
+ inner: fcore::client::AppendWriter,
+}
+
+#[pymethods]
+impl AppendWriter {
+ /// Write Arrow table data
+ pub fn write_arrow(&mut self, py: Python, table: PyObject) -> PyResult<()>
{
+ // Convert Arrow Table to batches and write each batch
+ let batches = table.call_method0(py, "to_batches")?;
+ let batch_list: Vec<PyObject> = batches.extract(py)?;
+
+ for batch in batch_list {
+ self.write_arrow_batch(py, batch)?;
+ }
+ Ok(())
+ }
+
+ /// Write Arrow batch data
+ pub fn write_arrow_batch(&mut self, py: Python, batch: PyObject) ->
PyResult<()> {
+ // Extract number of rows and columns from the Arrow batch
+ let num_rows: usize = batch.getattr(py, "num_rows")?.extract(py)?;
+ let num_columns: usize = batch.getattr(py,
"num_columns")?.extract(py)?;
+
+ // Process each row in the batch
+ for row_idx in 0..num_rows {
+ let mut generic_row = fcore::row::GenericRow::new();
+
+ // Extract values for each column in this row
+ for col_idx in 0..num_columns {
+ let column = batch.call_method1(py, "column", (col_idx,))?;
+ let value = column.call_method1(py, "__getitem__",
(row_idx,))?;
+
+ // Convert the Python value to a Datum and add to the row
+ let datum = self.convert_python_value_to_datum(py, value)?;
+ generic_row.set_field(col_idx, datum);
+ }
+
+ // Append this row using the async append method
+ TOKIO_RUNTIME.block_on(async {
+ self.inner
+ .append(generic_row)
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))
+ })?;
+ }
+
+ Ok(())
+ }
+
+ /// Write Pandas DataFrame data
+ pub fn write_pandas(&mut self, py: Python, df: PyObject) -> PyResult<()> {
+ // Import pyarrow module
+ let pyarrow = py.import("pyarrow")?;
+
+ // Get the Table class from pyarrow module
+ let table_class = pyarrow.getattr("Table")?;
+
+ // Call Table.from_pandas(df) - from_pandas is a class method
+ let pa_table = table_class.call_method1("from_pandas", (df,))?;
+
+ // Then call write_arrow with the converted table
+ self.write_arrow(py, pa_table.into())
+ }
+
+ /// Flush any pending data
+ pub fn flush(&mut self) -> PyResult<()> {
+ TOKIO_RUNTIME.block_on(async {
+ self.inner
+ .flush()
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))
+ })
+ }
+
+ fn __repr__(&self) -> String {
+ "AppendWriter()".to_string()
+ }
+}
+
+impl AppendWriter {
+ /// Create a AppendWriter from a core append writer
+ pub fn from_core(append: fcore::client::AppendWriter) -> Self {
+ Self { inner: append }
+ }
+
+ fn convert_python_value_to_datum(
+ &self,
+ py: Python,
+ value: PyObject,
+ ) -> PyResult<fcore::row::Datum<'static>> {
+ use fcore::row::{Blob, Datum, F32, F64};
+
+ // Check for None (null)
+ if value.is_none(py) {
+ return Ok(Datum::Null);
+ }
+
+ // Try to extract different types
+ if let Ok(type_name) = value.bind(py).get_type().name() {
+ if type_name == "StringScalar" {
+ if let Ok(py_value) = value.call_method0(py, "as_py") {
+ if let Ok(str_val) = py_value.extract::<String>(py) {
+ let leaked_str: &'static str =
Box::leak(str_val.into_boxed_str());
+ return Ok(Datum::String(leaked_str));
+ }
+ }
+ }
+ }
+
+ if let Ok(bool_val) = value.extract::<bool>(py) {
+ return Ok(Datum::Bool(bool_val));
+ }
+
+ if let Ok(int_val) = value.extract::<i32>(py) {
+ return Ok(Datum::Int32(int_val));
+ }
+
+ if let Ok(int_val) = value.extract::<i64>(py) {
+ return Ok(Datum::Int64(int_val));
+ }
+
+ if let Ok(float_val) = value.extract::<f32>(py) {
+ return Ok(Datum::Float32(F32::from(float_val)));
+ }
+
+ if let Ok(float_val) = value.extract::<f64>(py) {
+ return Ok(Datum::Float64(F64::from(float_val)));
+ }
+
+ if let Ok(str_val) = value.extract::<String>(py) {
+ // Convert String to &'static str by leaking memory
+ // This is a simplified approach - in production, you might want
better lifetime management
+ let leaked_str: &'static str = Box::leak(str_val.into_boxed_str());
+ return Ok(Datum::String(leaked_str));
+ }
+
+ if let Ok(bytes_val) = value.extract::<Vec<u8>>(py) {
+ let blob = Blob::from(bytes_val);
+ return Ok(Datum::Blob(blob));
+ }
+
+ // If we can't convert, return an error
+ let type_name = value.bind(py).get_type().name()?;
+ Err(FlussError::new_err(format!(
+ "Cannot convert Python value to Datum: {:?}",
+ type_name
+ )))
+ }
+}
+
+/// Scanner for reading log data from a Fluss table
+#[pyclass]
+pub struct LogScanner {
+ inner: fcore::client::LogScanner,
+ table_info: fcore::metadata::TableInfo,
+ #[allow(dead_code)]
+ start_timestamp: Option<i64>,
+ #[allow(dead_code)]
+ end_timestamp: Option<i64>,
+}
+
+#[pymethods]
+impl LogScanner {
+ /// Subscribe to log data with timestamp range
+ fn subscribe(
+ &mut self,
+ _start_timestamp: Option<i64>,
+ _end_timestamp: Option<i64>,
+ ) -> PyResult<()> {
+ if _start_timestamp.is_some() {
+ return Err(FlussError::new_err(
+ "Specifying start_timestamp is not yet supported. Please use
None.".to_string(),
+ ));
+ }
+ if _end_timestamp.is_some() {
+ return Err(FlussError::new_err(
+ "Specifying end_timestamp is not yet supported. Please use
None.".to_string(),
+ ));
+ }
+
+ let num_buckets = self.table_info.get_num_buckets();
+ for bucket_id in 0..num_buckets {
+ // -2 for the earliest offset.
+ let start_offset = -2;
+
+ TOKIO_RUNTIME.block_on(async {
+ self.inner
+ .subscribe(bucket_id, start_offset)
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))
+ })?;
+ }
+
+ Ok(())
+ }
+
+ /// Convert all data to Arrow Table
+ fn to_arrow(&self, py: Python) -> PyResult<PyObject> {
+ use std::collections::HashMap;
+ use std::time::Duration;
+
+ let mut all_batches = Vec::new();
+
+ let num_buckets = self.table_info.get_num_buckets();
+ let bucket_ids: Vec<i32> = (0..num_buckets).collect();
+
+ // todo: after supporting list_offsets with timestamp, we can use
start_timestamp and end_timestamp here
+ let target_offsets: HashMap<i32, i64> = if !bucket_ids.is_empty() {
+ TOKIO_RUNTIME
+ .block_on(async {
self.inner.list_offsets_latest(bucket_ids).await })
+ .map_err(|e| FlussError::new_err(e.to_string()))?
+ } else {
+ HashMap::new()
+ };
+
+ let mut current_offsets: HashMap<i32, i64> = HashMap::new();
+
+ if !target_offsets.is_empty() {
+ loop {
+ let batch_result = TOKIO_RUNTIME
+ .block_on(async {
self.inner.poll(Duration::from_millis(1000)).await });
+
+ match batch_result {
+ Ok(scan_records) => {
Review Comment:
The current behavior is not following **only scan to target_offset** .
If bucket 1 has already reach to target offset 1, but bucket2 is not.
The scanner will still scan the record for bucket 1, and in current
implementation, all records including records in bucket 1 will still be
included.
##########
bindings/python/src/table.rs:
##########
@@ -0,0 +1,433 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::TOKIO_RUNTIME;
+use crate::*;
+use pyo3_async_runtimes::tokio::future_into_py;
+use std::collections::HashMap;
+use std::sync::Arc;
+
+/// Represents a Fluss table for data operations
+#[pyclass]
+pub struct FlussTable {
+ connection: Arc<fcore::client::FlussConnection>,
+ metadata: Arc<fcore::client::Metadata>,
+ table_info: fcore::metadata::TableInfo,
+ table_path: fcore::metadata::TablePath,
+ has_primary_key: bool,
+}
+
+#[pymethods]
+impl FlussTable {
+ /// Create a new append writer for the table
+ fn new_append_writer<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py,
PyAny>> {
+ let conn = self.connection.clone();
+ let metadata = self.metadata.clone();
+ let table_info = self.table_info.clone();
+
+ future_into_py(py, async move {
+ let fluss_table = fcore::client::FlussTable::new(&conn, metadata,
table_info);
+
+ let table_append = fluss_table
+ .new_append()
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+ let rust_writer = table_append.create_writer();
+
+ let py_writer = AppendWriter::from_core(rust_writer);
+
+ Python::with_gil(|py| Py::new(py, py_writer))
+ })
+ }
+
+ /// Create a new log scanner for the table
+ fn new_log_scanner<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py,
PyAny>> {
+ let conn = self.connection.clone();
+ let metadata = self.metadata.clone();
+ let table_info = self.table_info.clone();
+
+ future_into_py(py, async move {
+ let fluss_table =
+ fcore::client::FlussTable::new(&conn, metadata.clone(),
table_info.clone());
+
+ let table_scan = fluss_table.new_scan();
+
+ let rust_scanner = table_scan.create_log_scanner();
+
+ let py_scanner = LogScanner::from_core(rust_scanner,
table_info.clone());
+
+ Python::with_gil(|py| Py::new(py, py_scanner))
+ })
+ }
+
+ /// synchronous version of new_log_scanner
+ fn new_log_scanner_sync(&self) -> PyResult<LogScanner> {
+ let conn = self.connection.clone();
+ let metadata = self.metadata.clone();
+ let table_info = self.table_info.clone();
+
+ let rust_scanner = TOKIO_RUNTIME.block_on(async {
+ let fluss_table =
+ fcore::client::FlussTable::new(&conn, metadata.clone(),
table_info.clone());
+
+ let table_scan = fluss_table.new_scan();
+ table_scan.create_log_scanner()
+ });
+
+ let py_scanner = LogScanner::from_core(rust_scanner,
table_info.clone());
+
+ Ok(py_scanner)
+ }
+
+ /// Get table information
+ pub fn get_table_info(&self) -> TableInfo {
+ TableInfo::from_core(self.table_info.clone())
+ }
+
+ /// Get table path
+ pub fn get_table_path(&self) -> TablePath {
+ TablePath::from_core(self.table_path.clone())
+ }
+
+ /// Check if table has primary key
+ pub fn has_primary_key(&self) -> bool {
+ self.has_primary_key
+ }
+
+ fn __repr__(&self) -> String {
+ format!(
+ "FlussTable(path={}.{})",
+ self.table_path.database(),
+ self.table_path.table()
+ )
+ }
+}
+
+impl FlussTable {
+ /// Create a FlussTable
+ pub fn new_table(
+ connection: Arc<fcore::client::FlussConnection>,
+ metadata: Arc<fcore::client::Metadata>,
+ table_info: fcore::metadata::TableInfo,
+ table_path: fcore::metadata::TablePath,
+ has_primary_key: bool,
+ ) -> Self {
+ Self {
+ connection,
+ metadata,
+ table_info,
+ table_path,
+ has_primary_key,
+ }
+ }
+}
+
+/// Writer for appending data to a Fluss table
+#[pyclass]
+pub struct AppendWriter {
+ inner: fcore::client::AppendWriter,
+}
+
+#[pymethods]
+impl AppendWriter {
+ /// Write Arrow table data
+ pub fn write_arrow(&mut self, py: Python, table: PyObject) -> PyResult<()>
{
+ // Convert Arrow Table to batches and write each batch
+ let batches = table.call_method0(py, "to_batches")?;
+ let batch_list: Vec<PyObject> = batches.extract(py)?;
+
+ for batch in batch_list {
+ self.write_arrow_batch(py, batch)?;
+ }
+ Ok(())
+ }
+
+ /// Write Arrow batch data
+ pub fn write_arrow_batch(&mut self, py: Python, batch: PyObject) ->
PyResult<()> {
+ // Extract number of rows and columns from the Arrow batch
+ let num_rows: usize = batch.getattr(py, "num_rows")?.extract(py)?;
+ let num_columns: usize = batch.getattr(py,
"num_columns")?.extract(py)?;
+
+ // Process each row in the batch
+ for row_idx in 0..num_rows {
+ let mut generic_row = fcore::row::GenericRow::new();
+
+ // Extract values for each column in this row
+ for col_idx in 0..num_columns {
+ let column = batch.call_method1(py, "column", (col_idx,))?;
+ let value = column.call_method1(py, "__getitem__",
(row_idx,))?;
+
+ // Convert the Python value to a Datum and add to the row
+ let datum = self.convert_python_value_to_datum(py, value)?;
+ generic_row.set_field(col_idx, datum);
+ }
+
+ // Append this row using the async append method
+ TOKIO_RUNTIME.block_on(async {
+ self.inner
+ .append(generic_row)
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))
+ })?;
+ }
+
+ Ok(())
+ }
+
+ /// Write Pandas DataFrame data
+ pub fn write_pandas(&mut self, py: Python, df: PyObject) -> PyResult<()> {
+ // Import pyarrow module
+ let pyarrow = py.import("pyarrow")?;
+
+ // Get the Table class from pyarrow module
+ let table_class = pyarrow.getattr("Table")?;
+
+ // Call Table.from_pandas(df) - from_pandas is a class method
+ let pa_table = table_class.call_method1("from_pandas", (df,))?;
+
+ // Then call write_arrow with the converted table
+ self.write_arrow(py, pa_table.into())
+ }
+
+ /// Flush any pending data
+ pub fn flush(&mut self) -> PyResult<()> {
+ TOKIO_RUNTIME.block_on(async {
+ self.inner
+ .flush()
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))
+ })
+ }
+
+ fn __repr__(&self) -> String {
+ "AppendWriter()".to_string()
+ }
+}
+
+impl AppendWriter {
+ /// Create a AppendWriter from a core append writer
+ pub fn from_core(append: fcore::client::AppendWriter) -> Self {
+ Self { inner: append }
+ }
+
+ fn convert_python_value_to_datum(
+ &self,
+ py: Python,
+ value: PyObject,
+ ) -> PyResult<fcore::row::Datum<'static>> {
+ use fcore::row::{Blob, Datum, F32, F64};
+
+ // Check for None (null)
+ if value.is_none(py) {
+ return Ok(Datum::Null);
+ }
+
+ // Try to extract different types
+ if let Ok(type_name) = value.bind(py).get_type().name() {
+ if type_name == "StringScalar" {
+ if let Ok(py_value) = value.call_method0(py, "as_py") {
+ if let Ok(str_val) = py_value.extract::<String>(py) {
+ let leaked_str: &'static str =
Box::leak(str_val.into_boxed_str());
+ return Ok(Datum::String(leaked_str));
+ }
+ }
+ }
+ }
+
+ if let Ok(bool_val) = value.extract::<bool>(py) {
+ return Ok(Datum::Bool(bool_val));
+ }
+
+ if let Ok(int_val) = value.extract::<i32>(py) {
+ return Ok(Datum::Int32(int_val));
+ }
+
+ if let Ok(int_val) = value.extract::<i64>(py) {
+ return Ok(Datum::Int64(int_val));
+ }
+
+ if let Ok(float_val) = value.extract::<f32>(py) {
+ return Ok(Datum::Float32(F32::from(float_val)));
+ }
+
+ if let Ok(float_val) = value.extract::<f64>(py) {
+ return Ok(Datum::Float64(F64::from(float_val)));
+ }
+
+ if let Ok(str_val) = value.extract::<String>(py) {
+ // Convert String to &'static str by leaking memory
+ // This is a simplified approach - in production, you might want
better lifetime management
+ let leaked_str: &'static str = Box::leak(str_val.into_boxed_str());
+ return Ok(Datum::String(leaked_str));
+ }
+
+ if let Ok(bytes_val) = value.extract::<Vec<u8>>(py) {
+ let blob = Blob::from(bytes_val);
+ return Ok(Datum::Blob(blob));
+ }
+
+ // If we can't convert, return an error
+ let type_name = value.bind(py).get_type().name()?;
+ Err(FlussError::new_err(format!(
+ "Cannot convert Python value to Datum: {:?}",
+ type_name
+ )))
+ }
+}
+
+/// Scanner for reading log data from a Fluss table
+#[pyclass]
+pub struct LogScanner {
+ inner: fcore::client::LogScanner,
+ table_info: fcore::metadata::TableInfo,
+ #[allow(dead_code)]
+ start_timestamp: Option<i64>,
+ #[allow(dead_code)]
+ end_timestamp: Option<i64>,
+}
+
+#[pymethods]
+impl LogScanner {
+ /// Subscribe to log data with timestamp range
+ fn subscribe(
+ &mut self,
+ _start_timestamp: Option<i64>,
+ _end_timestamp: Option<i64>,
+ ) -> PyResult<()> {
+ if _start_timestamp.is_some() {
+ return Err(FlussError::new_err(
+ "Specifying start_timestamp is not yet supported. Please use
None.".to_string(),
+ ));
+ }
+ if _end_timestamp.is_some() {
+ return Err(FlussError::new_err(
+ "Specifying end_timestamp is not yet supported. Please use
None.".to_string(),
+ ));
+ }
+
+ let num_buckets = self.table_info.get_num_buckets();
+ for bucket_id in 0..num_buckets {
+ // -2 for the earliest offset.
Review Comment:
consider use `const EARLIEST_OFFSET = -2`
##########
bindings/python/src/table.rs:
##########
@@ -0,0 +1,433 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::TOKIO_RUNTIME;
+use crate::*;
+use pyo3_async_runtimes::tokio::future_into_py;
+use std::collections::HashMap;
+use std::sync::Arc;
+
+/// Represents a Fluss table for data operations
+#[pyclass]
+pub struct FlussTable {
+ connection: Arc<fcore::client::FlussConnection>,
+ metadata: Arc<fcore::client::Metadata>,
+ table_info: fcore::metadata::TableInfo,
+ table_path: fcore::metadata::TablePath,
+ has_primary_key: bool,
+}
+
+#[pymethods]
+impl FlussTable {
+ /// Create a new append writer for the table
+ fn new_append_writer<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py,
PyAny>> {
+ let conn = self.connection.clone();
+ let metadata = self.metadata.clone();
+ let table_info = self.table_info.clone();
+
+ future_into_py(py, async move {
+ let fluss_table = fcore::client::FlussTable::new(&conn, metadata,
table_info);
+
+ let table_append = fluss_table
+ .new_append()
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+ let rust_writer = table_append.create_writer();
+
+ let py_writer = AppendWriter::from_core(rust_writer);
+
+ Python::with_gil(|py| Py::new(py, py_writer))
+ })
+ }
+
+ /// Create a new log scanner for the table
+ fn new_log_scanner<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py,
PyAny>> {
+ let conn = self.connection.clone();
+ let metadata = self.metadata.clone();
+ let table_info = self.table_info.clone();
+
+ future_into_py(py, async move {
+ let fluss_table =
+ fcore::client::FlussTable::new(&conn, metadata.clone(),
table_info.clone());
+
+ let table_scan = fluss_table.new_scan();
+
+ let rust_scanner = table_scan.create_log_scanner();
+
+ let py_scanner = LogScanner::from_core(rust_scanner,
table_info.clone());
+
+ Python::with_gil(|py| Py::new(py, py_scanner))
+ })
+ }
+
+ /// synchronous version of new_log_scanner
+ fn new_log_scanner_sync(&self) -> PyResult<LogScanner> {
+ let conn = self.connection.clone();
+ let metadata = self.metadata.clone();
+ let table_info = self.table_info.clone();
+
+ let rust_scanner = TOKIO_RUNTIME.block_on(async {
+ let fluss_table =
+ fcore::client::FlussTable::new(&conn, metadata.clone(),
table_info.clone());
+
+ let table_scan = fluss_table.new_scan();
+ table_scan.create_log_scanner()
+ });
+
+ let py_scanner = LogScanner::from_core(rust_scanner,
table_info.clone());
+
+ Ok(py_scanner)
+ }
+
+ /// Get table information
+ pub fn get_table_info(&self) -> TableInfo {
+ TableInfo::from_core(self.table_info.clone())
+ }
+
+ /// Get table path
+ pub fn get_table_path(&self) -> TablePath {
+ TablePath::from_core(self.table_path.clone())
+ }
+
+ /// Check if table has primary key
+ pub fn has_primary_key(&self) -> bool {
+ self.has_primary_key
+ }
+
+ fn __repr__(&self) -> String {
+ format!(
+ "FlussTable(path={}.{})",
+ self.table_path.database(),
+ self.table_path.table()
+ )
+ }
+}
+
+impl FlussTable {
+ /// Create a FlussTable
+ pub fn new_table(
+ connection: Arc<fcore::client::FlussConnection>,
+ metadata: Arc<fcore::client::Metadata>,
+ table_info: fcore::metadata::TableInfo,
+ table_path: fcore::metadata::TablePath,
+ has_primary_key: bool,
+ ) -> Self {
+ Self {
+ connection,
+ metadata,
+ table_info,
+ table_path,
+ has_primary_key,
+ }
+ }
+}
+
+/// Writer for appending data to a Fluss table
+#[pyclass]
+pub struct AppendWriter {
+ inner: fcore::client::AppendWriter,
+}
+
+#[pymethods]
+impl AppendWriter {
+ /// Write Arrow table data
+ pub fn write_arrow(&mut self, py: Python, table: PyObject) -> PyResult<()>
{
+ // Convert Arrow Table to batches and write each batch
+ let batches = table.call_method0(py, "to_batches")?;
+ let batch_list: Vec<PyObject> = batches.extract(py)?;
+
+ for batch in batch_list {
+ self.write_arrow_batch(py, batch)?;
+ }
+ Ok(())
+ }
+
+ /// Write Arrow batch data
+ pub fn write_arrow_batch(&mut self, py: Python, batch: PyObject) ->
PyResult<()> {
+ // Extract number of rows and columns from the Arrow batch
+ let num_rows: usize = batch.getattr(py, "num_rows")?.extract(py)?;
+ let num_columns: usize = batch.getattr(py,
"num_columns")?.extract(py)?;
+
+ // Process each row in the batch
+ for row_idx in 0..num_rows {
+ let mut generic_row = fcore::row::GenericRow::new();
+
+ // Extract values for each column in this row
+ for col_idx in 0..num_columns {
+ let column = batch.call_method1(py, "column", (col_idx,))?;
+ let value = column.call_method1(py, "__getitem__",
(row_idx,))?;
+
+ // Convert the Python value to a Datum and add to the row
+ let datum = self.convert_python_value_to_datum(py, value)?;
+ generic_row.set_field(col_idx, datum);
+ }
+
+ // Append this row using the async append method
+ TOKIO_RUNTIME.block_on(async {
+ self.inner
+ .append(generic_row)
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))
+ })?;
+ }
+
+ Ok(())
+ }
+
+ /// Write Pandas DataFrame data
+ pub fn write_pandas(&mut self, py: Python, df: PyObject) -> PyResult<()> {
+ // Import pyarrow module
+ let pyarrow = py.import("pyarrow")?;
+
+ // Get the Table class from pyarrow module
+ let table_class = pyarrow.getattr("Table")?;
+
+ // Call Table.from_pandas(df) - from_pandas is a class method
+ let pa_table = table_class.call_method1("from_pandas", (df,))?;
+
+ // Then call write_arrow with the converted table
+ self.write_arrow(py, pa_table.into())
+ }
+
+ /// Flush any pending data
+ pub fn flush(&mut self) -> PyResult<()> {
+ TOKIO_RUNTIME.block_on(async {
+ self.inner
+ .flush()
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))
+ })
+ }
+
+ fn __repr__(&self) -> String {
+ "AppendWriter()".to_string()
+ }
+}
+
+impl AppendWriter {
+ /// Create a AppendWriter from a core append writer
+ pub fn from_core(append: fcore::client::AppendWriter) -> Self {
+ Self { inner: append }
+ }
+
+ fn convert_python_value_to_datum(
+ &self,
+ py: Python,
+ value: PyObject,
+ ) -> PyResult<fcore::row::Datum<'static>> {
+ use fcore::row::{Blob, Datum, F32, F64};
+
+ // Check for None (null)
+ if value.is_none(py) {
+ return Ok(Datum::Null);
+ }
+
+ // Try to extract different types
+ if let Ok(type_name) = value.bind(py).get_type().name() {
+ if type_name == "StringScalar" {
+ if let Ok(py_value) = value.call_method0(py, "as_py") {
+ if let Ok(str_val) = py_value.extract::<String>(py) {
+ let leaked_str: &'static str =
Box::leak(str_val.into_boxed_str());
+ return Ok(Datum::String(leaked_str));
+ }
+ }
+ }
+ }
+
+ if let Ok(bool_val) = value.extract::<bool>(py) {
+ return Ok(Datum::Bool(bool_val));
+ }
+
+ if let Ok(int_val) = value.extract::<i32>(py) {
+ return Ok(Datum::Int32(int_val));
+ }
+
+ if let Ok(int_val) = value.extract::<i64>(py) {
+ return Ok(Datum::Int64(int_val));
+ }
+
+ if let Ok(float_val) = value.extract::<f32>(py) {
+ return Ok(Datum::Float32(F32::from(float_val)));
+ }
+
+ if let Ok(float_val) = value.extract::<f64>(py) {
+ return Ok(Datum::Float64(F64::from(float_val)));
+ }
+
+ if let Ok(str_val) = value.extract::<String>(py) {
+ // Convert String to &'static str by leaking memory
+ // This is a simplified approach - in production, you might want
better lifetime management
+ let leaked_str: &'static str = Box::leak(str_val.into_boxed_str());
+ return Ok(Datum::String(leaked_str));
+ }
+
+ if let Ok(bytes_val) = value.extract::<Vec<u8>>(py) {
+ let blob = Blob::from(bytes_val);
+ return Ok(Datum::Blob(blob));
+ }
+
+ // If we can't convert, return an error
+ let type_name = value.bind(py).get_type().name()?;
+ Err(FlussError::new_err(format!(
+ "Cannot convert Python value to Datum: {:?}",
+ type_name
+ )))
+ }
+}
+
+/// Scanner for reading log data from a Fluss table
+#[pyclass]
+pub struct LogScanner {
+ inner: fcore::client::LogScanner,
+ table_info: fcore::metadata::TableInfo,
+ #[allow(dead_code)]
+ start_timestamp: Option<i64>,
+ #[allow(dead_code)]
+ end_timestamp: Option<i64>,
+}
+
+#[pymethods]
+impl LogScanner {
+ /// Subscribe to log data with timestamp range
+ fn subscribe(
+ &mut self,
+ _start_timestamp: Option<i64>,
+ _end_timestamp: Option<i64>,
+ ) -> PyResult<()> {
+ if _start_timestamp.is_some() {
+ return Err(FlussError::new_err(
+ "Specifying start_timestamp is not yet supported. Please use
None.".to_string(),
+ ));
+ }
+ if _end_timestamp.is_some() {
+ return Err(FlussError::new_err(
+ "Specifying end_timestamp is not yet supported. Please use
None.".to_string(),
+ ));
+ }
+
+ let num_buckets = self.table_info.get_num_buckets();
+ for bucket_id in 0..num_buckets {
+ // -2 for the earliest offset.
+ let start_offset = -2;
+
+ TOKIO_RUNTIME.block_on(async {
+ self.inner
+ .subscribe(bucket_id, start_offset)
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))
+ })?;
+ }
+
+ Ok(())
+ }
+
+ /// Convert all data to Arrow Table
+ fn to_arrow(&self, py: Python) -> PyResult<PyObject> {
+ use std::collections::HashMap;
+ use std::time::Duration;
+
+ let mut all_batches = Vec::new();
+
+ let num_buckets = self.table_info.get_num_buckets();
+ let bucket_ids: Vec<i32> = (0..num_buckets).collect();
+
+ // todo: after supporting list_offsets with timestamp, we can use
start_timestamp and end_timestamp here
+ let target_offsets: HashMap<i32, i64> = if !bucket_ids.is_empty() {
Review Comment:
`bucket_ids` should never be empty
##########
bindings/python/src/table.rs:
##########
@@ -0,0 +1,433 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::TOKIO_RUNTIME;
+use crate::*;
+use pyo3_async_runtimes::tokio::future_into_py;
+use std::collections::HashMap;
+use std::sync::Arc;
+
+/// Represents a Fluss table for data operations
+#[pyclass]
+pub struct FlussTable {
+ connection: Arc<fcore::client::FlussConnection>,
+ metadata: Arc<fcore::client::Metadata>,
+ table_info: fcore::metadata::TableInfo,
+ table_path: fcore::metadata::TablePath,
+ has_primary_key: bool,
+}
+
+#[pymethods]
+impl FlussTable {
+ /// Create a new append writer for the table
+ fn new_append_writer<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py,
PyAny>> {
+ let conn = self.connection.clone();
+ let metadata = self.metadata.clone();
+ let table_info = self.table_info.clone();
+
+ future_into_py(py, async move {
+ let fluss_table = fcore::client::FlussTable::new(&conn, metadata,
table_info);
+
+ let table_append = fluss_table
+ .new_append()
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+ let rust_writer = table_append.create_writer();
+
+ let py_writer = AppendWriter::from_core(rust_writer);
+
+ Python::with_gil(|py| Py::new(py, py_writer))
+ })
+ }
+
+ /// Create a new log scanner for the table
+ fn new_log_scanner<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py,
PyAny>> {
+ let conn = self.connection.clone();
+ let metadata = self.metadata.clone();
+ let table_info = self.table_info.clone();
+
+ future_into_py(py, async move {
+ let fluss_table =
+ fcore::client::FlussTable::new(&conn, metadata.clone(),
table_info.clone());
+
+ let table_scan = fluss_table.new_scan();
+
+ let rust_scanner = table_scan.create_log_scanner();
+
+ let py_scanner = LogScanner::from_core(rust_scanner,
table_info.clone());
+
+ Python::with_gil(|py| Py::new(py, py_scanner))
+ })
+ }
+
+ /// synchronous version of new_log_scanner
+ fn new_log_scanner_sync(&self) -> PyResult<LogScanner> {
+ let conn = self.connection.clone();
+ let metadata = self.metadata.clone();
+ let table_info = self.table_info.clone();
+
+ let rust_scanner = TOKIO_RUNTIME.block_on(async {
+ let fluss_table =
+ fcore::client::FlussTable::new(&conn, metadata.clone(),
table_info.clone());
+
+ let table_scan = fluss_table.new_scan();
+ table_scan.create_log_scanner()
+ });
+
+ let py_scanner = LogScanner::from_core(rust_scanner,
table_info.clone());
+
+ Ok(py_scanner)
+ }
+
+ /// Get table information
+ pub fn get_table_info(&self) -> TableInfo {
+ TableInfo::from_core(self.table_info.clone())
+ }
+
+ /// Get table path
+ pub fn get_table_path(&self) -> TablePath {
+ TablePath::from_core(self.table_path.clone())
+ }
+
+ /// Check if table has primary key
+ pub fn has_primary_key(&self) -> bool {
+ self.has_primary_key
+ }
+
+ fn __repr__(&self) -> String {
+ format!(
+ "FlussTable(path={}.{})",
+ self.table_path.database(),
+ self.table_path.table()
+ )
+ }
+}
+
+impl FlussTable {
+ /// Create a FlussTable
+ pub fn new_table(
+ connection: Arc<fcore::client::FlussConnection>,
+ metadata: Arc<fcore::client::Metadata>,
+ table_info: fcore::metadata::TableInfo,
+ table_path: fcore::metadata::TablePath,
+ has_primary_key: bool,
+ ) -> Self {
+ Self {
+ connection,
+ metadata,
+ table_info,
+ table_path,
+ has_primary_key,
+ }
+ }
+}
+
+/// Writer for appending data to a Fluss table
+#[pyclass]
+pub struct AppendWriter {
+ inner: fcore::client::AppendWriter,
+}
+
+#[pymethods]
+impl AppendWriter {
+ /// Write Arrow table data
+ pub fn write_arrow(&mut self, py: Python, table: PyObject) -> PyResult<()>
{
+ // Convert Arrow Table to batches and write each batch
+ let batches = table.call_method0(py, "to_batches")?;
+ let batch_list: Vec<PyObject> = batches.extract(py)?;
+
+ for batch in batch_list {
+ self.write_arrow_batch(py, batch)?;
+ }
+ Ok(())
+ }
+
+ /// Write Arrow batch data
+ pub fn write_arrow_batch(&mut self, py: Python, batch: PyObject) ->
PyResult<()> {
+ // Extract number of rows and columns from the Arrow batch
+ let num_rows: usize = batch.getattr(py, "num_rows")?.extract(py)?;
+ let num_columns: usize = batch.getattr(py,
"num_columns")?.extract(py)?;
+
+ // Process each row in the batch
+ for row_idx in 0..num_rows {
+ let mut generic_row = fcore::row::GenericRow::new();
+
+ // Extract values for each column in this row
+ for col_idx in 0..num_columns {
+ let column = batch.call_method1(py, "column", (col_idx,))?;
+ let value = column.call_method1(py, "__getitem__",
(row_idx,))?;
+
+ // Convert the Python value to a Datum and add to the row
+ let datum = self.convert_python_value_to_datum(py, value)?;
+ generic_row.set_field(col_idx, datum);
+ }
+
+ // Append this row using the async append method
+ TOKIO_RUNTIME.block_on(async {
+ self.inner
+ .append(generic_row)
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))
+ })?;
+ }
+
+ Ok(())
+ }
+
+ /// Write Pandas DataFrame data
+ pub fn write_pandas(&mut self, py: Python, df: PyObject) -> PyResult<()> {
+ // Import pyarrow module
+ let pyarrow = py.import("pyarrow")?;
+
+ // Get the Table class from pyarrow module
+ let table_class = pyarrow.getattr("Table")?;
+
+ // Call Table.from_pandas(df) - from_pandas is a class method
+ let pa_table = table_class.call_method1("from_pandas", (df,))?;
+
+ // Then call write_arrow with the converted table
+ self.write_arrow(py, pa_table.into())
+ }
+
+ /// Flush any pending data
+ pub fn flush(&mut self) -> PyResult<()> {
+ TOKIO_RUNTIME.block_on(async {
+ self.inner
+ .flush()
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))
+ })
+ }
+
+ fn __repr__(&self) -> String {
+ "AppendWriter()".to_string()
+ }
+}
+
+impl AppendWriter {
+ /// Create a AppendWriter from a core append writer
+ pub fn from_core(append: fcore::client::AppendWriter) -> Self {
+ Self { inner: append }
+ }
+
+ fn convert_python_value_to_datum(
+ &self,
+ py: Python,
+ value: PyObject,
+ ) -> PyResult<fcore::row::Datum<'static>> {
+ use fcore::row::{Blob, Datum, F32, F64};
+
+ // Check for None (null)
+ if value.is_none(py) {
+ return Ok(Datum::Null);
+ }
+
+ // Try to extract different types
+ if let Ok(type_name) = value.bind(py).get_type().name() {
+ if type_name == "StringScalar" {
+ if let Ok(py_value) = value.call_method0(py, "as_py") {
+ if let Ok(str_val) = py_value.extract::<String>(py) {
+ let leaked_str: &'static str =
Box::leak(str_val.into_boxed_str());
+ return Ok(Datum::String(leaked_str));
+ }
+ }
+ }
+ }
+
+ if let Ok(bool_val) = value.extract::<bool>(py) {
+ return Ok(Datum::Bool(bool_val));
+ }
+
+ if let Ok(int_val) = value.extract::<i32>(py) {
+ return Ok(Datum::Int32(int_val));
+ }
+
+ if let Ok(int_val) = value.extract::<i64>(py) {
+ return Ok(Datum::Int64(int_val));
+ }
+
+ if let Ok(float_val) = value.extract::<f32>(py) {
+ return Ok(Datum::Float32(F32::from(float_val)));
+ }
+
+ if let Ok(float_val) = value.extract::<f64>(py) {
+ return Ok(Datum::Float64(F64::from(float_val)));
+ }
+
+ if let Ok(str_val) = value.extract::<String>(py) {
+ // Convert String to &'static str by leaking memory
+ // This is a simplified approach - in production, you might want
better lifetime management
+ let leaked_str: &'static str = Box::leak(str_val.into_boxed_str());
+ return Ok(Datum::String(leaked_str));
+ }
+
+ if let Ok(bytes_val) = value.extract::<Vec<u8>>(py) {
+ let blob = Blob::from(bytes_val);
+ return Ok(Datum::Blob(blob));
+ }
+
+ // If we can't convert, return an error
+ let type_name = value.bind(py).get_type().name()?;
+ Err(FlussError::new_err(format!(
+ "Cannot convert Python value to Datum: {:?}",
+ type_name
+ )))
+ }
+}
+
+/// Scanner for reading log data from a Fluss table
+#[pyclass]
+pub struct LogScanner {
+ inner: fcore::client::LogScanner,
+ table_info: fcore::metadata::TableInfo,
+ #[allow(dead_code)]
+ start_timestamp: Option<i64>,
+ #[allow(dead_code)]
+ end_timestamp: Option<i64>,
+}
+
+#[pymethods]
+impl LogScanner {
+ /// Subscribe to log data with timestamp range
+ fn subscribe(
+ &mut self,
+ _start_timestamp: Option<i64>,
+ _end_timestamp: Option<i64>,
+ ) -> PyResult<()> {
+ if _start_timestamp.is_some() {
+ return Err(FlussError::new_err(
+ "Specifying start_timestamp is not yet supported. Please use
None.".to_string(),
+ ));
+ }
+ if _end_timestamp.is_some() {
+ return Err(FlussError::new_err(
+ "Specifying end_timestamp is not yet supported. Please use
None.".to_string(),
+ ));
+ }
+
+ let num_buckets = self.table_info.get_num_buckets();
+ for bucket_id in 0..num_buckets {
+ // -2 for the earliest offset.
+ let start_offset = -2;
+
+ TOKIO_RUNTIME.block_on(async {
+ self.inner
+ .subscribe(bucket_id, start_offset)
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))
+ })?;
+ }
+
+ Ok(())
+ }
+
+ /// Convert all data to Arrow Table
+ fn to_arrow(&self, py: Python) -> PyResult<PyObject> {
+ use std::collections::HashMap;
+ use std::time::Duration;
+
+ let mut all_batches = Vec::new();
+
+ let num_buckets = self.table_info.get_num_buckets();
+ let bucket_ids: Vec<i32> = (0..num_buckets).collect();
+
+ // todo: after supporting list_offsets with timestamp, we can use
start_timestamp and end_timestamp here
+ let target_offsets: HashMap<i32, i64> = if !bucket_ids.is_empty() {
+ TOKIO_RUNTIME
+ .block_on(async {
self.inner.list_offsets_latest(bucket_ids).await })
+ .map_err(|e| FlussError::new_err(e.to_string()))?
+ } else {
+ HashMap::new()
+ };
+
+ let mut current_offsets: HashMap<i32, i64> = HashMap::new();
+
+ if !target_offsets.is_empty() {
+ loop {
+ let batch_result = TOKIO_RUNTIME
+ .block_on(async {
self.inner.poll(Duration::from_millis(1000)).await });
+
+ match batch_result {
+ Ok(scan_records) => {
+ if !scan_records.is_empty() {
Review Comment:
don't need `if`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]