This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new eadfa4b  chore: Introduce WriteFormat and various small changes to 
unblock Upsert implementation (#158)
eadfa4b is described below

commit eadfa4bd5a9d75f7676ad9e0b2212f88b332ad20
Author: Keith Lee <[email protected]>
AuthorDate: Thu Jan 15 09:25:31 2026 +0000

    chore: Introduce WriteFormat and various small changes to unblock Upsert 
implementation (#158)
---
 crates/fluss/Cargo.toml                       |  2 +
 crates/fluss/src/client/write/mod.rs          |  2 +
 crates/fluss/src/client/write/write_format.rs | 65 +++++++++++++++++++++++++++
 crates/fluss/src/error.rs                     |  9 ++++
 crates/fluss/src/metadata/data_lake_format.rs |  3 ++
 crates/fluss/src/metadata/datatype.rs         | 23 ++++++++++
 crates/fluss/src/metadata/table.rs            | 22 ++++++++-
 crates/fluss/src/row/field_getter.rs          | 14 +++++-
 8 files changed, 138 insertions(+), 2 deletions(-)

diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml
index e8c851f..8942ffc 100644
--- a/crates/fluss/Cargo.toml
+++ b/crates/fluss/Cargo.toml
@@ -60,6 +60,8 @@ tempfile = "3.23.0"
 snafu = "0.8.3"
 scopeguard = "1.2.0"
 delegate = "0.13.5"
+strum = "0.26"
+strum_macros = "0.26"
 
 [target.'cfg(target_arch = "wasm32")'.dependencies]
 jiff = { workspace = true, features = ["js"] }
diff --git a/crates/fluss/src/client/write/mod.rs 
b/crates/fluss/src/client/write/mod.rs
index d79418b..00a71c5 100644
--- a/crates/fluss/src/client/write/mod.rs
+++ b/crates/fluss/src/client/write/mod.rs
@@ -30,8 +30,10 @@ pub(crate) mod broadcast;
 mod bucket_assigner;
 
 mod sender;
+mod write_format;
 mod writer_client;
 
+pub use write_format::WriteFormat;
 pub use writer_client::WriterClient;
 
 pub struct WriteRecord<'a> {
diff --git a/crates/fluss/src/client/write/write_format.rs 
b/crates/fluss/src/client/write/write_format.rs
new file mode 100644
index 0000000..d65e42d
--- /dev/null
+++ b/crates/fluss/src/client/write/write_format.rs
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::Error::IllegalArgument;
+use crate::error::Result;
+use crate::metadata::KvFormat;
+use std::fmt::Display;
+
+pub enum WriteFormat {
+    ArrowLog,
+    CompactedLog,
+    CompactedKv,
+}
+
+impl WriteFormat {
+    pub const fn is_log(&self) -> bool {
+        matches!(self, Self::ArrowLog | Self::CompactedLog)
+    }
+
+    pub fn is_kv(&self) -> bool {
+        !self.is_log()
+    }
+
+    pub fn to_kv_format(&self) -> Result<KvFormat> {
+        match self {
+            WriteFormat::CompactedKv => Ok(KvFormat::COMPACTED),
+            other => Err(IllegalArgument {
+                message: format!("WriteFormat `{}` is not a KvFormat", other),
+            }),
+        }
+    }
+
+    pub fn from_kv_format(kv_format: &KvFormat) -> Result<Self> {
+        match kv_format {
+            KvFormat::COMPACTED => Ok(WriteFormat::CompactedKv),
+            other => Err(IllegalArgument {
+                message: format!("Unknown KvFormat: `{}`", other),
+            }),
+        }
+    }
+}
+
+impl Display for WriteFormat {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            WriteFormat::ArrowLog => f.write_str("ArrowLog"),
+            WriteFormat::CompactedLog => f.write_str("CompactedLog"),
+            WriteFormat::CompactedKv => f.write_str("CompactedKv"),
+        }
+    }
+}
diff --git a/crates/fluss/src/error.rs b/crates/fluss/src/error.rs
index e04fde1..0a368b7 100644
--- a/crates/fluss/src/error.rs
+++ b/crates/fluss/src/error.rs
@@ -21,6 +21,7 @@ pub use crate::rpc::{ApiError, FlussError};
 use arrow_schema::ArrowError;
 use snafu::Snafu;
 use std::{io, result};
+use strum::ParseError;
 
 pub type Result<T> = result::Result<T, Error>;
 
@@ -155,3 +156,11 @@ impl From<ApiError> for Error {
         Error::FlussAPIError { api_error: value }
     }
 }
+
+impl From<ParseError> for Error {
+    fn from(value: ParseError) -> Self {
+        Error::IllegalArgument {
+            message: value.to_string(),
+        }
+    }
+}
diff --git a/crates/fluss/src/metadata/data_lake_format.rs 
b/crates/fluss/src/metadata/data_lake_format.rs
index 76a23f8..c186109 100644
--- a/crates/fluss/src/metadata/data_lake_format.rs
+++ b/crates/fluss/src/metadata/data_lake_format.rs
@@ -15,11 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use strum_macros::{Display, EnumString};
+
 /// Identifies the logical format of a data lake table supported by Fluss.
 ///
 /// This enum is typically used in metadata and configuration to distinguish
 /// between different table formats so that the appropriate integration and
 /// semantics can be applied.
