This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 8a9a0b2  [feat] Create Python bindings for table writing and reading 
(#9)
8a9a0b2 is described below

commit 8a9a0b293afacdc855c96b089791f908bcd5ba6c
Author: naivedogger <[email protected]>
AuthorDate: Thu Oct 16 19:32:50 2025 +0800

    [feat] Create Python bindings for table writing and reading (#9)
    
    ---------
    
    Co-authored-by: luoyuxia <[email protected]>
---
 Cargo.toml                                   |   6 +-
 bindings/python/Cargo.toml                   |   3 +-
 bindings/python/fluss/__init__.py            |   2 +-
 bindings/python/pyproject.toml               |   2 +-
 bindings/python/src/admin.rs                 |  26 +-
 bindings/python/src/config.rs                |  33 +--
 bindings/python/src/connection.rs            |  45 ++-
 bindings/python/src/error.rs                 |   8 +-
 bindings/python/src/lib.rs                   |  23 +-
 bindings/python/src/metadata.rs              | 119 ++++----
 bindings/python/src/table.rs                 | 412 +++++++++++++++++++++++++++
 bindings/python/src/utils.rs                 | 105 ++++---
 crates/fluss/Cargo.toml                      |   7 +-
 crates/fluss/src/client/table/mod.rs         |  20 +-
 crates/fluss/src/client/table/scanner.rs     | 103 ++++++-
 crates/fluss/src/proto/fluss_api.proto       |  20 ++
 crates/fluss/src/record/mod.rs               |   5 +
 crates/fluss/src/row/column.rs               |   9 +
 crates/fluss/src/rpc/api_key.rs              |   3 +
 crates/fluss/src/rpc/message/list_offsets.rs | 124 ++++++++
 crates/fluss/src/rpc/message/mod.rs          |   2 +
 21 files changed, 909 insertions(+), 168 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 059236f..54436ac 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -28,9 +28,11 @@ rust-version = "1.85"
 
 [workspace]
 resolver = "2"
-members = ["crates/fluss", "crates/examples"]
+members = ["crates/fluss", "crates/examples", "bindings/python"]
 
 [workspace.dependencies]
 fluss = { version = "0.1.0", path = "./crates/fluss" }
 tokio = { version = "1.44.2", features = ["full"] }
-clap = { version = "4.5.37", features = ["derive"] }
\ No newline at end of file
+clap = { version = "4.5.37", features = ["derive"] }
+arrow = "55.1.0"
+chrono = { version = "0.4", features = ["clock", "std", "wasmbind"] }
diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml
index aee1a21..04826fb 100644
--- a/bindings/python/Cargo.toml
+++ b/bindings/python/Cargo.toml
@@ -26,8 +26,6 @@ rust-version = "1.85"
 name = "fluss"
 crate-type = ["cdylib"]
 
-[workspace]
-
 [dependencies]
 pyo3 = { version = "0.24", features = ["extension-module"] }
 fluss = { path = "../../crates/fluss" }
@@ -36,3 +34,4 @@ arrow = { workspace = true }
 arrow-pyarrow = "55.1.0"
 pyo3-async-runtimes = { version = "0.24.0", features = ["tokio-runtime"] }
 chrono = { workspace = true }
+once_cell = "1.21.3"
diff --git a/bindings/python/fluss/__init__.py 
b/bindings/python/fluss/__init__.py
index cceee10..098014a 100644
--- a/bindings/python/fluss/__init__.py
+++ b/bindings/python/fluss/__init__.py
@@ -15,6 +15,6 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from .fluss_python import *
+from ._fluss import *
 
 __version__ = "0.1.0"
diff --git a/bindings/python/pyproject.toml b/bindings/python/pyproject.toml
index fe9d588..e28b3d2 100644
--- a/bindings/python/pyproject.toml
+++ b/bindings/python/pyproject.toml
@@ -57,7 +57,7 @@ docs = [
 ]
 
 [tool.maturin]
-python-source = "python"
+python-source = "."
 module-name = "fluss._fluss"
 features = ["pyo3/extension-module"]
 
diff --git a/bindings/python/src/admin.rs b/bindings/python/src/admin.rs
index 7ec6eee..73b2dd3 100644
--- a/bindings/python/src/admin.rs
+++ b/bindings/python/src/admin.rs
@@ -15,9 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use pyo3::prelude::*;
-use pyo3_async_runtimes::tokio::future_into_py;
 use crate::*;
+use pyo3_async_runtimes::tokio::future_into_py;
 use std::sync::Arc;
 
 /// Administrative client for managing Fluss tables
@@ -38,16 +37,17 @@ impl FlussAdmin {
         ignore_if_exists: Option<bool>,
     ) -> PyResult<Bound<'py, PyAny>> {
         let ignore = ignore_if_exists.unwrap_or(false);
-        
+
         let core_table_path = table_path.to_core().clone();
         let core_descriptor = table_descriptor.to_core().clone();
         let admin = self.__admin.clone();
 
         future_into_py(py, async move {
-            admin.create_table(&core_table_path, &core_descriptor, ignore)
+            admin
+                .create_table(&core_table_path, &core_descriptor, ignore)
                 .await
                 .map_err(|e| FlussError::new_err(e.to_string()))?;
-        
+
             Python::with_gil(|py| Ok(py.None()))
         })
     }
@@ -60,10 +60,12 @@ impl FlussAdmin {
     ) -> PyResult<Bound<'py, PyAny>> {
         let core_table_path = table_path.to_core().clone();
         let admin = self.__admin.clone();
-        
+
         future_into_py(py, async move {
-            let core_table_info = admin.get_table(&core_table_path).await
-                .map_err(|e| FlussError::new_err(format!("Failed to get table: 
{}", e)))?;
+            let core_table_info = admin
+                .get_table(&core_table_path)
+                .await
+                .map_err(|e| FlussError::new_err(format!("Failed to get table: 
{e}")))?;
 
             Python::with_gil(|py| {
                 let table_info = TableInfo::from_core(core_table_info);
@@ -80,10 +82,12 @@ impl FlussAdmin {
     ) -> PyResult<Bound<'py, PyAny>> {
         let core_table_path = table_path.to_core().clone();
         let admin = self.__admin.clone();
-        
+
         future_into_py(py, async move {
-            let core_lake_snapshot = 
admin.get_latest_lake_snapshot(&core_table_path).await
-                .map_err(|e| FlussError::new_err(format!("Failed to get lake 
snapshot: {}", e)))?;
+            let core_lake_snapshot = admin
+                .get_latest_lake_snapshot(&core_table_path)
+                .await
+                .map_err(|e| FlussError::new_err(format!("Failed to get lake 
snapshot: {e}")))?;
 
             Python::with_gil(|py| {
                 let lake_snapshot = 
LakeSnapshot::from_core(core_lake_snapshot);
diff --git a/bindings/python/src/config.rs b/bindings/python/src/config.rs
index 08b20b4..70bd9cd 100644
--- a/bindings/python/src/config.rs
+++ b/bindings/python/src/config.rs
@@ -15,9 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use pyo3::prelude::*;
-use pyo3::types::PyDict;
 use crate::*;
+use pyo3::types::PyDict;
 
 /// Configuration for Fluss client
 #[pyclass]
@@ -33,7 +32,7 @@ impl Config {
     #[pyo3(signature = (properties = None))]
     fn new(properties: Option<&Bound<'_, PyDict>>) -> PyResult<Self> {
         let mut config = fcore::config::Config::default();
-        
+
         if let Some(props) = properties {
             for item in props.iter() {
                 let key: String = item.0.extract()?;
@@ -42,67 +41,65 @@ impl Config {
                 match key.as_str() {
                     "bootstrap.servers" => {
                         config.bootstrap_server = Some(value);
-                    },
+                    }
                     "request.max.size" => {
                         if let Ok(size) = value.parse::<i32>() {
                             config.request_max_size = size;
                         }
-                    },
+                    }
                     "writer.acks" => {
                         config.writer_acks = value;
-                    },
+                    }
                     "writer.retries" => {
                         if let Ok(retries) = value.parse::<i32>() {
                             config.writer_retries = retries;
                         }
-                    },
+                    }
                     "writer.batch.size" => {
                         if let Ok(size) = value.parse::<i32>() {
                             config.writer_batch_size = size;
                         }
-                    },
+                    }
                     _ => {
-                        return Err(FlussError::new_err(format!("Unknown 
property: {}", key)));
+                        return Err(FlussError::new_err(format!("Unknown 
property: {key}")));
                     }
                 }
             }
         }
 
-        Ok(Self {
-            inner: config,
-        })
+        Ok(Self { inner: config })
     }
-    
+
     /// Get the bootstrap server
     #[getter]
     fn bootstrap_server(&self) -> Option<String> {
         self.inner.bootstrap_server.clone()
     }
-    
+
     /// Set the bootstrap server
     #[setter]
     fn set_bootstrap_server(&mut self, server: String) {
         self.inner.bootstrap_server = Some(server);
     }
-    
+
     /// Get the request max size
     #[getter]
     fn request_max_size(&self) -> i32 {
         self.inner.request_max_size
     }
-    
+
     /// Set the request max size
     #[setter]
     fn set_request_max_size(&mut self, size: i32) {
         self.inner.request_max_size = size;
     }
-    
+
     /// Get the writer batch size
     #[getter]
     fn writer_batch_size(&self) -> i32 {
         self.inner.writer_batch_size
     }
-    
+
     /// Set the writer batch size
     #[setter]
     fn set_writer_batch_size(&mut self, size: i32) {
diff --git a/bindings/python/src/connection.rs 
b/bindings/python/src/connection.rs
index ba1fa50..aeb8410 100644
--- a/bindings/python/src/connection.rs
+++ b/bindings/python/src/connection.rs
@@ -15,10 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use pyo3::prelude::*;
 use crate::*;
-use std::sync::Arc;
 use pyo3_async_runtimes::tokio::future_into_py;
+use std::sync::Arc;
 
 /// Connection to a Fluss cluster
 #[pyclass]
@@ -37,55 +36,55 @@ impl FlussConnection {
             let connection = fcore::client::FlussConnection::new(rust_config)
                 .await
                 .map_err(|e| FlussError::new_err(e.to_string()))?;
-        
+
             let py_connection = FlussConnection {
                 inner: Arc::new(connection),
             };
 
-            Python::with_gil(|py| {
-                Py::new(py, py_connection)
-            })
+            Python::with_gil(|py| Py::new(py, py_connection))
         })
     }
-    
+
     /// Get admin interface
     fn get_admin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
         let client = self.inner.clone();
 
         future_into_py(py, async move {
-            let admin = client.get_admin()
+            let admin = client
+                .get_admin()
                 .await
                 .map_err(|e| FlussError::new_err(e.to_string()))?;
 
             let py_admin = FlussAdmin::from_core(admin);
 
-            Python::with_gil(|py| {
-                Py::new(py, py_admin)
-            })
+            Python::with_gil(|py| Py::new(py, py_admin))
         })
     }
 
     /// Get a table
-    fn get_table<'py>(&self, py: Python<'py>, table_path: &TablePath) -> 
PyResult<Bound<'py, PyAny>> {
+    fn get_table<'py>(
+        &self,
+        py: Python<'py>,
+        table_path: &TablePath,
+    ) -> PyResult<Bound<'py, PyAny>> {
         let client = self.inner.clone();
         let core_path = table_path.to_core().clone();
 
         future_into_py(py, async move {
-            let core_table = client.get_table(&core_path)
+            let core_table = client
+                .get_table(&core_path)
                 .await
                 .map_err(|e| FlussError::new_err(e.to_string()))?;
-        
+
             let py_table = FlussTable::new_table(
-                client,
-                core_table.metadata,
-                core_table.table_info,
-                core_table.table_path,
-                core_table.has_primary_key,
+                client.clone(),
+                core_table.metadata().clone(),
+                core_table.table_info().clone(),
+                core_table.table_path().clone(),
+                core_table.has_primary_key(),
             );
 
-            Python::with_gil(|py| {
-                Py::new(py, py_table)
-            })
+            Python::with_gil(|py| Py::new(py, py_table))
         })
     }
 
@@ -98,7 +97,7 @@ impl FlussConnection {
     fn __enter__(slf: PyRef<Self>) -> PyRef<Self> {
         slf
     }
-    
+
     // Exit the runtime context (for 'with' statement)
     #[pyo3(signature = (_exc_type=None, _exc_value=None, _traceback=None))]
     fn __exit__(
diff --git a/bindings/python/src/error.rs b/bindings/python/src/error.rs
index 2db2991..35d9d91 100644
--- a/bindings/python/src/error.rs
+++ b/bindings/python/src/error.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use pyo3::exceptions::PyException;
 use pyo3::prelude::*;
 
 /// Fluss errors
@@ -27,6 +28,11 @@ pub struct FlussError {
 
 #[pymethods]
 impl FlussError {
+    #[new]
+    fn new(message: String) -> Self {
+        Self { message }
+    }
+
     fn __str__(&self) -> String {
         format!("FlussError: {}", self.message)
     }
@@ -36,4 +42,4 @@ impl FlussError {
     pub fn new_err(message: impl ToString) -> PyErr {
         PyErr::new::<FlussError, _>(message.to_string())
     }
-}
\ No newline at end of file
+}
diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs
index 0d8b7a5..63e84b1 100644
--- a/bindings/python/src/lib.rs
+++ b/bindings/python/src/lib.rs
@@ -16,24 +16,24 @@
 // under the License.
 
 pub use ::fluss as fcore;
-use pyo3::prelude::*;
 use once_cell::sync::Lazy;
+use pyo3::prelude::*;
 use tokio::runtime::Runtime;
 
+mod admin;
 mod config;
 mod connection;
-mod table;
-mod admin;
-mod types;
 mod error;
+mod metadata;
+mod table;
 mod utils;
 
+pub use admin::*;
 pub use config::*;
 pub use connection::*;
-pub use table::*;
-pub use admin::*;
-pub use types::*;
 pub use error::*;
+pub use metadata::*;
+pub use table::*;
 pub use utils::*;
 
 static TOKIO_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
@@ -44,7 +44,7 @@ static TOKIO_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
 });
 
 #[pymodule]
-fn fluss_python(m: &Bound<'_, PyModule>) -> PyResult<()> {
+fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> {
     // Register all classes
     m.add_class::<Config>()?;
     m.add_class::<FlussConnection>()?;
@@ -58,10 +58,9 @@ fn fluss_python(m: &Bound<'_, PyModule>) -> PyResult<()> {
     m.add_class::<LogScanner>()?;
     m.add_class::<LakeSnapshot>()?;
     m.add_class::<TableBucket>()?;
-    
+
     // Register exception types
-    // TODO: maybe implement a separate module for exceptions
-    m.add("FlussError", m.py().get_type::<FlussError>())?;
-    
+    m.add_class::<FlussError>()?;
+
     Ok(())
 }
diff --git a/bindings/python/src/metadata.rs b/bindings/python/src/metadata.rs
index 238dde2..66748ab 100644
--- a/bindings/python/src/metadata.rs
+++ b/bindings/python/src/metadata.rs
@@ -15,7 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use pyo3::prelude::*;
 use crate::*;
 use pyo3::types::PyDict;
 use std::collections::HashMap;
@@ -38,13 +37,13 @@ impl TablePath {
             table_name,
         }
     }
-    
+
     /// Get the database name
     #[getter]
     pub fn database_name(&self) -> String {
         self.database_name.clone()
     }
-    
+
     /// Get the table name  
     #[getter]
     pub fn table_name(&self) -> String {
@@ -59,7 +58,7 @@ impl TablePath {
     pub fn __str__(&self) -> String {
         self.table_path_str()
     }
-    
+
     fn __repr__(&self) -> String {
         format!("TablePath('{}', '{}')", self.database_name, self.table_name)
     }
@@ -68,7 +67,7 @@ impl TablePath {
     pub fn __hash__(&self) -> u64 {
         use std::collections::hash_map::DefaultHasher;
         use std::hash::{Hash, Hasher};
-        
+
         let mut hasher = DefaultHasher::new();
         self.database_name.hash(&mut hasher);
         self.table_name.hash(&mut hasher);
@@ -77,8 +76,7 @@ impl TablePath {
 
     /// Equality implementation for Python
     pub fn __eq__(&self, other: &TablePath) -> bool {
-        self.database_name == other.database_name 
-            && self.table_name == other.table_name
+        self.database_name == other.database_name && self.table_name == 
other.table_name
     }
 }
 
@@ -112,27 +110,28 @@ impl Schema {
         primary_keys: Option<Vec<String>>,
     ) -> PyResult<Self> {
         let arrow_schema = 
crate::utils::Utils::pyarrow_to_arrow_schema(&schema)?;
-        
+
         let mut builder = fcore::metadata::Schema::builder();
-        
+
         for field in arrow_schema.fields() {
             let fluss_data_type = 
crate::utils::Utils::arrow_type_to_fluss_type(field.data_type())?;
             builder = builder.column(field.name(), fluss_data_type);
-            
+
             if let Some(comment) = field.metadata().get("comment") {
                 builder = builder.with_comment(comment);
             }
         }
-        
+
         if let Some(pk_columns) = primary_keys {
             if !pk_columns.is_empty() {
                 builder = builder.primary_key(pk_columns);
             }
         }
-        
-        let fluss_schema = builder.build()
-            .map_err(|e| FlussError::new_err(format!("Failed to build schema: 
{}", e)))?;
-        
+
+        let fluss_schema = builder
+            .build()
+            .map_err(|e| FlussError::new_err(format!("Failed to build schema: 
{e}")))?;
+
         Ok(Self {
             __schema: fluss_schema,
         })
@@ -140,20 +139,33 @@ impl Schema {
 
     /// Get column names
     fn get_column_names(&self) -> Vec<String> {
-        self.__schema.columns().iter().map(|col| 
col.name().to_string()).collect()
+        self.__schema
+            .columns()
+            .iter()
+            .map(|col| col.name().to_string())
+            .collect()
     }
 
     /// Get column types
     fn get_column_types(&self) -> Vec<String> {
-        self.__schema.columns().iter()
+        self.__schema
+            .columns()
+            .iter()
             .map(|col| Utils::datatype_to_string(col.data_type()))
             .collect()
     }
 
     /// Get columns as (name, type) pairs
     fn get_columns(&self) -> Vec<(String, String)> {
-        self.__schema.columns().iter()
-            .map(|col| (col.name().to_string(), 
Utils::datatype_to_string(col.data_type())))
+        self.__schema
+            .columns()
+            .iter()
+            .map(|col| {
+                (
+                    col.name().to_string(),
+                    Utils::datatype_to_string(col.data_type()),
+                )
+            })
             .collect()
     }
 
@@ -190,7 +202,6 @@ impl TableDistribution {
     }
 }
 
-
 /// Table descriptor containing schema and metadata
 #[pyclass]
 #[derive(Clone)]
@@ -204,7 +215,7 @@ impl TableDescriptor {
     #[new]
     #[pyo3(signature = (schema, **kwargs))]
     pub fn new(
-        schema: &Schema,  // fluss schema
+        schema: &Schema, // fluss schema
         kwargs: Option<&Bound<'_, PyDict>>,
     ) -> PyResult<Self> {
         let mut partition_keys = Vec::new();
@@ -237,18 +248,22 @@ impl TableDescriptor {
             }
             if let Ok(Some(lformat)) = kwargs.get_item("log_format") {
                 let format_str: String = lformat.extract()?;
-                log_format = 
Some(fcore::metadata::LogFormat::parse(&format_str)
-                    .map_err(|e| FlussError::new_err(e.to_string()))?);
+                log_format = Some(
+                    fcore::metadata::LogFormat::parse(&format_str)
+                        .map_err(|e| FlussError::new_err(e.to_string()))?,
+                );
             }
             if let Ok(Some(kformat)) = kwargs.get_item("kv_format") {
                 let format_str: String = kformat.extract()?;
-                kv_format = Some(fcore::metadata::KvFormat::parse(&format_str)
-                    .map_err(|e| FlussError::new_err(e.to_string()))?);
+                kv_format = Some(
+                    fcore::metadata::KvFormat::parse(&format_str)
+                        .map_err(|e| FlussError::new_err(e.to_string()))?,
+                );
             }
         }
 
         let fluss_schema = schema.to_core().clone();
-        
+
         let mut builder = fcore::metadata::TableDescriptor::builder()
             .schema(fluss_schema)
             .properties(properties)
@@ -266,8 +281,9 @@ impl TableDescriptor {
             builder = builder.kv_format(kv_format);
         }
 
-        let core_descriptor = builder.build()
-            .map_err(|e| FlussError::new_err(format!("Failed to build 
TableDescriptor: {}", e)))?;
+        let core_descriptor = builder
+            .build()
+            .map_err(|e| FlussError::new_err(format!("Failed to build 
TableDescriptor: {e}")))?;
 
         Ok(Self {
             __tbl_desc: core_descriptor,
@@ -303,13 +319,13 @@ impl TableInfo {
     pub fn table_id(&self) -> i64 {
         self.__table_info.get_table_id()
     }
-    
+
     /// Get the schema ID
     #[getter]
     pub fn schema_id(&self) -> i32 {
         self.__table_info.get_schema_id()
     }
-    
+
     /// Get the table path
     #[getter]
     pub fn table_path(&self) -> TablePath {
@@ -321,13 +337,13 @@ impl TableInfo {
     pub fn created_time(&self) -> i64 {
         self.__table_info.get_created_time()
     }
-    
+
     /// Get the modified time
     #[getter]
     pub fn modified_time(&self) -> i64 {
         self.__table_info.get_modified_time()
     }
-    
+
     /// Get the primary keys
     pub fn get_primary_keys(&self) -> Vec<String> {
         self.__table_info.get_primary_keys().clone()
@@ -384,7 +400,10 @@ impl TableInfo {
 
     /// Get column names
     pub fn get_column_names(&self) -> Vec<String> {
-        self.__table_info.get_schema().columns().iter()
+        self.__table_info
+            .get_schema()
+            .columns()
+            .iter()
             .map(|col| col.name().to_string())
             .collect()
     }
@@ -398,9 +417,7 @@ impl TableInfo {
 impl TableInfo {
     /// Create from core TableInfo (internal use)
     pub fn from_core(info: fcore::metadata::TableInfo) -> Self {
-        Self {
-            __table_info: info,
-        }
+        Self { __table_info: info }
     }
 }
 
@@ -414,7 +431,7 @@ pub struct LakeSnapshot {
 
 /// Represents a table bucket with table ID, partition ID, and bucket ID
 #[pyclass]
-#[derive(Clone)]
+#[derive(Eq, Hash, PartialEq, Clone)]
 pub struct TableBucket {
     table_id: i64,
     partition_id: Option<i64>,
@@ -464,11 +481,15 @@ impl TableBucket {
     /// String representation
     pub fn __str__(&self) -> String {
         if let Some(partition_id) = self.partition_id {
-            format!("TableBucket(table_id={}, partition_id={}, bucket={})", 
-                    self.table_id, partition_id, self.bucket)
+            format!(
+                "TableBucket(table_id={}, partition_id={}, bucket={})",
+                self.table_id, partition_id, self.bucket
+            )
         } else {
-            format!("TableBucket(table_id={}, bucket={})", 
-                    self.table_id, self.bucket)
+            format!(
+                "TableBucket(table_id={}, bucket={})",
+                self.table_id, self.bucket
+            )
         }
     }
 
@@ -481,7 +502,7 @@ impl TableBucket {
     pub fn __hash__(&self) -> u64 {
         use std::collections::hash_map::DefaultHasher;
         use std::hash::{Hash, Hasher};
-        
+
         let mut hasher = DefaultHasher::new();
         self.table_id.hash(&mut hasher);
         self.partition_id.hash(&mut hasher);
@@ -491,8 +512,8 @@ impl TableBucket {
 
     /// Equality implementation for Python
     pub fn __eq__(&self, other: &TableBucket) -> bool {
-        self.table_id == other.table_id 
-            && self.partition_id == other.partition_id 
+        self.table_id == other.table_id
+            && self.partition_id == other.partition_id
             && self.bucket == other.bucket
     }
 }
@@ -509,7 +530,7 @@ impl TableBucket {
 
     /// Convert to core TableBucket (internal use)
     pub fn to_core(&self) -> fcore::metadata::TableBucket {
-        fcore::metadata::TableBucket::new(self.table_id, self.partition_id, 
self.bucket)
+        fcore::metadata::TableBucket::new(self.table_id, self.bucket)
     }
 }
 
@@ -559,8 +580,11 @@ impl LakeSnapshot {
 
     /// String representation
     pub fn __str__(&self) -> String {
-        format!("LakeSnapshot(snapshot_id={}, buckets_count={})", 
-                self.snapshot_id, self.table_buckets_offset.len())
+        format!(
+            "LakeSnapshot(snapshot_id={}, buckets_count={})",
+            self.snapshot_id,
+            self.table_buckets_offset.len()
+        )
     }
 
     /// String representation
@@ -578,4 +602,3 @@ impl LakeSnapshot {
         }
     }
 }
-
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
new file mode 100644
index 0000000..98943b9
--- /dev/null
+++ b/bindings/python/src/table.rs
@@ -0,0 +1,412 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::TOKIO_RUNTIME;
+use crate::*;
+use pyo3_async_runtimes::tokio::future_into_py;
+use std::collections::HashSet;
+use std::sync::Arc;
+
+const EARLIEST_OFFSET: i64 = -2;
+
+/// Represents a Fluss table for data operations
+#[pyclass]
+pub struct FlussTable {
+    connection: Arc<fcore::client::FlussConnection>,
+    metadata: Arc<fcore::client::Metadata>,
+    table_info: fcore::metadata::TableInfo,
+    table_path: fcore::metadata::TablePath,
+    has_primary_key: bool,
+}
+
+#[pymethods]
+impl FlussTable {
+    /// Create a new append writer for the table
+    fn new_append_writer<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, 
PyAny>> {
+        let conn = self.connection.clone();
+        let metadata = self.metadata.clone();
+        let table_info = self.table_info.clone();
+
+        future_into_py(py, async move {
+            let fluss_table = fcore::client::FlussTable::new(&conn, metadata, 
table_info);
+
+            let table_append = fluss_table
+                .new_append()
+                .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+            let rust_writer = table_append.create_writer();
+
+            let py_writer = AppendWriter::from_core(rust_writer);
+
+            Python::with_gil(|py| Py::new(py, py_writer))
+        })
+    }
+
+    /// Create a new log scanner for the table
+    fn new_log_scanner<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, 
PyAny>> {
+        let conn = self.connection.clone();
+        let metadata = self.metadata.clone();
+        let table_info = self.table_info.clone();
+
+        future_into_py(py, async move {
+            let fluss_table =
+                fcore::client::FlussTable::new(&conn, metadata.clone(), 
table_info.clone());
+
+            let table_scan = fluss_table.new_scan();
+
+            let rust_scanner = table_scan.create_log_scanner();
+
+            let py_scanner = LogScanner::from_core(rust_scanner, 
table_info.clone());
+
+            Python::with_gil(|py| Py::new(py, py_scanner))
+        })
+    }
+
+    /// Get table information
+    pub fn get_table_info(&self) -> TableInfo {
+        TableInfo::from_core(self.table_info.clone())
+    }
+
+    /// Get table path
+    pub fn get_table_path(&self) -> TablePath {
+        TablePath::from_core(self.table_path.clone())
+    }
+
+    /// Check if table has primary key
+    pub fn has_primary_key(&self) -> bool {
+        self.has_primary_key
+    }
+
+    fn __repr__(&self) -> String {
+        format!(
+            "FlussTable(path={}.{})",
+            self.table_path.database(),
+            self.table_path.table()
+        )
+    }
+}
+
+impl FlussTable {
+    /// Create a FlussTable
+    pub fn new_table(
+        connection: Arc<fcore::client::FlussConnection>,
+        metadata: Arc<fcore::client::Metadata>,
+        table_info: fcore::metadata::TableInfo,
+        table_path: fcore::metadata::TablePath,
+        has_primary_key: bool,
+    ) -> Self {
+        Self {
+            connection,
+            metadata,
+            table_info,
+            table_path,
+            has_primary_key,
+        }
+    }
+}
+
+/// Writer for appending data to a Fluss table
+#[pyclass]
+pub struct AppendWriter {
+    inner: fcore::client::AppendWriter,
+}
+
+#[pymethods]
+impl AppendWriter {
+    /// Write Arrow table data
+    pub fn write_arrow(&mut self, py: Python, table: PyObject) -> PyResult<()> 
{
+        // Convert Arrow Table to batches and write each batch
+        let batches = table.call_method0(py, "to_batches")?;
+        let batch_list: Vec<PyObject> = batches.extract(py)?;
+
+        for batch in batch_list {
+            self.write_arrow_batch(py, batch)?;
+        }
+        Ok(())
+    }
+
+    /// Write Arrow batch data
+    pub fn write_arrow_batch(&mut self, py: Python, batch: PyObject) -> 
PyResult<()> {
+        // Extract number of rows and columns from the Arrow batch
+        let num_rows: usize = batch.getattr(py, "num_rows")?.extract(py)?;
+        let num_columns: usize = batch.getattr(py, 
"num_columns")?.extract(py)?;
+
+        // Process each row in the batch
+        for row_idx in 0..num_rows {
+            let mut generic_row = fcore::row::GenericRow::new();
+
+            // Extract values for each column in this row
+            for col_idx in 0..num_columns {
+                let column = batch.call_method1(py, "column", (col_idx,))?;
+                let value = column.call_method1(py, "__getitem__", 
(row_idx,))?;
+
+                // Convert the Python value to a Datum and add to the row
+                let datum = self.convert_python_value_to_datum(py, value)?;
+                generic_row.set_field(col_idx, datum);
+            }
+
+            // Append this row using the async append method
+            TOKIO_RUNTIME.block_on(async {
+                self.inner
+                    .append(generic_row)
+                    .await
+                    .map_err(|e| FlussError::new_err(e.to_string()))
+            })?;
+        }
+
+        Ok(())
+    }
+
+    /// Write Pandas DataFrame data
+    pub fn write_pandas(&mut self, py: Python, df: PyObject) -> PyResult<()> {
+        // Import pyarrow module
+        let pyarrow = py.import("pyarrow")?;
+
+        // Get the Table class from pyarrow module
+        let table_class = pyarrow.getattr("Table")?;
+
+        // Call Table.from_pandas(df) - from_pandas is a class method
+        let pa_table = table_class.call_method1("from_pandas", (df,))?;
+
+        // Then call write_arrow with the converted table
+        self.write_arrow(py, pa_table.into())
+    }
+
+    /// Flush any pending data
+    pub fn flush(&mut self) -> PyResult<()> {
+        TOKIO_RUNTIME.block_on(async {
+            self.inner
+                .flush()
+                .await
+                .map_err(|e| FlussError::new_err(e.to_string()))
+        })
+    }
+
+    fn __repr__(&self) -> String {
+        "AppendWriter()".to_string()
+    }
+}
+
+impl AppendWriter {
+    /// Create a AppendWriter from a core append writer
+    pub fn from_core(append: fcore::client::AppendWriter) -> Self {
+        Self { inner: append }
+    }
+
+    fn convert_python_value_to_datum(
+        &self,
+        py: Python,
+        value: PyObject,
+    ) -> PyResult<fcore::row::Datum<'static>> {
+        use fcore::row::{Blob, Datum, F32, F64};
+
+        // Check for None (null)
+        if value.is_none(py) {
+            return Ok(Datum::Null);
+        }
+
+        // Try to extract different types
+        if let Ok(type_name) = value.bind(py).get_type().name() {
+            if type_name == "StringScalar" {
+                if let Ok(py_value) = value.call_method0(py, "as_py") {
+                    if let Ok(str_val) = py_value.extract::<String>(py) {
+                        let leaked_str: &'static str = 
Box::leak(str_val.into_boxed_str());
+                        return Ok(Datum::String(leaked_str));
+                    }
+                }
+            }
+        }
+
+        if let Ok(bool_val) = value.extract::<bool>(py) {
+            return Ok(Datum::Bool(bool_val));
+        }
+
+        if let Ok(int_val) = value.extract::<i32>(py) {
+            return Ok(Datum::Int32(int_val));
+        }
+
+        if let Ok(int_val) = value.extract::<i64>(py) {
+            return Ok(Datum::Int64(int_val));
+        }
+
+        if let Ok(float_val) = value.extract::<f32>(py) {
+            return Ok(Datum::Float32(F32::from(float_val)));
+        }
+
+        if let Ok(float_val) = value.extract::<f64>(py) {
+            return Ok(Datum::Float64(F64::from(float_val)));
+        }
+
+        if let Ok(str_val) = value.extract::<String>(py) {
+            // Convert String to &'static str by leaking memory
+            // This is a simplified approach - in production, you might want 
better lifetime management
+            let leaked_str: &'static str = Box::leak(str_val.into_boxed_str());
+            return Ok(Datum::String(leaked_str));
+        }
+
+        if let Ok(bytes_val) = value.extract::<Vec<u8>>(py) {
+            let blob = Blob::from(bytes_val);
+            return Ok(Datum::Blob(blob));
+        }
+
+        // If we can't convert, return an error
+        let type_name = value.bind(py).get_type().name()?;
+        Err(FlussError::new_err(format!(
+            "Cannot convert Python value to Datum: {type_name:?}"
+        )))
+    }
+}
+
+/// Scanner for reading log data from a Fluss table
+#[pyclass]
+pub struct LogScanner {
+    inner: fcore::client::LogScanner,
+    table_info: fcore::metadata::TableInfo,
+    #[allow(dead_code)]
+    start_timestamp: Option<i64>,
+    #[allow(dead_code)]
+    end_timestamp: Option<i64>,
+}
+
+#[pymethods]
+impl LogScanner {
+    /// Subscribe to log data with timestamp range
+    fn subscribe(
+        &mut self,
+        _start_timestamp: Option<i64>,
+        _end_timestamp: Option<i64>,
+    ) -> PyResult<()> {
+        if _start_timestamp.is_some() {
+            return Err(FlussError::new_err(
+                "Specifying start_timestamp is not yet supported. Please use 
None.".to_string(),
+            ));
+        }
+        if _end_timestamp.is_some() {
+            return Err(FlussError::new_err(
+                "Specifying end_timestamp is not yet supported. Please use 
None.".to_string(),
+            ));
+        }
+
+        let num_buckets = self.table_info.get_num_buckets();
+        for bucket_id in 0..num_buckets {
+            let start_offset = EARLIEST_OFFSET;
+
+            TOKIO_RUNTIME.block_on(async {
+                self.inner
+                    .subscribe(bucket_id, start_offset)
+                    .await
+                    .map_err(|e| FlussError::new_err(e.to_string()))
+            })?;
+        }
+
+        Ok(())
+    }
+
+    /// Convert all data to Arrow Table
+    fn to_arrow(&self, py: Python) -> PyResult<PyObject> {
+        use std::collections::HashMap;
+        use std::time::Duration;
+
+        let mut all_batches = Vec::new();
+
+        let num_buckets = self.table_info.get_num_buckets();
+        let bucket_ids: Vec<i32> = (0..num_buckets).collect();
+
+        // todo: after supporting list_offsets with timestamp, we can use 
start_timestamp and end_timestamp here
+        let target_offsets: HashMap<i32, i64> = TOKIO_RUNTIME
+            .block_on(async { self.inner.list_offsets_latest(bucket_ids).await 
})
+            .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+        let mut current_offsets: HashMap<i32, i64> = HashMap::new();
+        let mut completed_buckets: HashSet<i32> = HashSet::new();
+
+        if !target_offsets.is_empty() {
+            loop {
+                let batch_result = TOKIO_RUNTIME
+                    .block_on(async { 
self.inner.poll(Duration::from_millis(500)).await });
+
+                match batch_result {
+                    Ok(scan_records) => {
+                        let mut filtered_records: HashMap<
+                            fcore::metadata::TableBucket,
+                            Vec<fcore::record::ScanRecord>,
+                        > = HashMap::new();
+                        for (bucket, records) in 
scan_records.records_by_buckets() {
+                            let bucket_id = bucket.bucket_id();
+                            if completed_buckets.contains(&bucket_id) {
+                                continue;
+                            }
+                            if let Some(last_record) = records.last() {
+                                let offset = last_record.offset();
+                                current_offsets.insert(bucket_id, offset);
+                                filtered_records.insert(bucket.clone(), 
records.clone());
+                                if offset >= target_offsets[&bucket_id] - 1 {
+                                    completed_buckets.insert(bucket_id);
+                                }
+                            }
+                        }
+
+                        if !filtered_records.is_empty() {
+                            let filtered_scan_records =
+                                
fcore::record::ScanRecords::new(filtered_records);
+                            let arrow_batch =
+                                
Utils::convert_scan_records_to_arrow(filtered_scan_records);
+                            all_batches.extend(arrow_batch);
+                        }
+
+                        // completed bucket is equal to all target buckets,
+                        // we can break scan records
+                        if completed_buckets.len() == target_offsets.len() {
+                            break;
+                        }
+                    }
+                    Err(e) => return Err(FlussError::new_err(e.to_string())),
+                }
+            }
+        }
+
+        Utils::combine_batches_to_table(py, all_batches)
+    }
+
+    /// Convert all data to Pandas DataFrame
+    fn to_pandas(&self, py: Python) -> PyResult<PyObject> {
+        let arrow_table = self.to_arrow(py)?;
+
+        // Convert Arrow Table to Pandas DataFrame using pyarrow
+        let df = arrow_table.call_method0(py, "to_pandas")?;
+        Ok(df)
+    }
+
+    fn __repr__(&self) -> String {
+        format!("LogScanner(table={})", self.table_info.table_path)
+    }
+}
+
+impl LogScanner {
+    /// Create LogScanner from core LogScanner
+    pub fn from_core(
+        inner: fcore::client::LogScanner,
+        table_info: fcore::metadata::TableInfo,
+    ) -> Self {
+        Self {
+            inner,
+            table_info,
+            start_timestamp: None,
+            end_timestamp: None,
+        }
+    }
+}
diff --git a/bindings/python/src/utils.rs b/bindings/python/src/utils.rs
index c40104b..9642e9d 100644
--- a/bindings/python/src/utils.rs
+++ b/bindings/python/src/utils.rs
@@ -15,11 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use pyo3::prelude::*;
+use crate::*;
 use arrow::datatypes::{Schema as ArrowSchema, SchemaRef};
-use std::sync::Arc;
 use arrow_pyarrow::ToPyArrow;
-use crate::*;
+use std::sync::Arc;
 
 /// Utilities for schema conversion between PyArrow, Arrow, and Fluss
 pub struct Utils;
@@ -29,15 +28,19 @@ impl Utils {
     pub fn pyarrow_to_arrow_schema(py_schema: &PyObject) -> 
PyResult<SchemaRef> {
         Python::with_gil(|py| {
             let schema_bound = py_schema.bind(py);
-            
-            let schema: ArrowSchema = 
arrow_pyarrow::FromPyArrow::from_pyarrow_bound(&schema_bound)
-                .map_err(|e| FlussError::new_err(format!("Failed to convert 
PyArrow schema: {}", e)))?;
+
+            let schema: ArrowSchema = 
arrow_pyarrow::FromPyArrow::from_pyarrow_bound(schema_bound)
+                .map_err(|e| {
+                    FlussError::new_err(format!("Failed to convert PyArrow 
schema: {e}"))
+                })?;
             Ok(Arc::new(schema))
         })
     }
 
     /// Convert Arrow DataType to Fluss DataType
-    pub fn arrow_type_to_fluss_type(arrow_type: &arrow::datatypes::DataType) 
-> PyResult<fcore::metadata::DataType> {
+    pub fn arrow_type_to_fluss_type(
+        arrow_type: &arrow::datatypes::DataType,
+    ) -> PyResult<fcore::metadata::DataType> {
         use arrow::datatypes::DataType as ArrowDataType;
         use fcore::metadata::DataTypes;
 
@@ -59,10 +62,12 @@ impl Utils {
             ArrowDataType::Date64 => DataTypes::date(),
             ArrowDataType::Time32(_) | ArrowDataType::Time64(_) => 
DataTypes::time(),
             ArrowDataType::Timestamp(_, _) => DataTypes::timestamp(),
-            ArrowDataType::Decimal128(precision, scale) => 
DataTypes::decimal(*precision as u32, *scale as u32),
+            ArrowDataType::Decimal128(precision, scale) => {
+                DataTypes::decimal(*precision as u32, *scale as u32)
+            }
             _ => {
                 return Err(FlussError::new_err(format!(
-                    "Unsupported Arrow data type: {:?}", arrow_type
+                    "Unsupported Arrow data type: {arrow_type:?}"
                 )));
             }
         };
@@ -89,47 +94,62 @@ impl Utils {
                 } else {
                     format!("time({})", t.precision())
                 }
-            },
+            }
             fcore::metadata::DataType::Timestamp(t) => {
                 if t.precision() == 6 {
                     "timestamp".to_string()
                 } else {
                     format!("timestamp({})", t.precision())
                 }
-            },
+            }
             fcore::metadata::DataType::TimestampLTz(t) => {
                 if t.precision() == 6 {
                     "timestamp_ltz".to_string()
                 } else {
                     format!("timestamp_ltz({})", t.precision())
                 }
-            },
+            }
             fcore::metadata::DataType::Char(c) => format!("char({})", 
c.length()),
-            fcore::metadata::DataType::Decimal(d) => format!("decimal({},{})", 
d.precision(), d.scale()),
+            fcore::metadata::DataType::Decimal(d) => {
+                format!("decimal({},{})", d.precision(), d.scale())
+            }
             fcore::metadata::DataType::Binary(b) => format!("binary({})", 
b.length()),
-            fcore::metadata::DataType::Array(arr) => format!("array<{}>", 
Utils::datatype_to_string(arr.get_element_type())),
-            fcore::metadata::DataType::Map(map) => format!("map<{},{}>", 
-                                        
Utils::datatype_to_string(map.key_type()), 
-                                        
Utils::datatype_to_string(map.value_type())),
+            fcore::metadata::DataType::Array(arr) => format!(
+                "array<{}>",
+                Utils::datatype_to_string(arr.get_element_type())
+            ),
+            fcore::metadata::DataType::Map(map) => format!(
+                "map<{},{}>",
+                Utils::datatype_to_string(map.key_type()),
+                Utils::datatype_to_string(map.value_type())
+            ),
             fcore::metadata::DataType::Row(row) => {
-                let fields: Vec<String> = row.fields().iter()
-                    .map(|field| format!("{}: {}", field.name(), 
Utils::datatype_to_string(field.data_type())))
+                let fields: Vec<String> = row
+                    .fields()
+                    .iter()
+                    .map(|field| {
+                        format!(
+                            "{}: {}",
+                            field.name(),
+                            Utils::datatype_to_string(field.data_type())
+                        )
+                    })
                     .collect();
                 format!("row<{}>", fields.join(", "))
-            },
+            }
         }
     }
 
     /// Parse log format string to LogFormat enum
     pub fn parse_log_format(format_str: &str) -> 
PyResult<fcore::metadata::LogFormat> {
         fcore::metadata::LogFormat::parse(format_str)
-            .map_err(|e| FlussError::new_err(format!("Invalid log format '{}': 
{}", format_str, e)))
+            .map_err(|e| FlussError::new_err(format!("Invalid log format 
'{format_str}': {e}")))
     }
 
     /// Parse kv format string to KvFormat enum
     pub fn parse_kv_format(format_str: &str) -> 
PyResult<fcore::metadata::KvFormat> {
         fcore::metadata::KvFormat::parse(format_str)
-            .map_err(|e| FlussError::new_err(format!("Invalid kv format '{}': 
{}", format_str, e)))
+            .map_err(|e| FlussError::new_err(format!("Invalid kv format 
'{format_str}': {e}")))
     }
 
     /// Convert ScanRecords to Arrow RecordBatch
@@ -137,42 +157,41 @@ impl Utils {
         _scan_records: fcore::record::ScanRecords,
     ) -> Vec<Arc<arrow::record_batch::RecordBatch>> {
         let mut result = Vec::new();
-        for(_, records) in _scan_records.into_records() {
-            for record in records {
-                let columnar_row = record.row();
-                let row_id = columnar_row.get_row_id();
-                if row_id == 0 {
-                    let record_batch = columnar_row.get_record_batch();
-                    result.push(record_batch.clone());
-                }
+        for record in _scan_records {
+            let columnar_row = record.row();
+            let row_id = columnar_row.get_row_id();
+            if row_id == 0 {
+                let record_batch = columnar_row.get_record_batch();
+                result.push(Arc::new(record_batch.clone()));
             }
         }
         result
     }
-    
+
     /// Combine multiple Arrow batches into a single Table
-    pub fn combine_batches_to_table(py: Python, batches: 
Vec<Arc<arrow::record_batch::RecordBatch>>) -> PyResult<PyObject> {
-        if batches.is_empty() {
-            return Err(FlussError::new_err("No batches to combine"));
-        }
-        
+    pub fn combine_batches_to_table(
+        py: Python,
+        batches: Vec<Arc<arrow::record_batch::RecordBatch>>,
+    ) -> PyResult<PyObject> {
         // Convert Rust Arrow RecordBatch to PyObject
-        let py_batches: Result<Vec<PyObject>, _> = batches.iter()
+        let py_batches: Result<Vec<PyObject>, _> = batches
+            .iter()
             .map(|batch| {
-                batch.as_ref().to_pyarrow(py)
-                    .map_err(|e| FlussError::new_err(format!("Failed to 
convert RecordBatch to PyObject: {}", e)))
+                batch.as_ref().to_pyarrow(py).map_err(|e| {
+                    FlussError::new_err(format!("Failed to convert RecordBatch 
to PyObject: {e}"))
+                })
             })
             .collect();
-        
+
         let py_batches = py_batches?;
-        
+
         let pyarrow = py.import("pyarrow")?;
-        
+
         // Use pyarrow.Table.from_batches to combine batches
         let table = pyarrow
             .getattr("Table")?
             .call_method1("from_batches", (py_batches,))?;
-        
+
         Ok(table.into())
     }
 }
diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml
index a728bd7..ab1efc2 100644
--- a/crates/fluss/Cargo.toml
+++ b/crates/fluss/Cargo.toml
@@ -23,7 +23,7 @@ name = "fluss"
 build = "src/build.rs"
 
 [dependencies]
-arrow = "55.1.0"
+arrow = { workspace = true }
 arrow-schema = "55.1.0"
 byteorder = "1.5"
 futures = "0.3"
@@ -44,7 +44,8 @@ rust_decimal = "1"
 ordered-float = { version = "4", features = ["serde"] }
 parse-display = "0.10"
 ref-cast = "1.0"
-chrono = { version = "0.4", features = ["clock", "std", "wasmbind"] }
+chrono = { workspace = true }
+oneshot = "0.1.11"
 
 [dev-dependencies]
 testcontainers = "0.25.0"
@@ -56,4 +57,4 @@ integration_tests = []
 
 
 [build-dependencies]
-prost-build = {  version = "0.13.5"  }
\ No newline at end of file
+prost-build = {  version = "0.13.5"  }
diff --git a/crates/fluss/src/client/table/mod.rs 
b/crates/fluss/src/client/table/mod.rs
index 4d6f8f0..07e6494 100644
--- a/crates/fluss/src/client/table/mod.rs
+++ b/crates/fluss/src/client/table/mod.rs
@@ -27,8 +27,8 @@ mod append;
 mod scanner;
 mod writer;
 
-pub use append::TableAppend;
-pub use scanner::TableScan;
+pub use append::{AppendWriter, TableAppend};
+pub use scanner::{LogScanner, TableScan};
 
 #[allow(dead_code)]
 pub struct FlussTable<'a> {
@@ -65,6 +65,22 @@ impl<'a> FlussTable<'a> {
     pub fn new_scan(&self) -> TableScan<'_> {
         TableScan::new(self.conn, self.table_info.clone(), 
self.metadata.clone())
     }
+
+    pub fn metadata(&self) -> &Arc<Metadata> {
+        &self.metadata
+    }
+
+    pub fn table_info(&self) -> &TableInfo {
+        &self.table_info
+    }
+
+    pub fn table_path(&self) -> &TablePath {
+        &self.table_path
+    }
+
+    pub fn has_primary_key(&self) -> bool {
+        self.has_primary_key
+    }
 }
 
 impl<'a> Drop for FlussTable<'a> {
diff --git a/crates/fluss/src/client/table/scanner.rs 
b/crates/fluss/src/client/table/scanner.rs
index 41fb17e..cbe7248 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -22,12 +22,14 @@ use crate::metadata::{TableBucket, TableInfo, TablePath};
 use crate::proto::{FetchLogRequest, PbFetchLogReqForBucket, 
PbFetchLogReqForTable};
 use crate::record::{LogRecordsBatchs, ReadContext, ScanRecord, ScanRecords, 
to_arrow_schema};
 use crate::rpc::RpcClient;
+use crate::rpc::message::{ListOffsetsRequest, OffsetSpec};
 use crate::util::FairBucketStatusMap;
 use parking_lot::RwLock;
 use std::collections::HashMap;
 use std::slice::from_ref;
 use std::sync::Arc;
 use std::time::Duration;
+use tokio::task::JoinHandle;
 
 const LOG_FETCH_MAX_BYTES: i32 = 16 * 1024 * 1024;
 #[allow(dead_code)]
@@ -65,6 +67,7 @@ pub struct LogScanner {
     metadata: Arc<Metadata>,
     log_scanner_status: Arc<LogScannerStatus>,
     log_fetcher: LogFetcher,
+    conns: Arc<RpcClient>,
 }
 
 impl LogScanner {
@@ -81,10 +84,11 @@ impl LogScanner {
             log_scanner_status: log_scanner_status.clone(),
             log_fetcher: LogFetcher::new(
                 table_info.clone(),
-                connections,
+                connections.clone(),
                 metadata.clone(),
                 log_scanner_status.clone(),
             ),
+            conns: connections.clone(),
         }
     }
 
@@ -102,6 +106,103 @@ impl LogScanner {
         Ok(())
     }
 
+    pub async fn list_offsets_latest(&self, buckets: Vec<i32>) -> 
Result<HashMap<i32, i64>> {
+        // TODO: support partition_id
+        let partition_id = None;
+        let offset_spec = OffsetSpec::Latest;
+
+        self.metadata
+            .check_and_update_table_metadata(from_ref(&self.table_path))
+            .await?;
+
+        let cluster = self.metadata.get_cluster();
+        let table_id = cluster.get_table(&self.table_path).table_id;
+
+        // Prepare requests
+        let requests_by_server = self.prepare_list_offsets_requests(
+            table_id,
+            partition_id,
+            buckets.clone(),
+            offset_spec,
+        )?;
+
+        // Send Requests
+        let response_futures = 
self.send_list_offsets_request(requests_by_server).await?;
+
+        let mut results = HashMap::new();
+
+        for response_future in response_futures {
+            let offsets = response_future.await.map_err(
+                // todo: consider use suitable error
+                |e| crate::error::Error::WriteError(format!("Fail to get 
result: {e}")),
+            )?;
+            results.extend(offsets?);
+        }
+        Ok(results)
+    }
+
+    fn prepare_list_offsets_requests(
+        &self,
+        table_id: i64,
+        partition_id: Option<i64>,
+        buckets: Vec<i32>,
+        offset_spec: OffsetSpec,
+    ) -> Result<HashMap<i32, ListOffsetsRequest>> {
+        let cluster = self.metadata.get_cluster();
+        let mut node_for_bucket_list: HashMap<i32, Vec<i32>> = HashMap::new();
+
+        for bucket_id in buckets {
+            let table_bucket = TableBucket::new(table_id, bucket_id);
+            let leader = cluster.leader_for(&table_bucket).ok_or_else(|| {
+                // todo: consider use another suitable error
+                crate::error::Error::InvalidTableError(format!(
+                    "No leader found for table bucket: table_id={table_id}, 
bucket_id={bucket_id}"
+                ))
+            })?;
+
+            node_for_bucket_list
+                .entry(leader.id())
+                .or_default()
+                .push(bucket_id);
+        }
+
+        let mut list_offsets_requests = HashMap::new();
+        for (leader_id, bucket_ids) in node_for_bucket_list {
+            let request =
+                ListOffsetsRequest::new(table_id, partition_id, bucket_ids, 
offset_spec.clone());
+            list_offsets_requests.insert(leader_id, request);
+        }
+        Ok(list_offsets_requests)
+    }
+
+    async fn send_list_offsets_request(
+        &self,
+        request_map: HashMap<i32, ListOffsetsRequest>,
+    ) -> Result<Vec<JoinHandle<Result<HashMap<i32, i64>>>>> {
+        let mut tasks = Vec::new();
+
+        for (leader_id, request) in request_map {
+            let rpc_client = self.conns.clone();
+            let metadata = self.metadata.clone();
+
+            let task = tokio::spawn(async move {
+                let cluster = metadata.get_cluster();
+                let tablet_server = 
cluster.get_tablet_server(leader_id).ok_or_else(|| {
+                    // todo: consider use more suitable error
+                    crate::error::Error::InvalidTableError(format!(
+                        "Tablet server {leader_id} not found"
+                    ))
+                })?;
+                let connection = 
rpc_client.get_connection(tablet_server).await?;
+                let list_offsets_response = connection.request(request).await?;
+                list_offsets_response.offsets()
+            });
+            tasks.push(task);
+        }
+
+        Ok(tasks)
+    }
+
     async fn poll_for_fetches(&self) -> Result<HashMap<TableBucket, 
Vec<ScanRecord>>> {
         self.log_fetcher.send_fetches_and_collect().await
     }
diff --git a/crates/fluss/src/proto/fluss_api.proto 
b/crates/fluss/src/proto/fluss_api.proto
index d71197b..ef460fc 100644
--- a/crates/fluss/src/proto/fluss_api.proto
+++ b/crates/fluss/src/proto/fluss_api.proto
@@ -202,6 +202,19 @@ message ListDatabasesResponse {
   repeated string database_name = 1;
 }
 
+// list offsets request and response
+message ListOffsetsRequest {
+  required int32 follower_server_id = 1;  // value -1 indicate the request 
from client.
+  required int32 offset_type = 2; // value can be 0,1,2 (see ListOffsetsParam 
for more details)
+  required int64 table_id = 3;
+  optional int64 partition_id = 4;
+  repeated int32 bucket_id = 5 [packed = true]; // it is recommended to use 
packed for repeated numerics to get more efficient encoding
+  optional int64 startTimestamp = 6;
+}
+message ListOffsetsResponse {
+  repeated PbListOffsetsRespForBucket buckets_resp = 1;
+}
+
 
 // fetch log request and response
 message FetchLogRequest {
@@ -262,6 +275,13 @@ message PbRemoteLogSegment {
   required int32 segment_size_in_bytes = 4;
 }
 
+message PbListOffsetsRespForBucket {
+  required int32 bucket_id = 1;
+  optional int32 error_code = 2;
+  optional string error_message = 3;
+  optional int64 offset = 4;
+}
+
 // fetch latest lake snapshot
 message GetLatestLakeSnapshotRequest {
   required PbTablePath table_path = 1;
diff --git a/crates/fluss/src/record/mod.rs b/crates/fluss/src/record/mod.rs
index d787205..07fbe08 100644
--- a/crates/fluss/src/record/mod.rs
+++ b/crates/fluss/src/record/mod.rs
@@ -84,6 +84,7 @@ impl fmt::Display for ChangeType {
     }
 }
 
+#[derive(Clone)]
 pub struct ScanRecord {
     pub row: ColumnarRow,
     offset: i64,
@@ -158,6 +159,10 @@ impl ScanRecords {
     pub fn is_empty(&self) -> bool {
         self.records.is_empty()
     }
+
+    pub fn records_by_buckets(&self) -> &HashMap<TableBucket, Vec<ScanRecord>> 
{
+        &self.records
+    }
 }
 
 impl IntoIterator for ScanRecords {
diff --git a/crates/fluss/src/row/column.rs b/crates/fluss/src/row/column.rs
index 44ca640..6d47836 100644
--- a/crates/fluss/src/row/column.rs
+++ b/crates/fluss/src/row/column.rs
@@ -22,6 +22,7 @@ use arrow::array::{
 };
 use std::sync::Arc;
 
+#[derive(Clone)]
 pub struct ColumnarRow {
     record_batch: Arc<RecordBatch>,
     row_id: usize,
@@ -45,6 +46,14 @@ impl ColumnarRow {
     pub fn set_row_id(&mut self, row_id: usize) {
         self.row_id = row_id
     }
+
+    pub fn get_row_id(&self) -> usize {
+        self.row_id
+    }
+
+    pub fn get_record_batch(&self) -> &RecordBatch {
+        &self.record_batch
+    }
 }
 
 impl InternalRow for ColumnarRow {
diff --git a/crates/fluss/src/rpc/api_key.rs b/crates/fluss/src/rpc/api_key.rs
index 18ce44f..215bb39 100644
--- a/crates/fluss/src/rpc/api_key.rs
+++ b/crates/fluss/src/rpc/api_key.rs
@@ -31,6 +31,7 @@ pub enum ApiKey {
     MetaData,
     ProduceLog,
     FetchLog,
+    ListOffsets,
     GetDatabaseInfo,
     GetLatestLakeSnapshot,
     Unknown(i16),
@@ -51,6 +52,7 @@ impl From<i16> for ApiKey {
             1012 => ApiKey::MetaData,
             1014 => ApiKey::ProduceLog,
             1015 => ApiKey::FetchLog,
+            1021 => ApiKey::ListOffsets,
             1032 => ApiKey::GetLatestLakeSnapshot,
             1035 => ApiKey::GetDatabaseInfo,
             _ => Unknown(key),
@@ -73,6 +75,7 @@ impl From<ApiKey> for i16 {
             ApiKey::MetaData => 1012,
             ApiKey::ProduceLog => 1014,
             ApiKey::FetchLog => 1015,
+            ApiKey::ListOffsets => 1021,
             ApiKey::GetLatestLakeSnapshot => 1032,
             ApiKey::GetDatabaseInfo => 1035,
             Unknown(x) => x,
diff --git a/crates/fluss/src/rpc/message/list_offsets.rs 
b/crates/fluss/src/rpc/message/list_offsets.rs
new file mode 100644
index 0000000..500db33
--- /dev/null
+++ b/crates/fluss/src/rpc/message/list_offsets.rs
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{impl_read_version_type, impl_write_version_type, proto};
+
+use crate::error::Error;
+use crate::error::Result as FlussResult;
+use crate::proto::ListOffsetsResponse;
+use crate::rpc::api_key::ApiKey;
+use crate::rpc::api_version::ApiVersion;
+use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
+use std::collections::HashMap;
+
+use bytes::{Buf, BufMut};
+use prost::Message;
+
+/// Offset type constants as per proto comments
+pub const LIST_EARLIEST_OFFSET: i32 = 0;
+pub const LIST_LATEST_OFFSET: i32 = 1;
+pub const LIST_OFFSET_FROM_TIMESTAMP: i32 = 2;
+
+/// Client follower server id constant
+pub const CLIENT_FOLLOWER_SERVER_ID: i32 = -1;
+
+/// Offset specification for list offsets request
+#[derive(Debug, Clone)]
+pub enum OffsetSpec {
+    /// Earliest offset spec
+    Earliest,
+    /// Latest offset spec  
+    Latest,
+    /// Timestamp offset spec
+    Timestamp(i64),
+}
+
+impl OffsetSpec {
+    pub fn offset_type(&self) -> i32 {
+        match self {
+            OffsetSpec::Earliest => LIST_EARLIEST_OFFSET,
+            OffsetSpec::Latest => LIST_LATEST_OFFSET,
+            OffsetSpec::Timestamp(_) => LIST_OFFSET_FROM_TIMESTAMP,
+        }
+    }
+
+    pub fn start_timestamp(&self) -> Option<i64> {
+        match self {
+            OffsetSpec::Timestamp(ts) => Some(*ts),
+            _ => None,
+        }
+    }
+}
+
+#[derive(Debug)]
+pub struct ListOffsetsRequest {
+    pub inner_request: proto::ListOffsetsRequest,
+}
+
+impl ListOffsetsRequest {
+    pub fn new(
+        table_id: i64,
+        partition_id: Option<i64>,
+        bucket_ids: Vec<i32>,
+        offset_spec: OffsetSpec,
+    ) -> Self {
+        ListOffsetsRequest {
+            inner_request: proto::ListOffsetsRequest {
+                follower_server_id: CLIENT_FOLLOWER_SERVER_ID,
+                offset_type: offset_spec.offset_type(),
+                table_id,
+                partition_id,
+                bucket_id: bucket_ids,
+                start_timestamp: offset_spec.start_timestamp(),
+            },
+        }
+    }
+}
+
+impl RequestBody for ListOffsetsRequest {
+    type ResponseBody = ListOffsetsResponse;
+
+    const API_KEY: ApiKey = ApiKey::ListOffsets;
+
+    const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+}
+
+impl_write_version_type!(ListOffsetsRequest);
+impl_read_version_type!(ListOffsetsResponse);
+
+impl ListOffsetsResponse {
+    pub fn offsets(&self) -> FlussResult<HashMap<i32, i64>> {
+        self.buckets_resp
+            .iter()
+            .map(|resp| {
+                if resp.error_code.is_some() {
+                    // todo: consider use another suitable error
+                    Err(Error::WriteError(format!(
+                        "Missing offset, error message: {}",
+                        resp.error_message
+                            .as_deref()
+                            .unwrap_or("unknown server exception")
+                    )))
+                } else {
+                    // if no error msg, offset must exists
+                    Ok((resp.bucket_id, resp.offset.unwrap()))
+                }
+            })
+            .collect()
+    }
+}
diff --git a/crates/fluss/src/rpc/message/mod.rs 
b/crates/fluss/src/rpc/message/mod.rs
index d5f8ebd..230d971 100644
--- a/crates/fluss/src/rpc/message/mod.rs
+++ b/crates/fluss/src/rpc/message/mod.rs
@@ -31,6 +31,7 @@ mod get_latest_lake_snapshot;
 mod get_table;
 mod header;
 mod list_databases;
+mod list_offsets;
 mod list_tables;
 mod produce_log;
 mod table_exists;
@@ -47,6 +48,7 @@ pub use get_latest_lake_snapshot::*;
 pub use get_table::*;
 pub use header::*;
 pub use list_databases::*;
+pub use list_offsets::*;
 pub use list_tables::*;
 pub use produce_log::*;
 pub use table_exists::*;

Reply via email to