This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 1d74c4a feat: Introduce UpsertWriter (#169)
1d74c4a is described below
commit 1d74c4a6bbc0e421b1b95b84d998401ca30cea39
Author: Keith Lee <[email protected]>
AuthorDate: Tue Jan 20 13:09:49 2026 +0000
feat: Introduce UpsertWriter (#169)
---
crates/fluss/Cargo.toml | 1 +
crates/fluss/src/client/table/log_fetch_buffer.rs | 6 +-
crates/fluss/src/client/table/lookup.rs | 9 +-
crates/fluss/src/client/table/mod.rs | 18 +
crates/fluss/src/client/table/partition_getter.rs | 56 +++
crates/fluss/src/client/table/upsert.rs | 522 +++++++++++++++++++++
crates/fluss/src/client/table/writer.rs | 20 +-
crates/fluss/src/client/write/accumulator.rs | 39 +-
crates/fluss/src/client/write/batch.rs | 20 +-
crates/fluss/src/client/write/bucket_assigner.rs | 9 +-
crates/fluss/src/client/write/mod.rs | 54 ++-
crates/fluss/src/client/write/write_format.rs | 1 +
crates/fluss/src/client/write/writer_client.rs | 9 +-
crates/fluss/src/metadata/table.rs | 66 ++-
crates/fluss/src/record/arrow.rs | 62 ++-
crates/fluss/src/record/kv/kv_record_batch.rs | 11 +-
.../fluss/src/record/kv/kv_record_batch_builder.rs | 80 ++--
.../fluss/src/record/kv/kv_record_read_context.rs | 17 +-
crates/fluss/src/row/compacted/compacted_row.rs | 15 +-
.../src/row/compacted/compacted_row_writer.rs | 12 +
.../fluss/src/row/encode/compacted_row_encoder.rs | 13 +-
crates/fluss/src/row/encode/mod.rs | 23 +-
crates/fluss/src/row/mod.rs | 27 +-
23 files changed, 895 insertions(+), 195 deletions(-)
diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml
index c3bdd44..9aeee72 100644
--- a/crates/fluss/Cargo.toml
+++ b/crates/fluss/Cargo.toml
@@ -33,6 +33,7 @@ integration_tests = []
[dependencies]
arrow = { workspace = true }
arrow-schema = "57.0.0"
+bitvec = "1"
byteorder = "1.5"
futures = "0.3"
clap = { workspace = true }
diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs
b/crates/fluss/src/client/table/log_fetch_buffer.rs
index ac44cc1..ca0a253 100644
--- a/crates/fluss/src/client/table/log_fetch_buffer.rs
+++ b/crates/fluss/src/client/table/log_fetch_buffer.rs
@@ -651,14 +651,14 @@ mod tests {
use crate::compression::{
ArrowCompressionInfo, ArrowCompressionType,
DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
};
- use crate::metadata::{DataField, DataTypes, TablePath};
+ use crate::metadata::{DataField, DataTypes, RowType, TablePath};
use crate::record::{MemoryLogRecordsArrowBuilder, ReadContext,
to_arrow_schema};
use crate::row::GenericRow;
use std::sync::Arc;
use std::time::Duration;
fn test_read_context() -> ReadContext {
- let row_type = DataTypes::row(vec![DataField::new(
+ let row_type = RowType::new(vec![DataField::new(
"id".to_string(),
DataTypes::int(),
None,
@@ -714,7 +714,7 @@ mod tests {
#[test]
fn default_completed_fetch_reads_records() -> Result<()> {
- let row_type = DataTypes::row(vec![
+ let row_type = RowType::new(vec![
DataField::new("id".to_string(), DataTypes::int(), None),
DataField::new("name".to_string(), DataTypes::string(), None),
]);
diff --git a/crates/fluss/src/client/table/lookup.rs
b/crates/fluss/src/client/table/lookup.rs
index 1d32ebd..cd23503 100644
--- a/crates/fluss/src/client/table/lookup.rs
+++ b/crates/fluss/src/client/table/lookup.rs
@@ -22,7 +22,7 @@ use crate::error::{Error, Result};
use crate::metadata::{RowType, TableBucket, TableInfo};
use crate::row::InternalRow;
use crate::row::compacted::CompactedRow;
-use crate::row::encode::KeyEncoder;
+use crate::row::encode::{KeyEncoder, KeyEncoderFactory};
use crate::rpc::ApiError;
use crate::rpc::message::LookupRequest;
use std::sync::Arc;
@@ -130,8 +130,11 @@ impl<'a> TableLookup<'a> {
// Create key encoder for the primary key fields
let pk_fields = self.table_info.get_physical_primary_keys().to_vec();
- let key_encoder =
- <dyn KeyEncoder>::of(self.table_info.row_type(), pk_fields,
data_lake_format)?;
+ let key_encoder = KeyEncoderFactory::of(
+ self.table_info.row_type(),
+ pk_fields.as_slice(),
+ &data_lake_format,
+ )?;
Ok(Lookuper {
conn: self.conn,
diff --git a/crates/fluss/src/client/table/mod.rs
b/crates/fluss/src/client/table/mod.rs
index 7356be2..2bfa054 100644
--- a/crates/fluss/src/client/table/mod.rs
+++ b/crates/fluss/src/client/table/mod.rs
@@ -27,13 +27,17 @@ mod append;
mod lookup;
mod log_fetch_buffer;
+mod partition_getter;
mod remote_log;
mod scanner;
+mod upsert;
mod writer;
+use crate::client::table::upsert::TableUpsert;
pub use append::{AppendWriter, TableAppend};
pub use lookup::{LookupResult, Lookuper, TableLookup};
pub use scanner::{LogScanner, RecordBatchLogScanner, TableScan};
+pub use writer::{TableWriter, UpsertWriter};
#[allow(dead_code)]
pub struct FlussTable<'a> {
@@ -119,6 +123,20 @@ impl<'a> FlussTable<'a> {
self.metadata.clone(),
))
}
+
+ pub fn new_upsert(&self) -> Result<TableUpsert> {
+ if !self.has_primary_key {
+ return Err(Error::UnsupportedOperation {
+ message: "Upsert is only supported for primary key
tables".to_string(),
+ });
+ }
+
+ Ok(TableUpsert::new(
+ self.table_path.clone(),
+ self.table_info.clone(),
+ self.conn.get_or_create_writer_client()?,
+ ))
+ }
}
impl<'a> Drop for FlussTable<'a> {
diff --git a/crates/fluss/src/client/table/partition_getter.rs
b/crates/fluss/src/client/table/partition_getter.rs
new file mode 100644
index 0000000..4529d86
--- /dev/null
+++ b/crates/fluss/src/client/table/partition_getter.rs
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::Error::IllegalArgument;
+use crate::error::Result;
+use crate::metadata::{DataType, RowType};
+use crate::row::field_getter::FieldGetter;
+
+#[allow(dead_code)]
+pub struct PartitionGetter<'a> {
+ partitions: Vec<(&'a String, &'a DataType, FieldGetter)>,
+}
+
+#[allow(dead_code)]
+impl<'a> PartitionGetter<'a> {
+ pub fn new(row_type: &'a RowType, partition_keys: &'a Vec<String>) ->
Result<Self> {
+ let mut partitions = Vec::with_capacity(partition_keys.len());
+
+ for partition_key in partition_keys {
+ if let Some(partition_col_index) =
row_type.get_field_index(partition_key.as_str()) {
+ let data_type = &row_type
+ .fields()
+ .get(partition_col_index)
+ .unwrap()
+ .data_type;
+ let field_getter = FieldGetter::create(data_type,
partition_col_index);
+
+ partitions.push((partition_key, data_type, field_getter));
+ } else {
+ return Err(IllegalArgument {
+ message: format!(
+ "The partition column {partition_key} is not in the
row {row_type}."
+ ),
+ });
+ };
+ }
+
+ Ok(Self { partitions })
+ }
+
+ // TODO Implement get partition
+}
diff --git a/crates/fluss/src/client/table/upsert.rs
b/crates/fluss/src/client/table/upsert.rs
new file mode 100644
index 0000000..a3909e7
--- /dev/null
+++ b/crates/fluss/src/client/table/upsert.rs
@@ -0,0 +1,522 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::client::table::writer::{DeleteResult, TableWriter, UpsertResult,
UpsertWriter};
+use crate::client::{RowBytes, WriteFormat, WriteRecord, WriterClient};
+use crate::error::Error::IllegalArgument;
+use crate::error::Result;
+use crate::metadata::{KvFormat, RowType, TableInfo, TablePath};
+use crate::row::InternalRow;
+use crate::row::encode::{KeyEncoder, KeyEncoderFactory, RowEncoder,
RowEncoderFactory};
+use crate::row::field_getter::FieldGetter;
+use std::sync::Arc;
+
+use bitvec::prelude::bitvec;
+use bytes::Bytes;
+
+#[allow(dead_code)]
+pub struct TableUpsert {
+ table_path: TablePath,
+ table_info: TableInfo,
+ writer_client: Arc<WriterClient>,
+ target_columns: Option<Arc<Vec<usize>>>,
+}
+
+#[allow(dead_code)]
+impl TableUpsert {
+ pub fn new(
+ table_path: TablePath,
+ table_info: TableInfo,
+ writer_client: Arc<WriterClient>,
+ ) -> Self {
+ Self {
+ table_path,
+ table_info,
+ writer_client,
+ target_columns: None,
+ }
+ }
+
+ pub fn partial_update(&self, target_columns: Option<Vec<usize>>) ->
Result<Self> {
+ if let Some(columns) = &target_columns {
+ let num_columns = self.table_info.row_type().fields().len();
+
+ if let Some(&invalid_column) = columns.iter().find(|&&col| col >=
num_columns) {
+ return Err(IllegalArgument {
+ message: format!(
+ "Invalid target column index: {invalid_column} for
table {}. The table only has {num_columns} columns.",
+ self.table_path
+ ),
+ });
+ }
+ }
+
+ Ok(Self {
+ table_path: self.table_path.clone(),
+ table_info: self.table_info.clone(),
+ writer_client: self.writer_client.clone(),
+ target_columns: target_columns.map(Arc::new),
+ })
+ }
+
+ pub fn partial_update_with_column_names(&self, target_column_names:
&[&str]) -> Result<Self> {
+ let row_type = self.table_info.row_type();
+ let col_indices: Vec<(&str, Option<usize>)> = target_column_names
+ .iter()
+ .map(|col_name| (*col_name, row_type.get_field_index(col_name)))
+ .collect();
+
+ if let Some((missing_name, _)) = col_indices.iter().find(|(_, ix)|
ix.is_none()) {
+ return Err(IllegalArgument {
+ message: format!(
+ "Cannot find target column `{}` for table {}.",
+ missing_name, self.table_path
+ ),
+ });
+ }
+
+ let valid_col_indices: Vec<usize> = col_indices
+ .into_iter()
+ .map(|(_, index)| index.unwrap())
+ .collect();
+
+ self.partial_update(Some(valid_col_indices))
+ }
+
+ pub fn create_writer(&self) -> Result<impl UpsertWriter> {
+ UpsertWriterFactory::create(
+ Arc::new(self.table_path.clone()),
+ Arc::new(self.table_info.clone()),
+ self.target_columns.clone(),
+ Arc::clone(&self.writer_client),
+ )
+ }
+}
+
+#[allow(dead_code)]
+struct UpsertWriterImpl<RE>
+where
+ RE: RowEncoder,
+{
+ table_path: Arc<TablePath>,
+ writer_client: Arc<WriterClient>,
+ // TODO: Partitioning
+ // partition_field_getter: Option<Box<dyn KeyEncoder>>,
+ primary_key_encoder: Box<dyn KeyEncoder>,
+ target_columns: Option<Arc<Vec<usize>>>,
+ // Use primary key encoder as bucket key encoder when None
+ bucket_key_encoder: Option<Box<dyn KeyEncoder>>,
+ kv_format: KvFormat,
+ write_format: WriteFormat,
+ row_encoder: RE,
+ field_getters: Box<[FieldGetter]>,
+ table_info: Arc<TableInfo>,
+}
+
+#[allow(dead_code)]
+struct UpsertWriterFactory;
+
+#[allow(dead_code)]
+impl UpsertWriterFactory {
+ pub fn create(
+ table_path: Arc<TablePath>,
+ table_info: Arc<TableInfo>,
+ partial_update_columns: Option<Arc<Vec<usize>>>,
+ writer_client: Arc<WriterClient>,
+ ) -> Result<impl UpsertWriter> {
+ let data_lake_format = &table_info.table_config.get_datalake_format()?;
+ let row_type = table_info.row_type();
+ let physical_pks = table_info.get_physical_primary_keys();
+
+ let names = table_info.get_schema().auto_increment_col_names();
+
+ Self::sanity_check(
+ row_type,
+ &table_info.primary_keys,
+ names,
+ &partial_update_columns,
+ )?;
+
+ let primary_key_encoder = KeyEncoderFactory::of(row_type,
physical_pks, data_lake_format)?;
+ let bucket_key_encoder = if !table_info.is_default_bucket_key() {
+ Some(KeyEncoderFactory::of(
+ row_type,
+ table_info.get_bucket_keys(),
+ data_lake_format,
+ )?)
+ } else {
+ // Defaults to using primary key encoder when None for bucket key
+ None
+ };
+
+ let kv_format = table_info.get_table_config().get_kv_format()?;
+ let write_format = WriteFormat::from_kv_format(&kv_format)?;
+
+ let field_getters = FieldGetter::create_field_getters(row_type);
+
+ Ok(UpsertWriterImpl {
+ table_path,
+ writer_client,
+ primary_key_encoder,
+ target_columns: partial_update_columns,
+ bucket_key_encoder,
+ kv_format: kv_format.clone(),
+ write_format,
+ row_encoder: RowEncoderFactory::create(kv_format,
row_type.clone())?,
+ field_getters,
+ table_info: table_info.clone(),
+ })
+ }
+
+ #[allow(dead_code)]
+ fn sanity_check(
+ row_type: &RowType,
+ primary_keys: &Vec<String>,
+ auto_increment_col_names: &Vec<String>,
+ target_columns: &Option<Arc<Vec<usize>>>,
+ ) -> Result<()> {
+ if target_columns.is_none() {
+ if !auto_increment_col_names.is_empty() {
+ return Err(IllegalArgument {
+ message: format!(
+ "This table has auto increment column {}. Explicitly
specifying values for an auto increment column is not allowed. Please Specify
non-auto-increment columns as target columns using partialUpdate first.",
+ auto_increment_col_names.join(", ")
+ ),
+ });
+ }
+ return Ok(());
+ }
+
+ let field_count = row_type.fields().len();
+
+ let mut target_column_set = bitvec![0; field_count];
+
+ let columns = target_columns.as_ref().unwrap().as_ref();
+
+ for &target_index in columns {
+ target_column_set.set(target_index, true);
+ }
+
+ let mut pk_column_set = bitvec![0; field_count];
+
+ // check the target columns contains the primary key
+ for primary_key in primary_keys {
+ let pk_index = row_type.get_field_index(primary_key.as_str());
+ match pk_index {
+ Some(pk_index) => {
+ if !target_column_set[pk_index] {
+ return Err(IllegalArgument {
+ message: format!(
+ "The target write columns {} must contain the
primary key columns {}",
+
row_type.project(columns)?.get_field_names().join(", "),
+ primary_keys.join(", ")
+ ),
+ });
+ }
+ pk_column_set.set(pk_index, true);
+ }
+ None => {
+ return Err(IllegalArgument {
+ message: format!(
+ "The specified primary key {} is not in row type
{}",
+ primary_key, row_type
+ ),
+ });
+ }
+ }
+ }
+
+ let mut auto_increment_column_set = bitvec![0; field_count];
+ // explicitly specifying values for an auto increment column is not
allowed
+ for auto_increment_col_name in auto_increment_col_names {
+ let auto_increment_field_index =
+ row_type.get_field_index(auto_increment_col_name.as_str());
+
+ if let Some(index) = auto_increment_field_index {
+ if target_column_set[index] {
+ return Err(IllegalArgument {
+ message: format!(
+ "Explicitly specifying values for the auto
increment column {} is not allowed.",
+ auto_increment_col_name
+ ),
+ });
+ }
+
+ auto_increment_column_set.set(index, true);
+ }
+ }
+
+ // check the columns not in targetColumns should be nullable
+ for i in 0..field_count {
+ // column not in primary key and not in auto increment column
+ if !pk_column_set[i] && !auto_increment_column_set[i] {
+ // the column should be nullable
+ if !row_type.fields().get(i).unwrap().data_type.is_nullable() {
+ return Err(IllegalArgument {
+ message: format!(
+ "Partial Update requires all columns except
primary key to be nullable, but column {} is NOT NULL.",
+ row_type.fields().get(i).unwrap().name()
+ ),
+ });
+ }
+ }
+ }
+
+ Ok(())
+ }
+}
+
+#[allow(dead_code)]
+impl<RE: RowEncoder> UpsertWriterImpl<RE> {
+ fn check_field_count<R: InternalRow>(&self, row: &R) -> Result<()> {
+ let expected = self.table_info.get_row_type().fields().len();
+ if row.get_field_count() != expected {
+ return Err(IllegalArgument {
+ message: format!(
+ "The field count of the row does not match the table
schema. Expected: {}, Actual: {}",
+ expected,
+ row.get_field_count()
+ ),
+ });
+ }
+ Ok(())
+ }
+
+ fn get_keys(&mut self, row: &dyn InternalRow) -> Result<(Bytes,
Option<Bytes>)> {
+ let key = self.primary_key_encoder.encode_key(row)?;
+ let bucket_key = match &mut self.bucket_key_encoder {
+ Some(bucket_key_encoder) =>
Some(bucket_key_encoder.encode_key(row)?),
+ None => Some(key.clone()),
+ };
+ Ok((key, bucket_key))
+ }
+
+ fn encode_row<R: InternalRow>(&mut self, row: &R) -> Result<Bytes> {
+ self.row_encoder.start_new_row()?;
+ for (pos, field_getter) in self.field_getters.iter().enumerate() {
+ let datum = field_getter.get_field(row);
+ self.row_encoder.encode_field(pos, datum)?;
+ }
+ self.row_encoder.finish_row()
+ }
+}
+
+impl<RE: RowEncoder> TableWriter for UpsertWriterImpl<RE> {
+ /// Flush data written that have not yet been sent to the server, forcing
the client to send the
+ /// requests to server and blocks on the completion of the requests
associated with these
+ /// records. A request is considered completed when it is successfully
acknowledged according to
+ /// the CLIENT_WRITER_ACKS configuration option you have specified or else
it
+ /// results in an error.
+ async fn flush(&self) -> Result<()> {
+ self.writer_client.flush().await
+ }
+}
+
+impl<RE: RowEncoder> UpsertWriter for UpsertWriterImpl<RE> {
+ /// Inserts row into Fluss table if they do not already exist, or updates
them if they do exist.
+ ///
+ /// # Arguments
+ /// * row - the row to upsert.
+ ///
+ /// # Returns
+ /// Ok(UpsertResult) when completed normally
+ async fn upsert<R: InternalRow>(&mut self, row: &R) ->
Result<UpsertResult> {
+ self.check_field_count(row)?;
+
+ let (key, bucket_key) = self.get_keys(row)?;
+
+ let row_bytes: RowBytes<'_> = match
row.as_encoded_bytes(self.write_format) {
+ Some(bytes) => RowBytes::Borrowed(bytes),
+ None => RowBytes::Owned(self.encode_row(row)?),
+ };
+
+ let write_record = WriteRecord::for_upsert(
+ Arc::clone(&self.table_path),
+ self.table_info.schema_id,
+ key,
+ bucket_key,
+ self.write_format,
+ self.target_columns.clone(),
+ Some(row_bytes),
+ );
+
+ let result_handle = self.writer_client.send(&write_record).await?;
+ let result = result_handle.wait().await?;
+
+ result_handle.result(result).map(|_| UpsertResult)
+ }
+
+ /// Delete certain row by the input row in Fluss table, the input row must
contain the primary
+ /// key.
+ ///
+ /// # Arguments
+ /// * row - the row to delete.
+ ///
+ /// # Returns
+ /// Ok(DeleteResult) when completed normally
+ async fn delete<R: InternalRow>(&mut self, row: &R) ->
Result<DeleteResult> {
+ self.check_field_count(row)?;
+
+ let (key, bucket_key) = self.get_keys(row)?;
+
+ let write_record = WriteRecord::for_upsert(
+ Arc::clone(&self.table_path),
+ self.table_info.schema_id,
+ key,
+ bucket_key,
+ self.write_format,
+ self.target_columns.clone(),
+ None,
+ );
+
+ let result_handle = self.writer_client.send(&write_record).await?;
+ let result = result_handle.wait().await?;
+
+ result_handle.result(result).map(|_| DeleteResult)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::metadata::{DataField, DataTypes};
+
+ #[test]
+ fn sanity_check() {
+ // No target columns specified but table has auto-increment column
+ let fields = vec![
+ DataField::new("id".to_string(),
DataTypes::int().as_non_nullable(), None),
+ DataField::new("name".to_string(), DataTypes::string(), None),
+ ];
+ let row_type = RowType::new(fields);
+ let primary_keys = vec!["id".to_string()];
+ let auto_increment_col_names = vec!["id".to_string()];
+ let target_columns = None;
+
+ let result = UpsertWriterFactory::sanity_check(
+ &row_type,
+ &primary_keys,
+ &auto_increment_col_names,
+ &target_columns,
+ );
+
+ assert!(result.unwrap_err().to_string().contains(
+ "This table has auto increment column id. Explicitly specifying
values for an auto increment column is not allowed. Please Specify
non-auto-increment columns as target columns using partialUpdate first."
+ ));
+
+ // Target columns do not contain primary key
+ let fields = vec![
+ DataField::new("id".to_string(),
DataTypes::int().as_non_nullable(), None),
+ DataField::new("name".to_string(), DataTypes::string(), None),
+ DataField::new("value".to_string(), DataTypes::int(), None),
+ ];
+ let row_type = RowType::new(fields);
+ let primary_keys = vec!["id".to_string()];
+ let auto_increment_col_names = vec![];
+ let target_columns = Some(Arc::new(vec![1usize]));
+
+ let result = UpsertWriterFactory::sanity_check(
+ &row_type,
+ &primary_keys,
+ &auto_increment_col_names,
+ &target_columns,
+ );
+
+ assert!(
+ result
+ .unwrap_err()
+ .to_string()
+ .contains("The target write columns name must contain the
primary key columns id")
+ );
+
+ // Primary key column not found in row type
+ let fields = vec![
+ DataField::new("id".to_string(),
DataTypes::int().as_non_nullable(), None),
+ DataField::new("name".to_string(), DataTypes::string(), None),
+ ];
+ let row_type = RowType::new(fields);
+ let primary_keys = vec!["nonexistent_pk".to_string()];
+ let auto_increment_col_names = vec![];
+ let target_columns = Some(Arc::new(vec![0usize, 1]));
+
+ let result = UpsertWriterFactory::sanity_check(
+ &row_type,
+ &primary_keys,
+ &auto_increment_col_names,
+ &target_columns,
+ );
+
+ assert!(
+ result
+ .unwrap_err()
+ .to_string()
+ .contains("The specified primary key nonexistent_pk is not in
row type")
+ );
+
+ // Target columns include auto-increment column
+ let fields = vec![
+ DataField::new("id".to_string(),
DataTypes::int().as_non_nullable(), None),
+ DataField::new(
+ "seq".to_string(),
+ DataTypes::bigint().as_non_nullable(),
+ None,
+ ),
+ DataField::new("name".to_string(), DataTypes::string(), None),
+ ];
+ let row_type = RowType::new(fields);
+ let primary_keys = vec!["id".to_string()];
+ let auto_increment_col_names = vec!["seq".to_string()];
+ let target_columns = Some(Arc::new(vec![0usize, 1, 2]));
+
+ let result = UpsertWriterFactory::sanity_check(
+ &row_type,
+ &primary_keys,
+ &auto_increment_col_names,
+ &target_columns,
+ );
+
+ assert!(result.unwrap_err().to_string().contains(
+ "Explicitly specifying values for the auto increment column seq is
not allowed."
+ ));
+
+ // Non-nullable column not in target columns (partial update requires
nullable)
+ let fields = vec![
+ DataField::new("id".to_string(),
DataTypes::int().as_non_nullable(), None),
+ DataField::new(
+ "required_field".to_string(),
+ DataTypes::string().as_non_nullable(),
+ None,
+ ),
+ DataField::new("optional_field".to_string(), DataTypes::int(),
None),
+ ];
+ let row_type = RowType::new(fields);
+ let primary_keys = vec!["id".to_string()];
+ let auto_increment_col_names = vec![];
+ let target_columns = Some(Arc::new(vec![0usize]));
+
+ let result = UpsertWriterFactory::sanity_check(
+ &row_type,
+ &primary_keys,
+ &auto_increment_col_names,
+ &target_columns,
+ );
+
+ assert!(result.unwrap_err().to_string().contains(
+ "Partial Update requires all columns except primary key to be
nullable, but column required_field is NOT NULL."
+ ));
+ }
+}
diff --git a/crates/fluss/src/client/table/writer.rs
b/crates/fluss/src/client/table/writer.rs
index 8a83b5e..8276545 100644
--- a/crates/fluss/src/client/table/writer.rs
+++ b/crates/fluss/src/client/table/writer.rs
@@ -16,13 +16,13 @@
// under the License.
use crate::client::{WriteRecord, WriterClient};
-use crate::row::GenericRow;
+use crate::row::{GenericRow, InternalRow};
use std::sync::Arc;
use crate::error::Result;
use crate::metadata::{TableInfo, TablePath};
-#[allow(dead_code)]
+#[allow(dead_code, async_fn_in_trait)]
pub trait TableWriter {
async fn flush(&self) -> Result<()>;
}
@@ -32,12 +32,22 @@ pub trait AppendWriter: TableWriter {
async fn append(&self, row: GenericRow) -> Result<()>;
}
-#[allow(dead_code)]
+#[allow(dead_code, async_fn_in_trait)]
pub trait UpsertWriter: TableWriter {
- async fn upsert(&self, row: GenericRow) -> Result<()>;
- async fn delete(&self, row: GenericRow) -> Result<()>;
+ async fn upsert<R: InternalRow>(&mut self, row: &R) ->
Result<UpsertResult>;
+ async fn delete<R: InternalRow>(&mut self, row: &R) ->
Result<DeleteResult>;
}
+/// The result of upserting a record
+/// Currently this is an empty struct to allow for compatible evolution in the
future
+#[derive(Default)]
+pub struct UpsertResult;
+
+/// The result of deleting a record
+/// Currently this is an empty struct to allow for compatible evolution in the
future
+#[derive(Default)]
+pub struct DeleteResult;
+
#[allow(dead_code)]
pub struct AbstractTableWriter {
table_path: Arc<TablePath>,
diff --git a/crates/fluss/src/client/write/accumulator.rs
b/crates/fluss/src/client/write/accumulator.rs
index 0afc9d4..fb7b544 100644
--- a/crates/fluss/src/client/write/accumulator.rs
+++ b/crates/fluss/src/client/write/accumulator.rs
@@ -15,8 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-use crate::client::write::batch::WriteBatch::ArrowLog;
-use crate::client::write::batch::{ArrowLogWriteBatch, WriteBatch};
+use crate::client::write::batch::WriteBatch::{ArrowLog, Kv};
+use crate::client::write::batch::{ArrowLogWriteBatch, KvWriteBatch,
WriteBatch};
use crate::client::{LogWriteRecord, Record, ResultHandle, WriteRecord};
use crate::cluster::{BucketLocation, Cluster, ServerNode};
use crate::config::Config;
@@ -102,16 +102,29 @@ impl RecordAccumulator {
let schema_id = table_info.schema_id;
- let mut batch = ArrowLog(ArrowLogWriteBatch::new(
- self.batch_id.fetch_add(1, Ordering::Relaxed),
- table_path.as_ref().clone(),
- schema_id,
- arrow_compression_info,
- row_type,
- bucket_id,
- current_time_ms(),
- matches!(&record.record,
Record::Log(LogWriteRecord::RecordBatch(_))),
- ));
+ let mut batch: WriteBatch = match record.record() {
+ Record::Log(_) => ArrowLog(ArrowLogWriteBatch::new(
+ self.batch_id.fetch_add(1, Ordering::Relaxed),
+ table_path.as_ref().clone(),
+ schema_id,
+ arrow_compression_info,
+ row_type,
+ bucket_id,
+ current_time_ms(),
+ matches!(&record.record,
Record::Log(LogWriteRecord::RecordBatch(_))),
+ )),
+ Record::Kv(kv_record) => Kv(KvWriteBatch::new(
+ self.batch_id.fetch_add(1, Ordering::Relaxed),
+ table_path.as_ref().clone(),
+ schema_id,
+ // TODO: Decide how to derive write limit in the absence of
java's equivalent of PreAllocatedPagedOutputView
+ KvWriteBatch::DEFAULT_WRITE_LIMIT,
+ record.write_format.to_kv_format()?,
+ bucket_id,
+ kv_record.target_columns.clone(),
+ current_time_ms(),
+ )),
+ };
let batch_id = batch.batch_id();
@@ -142,6 +155,8 @@ impl RecordAccumulator {
) -> Result<RecordAppendResult> {
let table_path = &record.table_path;
+ // TODO: Implement partitioning
+
let dq = {
let mut binding = self
.write_batches
diff --git a/crates/fluss/src/client/write/batch.rs
b/crates/fluss/src/client/write/batch.rs
index 0159753..2ddf519 100644
--- a/crates/fluss/src/client/write/batch.rs
+++ b/crates/fluss/src/client/write/batch.rs
@@ -20,11 +20,12 @@ use crate::client::broadcast::{BatchWriteResult,
BroadcastOnce};
use crate::client::{Record, ResultHandle, WriteRecord};
use crate::compression::ArrowCompressionInfo;
use crate::error::{Error, Result};
-use crate::metadata::{DataType, KvFormat, TablePath};
+use crate::metadata::{KvFormat, RowType, TablePath};
use crate::record::MemoryLogRecordsArrowBuilder;
use crate::record::kv::KvRecordBatchBuilder;
use bytes::Bytes;
use std::cmp::max;
+use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
#[allow(dead_code)]
@@ -192,7 +193,7 @@ impl ArrowLogWriteBatch {
table_path: TablePath,
schema_id: i32,
arrow_compression_info: ArrowCompressionInfo,
- row_type: &DataType,
+ row_type: &RowType,
bucket_id: BucketId,
create_ms: i64,
to_append_record_batch: bool,
@@ -249,11 +250,12 @@ impl ArrowLogWriteBatch {
pub struct KvWriteBatch {
write_batch: InnerWriteBatch,
kv_batch_builder: KvRecordBatchBuilder,
- target_columns: Option<Vec<usize>>,
+ target_columns: Option<Arc<Vec<usize>>>,
schema_id: i32,
}
impl KvWriteBatch {
+ pub const DEFAULT_WRITE_LIMIT: usize = 256;
#[allow(clippy::too_many_arguments)]
pub fn new(
batch_id: i64,
@@ -262,7 +264,7 @@ impl KvWriteBatch {
write_limit: usize,
kv_format: KvFormat,
bucket_id: BucketId,
- target_columns: Option<Vec<usize>>,
+ target_columns: Option<Arc<Vec<usize>>>,
create_ms: i64,
) -> Self {
let base = InnerWriteBatch::new(batch_id, table_path, create_ms,
bucket_id);
@@ -284,7 +286,7 @@ impl KvWriteBatch {
}
};
- let key = kv_write_record.key;
+ let key = kv_write_record.key.as_ref();
if self.schema_id != write_record.schema_id {
return Err(Error::UnexpectedError {
@@ -296,7 +298,7 @@ impl KvWriteBatch {
});
};
- if self.target_columns.as_deref() != kv_write_record.target_columns {
+ if self.target_columns != kv_write_record.target_columns {
return Err(Error::UnexpectedError {
message: format!(
"target columns {:?} of the write record to append are not
the same as the current target columns {:?} in the batch.",
@@ -307,14 +309,14 @@ impl KvWriteBatch {
});
}
- let row = kv_write_record.compacted_row.as_ref();
+ let row_bytes = kv_write_record.row_bytes();
- if self.is_closed() || !self.kv_batch_builder.has_room_for_row(key,
row) {
+ if self.is_closed() || !self.kv_batch_builder.has_room_for_row(key,
row_bytes) {
Ok(None)
} else {
// append successfully
self.kv_batch_builder
- .append_row(key, row)
+ .append_row(key, row_bytes)
.map_err(|e| Error::UnexpectedError {
message: "Failed to append row to
KvWriteBatch".to_string(),
source: Some(Box::new(e)),
diff --git a/crates/fluss/src/client/write/bucket_assigner.rs
b/crates/fluss/src/client/write/bucket_assigner.rs
index 2370719..817101a 100644
--- a/crates/fluss/src/client/write/bucket_assigner.rs
+++ b/crates/fluss/src/client/write/bucket_assigner.rs
@@ -20,6 +20,7 @@ use crate::cluster::Cluster;
use crate::error::Error::IllegalArgument;
use crate::error::Result;
use crate::metadata::TablePath;
+use bytes::Bytes;
use rand::Rng;
use std::sync::atomic::{AtomicI32, Ordering};
@@ -28,7 +29,7 @@ pub trait BucketAssigner: Sync + Send {
fn on_new_batch(&self, cluster: &Cluster, prev_bucket_id: i32);
- fn assign_bucket(&self, bucket_key: Option<&[u8]>, cluster: &Cluster) ->
Result<i32>;
+ fn assign_bucket(&self, bucket_key: Option<&Bytes>, cluster: &Cluster) ->
Result<i32>;
}
#[derive(Debug)]
@@ -94,7 +95,7 @@ impl BucketAssigner for StickyBucketAssigner {
self.next_bucket(cluster, prev_bucket_id);
}
- fn assign_bucket(&self, _bucket_key: Option<&[u8]>, cluster: &Cluster) ->
Result<i32> {
+ fn assign_bucket(&self, _bucket_key: Option<&Bytes>, cluster: &Cluster) ->
Result<i32> {
let bucket_id = self.current_bucket_id.load(Ordering::Relaxed);
if bucket_id < 0 {
Ok(self.next_bucket(cluster, bucket_id))
@@ -139,7 +140,7 @@ impl BucketAssigner for HashBucketAssigner {
// do nothing
}
- fn assign_bucket(&self, bucket_key: Option<&[u8]>, _: &Cluster) ->
Result<i32> {
+ fn assign_bucket(&self, bucket_key: Option<&Bytes>, _: &Cluster) ->
Result<i32> {
let key = bucket_key.ok_or_else(|| IllegalArgument {
message: "no bucket key provided".to_string(),
})?;
@@ -181,7 +182,7 @@ mod tests {
let assigner = HashBucketAssigner::new(4, <dyn
BucketingFunction>::of(None));
let cluster = Cluster::default();
let bucket = assigner
- .assign_bucket(Some(b"key"), &cluster)
+ .assign_bucket(Some(&Bytes::from_static(b"key")), &cluster)
.expect("bucket");
assert!((0..4).contains(&bucket));
}
diff --git a/crates/fluss/src/client/write/mod.rs
b/crates/fluss/src/client/write/mod.rs
index 248218e..dcc6795 100644
--- a/crates/fluss/src/client/write/mod.rs
+++ b/crates/fluss/src/client/write/mod.rs
@@ -21,9 +21,10 @@ mod batch;
use crate::client::broadcast::{self as client_broadcast, BatchWriteResult,
BroadcastOnceReceiver};
use crate::error::Error;
use crate::metadata::TablePath;
-use crate::row::{CompactedRow, GenericRow};
+use crate::row::GenericRow;
pub use accumulator::*;
use arrow::array::RecordBatch;
+use bytes::Bytes;
use std::sync::Arc;
pub(crate) mod broadcast;
@@ -40,7 +41,7 @@ pub use writer_client::WriterClient;
pub struct WriteRecord<'a> {
record: Record<'a>,
table_path: Arc<TablePath>,
- bucket_key: Option<&'a [u8]>,
+ bucket_key: Option<Bytes>,
schema_id: i32,
write_format: WriteFormat,
}
@@ -61,25 +62,43 @@ pub enum LogWriteRecord<'a> {
RecordBatch(Arc<RecordBatch>),
}
+#[derive(Clone)]
+pub enum RowBytes<'a> {
+ Borrowed(&'a [u8]),
+ Owned(Bytes),
+}
+
+impl<'a> RowBytes<'a> {
+ pub fn as_slice(&self) -> &[u8] {
+ match self {
+ RowBytes::Borrowed(slice) => slice,
+ RowBytes::Owned(bytes) => bytes.as_ref(),
+ }
+ }
+}
+
pub struct KvWriteRecord<'a> {
- // only valid for primary key table
- key: &'a [u8],
- target_columns: Option<&'a [usize]>,
- compacted_row: Option<CompactedRow<'a>>,
+ key: Bytes,
+ target_columns: Option<Arc<Vec<usize>>>,
+ row_bytes: Option<RowBytes<'a>>,
}
impl<'a> KvWriteRecord<'a> {
fn new(
- key: &'a [u8],
- target_columns: Option<&'a [usize]>,
- compacted_row: Option<CompactedRow<'a>>,
+ key: Bytes,
+ target_columns: Option<Arc<Vec<usize>>>,
+ row_bytes: Option<RowBytes<'a>>,
) -> Self {
KvWriteRecord {
key,
target_columns,
- compacted_row,
+ row_bytes,
}
}
+
+ pub fn row_bytes(&self) -> Option<&[u8]> {
+ self.row_bytes.as_ref().map(|rb| rb.as_slice())
+ }
}
impl<'a> WriteRecord<'a> {
@@ -110,17 +129,18 @@ impl<'a> WriteRecord<'a> {
pub fn for_upsert(
table_path: Arc<TablePath>,
schema_id: i32,
- bucket_key: &'a [u8],
- key: &'a [u8],
- target_columns: Option<&'a [usize]>,
- row: CompactedRow<'a>,
+ key: Bytes,
+ bucket_key: Option<Bytes>,
+ write_format: WriteFormat,
+ target_columns: Option<Arc<Vec<usize>>>,
+ row_bytes: Option<RowBytes<'a>>,
) -> Self {
Self {
- record: Record::Kv(KvWriteRecord::new(key, target_columns,
Some(row))),
+ record: Record::Kv(KvWriteRecord::new(key, target_columns,
row_bytes)),
table_path,
- bucket_key: Some(bucket_key),
+ bucket_key,
schema_id,
- write_format: WriteFormat::CompactedKv,
+ write_format,
}
}
}
diff --git a/crates/fluss/src/client/write/write_format.rs
b/crates/fluss/src/client/write/write_format.rs
index 4a0c0d8..147152c 100644
--- a/crates/fluss/src/client/write/write_format.rs
+++ b/crates/fluss/src/client/write/write_format.rs
@@ -20,6 +20,7 @@ use crate::error::Result;
use crate::metadata::KvFormat;
use std::fmt::Display;
+#[derive(Copy, Clone)]
pub enum WriteFormat {
ArrowLog,
CompactedLog,
diff --git a/crates/fluss/src/client/write/writer_client.rs
b/crates/fluss/src/client/write/writer_client.rs
index 22e0397..65b04f5 100644
--- a/crates/fluss/src/client/write/writer_client.rs
+++ b/crates/fluss/src/client/write/writer_client.rs
@@ -21,6 +21,7 @@ use crate::client::write::sender::Sender;
use crate::client::{RecordAccumulator, ResultHandle, WriteRecord};
use crate::config::Config;
use crate::metadata::TablePath;
+use bytes::Bytes;
use dashmap::DashMap;
use std::sync::Arc;
use tokio::sync::mpsc;
@@ -90,8 +91,9 @@ impl WriterClient {
pub async fn send(&self, record: &WriteRecord<'_>) -> Result<ResultHandle>
{
let table_path = &record.table_path;
let cluster = self.metadata.get_cluster();
+ let bucket_key = record.bucket_key.as_ref();
- let (bucket_assigner, bucket_id) = self.assign_bucket(table_path)?;
+ let (bucket_assigner, bucket_id) = self.assign_bucket(bucket_key,
table_path)?;
let mut result = self
.accumulate
@@ -101,7 +103,7 @@ impl WriterClient {
if result.abort_record_for_new_batch {
let prev_bucket_id = bucket_id;
bucket_assigner.on_new_batch(&cluster, prev_bucket_id);
- let bucket_id = bucket_assigner.assign_bucket(None, &cluster)?;
+ let bucket_id = bucket_assigner.assign_bucket(bucket_key,
&cluster)?;
result = self
.accumulate
.append(record, bucket_id, &cluster, false)
@@ -116,6 +118,7 @@ impl WriterClient {
}
fn assign_bucket(
&self,
+ bucket_key: Option<&Bytes>,
table_path: &Arc<TablePath>,
) -> Result<(Arc<Box<dyn BucketAssigner>>, i32)> {
let cluster = self.metadata.get_cluster();
@@ -129,7 +132,7 @@ impl WriterClient {
assigner
}
};
- let bucket_id = bucket_assigner.assign_bucket(None, &cluster)?;
+ let bucket_id = bucket_assigner.assign_bucket(bucket_key, &cluster)?;
Ok((bucket_assigner, bucket_id))
}
diff --git a/crates/fluss/src/metadata/table.rs
b/crates/fluss/src/metadata/table.rs
index da85b0c..8204e7c 100644
--- a/crates/fluss/src/metadata/table.rs
+++ b/crates/fluss/src/metadata/table.rs
@@ -16,7 +16,7 @@
// under the License.
use crate::compression::ArrowCompressionInfo;
-use crate::error::Error::InvalidTableError;
+use crate::error::Error::{IllegalArgument, InvalidTableError};
use crate::error::{Error, Result};
use crate::metadata::DataLakeFormat;
use crate::metadata::datatype::{DataField, DataType, RowType};
@@ -97,8 +97,8 @@ impl PrimaryKey {
pub struct Schema {
columns: Vec<Column>,
primary_key: Option<PrimaryKey>,
- // must be Row data type kind
- row_type: DataType,
+ row_type: RowType,
+ auto_increment_col_names: Vec<String>,
}
impl Schema {
@@ -118,7 +118,7 @@ impl Schema {
self.primary_key.as_ref()
}
- pub fn row_type(&self) -> &DataType {
+ pub fn row_type(&self) -> &RowType {
&self.row_type
}
@@ -144,12 +144,17 @@ impl Schema {
pub fn column_names(&self) -> Vec<&str> {
self.columns.iter().map(|c| c.name.as_str()).collect()
}
+
+ pub fn auto_increment_col_names(&self) -> &Vec<String> {
+ &self.auto_increment_col_names
+ }
}
#[derive(Debug, Default)]
pub struct SchemaBuilder {
columns: Vec<Column>,
primary_key: Option<PrimaryKey>,
+ auto_increment_col_names: Vec<String>,
}
impl SchemaBuilder {
@@ -198,9 +203,36 @@ impl SchemaBuilder {
self
}
+ /// Declares a column to be auto-incremented. With an auto-increment
column in the table,
+ /// whenever a new row is inserted into the table, the new row will be
assigned with the next
+ /// available value from the auto-increment sequence. A table can have at
most one auto
+ /// increment column.
+ pub fn enable_auto_increment(mut self, column_name: &str) -> Result<Self> {
+ if !self.auto_increment_col_names.is_empty() {
+ return Err(IllegalArgument {
+ message: "Multiple auto increment columns are not supported
yet.".to_string(),
+ });
+ }
+
+ self.auto_increment_col_names.push(column_name.to_string());
+ Ok(self)
+ }
+
pub fn build(&mut self) -> Result<Schema> {
let columns = Self::normalize_columns(&mut self.columns,
self.primary_key.as_ref())?;
+ let column_names: HashSet<_> = columns.iter().map(|c|
&c.name).collect();
+ for auto_inc_col in &self.auto_increment_col_names {
+ if !column_names.contains(auto_inc_col) {
+ return Err(IllegalArgument {
+ message: format!(
+ "Auto increment column '{}' is not found in the schema
columns.",
+ auto_inc_col
+ ),
+ });
+ }
+ }
+
let data_fields = columns
.iter()
.map(|c| DataField {
@@ -213,7 +245,8 @@ impl SchemaBuilder {
Ok(Schema {
columns,
primary_key: self.primary_key.clone(),
- row_type: DataType::Row(RowType::new(data_fields)),
+ row_type: RowType::new(data_fields),
+ auto_increment_col_names: self.auto_increment_col_names.clone(),
})
}
@@ -500,7 +533,7 @@ impl TableDescriptor {
bucket_keys.retain(|k| !partition_keys.contains(k));
if bucket_keys.is_empty() {
- return Err(Error::InvalidTableError {
+ return Err(InvalidTableError {
message: format!(
"Primary Key constraint {:?} should not be same with
partition fields {:?}.",
schema.primary_key().unwrap().column_names(),
@@ -580,7 +613,7 @@ pub enum LogFormat {
}
impl Display for LogFormat {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
LogFormat::ARROW => {
write!(f, "ARROW")?;
@@ -612,7 +645,7 @@ pub enum KvFormat {
}
impl Display for KvFormat {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
KvFormat::COMPACTED => write!(f, "COMPACTED")?,
KvFormat::INDEXED => write!(f, "INDEXED")?,
@@ -626,7 +659,7 @@ impl KvFormat {
match s.to_uppercase().as_str() {
"INDEXED" => Ok(KvFormat::INDEXED),
"COMPACTED" => Ok(KvFormat::COMPACTED),
- _ => Err(Error::InvalidTableError {
+ _ => Err(InvalidTableError {
message: format!("Unknown kv format: {s}"),
}),
}
@@ -692,7 +725,7 @@ pub struct TableInfo {
pub table_id: i64,
pub schema_id: i32,
pub schema: Schema,
- pub row_type: DataType,
+ pub row_type: RowType,
pub primary_keys: Vec<String>,
pub physical_primary_keys: Vec<String>,
pub bucket_keys: Vec<String>,
@@ -708,10 +741,7 @@ pub struct TableInfo {
impl TableInfo {
pub fn row_type(&self) -> &RowType {
- match &self.row_type {
- DataType::Row(row_type) => row_type,
- _ => panic!("should be a row type"),
- }
+ &self.row_type
}
}
@@ -847,7 +877,7 @@ impl TableInfo {
&self.schema
}
- pub fn get_row_type(&self) -> &DataType {
+ pub fn get_row_type(&self) -> &RowType {
&self.row_type
}
@@ -946,8 +976,8 @@ impl TableInfo {
}
}
-impl fmt::Display for TableInfo {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+impl Display for TableInfo {
+ fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(
f,
"TableInfo{{ table_path={:?}, table_id={}, schema_id={},
schema={:?}, physical_primary_keys={:?}, bucket_keys={:?}, partition_keys={:?},
num_buckets={}, properties={:?}, custom_properties={:?}, comment={:?},
created_time={}, modified_time={} }}",
@@ -998,7 +1028,7 @@ impl TableBucket {
}
impl Display for TableBucket {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
if let Some(partition_id) = self.partition_id {
write!(
f,
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index 3c46f9b..3c94b72 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -18,7 +18,7 @@
use crate::client::{LogWriteRecord, Record, WriteRecord};
use crate::compression::ArrowCompressionInfo;
use crate::error::{Error, Result};
-use crate::metadata::DataType;
+use crate::metadata::{DataType, RowType};
use crate::record::{ChangeType, ScanRecord};
use crate::row::{ColumnarRow, GenericRow};
use arrow::array::{
@@ -48,6 +48,7 @@ use std::{
sync::Arc,
};
+use crate::error::Error::IllegalArgument;
use arrow::ipc::writer::IpcWriteOptions;
/// const for record batch
pub const BASE_OFFSET_LENGTH: usize = 8;
@@ -171,7 +172,7 @@ pub struct RowAppendRecordBatchBuilder {
}
impl RowAppendRecordBatchBuilder {
- pub fn new(row_type: &DataType) -> Self {
+ pub fn new(row_type: &RowType) -> Self {
let schema_ref = to_arrow_schema(row_type);
let builders = Mutex::new(
schema_ref
@@ -251,7 +252,7 @@ impl ArrowRecordBatchInnerBuilder for
RowAppendRecordBatchBuilder {
impl MemoryLogRecordsArrowBuilder {
pub fn new(
schema_id: i32,
- row_type: &DataType,
+ row_type: &RowType,
to_append_record_batch: bool,
arrow_compression_info: ArrowCompressionInfo,
) -> Self {
@@ -329,7 +330,7 @@ impl MemoryLogRecordsArrowBuilder {
// write arrow batch bytes
let mut cursor = Cursor::new(&mut batch_bytes[..]);
cursor.set_position(RECORD_BATCH_HEADER_SIZE as u64);
- cursor.write_all(real_arrow_batch_bytes).unwrap();
+ cursor.write_all(real_arrow_batch_bytes)?;
let calcute_crc_bytes = &cursor.get_ref()[SCHEMA_ID_OFFSET..];
// then update crc
@@ -562,16 +563,17 @@ impl LogRecordBatch {
return
Ok(RecordBatch::new_empty(read_context.target_schema.clone()));
}
- let data = self.data.get(RECORDS_OFFSET..).ok_or_else(|| {
- crate::error::Error::UnexpectedError {
+ let data = self
+ .data
+ .get(RECORDS_OFFSET..)
+ .ok_or_else(|| Error::UnexpectedError {
message: format!(
"Corrupt log record batch: data length {} is less than
RECORDS_OFFSET {}",
self.data.len(),
RECORDS_OFFSET
),
source: None,
- }
- })?;
+ })?;
read_context.record_batch(data)
}
}
@@ -639,27 +641,20 @@ fn parse_ipc_message(
Ok((batch_metadata, body_buffer, message.version()))
}
-pub fn to_arrow_schema(fluss_schema: &DataType) -> SchemaRef {
- match &fluss_schema {
- DataType::Row(row_type) => {
- let fields: Vec<Field> = row_type
- .fields()
- .iter()
- .map(|f| {
- Field::new(
- f.name(),
- to_arrow_type(f.data_type()),
- f.data_type().is_nullable(),
- )
- })
- .collect();
+pub fn to_arrow_schema(fluss_schema: &RowType) -> SchemaRef {
+ let fields: Vec<Field> = fluss_schema
+ .fields()
+ .iter()
+ .map(|f| {
+ Field::new(
+ f.name(),
+ to_arrow_type(f.data_type()),
+ f.data_type().is_nullable(),
+ )
+ })
+ .collect();
- SchemaRef::new(arrow_schema::Schema::new(fields))
- }
- _ => {
- panic!("must be row data type.")
- }
- }
+ SchemaRef::new(arrow_schema::Schema::new(fields))
}
pub fn to_arrow_type(fluss_type: &DataType) -> ArrowDataType {
@@ -813,7 +808,7 @@ impl ReadContext {
let mut reordering_indexes =
Vec::with_capacity(projected_fields.len());
for &original_idx in &projected_fields {
let pos =
sorted_fields.binary_search(&original_idx).map_err(|_| {
- Error::IllegalArgument {
+ IllegalArgument {
message: format!(
"Projection index {original_idx} is invalid
for the current schema."
),
@@ -857,7 +852,7 @@ impl ReadContext {
let field_count = schema.fields().len();
for &index in projected_fields {
if index >= field_count {
- return Err(Error::IllegalArgument {
+ return Err(IllegalArgument {
message: format!(
"Projection index {index} is out of bounds for schema
with {field_count} fields."
),
@@ -869,7 +864,7 @@ impl ReadContext {
pub fn project_schema(schema: SchemaRef, projected_fields: &[usize]) ->
Result<SchemaRef> {
Ok(SchemaRef::new(schema.project(projected_fields).map_err(
- |e| Error::IllegalArgument {
+ |e| IllegalArgument {
message: format!("Invalid projection: {e}"),
},
)?))
@@ -1060,7 +1055,6 @@ pub struct MyVec<T>(pub StreamReader<T>);
#[cfg(test)]
mod tests {
use super::*;
- use crate::error::Error;
use crate::metadata::{DataField, DataTypes};
#[test]
@@ -1217,14 +1211,14 @@ mod tests {
#[test]
fn projection_rejects_out_of_bounds_index() {
- let row_type = DataTypes::row(vec![
+ let row_type = RowType::new(vec![
DataField::new("id".to_string(), DataTypes::int(), None),
DataField::new("name".to_string(), DataTypes::string(), None),
]);
let schema = to_arrow_schema(&row_type);
let result = ReadContext::with_projection_pushdown(schema, vec![0, 2],
false);
- assert!(matches!(result, Err(Error::IllegalArgument { .. })));
+ assert!(matches!(result, Err(IllegalArgument { .. })));
}
#[test]
diff --git a/crates/fluss/src/record/kv/kv_record_batch.rs
b/crates/fluss/src/record/kv/kv_record_batch.rs
index 32f712f..eb89d69 100644
--- a/crates/fluss/src/record/kv/kv_record_batch.rs
+++ b/crates/fluss/src/record/kv/kv_record_batch.rs
@@ -370,12 +370,12 @@ impl Iterator for KvRecordIterator {
#[cfg(test)]
mod tests {
use super::*;
- use crate::metadata::{DataTypes, KvFormat, RowType};
+ use crate::metadata::{DataTypes, KvFormat};
use crate::record::kv::test_util::TestReadContext;
use crate::record::kv::{CURRENT_KV_MAGIC_VALUE, KvRecordBatchBuilder};
use crate::row::InternalRow;
use crate::row::binary::BinaryWriter;
- use crate::row::compacted::CompactedRow;
+
use bytes::{BufMut, BytesMut};
#[test]
@@ -417,12 +417,11 @@ mod tests {
let mut value1_writer = CompactedRowWriter::new(1);
value1_writer.write_bytes(&[1, 2, 3, 4, 5]);
- let row_type = RowType::with_data_types([DataTypes::bytes()].to_vec());
- let row = &CompactedRow::from_bytes(&row_type, value1_writer.buffer());
- builder.append_row(key1, Some(row)).unwrap();
+ let row_bytes = value1_writer.buffer();
+ builder.append_row(key1, Some(row_bytes)).unwrap();
let key2 = b"key2";
- builder.append_row::<CompactedRow>(key2, None).unwrap();
+ builder.append_row(key2, None).unwrap();
let bytes = builder.build().unwrap();
diff --git a/crates/fluss/src/record/kv/kv_record_batch_builder.rs
b/crates/fluss/src/record/kv/kv_record_batch_builder.rs
index e3da864..0b65500 100644
--- a/crates/fluss/src/record/kv/kv_record_batch_builder.rs
+++ b/crates/fluss/src/record/kv/kv_record_batch_builder.rs
@@ -28,7 +28,6 @@ use crate::record::kv::kv_record_batch::{
WRITE_CLIENT_ID_OFFSET,
};
use crate::record::kv::{CURRENT_KV_MAGIC_VALUE, NO_BATCH_SEQUENCE,
NO_WRITER_ID};
-use crate::row::BinaryRow;
use bytes::{Bytes, BytesMut};
use std::io;
@@ -88,14 +87,13 @@ impl KvRecordBatchBuilder {
}
}
- /// Check if there is room for a new record containing the given key and
row.
+ /// Check if there is room for a new record containing the given key and
row bytes.
/// If no records have been appended, this always returns true.
- pub fn has_room_for_row<R: BinaryRow>(&self, key: &[u8], row: Option<&R>)
-> bool {
- let value = row.map(|r| r.as_bytes());
- self.size_in_bytes + KvRecord::size_of(key, value) <= self.write_limit
+ pub fn has_room_for_row(&self, key: &[u8], row_bytes: Option<&[u8]>) ->
bool {
+ self.size_in_bytes + KvRecord::size_of(key, row_bytes) <=
self.write_limit
}
- /// Append a KV record with a row value to the batch.
+ /// Append a KV record with row bytes to the batch.
///
/// Returns an error if:
/// - The builder has been aborted
@@ -103,7 +101,7 @@ impl KvRecordBatchBuilder {
/// - Adding this record would exceed the write limit
/// - The maximum number of records is exceeded
/// - The KV format is not COMPACTED
- pub fn append_row<R: BinaryRow>(&mut self, key: &[u8], row: Option<&R>) ->
io::Result<()> {
+ pub fn append_row(&mut self, key: &[u8], row_bytes: Option<&[u8]>) ->
io::Result<()> {
if self.kv_format != KvFormat::COMPACTED {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
@@ -134,8 +132,7 @@ impl KvRecordBatchBuilder {
));
}
- let value = row.map(|r| r.as_bytes());
- let record_size = KvRecord::size_of(key, value);
+ let record_size = KvRecord::size_of(key, row_bytes);
if self.size_in_bytes + record_size > self.write_limit {
return Err(io::Error::new(
io::ErrorKind::WriteZero,
@@ -146,7 +143,7 @@ impl KvRecordBatchBuilder {
));
}
- let record_byte_size = KvRecord::write_to_buf(&mut self.buffer, key,
value)?;
+ let record_byte_size = KvRecord::write_to_buf(&mut self.buffer, key,
row_bytes)?;
debug_assert_eq!(record_byte_size, record_size, "Record size
mismatch");
self.current_record_number += 1;
@@ -349,12 +346,12 @@ mod tests {
let key1 = b"key1";
let value1 = create_test_row(b"value1");
- assert!(builder.has_room_for_row(key1, Some(&value1)));
- builder.append_row(key1, Some(&value1)).unwrap();
+ assert!(builder.has_room_for_row(key1, Some(value1.as_bytes())));
+ builder.append_row(key1, Some(value1.as_bytes())).unwrap();
let key2 = b"key2";
- assert!(builder.has_room_for_row::<CompactedRow>(key2, None));
- builder.append_row::<CompactedRow>(key2, None).unwrap();
+ assert!(builder.has_room_for_row(key2, None));
+ builder.append_row(key2, None).unwrap();
builder.close().unwrap();
assert!(builder.is_closed());
@@ -369,35 +366,34 @@ mod tests {
// Test lifecycle: abort behavior
let mut builder = KvRecordBatchBuilder::new(1, 4096,
KvFormat::COMPACTED);
let value = create_test_row(b"value");
- builder.append_row(b"key", Some(&value)).unwrap();
+ builder.append_row(b"key", Some(value.as_bytes())).unwrap();
builder.abort();
- assert!(builder.append_row::<CompactedRow>(b"key2", None).is_err());
+ assert!(builder.append_row(b"key2", None).is_err());
assert!(builder.build().is_err());
assert!(builder.close().is_err());
// Test lifecycle: close behavior
let mut builder = KvRecordBatchBuilder::new(1, 4096,
KvFormat::COMPACTED);
let value = create_test_row(b"value");
- builder.append_row(b"key", Some(&value)).unwrap();
+ builder.append_row(b"key", Some(value.as_bytes())).unwrap();
builder.close().unwrap();
- assert!(builder.append_row::<CompactedRow>(b"key2", None).is_err());
+ assert!(builder.append_row(b"key2", None).is_err());
assert!(builder.build().is_ok());
// Test KvFormat validation
let mut row_writer = CompactedRowWriter::new(1);
row_writer.write_int(42);
- let row_type = RowType::with_data_types(vec![DataTypes::int()]);
- let row = &CompactedRow::from_bytes(&row_type, row_writer.buffer());
+ let row_bytes = row_writer.buffer();
// INDEXED format should reject append_row
let mut indexed_builder = KvRecordBatchBuilder::new(1, 4096,
KvFormat::INDEXED);
- let result = indexed_builder.append_row(b"key", Some(row));
+ let result = indexed_builder.append_row(b"key", Some(row_bytes));
assert!(result.is_err());
assert_eq!(result.unwrap_err().kind(), io::ErrorKind::InvalidInput);
// COMPACTED format should accept append_row
let mut compacted_builder = KvRecordBatchBuilder::new(1, 4096,
KvFormat::COMPACTED);
- let result = compacted_builder.append_row(b"key", Some(row));
+ let result = compacted_builder.append_row(b"key", Some(row_bytes));
assert!(result.is_ok());
}
@@ -410,15 +406,17 @@ mod tests {
let large_key = vec![0u8; 1000];
let large_value = vec![1u8; 1000];
let large_row = create_test_row(&large_value);
- assert!(!builder.has_room_for_row(&large_key, Some(&large_row)));
+ assert!(!builder.has_room_for_row(&large_key,
Some(large_row.as_bytes())));
let small_value = create_test_row(b"value");
- assert!(builder.has_room_for_row(b"key", Some(&small_value)));
+ assert!(builder.has_room_for_row(b"key",
Some(small_value.as_bytes())));
// Test append enforcement - add small record first
- builder.append_row(b"key", Some(&small_value)).unwrap();
+ builder
+ .append_row(b"key", Some(small_value.as_bytes()))
+ .unwrap();
// Try to add large record that exceeds limit (reuse large_row from
above)
- let result = builder.append_row(b"key2", Some(&large_row));
+ let result = builder.append_row(b"key2", Some(large_row.as_bytes()));
assert!(result.is_err());
assert_eq!(result.unwrap_err().kind(), io::ErrorKind::WriteZero);
}
@@ -429,10 +427,12 @@ mod tests {
builder.current_record_number = i32::MAX - 1;
let value1 = create_test_row(b"value1");
- builder.append_row(b"key1", Some(&value1)).unwrap();
+ builder
+ .append_row(b"key1", Some(value1.as_bytes()))
+ .unwrap();
let value2 = create_test_row(b"value2");
- let result = builder.append_row(b"key2", Some(&value2));
+ let result = builder.append_row(b"key2", Some(value2.as_bytes()));
assert!(result.is_err());
assert_eq!(result.unwrap_err().kind(), io::ErrorKind::InvalidInput);
}
@@ -452,13 +452,17 @@ mod tests {
builder.set_writer_state(100, 5);
let value1 = create_test_row(b"value1");
- builder.append_row(b"key1", Some(&value1)).unwrap();
+ builder
+ .append_row(b"key1", Some(value1.as_bytes()))
+ .unwrap();
let bytes1 = builder.build().unwrap();
let len1 = bytes1.len();
// Append another record - this should invalidate the cache
let value2 = create_test_row(b"value2");
- builder.append_row(b"key2", Some(&value2)).unwrap();
+ builder
+ .append_row(b"key2", Some(value2.as_bytes()))
+ .unwrap();
let bytes2 = builder.build().unwrap();
let len2 = bytes2.len();
@@ -472,7 +476,7 @@ mod tests {
let mut builder = KvRecordBatchBuilder::new(1, 4096,
KvFormat::COMPACTED);
builder.set_writer_state(100, 5);
let value = create_test_row(b"value");
- builder.append_row(b"key", Some(&value)).unwrap();
+ builder.append_row(b"key", Some(value.as_bytes())).unwrap();
let bytes1 = builder.build().unwrap();
// Change writer state - this should invalidate the cache
@@ -494,7 +498,6 @@ mod tests {
fn test_builder_with_compacted_row_writer() -> crate::error::Result<()> {
use crate::record::kv::KvRecordBatch;
use crate::row::InternalRow;
- use crate::row::compacted::CompactedRow;
let mut builder = KvRecordBatchBuilder::new(1, 100000,
KvFormat::COMPACTED);
builder.set_writer_state(100, 5);
@@ -504,26 +507,25 @@ mod tests {
row_writer1.write_int(42);
row_writer1.write_string("hello");
- let row_type = RowType::with_data_types([DataTypes::int(),
DataTypes::string()].to_vec());
- let row1 = &CompactedRow::from_bytes(&row_type, row_writer1.buffer());
+ let row_bytes1 = row_writer1.buffer();
let key1 = b"key1";
- assert!(builder.has_room_for_row(key1, Some(row1)));
- builder.append_row(key1, Some(row1))?;
+ assert!(builder.has_room_for_row(key1, Some(row_bytes1)));
+ builder.append_row(key1, Some(row_bytes1))?;
// Create and append second record
let mut row_writer2 = CompactedRowWriter::new(2);
row_writer2.write_int(100);
row_writer2.write_string("world");
- let row2 = &CompactedRow::from_bytes(&row_type, row_writer2.buffer());
+ let row_bytes2 = row_writer2.buffer();
let key2 = b"key2";
- builder.append_row(key2, Some(row2))?;
+ builder.append_row(key2, Some(row_bytes2))?;
// Append a deletion record
let key3 = b"key3";
- builder.append_row::<CompactedRow>(key3, None)?;
+ builder.append_row(key3, None)?;
// Build and verify
builder.close()?;
diff --git a/crates/fluss/src/record/kv/kv_record_read_context.rs
b/crates/fluss/src/record/kv/kv_record_read_context.rs
index fe6c6f0..9236321 100644
--- a/crates/fluss/src/record/kv/kv_record_read_context.rs
+++ b/crates/fluss/src/record/kv/kv_record_read_context.rs
@@ -18,7 +18,7 @@
//! Default implementation of ReadContext with decoder caching.
use super::ReadContext;
-use crate::error::{Error, Result};
+use crate::error::Result;
use crate::metadata::{KvFormat, Schema};
use crate::row::{RowDecoder, RowDecoderFactory};
use std::collections::HashMap;
@@ -85,20 +85,7 @@ impl ReadContext for KvRecordReadContext {
// Build decoder outside the lock to avoid blocking other threads
let schema = self.schema_getter.get_schema(schema_id)?;
- let row_type = match schema.row_type() {
- crate::metadata::DataType::Row(row_type) => row_type.clone(),
- other => {
- return Err(Error::IoUnexpectedError {
- message: format!(
- "Schema {schema_id} has invalid row type: expected
Row, got {other:?}"
- ),
- source: std::io::Error::new(
- std::io::ErrorKind::InvalidData,
- "Invalid row type",
- ),
- });
- }
- };
+ let row_type = schema.row_type().clone();
// Create decoder outside lock
let decoder = RowDecoderFactory::create(self.kv_format.clone(),
row_type)?;
diff --git a/crates/fluss/src/row/compacted/compacted_row.rs
b/crates/fluss/src/row/compacted/compacted_row.rs
index bc68ea1..35d684d 100644
--- a/crates/fluss/src/row/compacted/compacted_row.rs
+++ b/crates/fluss/src/row/compacted/compacted_row.rs
@@ -15,9 +15,10 @@
// specific language governing permissions and limitations
// under the License.
+use crate::client::WriteFormat;
use crate::metadata::RowType;
use crate::row::compacted::compacted_row_reader::{CompactedRowDeserializer,
CompactedRowReader};
-use crate::row::{BinaryRow, GenericRow, InternalRow};
+use crate::row::{GenericRow, InternalRow};
use std::sync::{Arc, OnceLock};
// Reference implementation:
@@ -69,10 +70,8 @@ impl<'a> CompactedRow<'a> {
self.decoded_row
.get_or_init(|| self.deserializer.deserialize(&self.reader))
}
-}
-impl BinaryRow for CompactedRow<'_> {
- fn as_bytes(&self) -> &[u8] {
+ pub fn as_bytes(&self) -> &[u8] {
self.data
}
}
@@ -153,6 +152,14 @@ impl<'a> InternalRow for CompactedRow<'a> {
fn get_timestamp_ltz(&self, pos: usize, precision: u32) ->
crate::row::datum::TimestampLtz {
self.decoded_row().get_timestamp_ltz(pos, precision)
}
+
+ fn as_encoded_bytes(&self, write_format: WriteFormat) -> Option<&[u8]> {
+ match write_format {
+ WriteFormat::CompactedKv => Some(self.as_bytes()),
+ WriteFormat::ArrowLog => None,
+ WriteFormat::CompactedLog => None,
+ }
+ }
}
#[cfg(test)]
diff --git a/crates/fluss/src/row/compacted/compacted_row_writer.rs
b/crates/fluss/src/row/compacted/compacted_row_writer.rs
index d1ad047..ac0100e 100644
--- a/crates/fluss/src/row/compacted/compacted_row_writer.rs
+++ b/crates/fluss/src/row/compacted/compacted_row_writer.rs
@@ -63,6 +63,18 @@ impl CompactedRowWriter {
Bytes::copy_from_slice(&self.buffer[..self.position])
}
+ /// Flushes writer's ByteMut, resetting writer's inner state and returns
Byte of flushed state
+ pub fn flush_bytes(&mut self) -> Bytes {
+ let used = self.buffer.split_to(self.position);
+ self.position = self.header_size_in_bytes;
+ if self.buffer.len() < self.header_size_in_bytes {
+ self.buffer.resize(self.header_size_in_bytes.max(64), 0);
+ } else {
+ self.buffer[..self.header_size_in_bytes].fill(0);
+ }
+ used.freeze()
+ }
+
fn ensure_capacity(&mut self, need_len: usize) {
if (self.buffer.len() - self.position) < need_len {
let new_len = cmp::max(self.buffer.len() * 2, self.buffer.len() +
need_len);
diff --git a/crates/fluss/src/row/encode/compacted_row_encoder.rs
b/crates/fluss/src/row/encode/compacted_row_encoder.rs
index 48b9f3f..20f2882 100644
--- a/crates/fluss/src/row/encode/compacted_row_encoder.rs
+++ b/crates/fluss/src/row/encode/compacted_row_encoder.rs
@@ -20,8 +20,9 @@ use crate::error::Result;
use crate::metadata::RowType;
use crate::row::Datum;
use crate::row::binary::{BinaryRowFormat, BinaryWriter, ValueWriter};
-use crate::row::compacted::{CompactedRow, CompactedRowDeserializer,
CompactedRowWriter};
-use crate::row::encode::{BinaryRow, RowEncoder};
+use crate::row::compacted::{CompactedRowDeserializer, CompactedRowWriter};
+use crate::row::encode::RowEncoder;
+use bytes::Bytes;
use std::sync::Arc;
#[allow(dead_code)]
@@ -65,12 +66,8 @@ impl RowEncoder for CompactedRowEncoder<'_> {
.write_value(&mut self.writer, pos, &value)
}
- fn finish_row(&mut self) -> Result<impl BinaryRow> {
- Ok(CompactedRow::deserialize(
- Arc::clone(&self.compacted_row_deserializer),
- self.arity,
- self.writer.buffer(),
- ))
+ fn finish_row(&mut self) -> Result<Bytes> {
+ Ok(self.writer.flush_bytes())
}
fn close(&mut self) -> Result<()> {
diff --git a/crates/fluss/src/row/encode/mod.rs
b/crates/fluss/src/row/encode/mod.rs
index c294ecf..468d4d1 100644
--- a/crates/fluss/src/row/encode/mod.rs
+++ b/crates/fluss/src/row/encode/mod.rs
@@ -22,7 +22,7 @@ use crate::error::Result;
use crate::metadata::{DataLakeFormat, KvFormat, RowType};
use crate::row::encode::compacted_key_encoder::CompactedKeyEncoder;
use crate::row::encode::compacted_row_encoder::CompactedRowEncoder;
-use crate::row::{BinaryRow, Datum, InternalRow};
+use crate::row::{Datum, InternalRow};
use bytes::Bytes;
/// An interface for encoding key of row into bytes.
@@ -31,8 +31,9 @@ pub trait KeyEncoder {
fn encode_key(&mut self, row: &dyn InternalRow) -> Result<Bytes>;
}
-#[allow(dead_code)]
-impl dyn KeyEncoder {
+pub struct KeyEncoderFactory;
+
+impl KeyEncoderFactory {
/// Create a key encoder to encode the key bytes of the input row.
/// # Arguments
/// * `row_type` - the row type of the input row
@@ -43,23 +44,21 @@ impl dyn KeyEncoder {
/// key encoder
pub fn of(
row_type: &RowType,
- key_fields: Vec<String>,
- data_lake_format: Option<DataLakeFormat>,
+ key_fields: &[String],
+ data_lake_format: &Option<DataLakeFormat>,
) -> Result<Box<dyn KeyEncoder>> {
match data_lake_format {
Some(DataLakeFormat::Paimon) => {
unimplemented!("KeyEncoder for Paimon format is currently
unimplemented")
}
Some(DataLakeFormat::Lance) =>
Ok(Box::new(CompactedKeyEncoder::create_key_encoder(
- row_type,
- key_fields.as_slice(),
+ row_type, key_fields,
)?)),
Some(DataLakeFormat::Iceberg) => {
unimplemented!("KeyEncoder for Iceberg format is currently
unimplemented")
}
None => Ok(Box::new(CompactedKeyEncoder::create_key_encoder(
- row_type,
- key_fields.as_slice(),
+ row_type, key_fields,
)?)),
}
}
@@ -96,7 +95,7 @@ pub trait RowEncoder {
///
/// # Returns
/// * the written row
- fn finish_row(&mut self) -> Result<impl BinaryRow>;
+ fn finish_row(&mut self) -> Result<Bytes>;
/// Closes the row encoder
///
@@ -110,8 +109,8 @@ pub struct RowEncoderFactory {}
#[allow(dead_code)]
impl RowEncoderFactory {
- pub fn create(kv_format: KvFormat, row_type: &RowType) -> Result<impl
RowEncoder> {
- Self::create_for_field_types(kv_format, row_type.clone())
+ pub fn create(kv_format: KvFormat, row_type: RowType) -> Result<impl
RowEncoder> {
+ Self::create_for_field_types(kv_format, row_type)
}
pub fn create_for_field_types(
diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs
index d2f640e..bc8134d 100644
--- a/crates/fluss/src/row/mod.rs
+++ b/crates/fluss/src/row/mod.rs
@@ -23,9 +23,11 @@ mod decimal;
pub mod binary;
pub mod compacted;
pub mod encode;
-mod field_getter;
+pub mod field_getter;
mod row_decoder;
+use crate::client::WriteFormat;
+use bytes::Bytes;
pub use column::*;
pub use compacted::CompactedRow;
pub use datum::*;
@@ -33,9 +35,23 @@ pub use decimal::{Decimal, MAX_COMPACT_PRECISION};
pub use encode::KeyEncoder;
pub use row_decoder::{CompactedRowDecoder, RowDecoder, RowDecoderFactory};
-pub trait BinaryRow: InternalRow {
+pub struct BinaryRow<'a> {
+ data: BinaryDataWrapper<'a>,
+}
+
+pub enum BinaryDataWrapper<'a> {
+ Bytes(Bytes),
+ Ref(&'a [u8]),
+}
+
+impl<'a> BinaryRow<'a> {
/// Returns the binary representation of this row as a byte slice.
- fn as_bytes(&self) -> &[u8];
+ pub fn as_bytes(&'a self) -> &'a [u8] {
+ match &self.data {
+ BinaryDataWrapper::Bytes(bytes) => bytes.as_ref(),
+ BinaryDataWrapper::Ref(r) => r,
+ }
+ }
}
// TODO make functions return Result<?> for better error handling
@@ -99,6 +115,11 @@ pub trait InternalRow {
/// Returns the binary value at the given position
fn get_bytes(&self, pos: usize) -> &[u8];
+
+ /// Returns encoded bytes if already encoded
+ fn as_encoded_bytes(&self, _write_format: WriteFormat) -> Option<&[u8]> {
+ None
+ }
}
pub struct GenericRow<'a> {