+#[derive(Debug, EnumString, Display, PartialEq)]
 pub enum DataLakeFormat {
     /// Apache Paimon data lake table format.
     Paimon,
diff --git a/crates/fluss/src/metadata/datatype.rs 
b/crates/fluss/src/metadata/datatype.rs
index c53cd27..dc1f407 100644
--- a/crates/fluss/src/metadata/datatype.rs
+++ b/crates/fluss/src/metadata/datatype.rs
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::error::Error::IllegalArgument;
+use crate::error::Result;
 use serde::{Deserialize, Serialize};
 use std::fmt::{Display, Formatter};
 
@@ -857,6 +859,27 @@ impl RowType {
         self.fields.iter().position(|f| f.name == field_name)
     }
 
+    pub fn get_field_names(&self) -> Vec<&str> {
+        self.fields.iter().map(|f| f.name.as_str()).collect()
+    }
+
+    pub fn project(&self, project_field_positions: &[usize]) -> 
Result<RowType> {
+        Ok(RowType::with_nullable(
+            self.nullable,
+            project_field_positions
+                .iter()
+                .map(|pos| {
+                    self.fields
+                        .get(*pos)
+                        .cloned()
+                        .ok_or_else(|| IllegalArgument {
+                            message: format!("invalid field position: {}", 
*pos),
+                        })
+                })
+                .collect::<Result<Vec<_>>>()?,
+        ))
+    }
+
     #[cfg(test)]
     pub fn with_data_types(data_types: Vec<DataType>) -> Self {
         let mut fields: Vec<DataField> = Vec::new();
diff --git a/crates/fluss/src/metadata/table.rs 
b/crates/fluss/src/metadata/table.rs
index 4f6c04b..b1e8a90 100644
--- a/crates/fluss/src/metadata/table.rs
+++ b/crates/fluss/src/metadata/table.rs
@@ -18,11 +18,13 @@
 use crate::compression::ArrowCompressionInfo;
 use crate::error::Error::InvalidTableError;
 use crate::error::{Error, Result};
+use crate::metadata::DataLakeFormat;
 use crate::metadata::datatype::{DataField, DataType, RowType};
 use core::fmt;
 use serde::{Deserialize, Serialize};
 use std::collections::{HashMap, HashSet};
 use std::fmt::{Display, Formatter};
+use strum_macros::EnumString;
 
 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
 pub struct Column {
@@ -603,7 +605,7 @@ impl LogFormat {
     }
 }
 
-#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, EnumString)]
 pub enum KvFormat {
     INDEXED,
     COMPACTED,
@@ -726,6 +728,24 @@ impl TableConfig {
     pub fn get_arrow_compression_info(&self) -> Result<ArrowCompressionInfo> {
         ArrowCompressionInfo::from_conf(&self.properties)
     }
+
+    pub fn get_datalake_format(&self) -> Result<Option<DataLakeFormat>> {
+        self.properties
+            .get("table.datalake.format")
+            .map(|f| f.parse().map_err(Error::from))
+            .transpose()
+    }
+
+    pub fn get_kv_format(&self) -> Result<KvFormat> {
+        // TODO: Consolidate configurations logic, constants, defaults in a 
single place
+        const DEFAULT_KV_FORMAT: &str = "COMPACTED";
+        let kv_format = self
+            .properties
+            .get("table.kv.format")
+            .map(String::as_str)
+            .unwrap_or(DEFAULT_KV_FORMAT);
+        kv_format.parse().map_err(Into::into)
+    }
 }
 
 impl TableInfo {
diff --git a/crates/fluss/src/row/field_getter.rs 
b/crates/fluss/src/row/field_getter.rs
index 8e529e5..97f9e39 100644
--- a/crates/fluss/src/row/field_getter.rs
+++ b/crates/fluss/src/row/field_getter.rs
@@ -15,9 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::metadata::DataType;
+use crate::metadata::{DataType, RowType};
 use crate::row::{Datum, InternalRow};
 
+#[derive(Clone)]
 pub enum FieldGetter {
     Nullable(InnerFieldGetter),
     NonNullable(InnerFieldGetter),
@@ -36,6 +37,16 @@ impl FieldGetter {
         }
     }
 
+    #[allow(dead_code)]
+    pub fn create_field_getters(row_type: &RowType) -> Box<[FieldGetter]> {
+        row_type
+            .fields()
+            .iter()
+            .enumerate()
+            .map(|(pos, field)| Self::create(field.data_type(), pos))
+            .collect()
+    }
+
     pub fn create(data_type: &DataType, pos: usize) -> FieldGetter {
         let inner_field_getter = match data_type {
             DataType::Char(t) => InnerFieldGetter::Char {
@@ -66,6 +77,7 @@ impl FieldGetter {
     }
 }
 
+#[derive(Clone)]
 pub enum InnerFieldGetter {
     Char { pos: usize, len: usize },
     String { pos: usize },

Reply via email to