This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 81dd6e0 feat: support produce partitioned table (#228)
81dd6e0 is described below
commit 81dd6e05bb2ae2e3bc046c74eac724bb4158d679
Author: yuxia Luo <[email protected]>
AuthorDate: Sat Jan 31 07:54:29 2026 +0800
feat: support produce partitioned table (#228)
---
bindings/cpp/src/lib.rs | 7 +-
bindings/python/src/table.rs | 6 +-
crates/examples/src/example_kv_table.rs | 2 +-
.../examples/src/example_partitioned_kv_table.rs | 2 +-
crates/examples/src/example_table.rs | 8 +-
crates/fluss/src/client/table/append.rs | 55 +++++++--
crates/fluss/src/client/table/log_fetch_buffer.rs | 2 +-
crates/fluss/src/client/table/mod.rs | 4 +-
crates/fluss/src/client/table/partition_getter.rs | 19 ++-
crates/fluss/src/client/table/scanner.rs | 13 +--
crates/fluss/src/client/table/upsert.rs | 67 ++++++-----
crates/fluss/src/client/table/writer.rs | 46 --------
crates/fluss/src/client/write/accumulator.rs | 12 +-
crates/fluss/src/client/write/batch.rs | 2 +-
crates/fluss/src/client/write/mod.rs | 8 +-
crates/fluss/src/client/write/sender.rs | 15 +--
crates/fluss/src/io/storage.rs | 2 +
crates/fluss/src/record/arrow.rs | 54 ++++++---
crates/fluss/src/row/column.rs | 36 +++---
crates/fluss/src/row/compacted/compacted_row.rs | 17 ++-
crates/fluss/src/row/mod.rs | 2 +-
crates/fluss/tests/integration/kv_table.rs | 17 +--
.../tests/integration/{table.rs => log_table.rs} | 128 +++++++++++++++++++--
.../fluss/tests/integration/table_remote_scan.rs | 7 +-
crates/fluss/tests/integration/utils.rs | 26 ++++-
crates/fluss/tests/test_fluss.rs | 2 +-
26 files changed, 350 insertions(+), 209 deletions(-)
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index 2d37763..bd38a03 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -508,7 +508,10 @@ impl Table {
Err(e) => return Err(format!("Failed to create append: {e}")),
};
- let writer = table_append.create_writer();
+ let writer = match table_append.create_writer() {
+ Ok(w) => w,
+ Err(e) => return Err(format!("Failed to create writer: {e}")),
+ };
let writer = Box::into_raw(Box::new(AppendWriter { inner: writer }));
Ok(writer)
}
@@ -580,7 +583,7 @@ impl AppendWriter {
fn append(&mut self, row: &ffi::FfiGenericRow) -> ffi::FfiResult {
let generic_row = types::ffi_row_to_core(row);
- let result = RUNTIME.block_on(async {
self.inner.append(generic_row).await });
+ let result = RUNTIME.block_on(async {
self.inner.append(&generic_row).await });
match result {
Ok(_) => ok_result(),
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index 4043350..48f09e7 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -68,7 +68,9 @@ impl FlussTable {
.new_append()
.map_err(|e| FlussError::new_err(e.to_string()))?;
- let rust_writer = table_append.create_writer();
+ let rust_writer = table_append
+ .create_writer()
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
let py_writer = AppendWriter::from_core(rust_writer, table_info);
@@ -251,7 +253,7 @@ impl AppendWriter {
future_into_py(py, async move {
inner
- .append(generic_row)
+ .append(&generic_row)
.await
.map_err(|e| FlussError::new_err(e.to_string()))
})
diff --git a/crates/examples/src/example_kv_table.rs
b/crates/examples/src/example_kv_table.rs
index 032691e..437da06 100644
--- a/crates/examples/src/example_kv_table.rs
+++ b/crates/examples/src/example_kv_table.rs
@@ -16,7 +16,7 @@
// under the License.
use clap::Parser;
-use fluss::client::{FlussConnection, UpsertWriter};
+use fluss::client::FlussConnection;
use fluss::config::Config;
use fluss::error::Result;
use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
diff --git a/crates/examples/src/example_partitioned_kv_table.rs
b/crates/examples/src/example_partitioned_kv_table.rs
index a5e76fa..1b0c303 100644
--- a/crates/examples/src/example_partitioned_kv_table.rs
+++ b/crates/examples/src/example_partitioned_kv_table.rs
@@ -16,7 +16,7 @@
// under the License.
use clap::Parser;
-use fluss::client::{FlussAdmin, FlussConnection, UpsertWriter};
+use fluss::client::{FlussAdmin, FlussConnection};
use fluss::config::Config;
use fluss::error::Result;
use fluss::metadata::{DataTypes, PartitionSpec, Schema, TableDescriptor,
TablePath};
diff --git a/crates/examples/src/example_table.rs
b/crates/examples/src/example_table.rs
index 92055a7..6c74e63 100644
--- a/crates/examples/src/example_table.rs
+++ b/crates/examples/src/example_table.rs
@@ -63,13 +63,13 @@ pub async fn main() -> Result<()> {
row.set_field(2, 123_456_789_123i64);
let table = conn.get_table(&table_path).await?;
- let append_writer = table.new_append()?.create_writer();
- let f1 = append_writer.append(row);
- row = GenericRow::new(3);
+ let append_writer = table.new_append()?.create_writer()?;
+ let f1 = append_writer.append(&row);
+ let mut row = GenericRow::new(3);
row.set_field(0, 233333);
row.set_field(1, "tt44");
row.set_field(2, 987_654_321_987i64);
- let f2 = append_writer.append(row);
+ let f2 = append_writer.append(&row);
try_join!(f1, f2, append_writer.flush())?;
// scan rows
diff --git a/crates/fluss/src/client/table/append.rs
b/crates/fluss/src/client/table/append.rs
index 7fe2023..ace91a6 100644
--- a/crates/fluss/src/client/table/append.rs
+++ b/crates/fluss/src/client/table/append.rs
@@ -15,16 +15,16 @@
// specific language governing permissions and limitations
// under the License.
+use crate::client::table::partition_getter::{PartitionGetter,
get_physical_path};
use crate::client::{WriteRecord, WriterClient};
use crate::error::Result;
use crate::metadata::{PhysicalTablePath, TableInfo, TablePath};
-use crate::row::GenericRow;
+use crate::row::{ColumnarRow, InternalRow};
use arrow::array::RecordBatch;
use std::sync::Arc;
-#[allow(dead_code)]
pub struct TableAppend {
- table_path: TablePath,
+ table_path: Arc<TablePath>,
table_info: Arc<TableInfo>,
writer_client: Arc<WriterClient>,
}
@@ -36,32 +36,48 @@ impl TableAppend {
writer_client: Arc<WriterClient>,
) -> Self {
Self {
- table_path,
+ table_path: Arc::new(table_path),
table_info,
writer_client,
}
}
- pub fn create_writer(&self) -> AppendWriter {
- AppendWriter {
- physical_table_path:
Arc::new(PhysicalTablePath::of(Arc::new(self.table_path.clone()))),
+ pub fn create_writer(&self) -> Result<AppendWriter> {
+ let partition_getter = if self.table_info.is_partitioned() {
+ Some(PartitionGetter::new(
+ self.table_info.row_type(),
+ Arc::clone(self.table_info.get_partition_keys()),
+ )?)
+ } else {
+ None
+ };
+
+ Ok(AppendWriter {
+ table_path: Arc::clone(&self.table_path),
+ partition_getter,
writer_client: self.writer_client.clone(),
table_info: Arc::clone(&self.table_info),
- }
+ })
}
}
pub struct AppendWriter {
- physical_table_path: Arc<PhysicalTablePath>,
+ table_path: Arc<TablePath>,
+ partition_getter: Option<PartitionGetter>,
writer_client: Arc<WriterClient>,
table_info: Arc<TableInfo>,
}
impl AppendWriter {
- pub async fn append(&self, row: GenericRow<'_>) -> Result<()> {
+ pub async fn append<R: InternalRow>(&self, row: &R) -> Result<()> {
+ let physical_table_path = Arc::new(get_physical_path(
+ &self.table_path,
+ self.partition_getter.as_ref(),
+ row,
+ )?);
let record = WriteRecord::for_append(
Arc::clone(&self.table_info),
- Arc::clone(&self.physical_table_path),
+ physical_table_path,
self.table_info.schema_id,
row,
);
@@ -70,10 +86,25 @@ impl AppendWriter {
result_handle.result(result)
}
+ /// Appends an Arrow RecordBatch to the table.
+ ///
+ /// For partitioned tables, the partition is derived from the **first
row** of the batch.
+ /// Callers must ensure all rows in the batch belong to the same partition.
pub async fn append_arrow_batch(&self, batch: RecordBatch) -> Result<()> {
+ let physical_table_path = if self.partition_getter.is_some() &&
batch.num_rows() > 0 {
+ let first_row = ColumnarRow::new(Arc::new(batch.clone()));
+ Arc::new(get_physical_path(
+ &self.table_path,
+ self.partition_getter.as_ref(),
+ &first_row,
+ )?)
+ } else {
+ Arc::new(PhysicalTablePath::of(Arc::clone(&self.table_path)))
+ };
+
let record = WriteRecord::for_append_record_batch(
Arc::clone(&self.table_info),
- Arc::clone(&self.physical_table_path),
+ physical_table_path,
self.table_info.schema_id,
batch,
);
diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs
b/crates/fluss/src/client/table/log_fetch_buffer.rs
index 78ee065..b622f19 100644
--- a/crates/fluss/src/client/table/log_fetch_buffer.rs
+++ b/crates/fluss/src/client/table/log_fetch_buffer.rs
@@ -917,7 +917,7 @@ mod tests {
let mut row = GenericRow::new(2);
row.set_field(0, 1_i32);
row.set_field(1, "alice");
- let record = WriteRecord::for_append(table_info, physical_table_path,
1, row);
+ let record = WriteRecord::for_append(table_info, physical_table_path,
1, &row);
builder.append(&record)?;
let data = builder.build()?;
diff --git a/crates/fluss/src/client/table/mod.rs
b/crates/fluss/src/client/table/mod.rs
index 2fbbbc9..6d54933 100644
--- a/crates/fluss/src/client/table/mod.rs
+++ b/crates/fluss/src/client/table/mod.rs
@@ -31,16 +31,14 @@ mod partition_getter;
mod remote_log;
mod scanner;
mod upsert;
-mod writer;
-use crate::client::table::upsert::TableUpsert;
pub use append::{AppendWriter, TableAppend};
pub use lookup::{LookupResult, Lookuper, TableLookup};
pub use remote_log::{
DEFAULT_SCANNER_REMOTE_LOG_DOWNLOAD_THREADS,
DEFAULT_SCANNER_REMOTE_LOG_PREFETCH_NUM,
};
pub use scanner::{LogScanner, RecordBatchLogScanner, TableScan};
-pub use writer::{TableWriter, UpsertWriter};
+pub use upsert::{TableUpsert, UpsertWriter};
#[allow(dead_code)]
pub struct FlussTable<'a> {
diff --git a/crates/fluss/src/client/table/partition_getter.rs
b/crates/fluss/src/client/table/partition_getter.rs
index 1a76106..9136801 100644
--- a/crates/fluss/src/client/table/partition_getter.rs
+++ b/crates/fluss/src/client/table/partition_getter.rs
@@ -17,12 +17,29 @@
use crate::error::Error::IllegalArgument;
use crate::error::Result;
-use crate::metadata::{DataType, ResolvedPartitionSpec, RowType};
+use crate::metadata::{DataType, PhysicalTablePath, ResolvedPartitionSpec,
RowType, TablePath};
use crate::row::InternalRow;
use crate::row::field_getter::FieldGetter;
use crate::util::partition;
use std::sync::Arc;
+/// Get the physical table path for a row, handling partitioned vs
non-partitioned tables.
+pub fn get_physical_path<R: InternalRow>(
+ table_path: &Arc<TablePath>,
+ partition_getter: Option<&PartitionGetter>,
+ row: &R,
+) -> Result<PhysicalTablePath> {
+ if let Some(getter) = partition_getter {
+ let partition = getter.get_partition(row)?;
+ Ok(PhysicalTablePath::of_partitioned(
+ Arc::clone(table_path),
+ Some(partition),
+ ))
+ } else {
+ Ok(PhysicalTablePath::of(Arc::clone(table_path)))
+ }
+}
+
/// A getter to get partition name from a row.
#[allow(dead_code)]
pub struct PartitionGetter {
diff --git a/crates/fluss/src/client/table/scanner.rs
b/crates/fluss/src/client/table/scanner.rs
index 14d2841..10e7fff 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -1516,14 +1516,11 @@ mod tests {
},
)?;
let physical_table_path = Arc::new(PhysicalTablePath::of(table_path));
- let record = WriteRecord::for_append(
- Arc::new(table_info.clone()),
- physical_table_path,
- 1,
- GenericRow {
- values: vec![Datum::Int32(1)],
- },
- );
+ let row = GenericRow {
+ values: vec![Datum::Int32(1)],
+ };
+ let record =
+ WriteRecord::for_append(Arc::new(table_info.clone()),
physical_table_path, 1, &row);
builder.append(&record)?;
builder.build()
}
diff --git a/crates/fluss/src/client/table/upsert.rs
b/crates/fluss/src/client/table/upsert.rs
index 269d525..0595397 100644
--- a/crates/fluss/src/client/table/upsert.rs
+++ b/crates/fluss/src/client/table/upsert.rs
@@ -15,17 +15,16 @@
// specific language governing permissions and limitations
// under the License.
-use crate::client::table::writer::{DeleteResult, TableWriter, UpsertResult,
UpsertWriter};
use crate::client::{RowBytes, WriteFormat, WriteRecord, WriterClient};
use crate::error::Error::IllegalArgument;
use crate::error::Result;
-use crate::metadata::{PhysicalTablePath, RowType, TableInfo, TablePath};
+use crate::metadata::{RowType, TableInfo, TablePath};
use crate::row::InternalRow;
use crate::row::encode::{KeyEncoder, KeyEncoderFactory, RowEncoder,
RowEncoderFactory};
use crate::row::field_getter::FieldGetter;
use std::sync::Arc;
-use crate::client::table::partition_getter::PartitionGetter;
+use crate::client::table::partition_getter::{PartitionGetter,
get_physical_path};
use bitvec::prelude::bitvec;
use bytes::Bytes;
@@ -98,7 +97,7 @@ impl TableUpsert {
self.partial_update(Some(valid_col_indices))
}
- pub fn create_writer(&self) -> Result<impl UpsertWriter> {
+ pub fn create_writer(&self) -> Result<UpsertWriter> {
UpsertWriterFactory::create(
Arc::new(self.table_path.clone()),
Arc::new(self.table_info.clone()),
@@ -108,10 +107,7 @@ impl TableUpsert {
}
}
-struct UpsertWriterImpl<RE>
-where
- RE: RowEncoder,
-{
+pub struct UpsertWriter {
table_path: Arc<TablePath>,
writer_client: Arc<WriterClient>,
partition_field_getter: Option<PartitionGetter>,
@@ -120,7 +116,7 @@ where
// Use primary key encoder as bucket key encoder when None
bucket_key_encoder: Option<Box<dyn KeyEncoder>>,
write_format: WriteFormat,
- row_encoder: RE,
+ row_encoder: Box<dyn RowEncoder>,
field_getters: Box<[FieldGetter]>,
table_info: Arc<TableInfo>,
}
@@ -133,7 +129,7 @@ impl UpsertWriterFactory {
table_info: Arc<TableInfo>,
partial_update_columns: Option<Arc<Vec<usize>>>,
writer_client: Arc<WriterClient>,
- ) -> Result<impl UpsertWriter> {
+ ) -> Result<UpsertWriter> {
let data_lake_format = &table_info.table_config.get_datalake_format()?;
let row_type = table_info.row_type();
let physical_pks = table_info.get_physical_primary_keys();
@@ -173,7 +169,7 @@ impl UpsertWriterFactory {
None
};
- Ok(UpsertWriterImpl {
+ Ok(UpsertWriter {
table_path,
partition_field_getter,
writer_client,
@@ -181,7 +177,7 @@ impl UpsertWriterFactory {
target_columns: partial_update_columns,
bucket_key_encoder,
write_format,
- row_encoder: RowEncoderFactory::create(kv_format,
row_type.clone())?,
+ row_encoder: Box::new(RowEncoderFactory::create(kv_format,
row_type.clone())?),
field_getters,
table_info: table_info.clone(),
})
@@ -283,8 +279,7 @@ impl UpsertWriterFactory {
}
}
-#[allow(dead_code)]
-impl<RE: RowEncoder> UpsertWriterImpl<RE> {
+impl UpsertWriter {
fn check_field_count<R: InternalRow>(&self, row: &R) -> Result<()> {
let expected = self.table_info.get_row_type().fields().len();
if row.get_field_count() != expected {
@@ -317,31 +312,15 @@ impl<RE: RowEncoder> UpsertWriterImpl<RE> {
self.row_encoder.finish_row()
}
- fn get_physical_path<R: InternalRow>(&self, row: &R) ->
Result<PhysicalTablePath> {
- if let Some(partition_getter) = &self.partition_field_getter {
- let partition = partition_getter.get_partition(row);
- Ok(PhysicalTablePath::of_partitioned(
- Arc::clone(&self.table_path),
- Some(partition?),
- ))
- } else {
- Ok(PhysicalTablePath::of(Arc::clone(&self.table_path)))
- }
- }
-}
-
-impl<RE: RowEncoder> TableWriter for UpsertWriterImpl<RE> {
/// Flush data written that have not yet been sent to the server, forcing
the client to send the
/// requests to server and blocks on the completion of the requests
associated with these
/// records. A request is considered completed when it is successfully
acknowledged according to
/// the CLIENT_WRITER_ACKS configuration option you have specified or else
it
/// results in an error.
- async fn flush(&self) -> Result<()> {
+ pub async fn flush(&self) -> Result<()> {
self.writer_client.flush().await
}
-}
-impl<RE: RowEncoder> UpsertWriter for UpsertWriterImpl<RE> {
/// Inserts row into Fluss table if they do not already exist, or updates
them if they do exist.
///
/// # Arguments
@@ -349,7 +328,7 @@ impl<RE: RowEncoder> UpsertWriter for UpsertWriterImpl<RE> {
///
/// # Returns
/// Ok(UpsertResult) when completed normally
- async fn upsert<R: InternalRow>(&mut self, row: &R) ->
Result<UpsertResult> {
+ pub async fn upsert<R: InternalRow>(&mut self, row: &R) ->
Result<UpsertResult> {
self.check_field_count(row)?;
let (key, bucket_key) = self.get_keys(row)?;
@@ -361,7 +340,11 @@ impl<RE: RowEncoder> UpsertWriter for UpsertWriterImpl<RE>
{
let write_record = WriteRecord::for_upsert(
Arc::clone(&self.table_info),
- Arc::new(self.get_physical_path(row)?),
+ Arc::new(get_physical_path(
+ &self.table_path,
+ self.partition_field_getter.as_ref(),
+ row,
+ )?),
self.table_info.schema_id,
key,
bucket_key,
@@ -384,14 +367,18 @@ impl<RE: RowEncoder> UpsertWriter for
UpsertWriterImpl<RE> {
///
/// # Returns
/// Ok(DeleteResult) when completed normally
- async fn delete<R: InternalRow>(&mut self, row: &R) ->
Result<DeleteResult> {
+ pub async fn delete<R: InternalRow>(&mut self, row: &R) ->
Result<DeleteResult> {
self.check_field_count(row)?;
let (key, bucket_key) = self.get_keys(row)?;
let write_record = WriteRecord::for_upsert(
Arc::clone(&self.table_info),
- Arc::new(self.get_physical_path(row)?),
+ Arc::new(get_physical_path(
+ &self.table_path,
+ self.partition_field_getter.as_ref(),
+ row,
+ )?),
self.table_info.schema_id,
key,
bucket_key,
@@ -537,3 +524,13 @@ mod tests {
));
}
}
+
+/// The result of upserting a record
+/// Currently this is an empty struct to allow for compatible evolution in the
future
+#[derive(Default)]
+pub struct UpsertResult;
+
+/// The result of deleting a record
+/// Currently this is an empty struct to allow for compatible evolution in the
future
+#[derive(Default)]
+pub struct DeleteResult;
diff --git a/crates/fluss/src/client/table/writer.rs
b/crates/fluss/src/client/table/writer.rs
deleted file mode 100644
index ec26ec6..0000000
--- a/crates/fluss/src/client/table/writer.rs
+++ /dev/null
@@ -1,46 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::row::{GenericRow, InternalRow};
-
-use crate::error::Result;
-
-#[allow(dead_code, async_fn_in_trait)]
-pub trait TableWriter {
- async fn flush(&self) -> Result<()>;
-}
-
-#[allow(dead_code)]
-pub trait AppendWriter: TableWriter {
- async fn append(&self, row: GenericRow) -> Result<()>;
-}
-
-#[allow(dead_code, async_fn_in_trait)]
-pub trait UpsertWriter: TableWriter {
- async fn upsert<R: InternalRow>(&mut self, row: &R) ->
Result<UpsertResult>;
- async fn delete<R: InternalRow>(&mut self, row: &R) ->
Result<DeleteResult>;
-}
-
-/// The result of upserting a record
-/// Currently this is an empty struct to allow for compatible evolution in the
future
-#[derive(Default)]
-pub struct UpsertResult;
-
-/// The result of deleting a record
-/// Currently this is an empty struct to allow for compatible evolution in the
future
-#[derive(Default)]
-pub struct DeleteResult;
diff --git a/crates/fluss/src/client/write/accumulator.rs
b/crates/fluss/src/client/write/accumulator.rs
index 2a45517..5eae868 100644
--- a/crates/fluss/src/client/write/accumulator.rs
+++ b/crates/fluss/src/client/write/accumulator.rs
@@ -599,14 +599,10 @@ mod tests {
let physical_table_path =
Arc::new(PhysicalTablePath::of(Arc::new(table_path.clone())));
let table_info = Arc::new(build_table_info(table_path.clone(), 1, 1));
let cluster = Arc::new(build_cluster(&table_path, 1, 1));
- let record = WriteRecord::for_append(
- table_info,
- physical_table_path,
- 1,
- GenericRow {
- values: vec![Datum::Int32(1)],
- },
- );
+ let row = GenericRow {
+ values: vec![Datum::Int32(1)],
+ };
+ let record = WriteRecord::for_append(table_info, physical_table_path,
1, &row);
accumulator.append(&record, 0, &cluster, false).await?;
diff --git a/crates/fluss/src/client/write/batch.rs
b/crates/fluss/src/client/write/batch.rs
index da30c8a..c765473 100644
--- a/crates/fluss/src/client/write/batch.rs
+++ b/crates/fluss/src/client/write/batch.rs
@@ -426,7 +426,7 @@ mod tests {
Arc::clone(&table_info),
Arc::clone(&physical_table_path),
1,
- row,
+ &row,
);
batch.try_append(&record).unwrap();
}
diff --git a/crates/fluss/src/client/write/mod.rs
b/crates/fluss/src/client/write/mod.rs
index 868b582..25a0db6 100644
--- a/crates/fluss/src/client/write/mod.rs
+++ b/crates/fluss/src/client/write/mod.rs
@@ -22,7 +22,7 @@ use crate::client::broadcast::{self as client_broadcast,
BatchWriteResult, Broad
use crate::error::Error;
use crate::metadata::{PhysicalTablePath, TableInfo};
-use crate::row::GenericRow;
+use crate::row::InternalRow;
pub use accumulator::*;
use arrow::array::RecordBatch;
use bytes::Bytes;
@@ -64,7 +64,7 @@ pub enum Record<'a> {
}
pub enum LogWriteRecord<'a> {
- Generic(GenericRow<'a>),
+ InternalRow(&'a dyn InternalRow),
RecordBatch(Arc<RecordBatch>),
}
@@ -112,11 +112,11 @@ impl<'a> WriteRecord<'a> {
table_info: Arc<TableInfo>,
physical_table_path: Arc<PhysicalTablePath>,
schema_id: i32,
- row: GenericRow<'a>,
+ row: &'a dyn InternalRow,
) -> Self {
Self {
table_info,
- record: Record::Log(LogWriteRecord::Generic(row)),
+ record: Record::Log(LogWriteRecord::InternalRow(row)),
physical_table_path,
bucket_key: None,
schema_id,
diff --git a/crates/fluss/src/client/write/sender.rs
b/crates/fluss/src/client/write/sender.rs
index 6a7dad0..f336d0c 100644
--- a/crates/fluss/src/client/write/sender.rs
+++ b/crates/fluss/src/client/write/sender.rs
@@ -103,8 +103,7 @@ impl Sender {
if api_error.code ==
FlussError::PartitionNotExists.code() =>
{
warn!(
- "Partition does not exist during metadata update,
continuing: {}",
- api_error
+ "Partition does not exist during metadata update,
continuing: {api_error}"
);
}
_ => return Err(e),
@@ -664,14 +663,10 @@ mod tests {
) -> Result<(ReadyWriteBatch, crate::client::ResultHandle)> {
let table_info =
Arc::new(build_table_info(table_path.as_ref().clone(), 1, 1));
let physical_table_path = Arc::new(PhysicalTablePath::of(table_path));
- let record = WriteRecord::for_append(
- table_info,
- physical_table_path,
- 1,
- GenericRow {
- values: vec![Datum::Int32(1)],
- },
- );
+ let row = GenericRow {
+ values: vec![Datum::Int32(1)],
+ };
+ let record = WriteRecord::for_append(table_info, physical_table_path,
1, &row);
let result = accumulator.append(&record, 0, &cluster, false).await?;
let result_handle = result.result_handle.expect("result handle");
let server = cluster.get_tablet_server(1).expect("server");
diff --git a/crates/fluss/src/io/storage.rs b/crates/fluss/src/io/storage.rs
index a370861..a573517 100644
--- a/crates/fluss/src/io/storage.rs
+++ b/crates/fluss/src/io/storage.rs
@@ -19,6 +19,8 @@ use crate::error;
use crate::error::Result;
use crate::io::FileIOBuilder;
use opendal::{Operator, Scheme};
+#[cfg(any(feature = "storage-s3", feature = "storage-oss"))]
+use std::collections::HashMap;
/// The storage carries all supported storage services in fluss
#[derive(Debug)]
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index 726106b..b798896 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -20,7 +20,8 @@ use crate::compression::ArrowCompressionInfo;
use crate::error::{Error, Result};
use crate::metadata::{DataType, RowType};
use crate::record::{ChangeType, ScanRecord};
-use crate::row::{ColumnarRow, GenericRow};
+use crate::row::field_getter::FieldGetter;
+use crate::row::{ColumnarRow, InternalRow};
use arrow::array::{
ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder,
Decimal128Builder,
Float32Builder, Float64Builder, Int8Builder, Int16Builder, Int32Builder,
Int64Builder,
@@ -166,7 +167,7 @@ pub struct MemoryLogRecordsArrowBuilder {
pub trait ArrowRecordBatchInnerBuilder: Send + Sync {
fn build_arrow_record_batch(&mut self) -> Result<Arc<RecordBatch>>;
- fn append(&mut self, row: &GenericRow) -> Result<bool>;
+ fn append(&mut self, row: &dyn InternalRow) -> Result<bool>;
fn append_batch(&mut self, record_batch: Arc<RecordBatch>) -> Result<bool>;
@@ -191,7 +192,7 @@ impl ArrowRecordBatchInnerBuilder for
PrebuiltRecordBatchBuilder {
Ok(self.arrow_record_batch.as_ref().unwrap().clone())
}
- fn append(&mut self, _row: &GenericRow) -> Result<bool> {
+ fn append(&mut self, _row: &dyn InternalRow) -> Result<bool> {
// append one single row is not supported, return false directly
Ok(false)
}
@@ -229,6 +230,7 @@ impl ArrowRecordBatchInnerBuilder for
PrebuiltRecordBatchBuilder {
pub struct RowAppendRecordBatchBuilder {
table_schema: SchemaRef,
arrow_column_builders: Vec<Box<dyn ArrayBuilder>>,
+ field_getters: Box<[FieldGetter]>,
records_count: i32,
}
@@ -240,9 +242,11 @@ impl RowAppendRecordBatchBuilder {
.iter()
.map(|field| Self::create_builder(field.data_type()))
.collect();
+ let field_getters = FieldGetter::create_field_getters(row_type);
Ok(Self {
table_schema: schema_ref.clone(),
arrow_column_builders: builders?,
+ field_getters,
records_count: 0,
})
}
@@ -346,11 +350,18 @@ impl ArrowRecordBatchInnerBuilder for
RowAppendRecordBatchBuilder {
)?))
}
- fn append(&mut self, row: &GenericRow) -> Result<bool> {
- for (idx, value) in row.values.iter().enumerate() {
+ fn append(&mut self, row: &dyn InternalRow) -> Result<bool> {
+ for (idx, getter) in self.field_getters.iter().enumerate() {
+ let datum = getter.get_field(row);
let field_type = self.table_schema.field(idx).data_type();
- let builder = self.arrow_column_builders.get_mut(idx).unwrap();
- value.append_to(builder.as_mut(), field_type)?;
+ let builder =
+ self.arrow_column_builders
+ .get_mut(idx)
+ .ok_or_else(|| Error::UnexpectedError {
+ message: format!("Column builder at index {idx} not
found."),
+ source: None,
+ })?;
+ datum.append_to(builder, field_type)?;
}
self.records_count += 1;
Ok(true)
@@ -412,7 +423,9 @@ impl MemoryLogRecordsArrowBuilder {
pub fn append(&mut self, record: &WriteRecord) -> Result<bool> {
match &record.record() {
Record::Log(log_write_record) => match log_write_record {
- LogWriteRecord::Generic(row) =>
Ok(self.arrow_record_batch_builder.append(row)?),
+ LogWriteRecord::InternalRow(row) => {
+ Ok(self.arrow_record_batch_builder.append(*row)?)
+ }
LogWriteRecord::RecordBatch(record_batch) => Ok(self
.arrow_record_batch_builder
.append_batch(record_batch.clone())?),
@@ -1715,9 +1728,10 @@ mod tests {
)]);
let mut builder = RowAppendRecordBatchBuilder::new(&row_type)?;
let decimal =
Decimal::from_big_decimal(BigDecimal::from_str("123.456").unwrap(), 10, 3)?;
- builder.append(&GenericRow {
+ let row = GenericRow {
values: vec![Datum::Decimal(decimal)],
- })?;
+ };
+ builder.append(&row)?;
let batch = builder.build_arrow_record_batch()?;
let array = batch
.column(0)
@@ -1735,9 +1749,10 @@ mod tests {
)]);
let mut builder = RowAppendRecordBatchBuilder::new(&row_type)?;
let decimal =
Decimal::from_big_decimal(BigDecimal::from_str("123456.78").unwrap(), 10, 2)?;
- let result = builder.append(&GenericRow {
+ let row = GenericRow {
values: vec![Datum::Decimal(decimal)],
- });
+ };
+ let result = builder.append(&row);
assert!(result.is_err());
assert!(
result
@@ -1832,7 +1847,7 @@ mod tests {
let mut builder = RowAppendRecordBatchBuilder::new(&row_type)?;
// Append rows with various data types
- builder.append(&GenericRow {
+ let row = GenericRow {
values: vec![
Datum::Int32(1),
Datum::Decimal(Decimal::from_big_decimal(
@@ -1851,7 +1866,8 @@ mod tests {
// 1609459200000 ms = 2021-01-01 00:00:00 UTC, with 987654
additional nanoseconds
Datum::TimestampLtz(TimestampLtz::from_millis_nanos(1609459200000, 987654)?),
],
- })?;
+ };
+ builder.append(&row)?;
let batch = builder.build_arrow_record_batch()?;
@@ -1959,15 +1975,19 @@ mod tests {
let mut row = GenericRow::new(2);
row.set_field(0, 1_i32);
row.set_field(1, "alice");
- let record =
- WriteRecord::for_append(Arc::clone(&table_info),
physical_table_path.clone(), 1, row);
+ let record = WriteRecord::for_append(
+ Arc::clone(&table_info),
+ physical_table_path.clone(),
+ 1,
+ &row,
+ );
builder.append(&record)?;
let mut row2 = GenericRow::new(2);
row2.set_field(0, 2_i32);
row2.set_field(1, "bob");
let record2 =
- WriteRecord::for_append(Arc::clone(&table_info),
physical_table_path, 2, row2);
+ WriteRecord::for_append(Arc::clone(&table_info),
physical_table_path, 2, &row2);
builder.append(&record2)?;
let data = builder.build()?;
diff --git a/crates/fluss/src/row/column.rs b/crates/fluss/src/row/column.rs
index f48075b..50db32b 100644
--- a/crates/fluss/src/row/column.rs
+++ b/crates/fluss/src/row/column.rs
@@ -248,6 +248,24 @@ impl InternalRow for ColumnarRow {
.value(self.row_id)
}
+ fn get_char(&self, pos: usize, _length: usize) -> &str {
+ self.record_batch
+ .column(pos)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .expect("Expected String array for char type")
+ .value(self.row_id)
+ }
+
+ fn get_string(&self, pos: usize) -> &str {
+ self.record_batch
+ .column(pos)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .expect("Expected String array.")
+ .value(self.row_id)
+ }
+
fn get_decimal(&self, pos: usize, precision: usize, scale: usize) ->
crate::row::Decimal {
use arrow::datatypes::DataType;
@@ -327,24 +345,6 @@ impl InternalRow for ColumnarRow {
)
}
- fn get_char(&self, pos: usize, _length: usize) -> &str {
- self.record_batch
- .column(pos)
- .as_any()
- .downcast_ref::<StringArray>()
- .expect("Expected String array for char type")
- .value(self.row_id)
- }
-
- fn get_string(&self, pos: usize) -> &str {
- self.record_batch
- .column(pos)
- .as_any()
- .downcast_ref::<StringArray>()
- .expect("Expected String array.")
- .value(self.row_id)
- }
-
fn get_binary(&self, pos: usize, _length: usize) -> &[u8] {
self.record_batch
.column(pos)
diff --git a/crates/fluss/src/row/compacted/compacted_row.rs
b/crates/fluss/src/row/compacted/compacted_row.rs
index 35d684d..2322207 100644
--- a/crates/fluss/src/row/compacted/compacted_row.rs
+++ b/crates/fluss/src/row/compacted/compacted_row.rs
@@ -76,7 +76,6 @@ impl<'a> CompactedRow<'a> {
}
}
-#[allow(dead_code)]
impl<'a> InternalRow for CompactedRow<'a> {
fn get_field_count(&self) -> usize {
self.arity
@@ -125,14 +124,6 @@ impl<'a> InternalRow for CompactedRow<'a> {
self.decoded_row().get_string(pos)
}
- fn get_binary(&self, pos: usize, length: usize) -> &[u8] {
- self.decoded_row().get_binary(pos, length)
- }
-
- fn get_bytes(&self, pos: usize) -> &[u8] {
- self.decoded_row().get_bytes(pos)
- }
-
fn get_decimal(&self, pos: usize, precision: usize, scale: usize) ->
crate::row::Decimal {
self.decoded_row().get_decimal(pos, precision, scale)
}
@@ -153,6 +144,14 @@ impl<'a> InternalRow for CompactedRow<'a> {
self.decoded_row().get_timestamp_ltz(pos, precision)
}
+ fn get_binary(&self, pos: usize, length: usize) -> &[u8] {
+ self.decoded_row().get_binary(pos, length)
+ }
+
+ fn get_bytes(&self, pos: usize) -> &[u8] {
+ self.decoded_row().get_bytes(pos)
+ }
+
fn as_encoded_bytes(&self, write_format: WriteFormat) -> Option<&[u8]> {
match write_format {
WriteFormat::CompactedKv => Some(self.as_bytes()),
diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs
index f7c8bec..276dcca 100644
--- a/crates/fluss/src/row/mod.rs
+++ b/crates/fluss/src/row/mod.rs
@@ -55,7 +55,7 @@ impl<'a> BinaryRow<'a> {
}
// TODO make functions return Result<?> for better error handling
-pub trait InternalRow {
+pub trait InternalRow: Send + Sync {
/// Returns the number of fields in this row
fn get_field_count(&self) -> usize;
diff --git a/crates/fluss/tests/integration/kv_table.rs
b/crates/fluss/tests/integration/kv_table.rs
index b2263c2..3691d65 100644
--- a/crates/fluss/tests/integration/kv_table.rs
+++ b/crates/fluss/tests/integration/kv_table.rs
@@ -34,9 +34,10 @@ static SHARED_FLUSS_CLUSTER:
LazyLock<Arc<RwLock<Option<FlussTestingCluster>>>>
mod kv_table_test {
use super::SHARED_FLUSS_CLUSTER;
use crate::integration::fluss_cluster::FlussTestingCluster;
- use crate::integration::utils::{create_table, get_cluster, start_cluster,
stop_cluster};
- use fluss::client::UpsertWriter;
- use fluss::metadata::{DataTypes, PartitionSpec, Schema, TableDescriptor,
TablePath};
+ use crate::integration::utils::{
+ create_partitions, create_table, get_cluster, start_cluster,
stop_cluster,
+ };
+ use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
use fluss::row::{GenericRow, InternalRow};
use std::sync::Arc;
@@ -467,15 +468,7 @@ mod kv_table_test {
create_table(&admin, &table_path, &table_descriptor).await;
// Create partitions for each region before inserting data
- for region in &["US", "EU", "APAC"] {
- let mut partition_map = std::collections::HashMap::new();
- partition_map.insert("region".to_string(), region.to_string());
- let partition_spec = PartitionSpec::new(partition_map);
- admin
- .create_partition(&table_path, &partition_spec, false)
- .await
- .expect("Failed to create partition");
- }
+ create_partitions(&admin, &table_path, "region", &["US", "EU",
"APAC"]).await;
let connection = cluster.get_fluss_connection().await;
diff --git a/crates/fluss/tests/integration/table.rs
b/crates/fluss/tests/integration/log_table.rs
similarity index 90%
rename from crates/fluss/tests/integration/table.rs
rename to crates/fluss/tests/integration/log_table.rs
index 6a15674..64e6289 100644
--- a/crates/fluss/tests/integration/table.rs
+++ b/crates/fluss/tests/integration/log_table.rs
@@ -34,7 +34,9 @@ static SHARED_FLUSS_CLUSTER:
LazyLock<Arc<RwLock<Option<FlussTestingCluster>>>>
mod table_test {
use super::SHARED_FLUSS_CLUSTER;
use crate::integration::fluss_cluster::FlussTestingCluster;
- use crate::integration::utils::{create_table, get_cluster, start_cluster,
stop_cluster};
+ use crate::integration::utils::{
+ create_partitions, create_table, get_cluster, start_cluster,
stop_cluster,
+ };
use arrow::array::record_batch;
use fluss::client::{FlussTable, TableScan};
use fluss::metadata::{DataTypes, Schema, TableBucket, TableDescriptor,
TablePath};
@@ -44,6 +46,8 @@ mod table_test {
use jiff::Timestamp;
use std::collections::HashMap;
use std::sync::Arc;
+ use std::sync::atomic::AtomicUsize;
+ use std::sync::atomic::Ordering;
use std::time::Duration;
fn before_all() {
@@ -91,7 +95,8 @@ mod table_test {
let append_writer = table
.new_append()
.expect("Failed to create append")
- .create_writer();
+ .create_writer()
+ .expect("Failed to create writer");
let batch1 =
record_batch!(("c1", Int32, [1, 2, 3]), ("c2", Utf8, ["a1", "a2",
"a3"])).unwrap();
@@ -217,7 +222,8 @@ mod table_test {
.expect("Failed to get table")
.new_append()
.expect("Failed to create append")
- .create_writer();
+ .create_writer()
+ .expect("Failed to create writer");
let batch = record_batch!(
("id", Int32, [1, 2, 3]),
@@ -314,7 +320,8 @@ mod table_test {
let append_writer = table
.new_append()
.expect("Failed to create append")
- .create_writer();
+ .create_writer()
+ .expect("Failed to create writer");
let batch = record_batch!(
("col_a", Int32, [1, 2, 3]),
@@ -472,7 +479,7 @@ mod table_test {
.is_empty()
);
- let writer = table.new_append().unwrap().create_writer();
+ let writer = table.new_append().unwrap().create_writer().unwrap();
writer
.append_arrow_batch(
record_batch!(("id", Int32, [1, 2]), ("name", Utf8, ["a",
"b"])).unwrap(),
@@ -676,7 +683,8 @@ mod table_test {
let append_writer = table
.new_append()
.expect("Failed to create append")
- .create_writer();
+ .create_writer()
+ .expect("Failed to create writer");
// Test data for all datatypes
let col_tinyint = 127i8;
@@ -749,7 +757,7 @@ mod table_test {
row.set_field(27, col_timestamp_ltz_ns_neg.clone());
append_writer
- .append(row)
+ .append(&row)
.await
.expect("Failed to append row with all datatypes");
@@ -760,7 +768,7 @@ mod table_test {
}
append_writer
- .append(row_with_nulls)
+ .append(&row_with_nulls)
.await
.expect("Failed to append row with nulls");
@@ -967,4 +975,108 @@ mod table_test {
.await
.expect("Failed to drop table");
}
+
+ #[tokio::test]
+ async fn partitioned_table_append() {
+ let cluster = get_fluss_cluster();
+ let connection = cluster.get_fluss_connection().await;
+
+ let admin = connection.get_admin().await.expect("Failed to get admin");
+
+ let table_path = TablePath::new(
+ "fluss".to_string(),
+ "test_partitioned_log_append".to_string(),
+ );
+
+ // Create a partitioned log table
+ let table_descriptor = TableDescriptor::builder()
+ .schema(
+ Schema::builder()
+ .column("id", DataTypes::int())
+ .column("region", DataTypes::string())
+ .column("value", DataTypes::bigint())
+ .build()
+ .expect("Failed to build schema"),
+ )
+ .partitioned_by(vec!["region".to_string()])
+ .build()
+ .expect("Failed to build table");
+
+ create_table(&admin, &table_path, &table_descriptor).await;
+
+ // Create partitions
+ create_partitions(&admin, &table_path, "region", &["US", "EU"]).await;
+
+ // Wait for partitions to be available
+ tokio::time::sleep(Duration::from_secs(2)).await;
+
+ let table = connection
+ .get_table(&table_path)
+ .await
+ .expect("Failed to get table");
+
+ // Create append writer - this should now work for partitioned tables
+ let append_writer = table
+ .new_append()
+ .expect("Failed to create append")
+ .create_writer()
+ .expect("Failed to create writer");
+
+ // Append records with different partitions
+ let test_data = [
+ (1, "US", 100i64),
+ (2, "US", 200i64),
+ (3, "EU", 300i64),
+ (4, "EU", 400i64),
+ ];
+
+ for (id, region, value) in &test_data {
+ let mut row = fluss::row::GenericRow::new(3);
+ row.set_field(0, *id);
+ row.set_field(1, *region);
+ row.set_field(2, *value);
+ append_writer
+ .append(&row)
+ .await
+ .expect("Failed to append row");
+ }
+
+ append_writer.flush().await.expect("Failed to flush");
+
+ // Test append_arrow_batch for partitioned tables
+ // Each batch must contain rows from the same partition
+ let us_batch = record_batch!(
+ ("id", Int32, [5, 6]),
+ ("region", Utf8, ["US", "US"]),
+ ("value", Int64, [500, 600])
+ )
+ .unwrap();
+ append_writer
+ .append_arrow_batch(us_batch)
+ .await
+ .expect("Failed to append US batch");
+
+ let eu_batch = record_batch!(
+ ("id", Int32, [7, 8]),
+ ("region", Utf8, ["EU", "EU"]),
+ ("value", Int64, [700, 800])
+ )
+ .unwrap();
+ append_writer
+ .append_arrow_batch(eu_batch)
+ .await
+ .expect("Failed to append EU batch");
+
+ append_writer
+ .flush()
+ .await
+ .expect("Failed to flush batches");
+
+ admin
+ .drop_table(&table_path, false)
+ .await
+ .expect("Failed to drop table");
+
+ // todo: add scan test in 203
+ }
}
diff --git a/crates/fluss/tests/integration/table_remote_scan.rs
b/crates/fluss/tests/integration/table_remote_scan.rs
index c83da0f..ce0c137 100644
--- a/crates/fluss/tests/integration/table_remote_scan.rs
+++ b/crates/fluss/tests/integration/table_remote_scan.rs
@@ -40,10 +40,10 @@ mod table_remote_scan_test {
use fluss::row::{GenericRow, InternalRow};
use std::collections::HashMap;
use std::sync::Arc;
+ use std::sync::atomic::AtomicUsize;
use std::thread;
use std::time::Duration;
use uuid::Uuid;
-
fn before_all() {
// Create a new tokio runtime in a separate thread
let cluster_lock = SHARED_FLUSS_CLUSTER.clone();
@@ -141,7 +141,8 @@ mod table_remote_scan_test {
let append_writer = table
.new_append()
.expect("Failed to create append")
- .create_writer();
+ .create_writer()
+ .expect("Failed to create writer");
// append 20 rows, there must be some tiered to remote
let record_count = 20;
@@ -151,7 +152,7 @@ mod table_remote_scan_test {
let v = format!("v{}", i);
row.set_field(1, v.as_str());
append_writer
- .append(row)
+ .append(&row)
.await
.expect("Failed to append row");
}
diff --git a/crates/fluss/tests/integration/utils.rs
b/crates/fluss/tests/integration/utils.rs
index 4d0c349..fd5145a 100644
--- a/crates/fluss/tests/integration/utils.rs
+++ b/crates/fluss/tests/integration/utils.rs
@@ -17,8 +17,9 @@
*/
use crate::integration::fluss_cluster::{FlussTestingCluster,
FlussTestingClusterBuilder};
use fluss::client::FlussAdmin;
-use fluss::metadata::{TableDescriptor, TablePath};
+use fluss::metadata::{PartitionSpec, TableDescriptor, TablePath};
use parking_lot::RwLock;
+use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
@@ -102,3 +103,26 @@ pub fn get_cluster(cluster_lock:
&RwLock<Option<FlussTestingCluster>>) -> Arc<Fl
.clone(),
)
}
+
+/// Creates partitions for a partitioned table.
+///
+/// # Arguments
+/// * `admin` - The FlussAdmin instance
+/// * `table_path` - The table path
+/// * `partition_column` - The partition column name
+/// * `partition_values` - The partition values to create
+pub async fn create_partitions(
+ admin: &FlussAdmin,
+ table_path: &TablePath,
+ partition_column: &str,
+ partition_values: &[&str],
+) {
+ for value in partition_values {
+ let mut partition_map = HashMap::new();
+ partition_map.insert(partition_column.to_string(), value.to_string());
+ admin
+ .create_partition(table_path, &PartitionSpec::new(partition_map),
true)
+ .await
+ .expect("Failed to create partition");
+ }
+}
diff --git a/crates/fluss/tests/test_fluss.rs b/crates/fluss/tests/test_fluss.rs
index f3987e6..a6cc27a 100644
--- a/crates/fluss/tests/test_fluss.rs
+++ b/crates/fluss/tests/test_fluss.rs
@@ -23,7 +23,7 @@ mod integration {
mod admin;
mod fluss_cluster;
mod kv_table;
- mod table;
+ mod log_table;
mod utils;