This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new eadfa4b chore: Introduce WriteFormat and various small changes to
unblock Upsert implementation (#158)
eadfa4b is described below
commit eadfa4bd5a9d75f7676ad9e0b2212f88b332ad20
Author: Keith Lee <[email protected]>
AuthorDate: Thu Jan 15 09:25:31 2026 +0000
chore: Introduce WriteFormat and various small changes to unblock Upsert
implementation (#158)
---
crates/fluss/Cargo.toml | 2 +
crates/fluss/src/client/write/mod.rs | 2 +
crates/fluss/src/client/write/write_format.rs | 65 +++++++++++++++++++++++++++
crates/fluss/src/error.rs | 9 ++++
crates/fluss/src/metadata/data_lake_format.rs | 3 ++
crates/fluss/src/metadata/datatype.rs | 23 ++++++++++
crates/fluss/src/metadata/table.rs | 22 ++++++++-
crates/fluss/src/row/field_getter.rs | 14 +++++-
8 files changed, 138 insertions(+), 2 deletions(-)
diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml
index e8c851f..8942ffc 100644
--- a/crates/fluss/Cargo.toml
+++ b/crates/fluss/Cargo.toml
@@ -60,6 +60,8 @@ tempfile = "3.23.0"
snafu = "0.8.3"
scopeguard = "1.2.0"
delegate = "0.13.5"
+strum = "0.26"
+strum_macros = "0.26"
[target.'cfg(target_arch = "wasm32")'.dependencies]
jiff = { workspace = true, features = ["js"] }
diff --git a/crates/fluss/src/client/write/mod.rs
b/crates/fluss/src/client/write/mod.rs
index d79418b..00a71c5 100644
--- a/crates/fluss/src/client/write/mod.rs
+++ b/crates/fluss/src/client/write/mod.rs
@@ -30,8 +30,10 @@ pub(crate) mod broadcast;
mod bucket_assigner;
mod sender;
+mod write_format;
mod writer_client;
+pub use write_format::WriteFormat;
pub use writer_client::WriterClient;
pub struct WriteRecord<'a> {
diff --git a/crates/fluss/src/client/write/write_format.rs
b/crates/fluss/src/client/write/write_format.rs
new file mode 100644
index 0000000..d65e42d
--- /dev/null
+++ b/crates/fluss/src/client/write/write_format.rs
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::Error::IllegalArgument;
+use crate::error::Result;
+use crate::metadata::KvFormat;
+use std::fmt::Display;
+
+pub enum WriteFormat {
+ ArrowLog,
+ CompactedLog,
+ CompactedKv,
+}
+
+impl WriteFormat {
+ pub const fn is_log(&self) -> bool {
+ matches!(self, Self::ArrowLog | Self::CompactedLog)
+ }
+
+ pub fn is_kv(&self) -> bool {
+ !self.is_log()
+ }
+
+ pub fn to_kv_format(&self) -> Result<KvFormat> {
+ match self {
+ WriteFormat::CompactedKv => Ok(KvFormat::COMPACTED),
+ other => Err(IllegalArgument {
+ message: format!("WriteFormat `{}` is not a KvFormat", other),
+ }),
+ }
+ }
+
+ pub fn from_kv_format(kv_format: &KvFormat) -> Result<Self> {
+ match kv_format {
+ KvFormat::COMPACTED => Ok(WriteFormat::CompactedKv),
+ other => Err(IllegalArgument {
+ message: format!("Unknown KvFormat: `{}`", other),
+ }),
+ }
+ }
+}
+
+impl Display for WriteFormat {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ WriteFormat::ArrowLog => f.write_str("ArrowLog"),
+ WriteFormat::CompactedLog => f.write_str("CompactedLog"),
+ WriteFormat::CompactedKv => f.write_str("CompactedKv"),
+ }
+ }
+}
diff --git a/crates/fluss/src/error.rs b/crates/fluss/src/error.rs
index e04fde1..0a368b7 100644
--- a/crates/fluss/src/error.rs
+++ b/crates/fluss/src/error.rs
@@ -21,6 +21,7 @@ pub use crate::rpc::{ApiError, FlussError};
use arrow_schema::ArrowError;
use snafu::Snafu;
use std::{io, result};
+use strum::ParseError;
pub type Result<T> = result::Result<T, Error>;
@@ -155,3 +156,11 @@ impl From<ApiError> for Error {
Error::FlussAPIError { api_error: value }
}
}
+
+impl From<ParseError> for Error {
+ fn from(value: ParseError) -> Self {
+ Error::IllegalArgument {
+ message: value.to_string(),
+ }
+ }
+}
diff --git a/crates/fluss/src/metadata/data_lake_format.rs
b/crates/fluss/src/metadata/data_lake_format.rs
index 76a23f8..c186109 100644
--- a/crates/fluss/src/metadata/data_lake_format.rs
+++ b/crates/fluss/src/metadata/data_lake_format.rs
@@ -15,11 +15,14 @@
// specific language governing permissions and limitations
// under the License.
+use strum_macros::{Display, EnumString};
+
/// Identifies the logical format of a data lake table supported by Fluss.
///
/// This enum is typically used in metadata and configuration to distinguish
/// between different table formats so that the appropriate integration and
/// semantics can be applied.
+#[derive(Debug, EnumString, Display, PartialEq)]
pub enum DataLakeFormat {
/// Apache Paimon data lake table format.
Paimon,
diff --git a/crates/fluss/src/metadata/datatype.rs
b/crates/fluss/src/metadata/datatype.rs
index c53cd27..dc1f407 100644
--- a/crates/fluss/src/metadata/datatype.rs
+++ b/crates/fluss/src/metadata/datatype.rs
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+use crate::error::Error::IllegalArgument;
+use crate::error::Result;
use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};
@@ -857,6 +859,27 @@ impl RowType {
self.fields.iter().position(|f| f.name == field_name)
}
+ pub fn get_field_names(&self) -> Vec<&str> {
+ self.fields.iter().map(|f| f.name.as_str()).collect()
+ }
+
+ pub fn project(&self, project_field_positions: &[usize]) ->
Result<RowType> {
+ Ok(RowType::with_nullable(
+ self.nullable,
+ project_field_positions
+ .iter()
+ .map(|pos| {
+ self.fields
+ .get(*pos)
+ .cloned()
+ .ok_or_else(|| IllegalArgument {
+ message: format!("invalid field position: {}",
*pos),
+ })
+ })
+ .collect::<Result<Vec<_>>>()?,
+ ))
+ }
+
#[cfg(test)]
pub fn with_data_types(data_types: Vec<DataType>) -> Self {
let mut fields: Vec<DataField> = Vec::new();
diff --git a/crates/fluss/src/metadata/table.rs
b/crates/fluss/src/metadata/table.rs
index 4f6c04b..b1e8a90 100644
--- a/crates/fluss/src/metadata/table.rs
+++ b/crates/fluss/src/metadata/table.rs
@@ -18,11 +18,13 @@
use crate::compression::ArrowCompressionInfo;
use crate::error::Error::InvalidTableError;
use crate::error::{Error, Result};
+use crate::metadata::DataLakeFormat;
use crate::metadata::datatype::{DataField, DataType, RowType};
use core::fmt;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::fmt::{Display, Formatter};
+use strum_macros::EnumString;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct Column {
@@ -603,7 +605,7 @@ impl LogFormat {
}
}
-#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, EnumString)]
pub enum KvFormat {
INDEXED,
COMPACTED,
@@ -726,6 +728,24 @@ impl TableConfig {
pub fn get_arrow_compression_info(&self) -> Result<ArrowCompressionInfo> {
ArrowCompressionInfo::from_conf(&self.properties)
}
+
+ pub fn get_datalake_format(&self) -> Result<Option<DataLakeFormat>> {
+ self.properties
+ .get("table.datalake.format")
+ .map(|f| f.parse().map_err(Error::from))
+ .transpose()
+ }
+
+ pub fn get_kv_format(&self) -> Result<KvFormat> {
+ // TODO: Consolidate configurations logic, constants, defaults in a
single place
+ const DEFAULT_KV_FORMAT: &str = "COMPACTED";
+ let kv_format = self
+ .properties
+ .get("table.kv.format")
+ .map(String::as_str)
+ .unwrap_or(DEFAULT_KV_FORMAT);
+ kv_format.parse().map_err(Into::into)
+ }
}
impl TableInfo {
diff --git a/crates/fluss/src/row/field_getter.rs
b/crates/fluss/src/row/field_getter.rs
index 8e529e5..97f9e39 100644
--- a/crates/fluss/src/row/field_getter.rs
+++ b/crates/fluss/src/row/field_getter.rs
@@ -15,9 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-use crate::metadata::DataType;
+use crate::metadata::{DataType, RowType};
use crate::row::{Datum, InternalRow};
+#[derive(Clone)]
pub enum FieldGetter {
Nullable(InnerFieldGetter),
NonNullable(InnerFieldGetter),
@@ -36,6 +37,16 @@ impl FieldGetter {
}
}
+ #[allow(dead_code)]
+ pub fn create_field_getters(row_type: &RowType) -> Box<[FieldGetter]> {
+ row_type
+ .fields()
+ .iter()
+ .enumerate()
+ .map(|(pos, field)| Self::create(field.data_type(), pos))
+ .collect()
+ }
+
pub fn create(data_type: &DataType, pos: usize) -> FieldGetter {
let inner_field_getter = match data_type {
DataType::Char(t) => InnerFieldGetter::Char {
@@ -66,6 +77,7 @@ impl FieldGetter {
}
}
+#[derive(Clone)]
pub enum InnerFieldGetter {
Char { pos: usize, len: usize },
String { pos: usize },