This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 8a9a0b2 [feat] Create Python bindings for table writing and reading
(#9)
8a9a0b2 is described below
commit 8a9a0b293afacdc855c96b089791f908bcd5ba6c
Author: naivedogger <[email protected]>
AuthorDate: Thu Oct 16 19:32:50 2025 +0800
[feat] Create Python bindings for table writing and reading (#9)
---------
Co-authored-by: luoyuxia <[email protected]>
---
Cargo.toml | 6 +-
bindings/python/Cargo.toml | 3 +-
bindings/python/fluss/__init__.py | 2 +-
bindings/python/pyproject.toml | 2 +-
bindings/python/src/admin.rs | 26 +-
bindings/python/src/config.rs | 33 +--
bindings/python/src/connection.rs | 45 ++-
bindings/python/src/error.rs | 8 +-
bindings/python/src/lib.rs | 23 +-
bindings/python/src/metadata.rs | 119 ++++----
bindings/python/src/table.rs | 412 +++++++++++++++++++++++++++
bindings/python/src/utils.rs | 105 ++++---
crates/fluss/Cargo.toml | 7 +-
crates/fluss/src/client/table/mod.rs | 20 +-
crates/fluss/src/client/table/scanner.rs | 103 ++++++-
crates/fluss/src/proto/fluss_api.proto | 20 ++
crates/fluss/src/record/mod.rs | 5 +
crates/fluss/src/row/column.rs | 9 +
crates/fluss/src/rpc/api_key.rs | 3 +
crates/fluss/src/rpc/message/list_offsets.rs | 124 ++++++++
crates/fluss/src/rpc/message/mod.rs | 2 +
21 files changed, 909 insertions(+), 168 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 059236f..54436ac 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -28,9 +28,11 @@ rust-version = "1.85"
[workspace]
resolver = "2"
-members = ["crates/fluss", "crates/examples"]
+members = ["crates/fluss", "crates/examples", "bindings/python"]
[workspace.dependencies]
fluss = { version = "0.1.0", path = "./crates/fluss" }
tokio = { version = "1.44.2", features = ["full"] }
-clap = { version = "4.5.37", features = ["derive"] }
\ No newline at end of file
+clap = { version = "4.5.37", features = ["derive"] }
+arrow = "55.1.0"
+chrono = { version = "0.4", features = ["clock", "std", "wasmbind"] }
diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml
index aee1a21..04826fb 100644
--- a/bindings/python/Cargo.toml
+++ b/bindings/python/Cargo.toml
@@ -26,8 +26,6 @@ rust-version = "1.85"
name = "fluss"
crate-type = ["cdylib"]
-[workspace]
-
[dependencies]
pyo3 = { version = "0.24", features = ["extension-module"] }
fluss = { path = "../../crates/fluss" }
@@ -36,3 +34,4 @@ arrow = { workspace = true }
arrow-pyarrow = "55.1.0"
pyo3-async-runtimes = { version = "0.24.0", features = ["tokio-runtime"] }
chrono = { workspace = true }
+once_cell = "1.21.3"
diff --git a/bindings/python/fluss/__init__.py
b/bindings/python/fluss/__init__.py
index cceee10..098014a 100644
--- a/bindings/python/fluss/__init__.py
+++ b/bindings/python/fluss/__init__.py
@@ -15,6 +15,6 @@
# specific language governing permissions and limitations
# under the License.
-from .fluss_python import *
+from ._fluss import *
__version__ = "0.1.0"
diff --git a/bindings/python/pyproject.toml b/bindings/python/pyproject.toml
index fe9d588..e28b3d2 100644
--- a/bindings/python/pyproject.toml
+++ b/bindings/python/pyproject.toml
@@ -57,7 +57,7 @@ docs = [
]
[tool.maturin]
-python-source = "python"
+python-source = "."
module-name = "fluss._fluss"
features = ["pyo3/extension-module"]
diff --git a/bindings/python/src/admin.rs b/bindings/python/src/admin.rs
index 7ec6eee..73b2dd3 100644
--- a/bindings/python/src/admin.rs
+++ b/bindings/python/src/admin.rs
@@ -15,9 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-use pyo3::prelude::*;
-use pyo3_async_runtimes::tokio::future_into_py;
use crate::*;
+use pyo3_async_runtimes::tokio::future_into_py;
use std::sync::Arc;
/// Administrative client for managing Fluss tables
@@ -38,16 +37,17 @@ impl FlussAdmin {
ignore_if_exists: Option<bool>,
) -> PyResult<Bound<'py, PyAny>> {
let ignore = ignore_if_exists.unwrap_or(false);
-
+
let core_table_path = table_path.to_core().clone();
let core_descriptor = table_descriptor.to_core().clone();
let admin = self.__admin.clone();
future_into_py(py, async move {
- admin.create_table(&core_table_path, &core_descriptor, ignore)
+ admin
+ .create_table(&core_table_path, &core_descriptor, ignore)
.await
.map_err(|e| FlussError::new_err(e.to_string()))?;
-
+
Python::with_gil(|py| Ok(py.None()))
})
}
@@ -60,10 +60,12 @@ impl FlussAdmin {
) -> PyResult<Bound<'py, PyAny>> {
let core_table_path = table_path.to_core().clone();
let admin = self.__admin.clone();
-
+
future_into_py(py, async move {
- let core_table_info = admin.get_table(&core_table_path).await
- .map_err(|e| FlussError::new_err(format!("Failed to get table:
{}", e)))?;
+ let core_table_info = admin
+ .get_table(&core_table_path)
+ .await
+ .map_err(|e| FlussError::new_err(format!("Failed to get table:
{e}")))?;
Python::with_gil(|py| {
let table_info = TableInfo::from_core(core_table_info);
@@ -80,10 +82,12 @@ impl FlussAdmin {
) -> PyResult<Bound<'py, PyAny>> {
let core_table_path = table_path.to_core().clone();
let admin = self.__admin.clone();
-
+
future_into_py(py, async move {
- let core_lake_snapshot =
admin.get_latest_lake_snapshot(&core_table_path).await
- .map_err(|e| FlussError::new_err(format!("Failed to get lake
snapshot: {}", e)))?;
+ let core_lake_snapshot = admin
+ .get_latest_lake_snapshot(&core_table_path)
+ .await
+ .map_err(|e| FlussError::new_err(format!("Failed to get lake
snapshot: {e}")))?;
Python::with_gil(|py| {
let lake_snapshot =
LakeSnapshot::from_core(core_lake_snapshot);
diff --git a/bindings/python/src/config.rs b/bindings/python/src/config.rs
index 08b20b4..70bd9cd 100644
--- a/bindings/python/src/config.rs
+++ b/bindings/python/src/config.rs
@@ -15,9 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-use pyo3::prelude::*;
-use pyo3::types::PyDict;
use crate::*;
+use pyo3::types::PyDict;
/// Configuration for Fluss client
#[pyclass]
@@ -33,7 +32,7 @@ impl Config {
#[pyo3(signature = (properties = None))]
fn new(properties: Option<&Bound<'_, PyDict>>) -> PyResult<Self> {
let mut config = fcore::config::Config::default();
-
+
if let Some(props) = properties {
for item in props.iter() {
let key: String = item.0.extract()?;
@@ -42,67 +41,65 @@ impl Config {
match key.as_str() {
"bootstrap.servers" => {
config.bootstrap_server = Some(value);
- },
+ }
"request.max.size" => {
if let Ok(size) = value.parse::<i32>() {
config.request_max_size = size;
}
- },
+ }
"writer.acks" => {
config.writer_acks = value;
- },
+ }
"writer.retries" => {
if let Ok(retries) = value.parse::<i32>() {
config.writer_retries = retries;
}
- },
+ }
"writer.batch.size" => {
if let Ok(size) = value.parse::<i32>() {
config.writer_batch_size = size;
}
- },
+ }
_ => {
- return Err(FlussError::new_err(format!("Unknown
property: {}", key)));
+ return Err(FlussError::new_err(format!("Unknown
property: {key}")));
}
}
}
}
- Ok(Self {
- inner: config,
- })
+ Ok(Self { inner: config })
}
-
+
/// Get the bootstrap server
#[getter]
fn bootstrap_server(&self) -> Option<String> {
self.inner.bootstrap_server.clone()
}
-
+
/// Set the bootstrap server
#[setter]
fn set_bootstrap_server(&mut self, server: String) {
self.inner.bootstrap_server = Some(server);
}
-
+
/// Get the request max size
#[getter]
fn request_max_size(&self) -> i32 {
self.inner.request_max_size
}
-
+
/// Set the request max size
#[setter]
fn set_request_max_size(&mut self, size: i32) {
self.inner.request_max_size = size;
}
-
+
/// Get the writer batch size
#[getter]
fn writer_batch_size(&self) -> i32 {
self.inner.writer_batch_size
}
-
+
/// Set the writer batch size
#[setter]
fn set_writer_batch_size(&mut self, size: i32) {
diff --git a/bindings/python/src/connection.rs
b/bindings/python/src/connection.rs
index ba1fa50..aeb8410 100644
--- a/bindings/python/src/connection.rs
+++ b/bindings/python/src/connection.rs
@@ -15,10 +15,9 @@
// specific language governing permissions and limitations
// under the License.
-use pyo3::prelude::*;
use crate::*;
-use std::sync::Arc;
use pyo3_async_runtimes::tokio::future_into_py;
+use std::sync::Arc;
/// Connection to a Fluss cluster
#[pyclass]
@@ -37,55 +36,55 @@ impl FlussConnection {
let connection = fcore::client::FlussConnection::new(rust_config)
.await
.map_err(|e| FlussError::new_err(e.to_string()))?;
-
+
let py_connection = FlussConnection {
inner: Arc::new(connection),
};
- Python::with_gil(|py| {
- Py::new(py, py_connection)
- })
+ Python::with_gil(|py| Py::new(py, py_connection))
})
}
-
+
/// Get admin interface
fn get_admin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let client = self.inner.clone();
future_into_py(py, async move {
- let admin = client.get_admin()
+ let admin = client
+ .get_admin()
.await
.map_err(|e| FlussError::new_err(e.to_string()))?;
let py_admin = FlussAdmin::from_core(admin);
- Python::with_gil(|py| {
- Py::new(py, py_admin)
- })
+ Python::with_gil(|py| Py::new(py, py_admin))
})
}
/// Get a table
- fn get_table<'py>(&self, py: Python<'py>, table_path: &TablePath) ->
PyResult<Bound<'py, PyAny>> {
+ fn get_table<'py>(
+ &self,
+ py: Python<'py>,
+ table_path: &TablePath,
+ ) -> PyResult<Bound<'py, PyAny>> {
let client = self.inner.clone();
let core_path = table_path.to_core().clone();
future_into_py(py, async move {
- let core_table = client.get_table(&core_path)
+ let core_table = client
+ .get_table(&core_path)
.await
.map_err(|e| FlussError::new_err(e.to_string()))?;
-
+
let py_table = FlussTable::new_table(
- client,
- core_table.metadata,
- core_table.table_info,
- core_table.table_path,
- core_table.has_primary_key,
+ client.clone(),
+ core_table.metadata().clone(),
+ core_table.table_info().clone(),
+ core_table.table_path().clone(),
+ core_table.has_primary_key(),
);
- Python::with_gil(|py| {
- Py::new(py, py_table)
- })
+ Python::with_gil(|py| Py::new(py, py_table))
})
}
@@ -98,7 +97,7 @@ impl FlussConnection {
fn __enter__(slf: PyRef<Self>) -> PyRef<Self> {
slf
}
-
+
// Exit the runtime context (for 'with' statement)
#[pyo3(signature = (_exc_type=None, _exc_value=None, _traceback=None))]
fn __exit__(
diff --git a/bindings/python/src/error.rs b/bindings/python/src/error.rs
index 2db2991..35d9d91 100644
--- a/bindings/python/src/error.rs
+++ b/bindings/python/src/error.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use pyo3::exceptions::PyException;
use pyo3::prelude::*;
/// Fluss errors
@@ -27,6 +28,11 @@ pub struct FlussError {
#[pymethods]
impl FlussError {
+ #[new]
+ fn new(message: String) -> Self {
+ Self { message }
+ }
+
fn __str__(&self) -> String {
format!("FlussError: {}", self.message)
}
@@ -36,4 +42,4 @@ impl FlussError {
pub fn new_err(message: impl ToString) -> PyErr {
PyErr::new::<FlussError, _>(message.to_string())
}
-}
\ No newline at end of file
+}
diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs
index 0d8b7a5..63e84b1 100644
--- a/bindings/python/src/lib.rs
+++ b/bindings/python/src/lib.rs
@@ -16,24 +16,24 @@
// under the License.
pub use ::fluss as fcore;
-use pyo3::prelude::*;
use once_cell::sync::Lazy;
+use pyo3::prelude::*;
use tokio::runtime::Runtime;
+mod admin;
mod config;
mod connection;
-mod table;
-mod admin;
-mod types;
mod error;
+mod metadata;
+mod table;
mod utils;
+pub use admin::*;
pub use config::*;
pub use connection::*;
-pub use table::*;
-pub use admin::*;
-pub use types::*;
pub use error::*;
+pub use metadata::*;
+pub use table::*;
pub use utils::*;
static TOKIO_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
@@ -44,7 +44,7 @@ static TOKIO_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
});
#[pymodule]
-fn fluss_python(m: &Bound<'_, PyModule>) -> PyResult<()> {
+fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> {
// Register all classes
m.add_class::<Config>()?;
m.add_class::<FlussConnection>()?;
@@ -58,10 +58,9 @@ fn fluss_python(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<LogScanner>()?;
m.add_class::<LakeSnapshot>()?;
m.add_class::<TableBucket>()?;
-
+
// Register exception types
- // TODO: maybe implement a separate module for exceptions
- m.add("FlussError", m.py().get_type::<FlussError>())?;
-
+ m.add_class::<FlussError>()?;
+
Ok(())
}
diff --git a/bindings/python/src/metadata.rs b/bindings/python/src/metadata.rs
index 238dde2..66748ab 100644
--- a/bindings/python/src/metadata.rs
+++ b/bindings/python/src/metadata.rs
@@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-use pyo3::prelude::*;
use crate::*;
use pyo3::types::PyDict;
use std::collections::HashMap;
@@ -38,13 +37,13 @@ impl TablePath {
table_name,
}
}
-
+
/// Get the database name
#[getter]
pub fn database_name(&self) -> String {
self.database_name.clone()
}
-
+
/// Get the table name
#[getter]
pub fn table_name(&self) -> String {
@@ -59,7 +58,7 @@ impl TablePath {
pub fn __str__(&self) -> String {
self.table_path_str()
}
-
+
fn __repr__(&self) -> String {
format!("TablePath('{}', '{}')", self.database_name, self.table_name)
}
@@ -68,7 +67,7 @@ impl TablePath {
pub fn __hash__(&self) -> u64 {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
-
+
let mut hasher = DefaultHasher::new();
self.database_name.hash(&mut hasher);
self.table_name.hash(&mut hasher);
@@ -77,8 +76,7 @@ impl TablePath {
/// Equality implementation for Python
pub fn __eq__(&self, other: &TablePath) -> bool {
- self.database_name == other.database_name
- && self.table_name == other.table_name
+ self.database_name == other.database_name && self.table_name ==
other.table_name
}
}
@@ -112,27 +110,28 @@ impl Schema {
primary_keys: Option<Vec<String>>,
) -> PyResult<Self> {
let arrow_schema =
crate::utils::Utils::pyarrow_to_arrow_schema(&schema)?;
-
+
let mut builder = fcore::metadata::Schema::builder();
-
+
for field in arrow_schema.fields() {
let fluss_data_type =
crate::utils::Utils::arrow_type_to_fluss_type(field.data_type())?;
builder = builder.column(field.name(), fluss_data_type);
-
+
if let Some(comment) = field.metadata().get("comment") {
builder = builder.with_comment(comment);
}
}
-
+
if let Some(pk_columns) = primary_keys {
if !pk_columns.is_empty() {
builder = builder.primary_key(pk_columns);
}
}
-
- let fluss_schema = builder.build()
- .map_err(|e| FlussError::new_err(format!("Failed to build schema:
{}", e)))?;
-
+
+ let fluss_schema = builder
+ .build()
+ .map_err(|e| FlussError::new_err(format!("Failed to build schema:
{e}")))?;
+
Ok(Self {
__schema: fluss_schema,
})
@@ -140,20 +139,33 @@ impl Schema {
/// Get column names
fn get_column_names(&self) -> Vec<String> {
- self.__schema.columns().iter().map(|col|
col.name().to_string()).collect()
+ self.__schema
+ .columns()
+ .iter()
+ .map(|col| col.name().to_string())
+ .collect()
}
/// Get column types
fn get_column_types(&self) -> Vec<String> {
- self.__schema.columns().iter()
+ self.__schema
+ .columns()
+ .iter()
.map(|col| Utils::datatype_to_string(col.data_type()))
.collect()
}
/// Get columns as (name, type) pairs
fn get_columns(&self) -> Vec<(String, String)> {
- self.__schema.columns().iter()
- .map(|col| (col.name().to_string(),
Utils::datatype_to_string(col.data_type())))
+ self.__schema
+ .columns()
+ .iter()
+ .map(|col| {
+ (
+ col.name().to_string(),
+ Utils::datatype_to_string(col.data_type()),
+ )
+ })
.collect()
}
@@ -190,7 +202,6 @@ impl TableDistribution {
}
}
-
/// Table descriptor containing schema and metadata
#[pyclass]
#[derive(Clone)]
@@ -204,7 +215,7 @@ impl TableDescriptor {
#[new]
#[pyo3(signature = (schema, **kwargs))]
pub fn new(
- schema: &Schema, // fluss schema
+ schema: &Schema, // fluss schema
kwargs: Option<&Bound<'_, PyDict>>,
) -> PyResult<Self> {
let mut partition_keys = Vec::new();
@@ -237,18 +248,22 @@ impl TableDescriptor {
}
if let Ok(Some(lformat)) = kwargs.get_item("log_format") {
let format_str: String = lformat.extract()?;
- log_format =
Some(fcore::metadata::LogFormat::parse(&format_str)
- .map_err(|e| FlussError::new_err(e.to_string()))?);
+ log_format = Some(
+ fcore::metadata::LogFormat::parse(&format_str)
+ .map_err(|e| FlussError::new_err(e.to_string()))?,
+ );
}
if let Ok(Some(kformat)) = kwargs.get_item("kv_format") {
let format_str: String = kformat.extract()?;
- kv_format = Some(fcore::metadata::KvFormat::parse(&format_str)
- .map_err(|e| FlussError::new_err(e.to_string()))?);
+ kv_format = Some(
+ fcore::metadata::KvFormat::parse(&format_str)
+ .map_err(|e| FlussError::new_err(e.to_string()))?,
+ );
}
}
let fluss_schema = schema.to_core().clone();
-
+
let mut builder = fcore::metadata::TableDescriptor::builder()
.schema(fluss_schema)
.properties(properties)
@@ -266,8 +281,9 @@ impl TableDescriptor {
builder = builder.kv_format(kv_format);
}
- let core_descriptor = builder.build()
- .map_err(|e| FlussError::new_err(format!("Failed to build
TableDescriptor: {}", e)))?;
+ let core_descriptor = builder
+ .build()
+ .map_err(|e| FlussError::new_err(format!("Failed to build
TableDescriptor: {e}")))?;
Ok(Self {
__tbl_desc: core_descriptor,
@@ -303,13 +319,13 @@ impl TableInfo {
pub fn table_id(&self) -> i64 {
self.__table_info.get_table_id()
}
-
+
/// Get the schema ID
#[getter]
pub fn schema_id(&self) -> i32 {
self.__table_info.get_schema_id()
}
-
+
/// Get the table path
#[getter]
pub fn table_path(&self) -> TablePath {
@@ -321,13 +337,13 @@ impl TableInfo {
pub fn created_time(&self) -> i64 {
self.__table_info.get_created_time()
}
-
+
/// Get the modified time
#[getter]
pub fn modified_time(&self) -> i64 {
self.__table_info.get_modified_time()
}
-
+
/// Get the primary keys
pub fn get_primary_keys(&self) -> Vec<String> {
self.__table_info.get_primary_keys().clone()
@@ -384,7 +400,10 @@ impl TableInfo {
/// Get column names
pub fn get_column_names(&self) -> Vec<String> {
- self.__table_info.get_schema().columns().iter()
+ self.__table_info
+ .get_schema()
+ .columns()
+ .iter()
.map(|col| col.name().to_string())
.collect()
}
@@ -398,9 +417,7 @@ impl TableInfo {
impl TableInfo {
/// Create from core TableInfo (internal use)
pub fn from_core(info: fcore::metadata::TableInfo) -> Self {
- Self {
- __table_info: info,
- }
+ Self { __table_info: info }
}
}
@@ -414,7 +431,7 @@ pub struct LakeSnapshot {
/// Represents a table bucket with table ID, partition ID, and bucket ID
#[pyclass]
-#[derive(Clone)]
+#[derive(Eq, Hash, PartialEq, Clone)]
pub struct TableBucket {
table_id: i64,
partition_id: Option<i64>,
@@ -464,11 +481,15 @@ impl TableBucket {
/// String representation
pub fn __str__(&self) -> String {
if let Some(partition_id) = self.partition_id {
- format!("TableBucket(table_id={}, partition_id={}, bucket={})",
- self.table_id, partition_id, self.bucket)
+ format!(
+ "TableBucket(table_id={}, partition_id={}, bucket={})",
+ self.table_id, partition_id, self.bucket
+ )
} else {
- format!("TableBucket(table_id={}, bucket={})",
- self.table_id, self.bucket)
+ format!(
+ "TableBucket(table_id={}, bucket={})",
+ self.table_id, self.bucket
+ )
}
}
@@ -481,7 +502,7 @@ impl TableBucket {
pub fn __hash__(&self) -> u64 {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
-
+
let mut hasher = DefaultHasher::new();
self.table_id.hash(&mut hasher);
self.partition_id.hash(&mut hasher);
@@ -491,8 +512,8 @@ impl TableBucket {
/// Equality implementation for Python
pub fn __eq__(&self, other: &TableBucket) -> bool {
- self.table_id == other.table_id
- && self.partition_id == other.partition_id
+ self.table_id == other.table_id
+ && self.partition_id == other.partition_id
&& self.bucket == other.bucket
}
}
@@ -509,7 +530,7 @@ impl TableBucket {
/// Convert to core TableBucket (internal use)
pub fn to_core(&self) -> fcore::metadata::TableBucket {
- fcore::metadata::TableBucket::new(self.table_id, self.partition_id,
self.bucket)
+ fcore::metadata::TableBucket::new(self.table_id, self.bucket)
}
}
@@ -559,8 +580,11 @@ impl LakeSnapshot {
/// String representation
pub fn __str__(&self) -> String {
- format!("LakeSnapshot(snapshot_id={}, buckets_count={})",
- self.snapshot_id, self.table_buckets_offset.len())
+ format!(
+ "LakeSnapshot(snapshot_id={}, buckets_count={})",
+ self.snapshot_id,
+ self.table_buckets_offset.len()
+ )
}
/// String representation
@@ -578,4 +602,3 @@ impl LakeSnapshot {
}
}
}
-
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
new file mode 100644
index 0000000..98943b9
--- /dev/null
+++ b/bindings/python/src/table.rs
@@ -0,0 +1,412 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::TOKIO_RUNTIME;
+use crate::*;
+use pyo3_async_runtimes::tokio::future_into_py;
+use std::collections::HashSet;
+use std::sync::Arc;
+
+const EARLIEST_OFFSET: i64 = -2;
+
+/// Represents a Fluss table for data operations
+#[pyclass]
+pub struct FlussTable {
+ connection: Arc<fcore::client::FlussConnection>,
+ metadata: Arc<fcore::client::Metadata>,
+ table_info: fcore::metadata::TableInfo,
+ table_path: fcore::metadata::TablePath,
+ has_primary_key: bool,
+}
+
+#[pymethods]
+impl FlussTable {
+ /// Create a new append writer for the table
+ fn new_append_writer<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py,
PyAny>> {
+ let conn = self.connection.clone();
+ let metadata = self.metadata.clone();
+ let table_info = self.table_info.clone();
+
+ future_into_py(py, async move {
+ let fluss_table = fcore::client::FlussTable::new(&conn, metadata,
table_info);
+
+ let table_append = fluss_table
+ .new_append()
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+ let rust_writer = table_append.create_writer();
+
+ let py_writer = AppendWriter::from_core(rust_writer);
+
+ Python::with_gil(|py| Py::new(py, py_writer))
+ })
+ }
+
+ /// Create a new log scanner for the table
+ fn new_log_scanner<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py,
PyAny>> {
+ let conn = self.connection.clone();
+ let metadata = self.metadata.clone();
+ let table_info = self.table_info.clone();
+
+ future_into_py(py, async move {
+ let fluss_table =
+ fcore::client::FlussTable::new(&conn, metadata.clone(),
table_info.clone());
+
+ let table_scan = fluss_table.new_scan();
+
+ let rust_scanner = table_scan.create_log_scanner();
+
+ let py_scanner = LogScanner::from_core(rust_scanner,
table_info.clone());
+
+ Python::with_gil(|py| Py::new(py, py_scanner))
+ })
+ }
+
+ /// Get table information
+ pub fn get_table_info(&self) -> TableInfo {
+ TableInfo::from_core(self.table_info.clone())
+ }
+
+ /// Get table path
+ pub fn get_table_path(&self) -> TablePath {
+ TablePath::from_core(self.table_path.clone())
+ }
+
+ /// Check if table has primary key
+ pub fn has_primary_key(&self) -> bool {
+ self.has_primary_key
+ }
+
+ fn __repr__(&self) -> String {
+ format!(
+ "FlussTable(path={}.{})",
+ self.table_path.database(),
+ self.table_path.table()
+ )
+ }
+}
+
+impl FlussTable {
+ /// Create a FlussTable
+ pub fn new_table(
+ connection: Arc<fcore::client::FlussConnection>,
+ metadata: Arc<fcore::client::Metadata>,
+ table_info: fcore::metadata::TableInfo,
+ table_path: fcore::metadata::TablePath,
+ has_primary_key: bool,
+ ) -> Self {
+ Self {
+ connection,
+ metadata,
+ table_info,
+ table_path,
+ has_primary_key,
+ }
+ }
+}
+
+/// Writer for appending data to a Fluss table
+#[pyclass]
+pub struct AppendWriter {
+ inner: fcore::client::AppendWriter,
+}
+
+#[pymethods]
+impl AppendWriter {
+ /// Write Arrow table data
+ pub fn write_arrow(&mut self, py: Python, table: PyObject) -> PyResult<()>
{
+ // Convert Arrow Table to batches and write each batch
+ let batches = table.call_method0(py, "to_batches")?;
+ let batch_list: Vec<PyObject> = batches.extract(py)?;
+
+ for batch in batch_list {
+ self.write_arrow_batch(py, batch)?;
+ }
+ Ok(())
+ }
+
+ /// Write Arrow batch data
+ pub fn write_arrow_batch(&mut self, py: Python, batch: PyObject) ->
PyResult<()> {
+ // Extract number of rows and columns from the Arrow batch
+ let num_rows: usize = batch.getattr(py, "num_rows")?.extract(py)?;
+ let num_columns: usize = batch.getattr(py,
"num_columns")?.extract(py)?;
+
+ // Process each row in the batch
+ for row_idx in 0..num_rows {
+ let mut generic_row = fcore::row::GenericRow::new();
+
+ // Extract values for each column in this row
+ for col_idx in 0..num_columns {
+ let column = batch.call_method1(py, "column", (col_idx,))?;
+ let value = column.call_method1(py, "__getitem__",
(row_idx,))?;
+
+ // Convert the Python value to a Datum and add to the row
+ let datum = self.convert_python_value_to_datum(py, value)?;
+ generic_row.set_field(col_idx, datum);
+ }
+
+ // Append this row using the async append method
+ TOKIO_RUNTIME.block_on(async {
+ self.inner
+ .append(generic_row)
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))
+ })?;
+ }
+
+ Ok(())
+ }
+
+ /// Write Pandas DataFrame data
+ pub fn write_pandas(&mut self, py: Python, df: PyObject) -> PyResult<()> {
+ // Import pyarrow module
+ let pyarrow = py.import("pyarrow")?;
+
+ // Get the Table class from pyarrow module
+ let table_class = pyarrow.getattr("Table")?;
+
+ // Call Table.from_pandas(df) - from_pandas is a class method
+ let pa_table = table_class.call_method1("from_pandas", (df,))?;
+
+ // Then call write_arrow with the converted table
+ self.write_arrow(py, pa_table.into())
+ }
+
+ /// Flush any pending data
+ pub fn flush(&mut self) -> PyResult<()> {
+ TOKIO_RUNTIME.block_on(async {
+ self.inner
+ .flush()
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))
+ })
+ }
+
+ fn __repr__(&self) -> String {
+ "AppendWriter()".to_string()
+ }
+}
+
+impl AppendWriter {
+ /// Create a AppendWriter from a core append writer
+ pub fn from_core(append: fcore::client::AppendWriter) -> Self {
+ Self { inner: append }
+ }
+
+ fn convert_python_value_to_datum(
+ &self,
+ py: Python,
+ value: PyObject,
+ ) -> PyResult<fcore::row::Datum<'static>> {
+ use fcore::row::{Blob, Datum, F32, F64};
+
+ // Check for None (null)
+ if value.is_none(py) {
+ return Ok(Datum::Null);
+ }
+
+ // Try to extract different types
+ if let Ok(type_name) = value.bind(py).get_type().name() {
+ if type_name == "StringScalar" {
+ if let Ok(py_value) = value.call_method0(py, "as_py") {
+ if let Ok(str_val) = py_value.extract::<String>(py) {
+ let leaked_str: &'static str =
Box::leak(str_val.into_boxed_str());
+ return Ok(Datum::String(leaked_str));
+ }
+ }
+ }
+ }
+
+ if let Ok(bool_val) = value.extract::<bool>(py) {
+ return Ok(Datum::Bool(bool_val));
+ }
+
+ if let Ok(int_val) = value.extract::<i32>(py) {
+ return Ok(Datum::Int32(int_val));
+ }
+
+ if let Ok(int_val) = value.extract::<i64>(py) {
+ return Ok(Datum::Int64(int_val));
+ }
+
+ if let Ok(float_val) = value.extract::<f32>(py) {
+ return Ok(Datum::Float32(F32::from(float_val)));
+ }
+
+ if let Ok(float_val) = value.extract::<f64>(py) {
+ return Ok(Datum::Float64(F64::from(float_val)));
+ }
+
+ if let Ok(str_val) = value.extract::<String>(py) {
+ // Convert String to &'static str by leaking memory
+ // This is a simplified approach - in production, you might want
better lifetime management
+ let leaked_str: &'static str = Box::leak(str_val.into_boxed_str());
+ return Ok(Datum::String(leaked_str));
+ }
+
+ if let Ok(bytes_val) = value.extract::<Vec<u8>>(py) {
+ let blob = Blob::from(bytes_val);
+ return Ok(Datum::Blob(blob));
+ }
+
+ // If we can't convert, return an error
+ let type_name = value.bind(py).get_type().name()?;
+ Err(FlussError::new_err(format!(
+ "Cannot convert Python value to Datum: {type_name:?}"
+ )))
+ }
+}
+
+/// Scanner for reading log data from a Fluss table
+#[pyclass]
+pub struct LogScanner {
+ inner: fcore::client::LogScanner,
+ table_info: fcore::metadata::TableInfo,
+ #[allow(dead_code)]
+ start_timestamp: Option<i64>,
+ #[allow(dead_code)]
+ end_timestamp: Option<i64>,
+}
+
+#[pymethods]
+impl LogScanner {
+ /// Subscribe to log data with timestamp range
+ fn subscribe(
+ &mut self,
+ _start_timestamp: Option<i64>,
+ _end_timestamp: Option<i64>,
+ ) -> PyResult<()> {
+ if _start_timestamp.is_some() {
+ return Err(FlussError::new_err(
+ "Specifying start_timestamp is not yet supported. Please use
None.".to_string(),
+ ));
+ }
+ if _end_timestamp.is_some() {
+ return Err(FlussError::new_err(
+ "Specifying end_timestamp is not yet supported. Please use
None.".to_string(),
+ ));
+ }
+
+ let num_buckets = self.table_info.get_num_buckets();
+ for bucket_id in 0..num_buckets {
+ let start_offset = EARLIEST_OFFSET;
+
+ TOKIO_RUNTIME.block_on(async {
+ self.inner
+ .subscribe(bucket_id, start_offset)
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))
+ })?;
+ }
+
+ Ok(())
+ }
+
+ /// Convert all data to Arrow Table
+ fn to_arrow(&self, py: Python) -> PyResult<PyObject> {
+ use std::collections::HashMap;
+ use std::time::Duration;
+
+ let mut all_batches = Vec::new();
+
+ let num_buckets = self.table_info.get_num_buckets();
+ let bucket_ids: Vec<i32> = (0..num_buckets).collect();
+
+ // todo: after supporting list_offsets with timestamp, we can use
start_timestamp and end_timestamp here
+ let target_offsets: HashMap<i32, i64> = TOKIO_RUNTIME
+ .block_on(async { self.inner.list_offsets_latest(bucket_ids).await
})
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+ let mut current_offsets: HashMap<i32, i64> = HashMap::new();
+ let mut completed_buckets: HashSet<i32> = HashSet::new();
+
+ if !target_offsets.is_empty() {
+ loop {
+ let batch_result = TOKIO_RUNTIME
+ .block_on(async {
self.inner.poll(Duration::from_millis(500)).await });
+
+ match batch_result {
+ Ok(scan_records) => {
+ let mut filtered_records: HashMap<
+ fcore::metadata::TableBucket,
+ Vec<fcore::record::ScanRecord>,
+ > = HashMap::new();
+ for (bucket, records) in
scan_records.records_by_buckets() {
+ let bucket_id = bucket.bucket_id();
+ if completed_buckets.contains(&bucket_id) {
+ continue;
+ }
+ if let Some(last_record) = records.last() {
+ let offset = last_record.offset();
+ current_offsets.insert(bucket_id, offset);
+ filtered_records.insert(bucket.clone(),
records.clone());
+ if offset >= target_offsets[&bucket_id] - 1 {
+ completed_buckets.insert(bucket_id);
+ }
+ }
+ }
+
+ if !filtered_records.is_empty() {
+ let filtered_scan_records =
+
fcore::record::ScanRecords::new(filtered_records);
+ let arrow_batch =
+
Utils::convert_scan_records_to_arrow(filtered_scan_records);
+ all_batches.extend(arrow_batch);
+ }
+
+ // completed bucket is equal to all target buckets,
+ // we can break scan records
+ if completed_buckets.len() == target_offsets.len() {
+ break;
+ }
+ }
+ Err(e) => return Err(FlussError::new_err(e.to_string())),
+ }
+ }
+ }
+
+ Utils::combine_batches_to_table(py, all_batches)
+ }
+
+ /// Convert all data to Pandas DataFrame
+ fn to_pandas(&self, py: Python) -> PyResult<PyObject> {
+ let arrow_table = self.to_arrow(py)?;
+
+ // Convert Arrow Table to Pandas DataFrame using pyarrow
+ let df = arrow_table.call_method0(py, "to_pandas")?;
+ Ok(df)
+ }
+
+ fn __repr__(&self) -> String {
+ format!("LogScanner(table={})", self.table_info.table_path)
+ }
+}
+
+impl LogScanner {
+ /// Create LogScanner from core LogScanner
+ pub fn from_core(
+ inner: fcore::client::LogScanner,
+ table_info: fcore::metadata::TableInfo,
+ ) -> Self {
+ Self {
+ inner,
+ table_info,
+ start_timestamp: None,
+ end_timestamp: None,
+ }
+ }
+}
diff --git a/bindings/python/src/utils.rs b/bindings/python/src/utils.rs
index c40104b..9642e9d 100644
--- a/bindings/python/src/utils.rs
+++ b/bindings/python/src/utils.rs
@@ -15,11 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-use pyo3::prelude::*;
+use crate::*;
use arrow::datatypes::{Schema as ArrowSchema, SchemaRef};
-use std::sync::Arc;
use arrow_pyarrow::ToPyArrow;
-use crate::*;
+use std::sync::Arc;
/// Utilities for schema conversion between PyArrow, Arrow, and Fluss
pub struct Utils;
@@ -29,15 +28,19 @@ impl Utils {
pub fn pyarrow_to_arrow_schema(py_schema: &PyObject) ->
PyResult<SchemaRef> {
Python::with_gil(|py| {
let schema_bound = py_schema.bind(py);
-
- let schema: ArrowSchema =
arrow_pyarrow::FromPyArrow::from_pyarrow_bound(&schema_bound)
- .map_err(|e| FlussError::new_err(format!("Failed to convert
PyArrow schema: {}", e)))?;
+
+ let schema: ArrowSchema =
arrow_pyarrow::FromPyArrow::from_pyarrow_bound(schema_bound)
+ .map_err(|e| {
+ FlussError::new_err(format!("Failed to convert PyArrow
schema: {e}"))
+ })?;
Ok(Arc::new(schema))
})
}
/// Convert Arrow DataType to Fluss DataType
- pub fn arrow_type_to_fluss_type(arrow_type: &arrow::datatypes::DataType)
-> PyResult<fcore::metadata::DataType> {
+ pub fn arrow_type_to_fluss_type(
+ arrow_type: &arrow::datatypes::DataType,
+ ) -> PyResult<fcore::metadata::DataType> {
use arrow::datatypes::DataType as ArrowDataType;
use fcore::metadata::DataTypes;
@@ -59,10 +62,12 @@ impl Utils {
ArrowDataType::Date64 => DataTypes::date(),
ArrowDataType::Time32(_) | ArrowDataType::Time64(_) =>
DataTypes::time(),
ArrowDataType::Timestamp(_, _) => DataTypes::timestamp(),
- ArrowDataType::Decimal128(precision, scale) =>
DataTypes::decimal(*precision as u32, *scale as u32),
+ ArrowDataType::Decimal128(precision, scale) => {
+ DataTypes::decimal(*precision as u32, *scale as u32)
+ }
_ => {
return Err(FlussError::new_err(format!(
- "Unsupported Arrow data type: {:?}", arrow_type
+ "Unsupported Arrow data type: {arrow_type:?}"
)));
}
};
@@ -89,47 +94,62 @@ impl Utils {
} else {
format!("time({})", t.precision())
}
- },
+ }
fcore::metadata::DataType::Timestamp(t) => {
if t.precision() == 6 {
"timestamp".to_string()
} else {
format!("timestamp({})", t.precision())
}
- },
+ }
fcore::metadata::DataType::TimestampLTz(t) => {
if t.precision() == 6 {
"timestamp_ltz".to_string()
} else {
format!("timestamp_ltz({})", t.precision())
}
- },
+ }
fcore::metadata::DataType::Char(c) => format!("char({})",
c.length()),
- fcore::metadata::DataType::Decimal(d) => format!("decimal({},{})",
d.precision(), d.scale()),
+ fcore::metadata::DataType::Decimal(d) => {
+ format!("decimal({},{})", d.precision(), d.scale())
+ }
fcore::metadata::DataType::Binary(b) => format!("binary({})",
b.length()),
- fcore::metadata::DataType::Array(arr) => format!("array<{}>",
Utils::datatype_to_string(arr.get_element_type())),
- fcore::metadata::DataType::Map(map) => format!("map<{},{}>",
-
Utils::datatype_to_string(map.key_type()),
-
Utils::datatype_to_string(map.value_type())),
+ fcore::metadata::DataType::Array(arr) => format!(
+ "array<{}>",
+ Utils::datatype_to_string(arr.get_element_type())
+ ),
+ fcore::metadata::DataType::Map(map) => format!(
+ "map<{},{}>",
+ Utils::datatype_to_string(map.key_type()),
+ Utils::datatype_to_string(map.value_type())
+ ),
fcore::metadata::DataType::Row(row) => {
- let fields: Vec<String> = row.fields().iter()
- .map(|field| format!("{}: {}", field.name(),
Utils::datatype_to_string(field.data_type())))
+ let fields: Vec<String> = row
+ .fields()
+ .iter()
+ .map(|field| {
+ format!(
+ "{}: {}",
+ field.name(),
+ Utils::datatype_to_string(field.data_type())
+ )
+ })
.collect();
format!("row<{}>", fields.join(", "))
- },
+ }
}
}
/// Parse log format string to LogFormat enum
pub fn parse_log_format(format_str: &str) ->
PyResult<fcore::metadata::LogFormat> {
fcore::metadata::LogFormat::parse(format_str)
- .map_err(|e| FlussError::new_err(format!("Invalid log format '{}':
{}", format_str, e)))
+ .map_err(|e| FlussError::new_err(format!("Invalid log format
'{format_str}': {e}")))
}
/// Parse kv format string to KvFormat enum
pub fn parse_kv_format(format_str: &str) ->
PyResult<fcore::metadata::KvFormat> {
fcore::metadata::KvFormat::parse(format_str)
- .map_err(|e| FlussError::new_err(format!("Invalid kv format '{}':
{}", format_str, e)))
+ .map_err(|e| FlussError::new_err(format!("Invalid kv format
'{format_str}': {e}")))
}
/// Convert ScanRecords to Arrow RecordBatch
@@ -137,42 +157,41 @@ impl Utils {
_scan_records: fcore::record::ScanRecords,
) -> Vec<Arc<arrow::record_batch::RecordBatch>> {
let mut result = Vec::new();
- for(_, records) in _scan_records.into_records() {
- for record in records {
- let columnar_row = record.row();
- let row_id = columnar_row.get_row_id();
- if row_id == 0 {
- let record_batch = columnar_row.get_record_batch();
- result.push(record_batch.clone());
- }
+ for record in _scan_records {
+ let columnar_row = record.row();
+ let row_id = columnar_row.get_row_id();
+ if row_id == 0 {
+ let record_batch = columnar_row.get_record_batch();
+ result.push(Arc::new(record_batch.clone()));
}
}
result
}
-
+
/// Combine multiple Arrow batches into a single Table
- pub fn combine_batches_to_table(py: Python, batches:
Vec<Arc<arrow::record_batch::RecordBatch>>) -> PyResult<PyObject> {
- if batches.is_empty() {
- return Err(FlussError::new_err("No batches to combine"));
- }
-
+ pub fn combine_batches_to_table(
+ py: Python,
+ batches: Vec<Arc<arrow::record_batch::RecordBatch>>,
+ ) -> PyResult<PyObject> {
// Convert Rust Arrow RecordBatch to PyObject
- let py_batches: Result<Vec<PyObject>, _> = batches.iter()
+ let py_batches: Result<Vec<PyObject>, _> = batches
+ .iter()
.map(|batch| {
- batch.as_ref().to_pyarrow(py)
- .map_err(|e| FlussError::new_err(format!("Failed to
convert RecordBatch to PyObject: {}", e)))
+ batch.as_ref().to_pyarrow(py).map_err(|e| {
+ FlussError::new_err(format!("Failed to convert RecordBatch
to PyObject: {e}"))
+ })
})
.collect();
-
+
let py_batches = py_batches?;
-
+
let pyarrow = py.import("pyarrow")?;
-
+
// Use pyarrow.Table.from_batches to combine batches
let table = pyarrow
.getattr("Table")?
.call_method1("from_batches", (py_batches,))?;
-
+
Ok(table.into())
}
}
diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml
index a728bd7..ab1efc2 100644
--- a/crates/fluss/Cargo.toml
+++ b/crates/fluss/Cargo.toml
@@ -23,7 +23,7 @@ name = "fluss"
build = "src/build.rs"
[dependencies]
-arrow = "55.1.0"
+arrow = { workspace = true }
arrow-schema = "55.1.0"
byteorder = "1.5"
futures = "0.3"
@@ -44,7 +44,8 @@ rust_decimal = "1"
ordered-float = { version = "4", features = ["serde"] }
parse-display = "0.10"
ref-cast = "1.0"
-chrono = { version = "0.4", features = ["clock", "std", "wasmbind"] }
+chrono = { workspace = true }
+oneshot = "0.1.11"
[dev-dependencies]
testcontainers = "0.25.0"
@@ -56,4 +57,4 @@ integration_tests = []
[build-dependencies]
-prost-build = { version = "0.13.5" }
\ No newline at end of file
+prost-build = { version = "0.13.5" }
diff --git a/crates/fluss/src/client/table/mod.rs
b/crates/fluss/src/client/table/mod.rs
index 4d6f8f0..07e6494 100644
--- a/crates/fluss/src/client/table/mod.rs
+++ b/crates/fluss/src/client/table/mod.rs
@@ -27,8 +27,8 @@ mod append;
mod scanner;
mod writer;
-pub use append::TableAppend;
-pub use scanner::TableScan;
+pub use append::{AppendWriter, TableAppend};
+pub use scanner::{LogScanner, TableScan};
#[allow(dead_code)]
pub struct FlussTable<'a> {
@@ -65,6 +65,22 @@ impl<'a> FlussTable<'a> {
pub fn new_scan(&self) -> TableScan<'_> {
TableScan::new(self.conn, self.table_info.clone(),
self.metadata.clone())
}
+
+ pub fn metadata(&self) -> &Arc<Metadata> {
+ &self.metadata
+ }
+
+ pub fn table_info(&self) -> &TableInfo {
+ &self.table_info
+ }
+
+ pub fn table_path(&self) -> &TablePath {
+ &self.table_path
+ }
+
+ pub fn has_primary_key(&self) -> bool {
+ self.has_primary_key
+ }
}
impl<'a> Drop for FlussTable<'a> {
diff --git a/crates/fluss/src/client/table/scanner.rs
b/crates/fluss/src/client/table/scanner.rs
index 41fb17e..cbe7248 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -22,12 +22,14 @@ use crate::metadata::{TableBucket, TableInfo, TablePath};
use crate::proto::{FetchLogRequest, PbFetchLogReqForBucket,
PbFetchLogReqForTable};
use crate::record::{LogRecordsBatchs, ReadContext, ScanRecord, ScanRecords,
to_arrow_schema};
use crate::rpc::RpcClient;
+use crate::rpc::message::{ListOffsetsRequest, OffsetSpec};
use crate::util::FairBucketStatusMap;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::slice::from_ref;
use std::sync::Arc;
use std::time::Duration;
+use tokio::task::JoinHandle;
const LOG_FETCH_MAX_BYTES: i32 = 16 * 1024 * 1024;
#[allow(dead_code)]
@@ -65,6 +67,7 @@ pub struct LogScanner {
metadata: Arc<Metadata>,
log_scanner_status: Arc<LogScannerStatus>,
log_fetcher: LogFetcher,
+ conns: Arc<RpcClient>,
}
impl LogScanner {
@@ -81,10 +84,11 @@ impl LogScanner {
log_scanner_status: log_scanner_status.clone(),
log_fetcher: LogFetcher::new(
table_info.clone(),
- connections,
+ connections.clone(),
metadata.clone(),
log_scanner_status.clone(),
),
+ conns: connections.clone(),
}
}
@@ -102,6 +106,103 @@ impl LogScanner {
Ok(())
}
+ pub async fn list_offsets_latest(&self, buckets: Vec<i32>) ->
Result<HashMap<i32, i64>> {
+ // TODO: support partition_id
+ let partition_id = None;
+ let offset_spec = OffsetSpec::Latest;
+
+ self.metadata
+ .check_and_update_table_metadata(from_ref(&self.table_path))
+ .await?;
+
+ let cluster = self.metadata.get_cluster();
+ let table_id = cluster.get_table(&self.table_path).table_id;
+
+ // Prepare requests
+ let requests_by_server = self.prepare_list_offsets_requests(
+ table_id,
+ partition_id,
+ buckets.clone(),
+ offset_spec,
+ )?;
+
+ // Send Requests
+ let response_futures =
self.send_list_offsets_request(requests_by_server).await?;
+
+ let mut results = HashMap::new();
+
+ for response_future in response_futures {
+ let offsets = response_future.await.map_err(
+ // todo: consider use suitable error
+ |e| crate::error::Error::WriteError(format!("Fail to get
result: {e}")),
+ )?;
+ results.extend(offsets?);
+ }
+ Ok(results)
+ }
+
+ fn prepare_list_offsets_requests(
+ &self,
+ table_id: i64,
+ partition_id: Option<i64>,
+ buckets: Vec<i32>,
+ offset_spec: OffsetSpec,
+ ) -> Result<HashMap<i32, ListOffsetsRequest>> {
+ let cluster = self.metadata.get_cluster();
+ let mut node_for_bucket_list: HashMap<i32, Vec<i32>> = HashMap::new();
+
+ for bucket_id in buckets {
+ let table_bucket = TableBucket::new(table_id, bucket_id);
+ let leader = cluster.leader_for(&table_bucket).ok_or_else(|| {
+ // todo: consider use another suitable error
+ crate::error::Error::InvalidTableError(format!(
+ "No leader found for table bucket: table_id={table_id},
bucket_id={bucket_id}"
+ ))
+ })?;
+
+ node_for_bucket_list
+ .entry(leader.id())
+ .or_default()
+ .push(bucket_id);
+ }
+
+ let mut list_offsets_requests = HashMap::new();
+ for (leader_id, bucket_ids) in node_for_bucket_list {
+ let request =
+ ListOffsetsRequest::new(table_id, partition_id, bucket_ids,
offset_spec.clone());
+ list_offsets_requests.insert(leader_id, request);
+ }
+ Ok(list_offsets_requests)
+ }
+
+ async fn send_list_offsets_request(
+ &self,
+ request_map: HashMap<i32, ListOffsetsRequest>,
+ ) -> Result<Vec<JoinHandle<Result<HashMap<i32, i64>>>>> {
+ let mut tasks = Vec::new();
+
+ for (leader_id, request) in request_map {
+ let rpc_client = self.conns.clone();
+ let metadata = self.metadata.clone();
+
+ let task = tokio::spawn(async move {
+ let cluster = metadata.get_cluster();
+ let tablet_server =
cluster.get_tablet_server(leader_id).ok_or_else(|| {
+ // todo: consider use more suitable error
+ crate::error::Error::InvalidTableError(format!(
+ "Tablet server {leader_id} not found"
+ ))
+ })?;
+ let connection =
rpc_client.get_connection(tablet_server).await?;
+ let list_offsets_response = connection.request(request).await?;
+ list_offsets_response.offsets()
+ });
+ tasks.push(task);
+ }
+
+ Ok(tasks)
+ }
+
async fn poll_for_fetches(&self) -> Result<HashMap<TableBucket,
Vec<ScanRecord>>> {
self.log_fetcher.send_fetches_and_collect().await
}
diff --git a/crates/fluss/src/proto/fluss_api.proto
b/crates/fluss/src/proto/fluss_api.proto
index d71197b..ef460fc 100644
--- a/crates/fluss/src/proto/fluss_api.proto
+++ b/crates/fluss/src/proto/fluss_api.proto
@@ -202,6 +202,19 @@ message ListDatabasesResponse {
repeated string database_name = 1;
}
+// list offsets request and response
+message ListOffsetsRequest {
+ required int32 follower_server_id = 1; // value -1 indicate the request
from client.
+ required int32 offset_type = 2; // value can be 0,1,2 (see ListOffsetsParam
for more details)
+ required int64 table_id = 3;
+ optional int64 partition_id = 4;
+ repeated int32 bucket_id = 5 [packed = true]; // it is recommended to use
packed for repeated numerics to get more efficient encoding
+ optional int64 startTimestamp = 6;
+}
+message ListOffsetsResponse {
+ repeated PbListOffsetsRespForBucket buckets_resp = 1;
+}
+
// fetch log request and response
message FetchLogRequest {
@@ -262,6 +275,13 @@ message PbRemoteLogSegment {
required int32 segment_size_in_bytes = 4;
}
+message PbListOffsetsRespForBucket {
+ required int32 bucket_id = 1;
+ optional int32 error_code = 2;
+ optional string error_message = 3;
+ optional int64 offset = 4;
+}
+
// fetch latest lake snapshot
message GetLatestLakeSnapshotRequest {
required PbTablePath table_path = 1;
diff --git a/crates/fluss/src/record/mod.rs b/crates/fluss/src/record/mod.rs
index d787205..07fbe08 100644
--- a/crates/fluss/src/record/mod.rs
+++ b/crates/fluss/src/record/mod.rs
@@ -84,6 +84,7 @@ impl fmt::Display for ChangeType {
}
}
+#[derive(Clone)]
pub struct ScanRecord {
pub row: ColumnarRow,
offset: i64,
@@ -158,6 +159,10 @@ impl ScanRecords {
pub fn is_empty(&self) -> bool {
self.records.is_empty()
}
+
+ pub fn records_by_buckets(&self) -> &HashMap<TableBucket, Vec<ScanRecord>>
{
+ &self.records
+ }
}
impl IntoIterator for ScanRecords {
diff --git a/crates/fluss/src/row/column.rs b/crates/fluss/src/row/column.rs
index 44ca640..6d47836 100644
--- a/crates/fluss/src/row/column.rs
+++ b/crates/fluss/src/row/column.rs
@@ -22,6 +22,7 @@ use arrow::array::{
};
use std::sync::Arc;
+#[derive(Clone)]
pub struct ColumnarRow {
record_batch: Arc<RecordBatch>,
row_id: usize,
@@ -45,6 +46,14 @@ impl ColumnarRow {
pub fn set_row_id(&mut self, row_id: usize) {
self.row_id = row_id
}
+
+ pub fn get_row_id(&self) -> usize {
+ self.row_id
+ }
+
+ pub fn get_record_batch(&self) -> &RecordBatch {
+ &self.record_batch
+ }
}
impl InternalRow for ColumnarRow {
diff --git a/crates/fluss/src/rpc/api_key.rs b/crates/fluss/src/rpc/api_key.rs
index 18ce44f..215bb39 100644
--- a/crates/fluss/src/rpc/api_key.rs
+++ b/crates/fluss/src/rpc/api_key.rs
@@ -31,6 +31,7 @@ pub enum ApiKey {
MetaData,
ProduceLog,
FetchLog,
+ ListOffsets,
GetDatabaseInfo,
GetLatestLakeSnapshot,
Unknown(i16),
@@ -51,6 +52,7 @@ impl From<i16> for ApiKey {
1012 => ApiKey::MetaData,
1014 => ApiKey::ProduceLog,
1015 => ApiKey::FetchLog,
+ 1021 => ApiKey::ListOffsets,
1032 => ApiKey::GetLatestLakeSnapshot,
1035 => ApiKey::GetDatabaseInfo,
_ => Unknown(key),
@@ -73,6 +75,7 @@ impl From<ApiKey> for i16 {
ApiKey::MetaData => 1012,
ApiKey::ProduceLog => 1014,
ApiKey::FetchLog => 1015,
+ ApiKey::ListOffsets => 1021,
ApiKey::GetLatestLakeSnapshot => 1032,
ApiKey::GetDatabaseInfo => 1035,
Unknown(x) => x,
diff --git a/crates/fluss/src/rpc/message/list_offsets.rs
b/crates/fluss/src/rpc/message/list_offsets.rs
new file mode 100644
index 0000000..500db33
--- /dev/null
+++ b/crates/fluss/src/rpc/message/list_offsets.rs
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{impl_read_version_type, impl_write_version_type, proto};
+
+use crate::error::Error;
+use crate::error::Result as FlussResult;
+use crate::proto::ListOffsetsResponse;
+use crate::rpc::api_key::ApiKey;
+use crate::rpc::api_version::ApiVersion;
+use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
+use std::collections::HashMap;
+
+use bytes::{Buf, BufMut};
+use prost::Message;
+
+/// Offset type constants as per proto comments
+pub const LIST_EARLIEST_OFFSET: i32 = 0;
+pub const LIST_LATEST_OFFSET: i32 = 1;
+pub const LIST_OFFSET_FROM_TIMESTAMP: i32 = 2;
+
+/// Client follower server id constant
+pub const CLIENT_FOLLOWER_SERVER_ID: i32 = -1;
+
+/// Offset specification for list offsets request
+#[derive(Debug, Clone)]
+pub enum OffsetSpec {
+ /// Earliest offset spec
+ Earliest,
+ /// Latest offset spec
+ Latest,
+ /// Timestamp offset spec
+ Timestamp(i64),
+}
+
+impl OffsetSpec {
+ pub fn offset_type(&self) -> i32 {
+ match self {
+ OffsetSpec::Earliest => LIST_EARLIEST_OFFSET,
+ OffsetSpec::Latest => LIST_LATEST_OFFSET,
+ OffsetSpec::Timestamp(_) => LIST_OFFSET_FROM_TIMESTAMP,
+ }
+ }
+
+ pub fn start_timestamp(&self) -> Option<i64> {
+ match self {
+ OffsetSpec::Timestamp(ts) => Some(*ts),
+ _ => None,
+ }
+ }
+}
+
+#[derive(Debug)]
+pub struct ListOffsetsRequest {
+ pub inner_request: proto::ListOffsetsRequest,
+}
+
+impl ListOffsetsRequest {
+ pub fn new(
+ table_id: i64,
+ partition_id: Option<i64>,
+ bucket_ids: Vec<i32>,
+ offset_spec: OffsetSpec,
+ ) -> Self {
+ ListOffsetsRequest {
+ inner_request: proto::ListOffsetsRequest {
+ follower_server_id: CLIENT_FOLLOWER_SERVER_ID,
+ offset_type: offset_spec.offset_type(),
+ table_id,
+ partition_id,
+ bucket_id: bucket_ids,
+ start_timestamp: offset_spec.start_timestamp(),
+ },
+ }
+ }
+}
+
+impl RequestBody for ListOffsetsRequest {
+ type ResponseBody = ListOffsetsResponse;
+
+ const API_KEY: ApiKey = ApiKey::ListOffsets;
+
+ const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+}
+
+impl_write_version_type!(ListOffsetsRequest);
+impl_read_version_type!(ListOffsetsResponse);
+
+impl ListOffsetsResponse {
+ pub fn offsets(&self) -> FlussResult<HashMap<i32, i64>> {
+ self.buckets_resp
+ .iter()
+ .map(|resp| {
+ if resp.error_code.is_some() {
+ // todo: consider use another suitable error
+ Err(Error::WriteError(format!(
+ "Missing offset, error message: {}",
+ resp.error_message
+ .as_deref()
+ .unwrap_or("unknown server exception")
+ )))
+ } else {
+ // if no error msg, offset must exists
+ Ok((resp.bucket_id, resp.offset.unwrap()))
+ }
+ })
+ .collect()
+ }
+}
diff --git a/crates/fluss/src/rpc/message/mod.rs
b/crates/fluss/src/rpc/message/mod.rs
index d5f8ebd..230d971 100644
--- a/crates/fluss/src/rpc/message/mod.rs
+++ b/crates/fluss/src/rpc/message/mod.rs
@@ -31,6 +31,7 @@ mod get_latest_lake_snapshot;
mod get_table;
mod header;
mod list_databases;
+mod list_offsets;
mod list_tables;
mod produce_log;
mod table_exists;
@@ -47,6 +48,7 @@ pub use get_latest_lake_snapshot::*;
pub use get_table::*;
pub use header::*;
pub use list_databases::*;
+pub use list_offsets::*;
pub use list_tables::*;
pub use produce_log::*;
pub use table_exists::*;