This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 4ce374a chore: fix bug where column prune won't work when compression
is enabled (#108)
4ce374a is described below
commit 4ce374a884084438491ec30f59441e965ad0b9ad
Author: Keith Lee <[email protected]>
AuthorDate: Mon Dec 22 09:43:43 2025 +0000
chore: fix bug where column prune won't work when compression is enabled
(#108)
---
bindings/cpp/examples/example.cpp | 1 -
crates/fluss/src/client/write/accumulator.rs | 2 +
crates/fluss/src/client/write/batch.rs | 4 +
crates/fluss/src/compression/arrow_compression.rs | 245 +++++++++++++++++++++
crates/fluss/src/{lib.rs => compression/mod.rs} | 22 +-
crates/fluss/src/lib.rs | 1 +
crates/fluss/src/metadata/table.rs | 5 +
crates/fluss/src/record/arrow.rs | 21 +-
crates/fluss/tests/integration/table.rs | 1 -
.../fluss/tests/integration/table_remote_scan.rs | 1 -
10 files changed, 278 insertions(+), 25 deletions(-)
diff --git a/bindings/cpp/examples/example.cpp
b/bindings/cpp/examples/example.cpp
index 04f9ac6..6ff2b9b 100644
--- a/bindings/cpp/examples/example.cpp
+++ b/bindings/cpp/examples/example.cpp
@@ -61,7 +61,6 @@ int main() {
auto descriptor = fluss::TableDescriptor::NewBuilder()
.SetSchema(schema)
.SetBucketCount(3)
- .SetProperty("table.log.arrow.compression.type",
"NONE")
.SetComment("cpp example table with 3 buckets")
.Build();
diff --git a/crates/fluss/src/client/write/accumulator.rs
b/crates/fluss/src/client/write/accumulator.rs
index e4ca957..215adbe 100644
--- a/crates/fluss/src/client/write/accumulator.rs
+++ b/crates/fluss/src/client/write/accumulator.rs
@@ -94,6 +94,7 @@ impl RecordAccumulator {
let table_path = &record.table_path;
let table_info = cluster.get_table(table_path);
+ let arrow_compression_info =
table_info.get_table_config().get_arrow_compression_info()?;
let row_type = &cluster.get_table(table_path).row_type;
let schema_id = table_info.schema_id;
@@ -102,6 +103,7 @@ impl RecordAccumulator {
self.batch_id.fetch_add(1, Ordering::Relaxed),
table_path.as_ref().clone(),
schema_id,
+ arrow_compression_info,
row_type,
bucket_id,
current_time_ms(),
diff --git a/crates/fluss/src/client/write/batch.rs
b/crates/fluss/src/client/write/batch.rs
index 13b3d36..ba04db4 100644
--- a/crates/fluss/src/client/write/batch.rs
+++ b/crates/fluss/src/client/write/batch.rs
@@ -18,6 +18,7 @@
use crate::BucketId;
use crate::client::broadcast::{BatchWriteResult, BroadcastOnce};
use crate::client::{ResultHandle, WriteRecord};
+use crate::compression::ArrowCompressionInfo;
use crate::error::Result;
use crate::metadata::{DataType, TablePath};
use crate::record::MemoryLogRecordsArrowBuilder;
@@ -132,10 +133,12 @@ pub struct ArrowLogWriteBatch {
}
impl ArrowLogWriteBatch {
+ #[allow(clippy::too_many_arguments)]
pub fn new(
batch_id: i64,
table_path: TablePath,
schema_id: i32,
+ arrow_compression_info: ArrowCompressionInfo,
row_type: &DataType,
bucket_id: BucketId,
create_ms: i64,
@@ -148,6 +151,7 @@ impl ArrowLogWriteBatch {
schema_id,
row_type,
to_append_record_batch,
+ arrow_compression_info,
),
}
}
diff --git a/crates/fluss/src/compression/arrow_compression.rs
b/crates/fluss/src/compression/arrow_compression.rs
new file mode 100644
index 0000000..32dfadb
--- /dev/null
+++ b/crates/fluss/src/compression/arrow_compression.rs
@@ -0,0 +1,245 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::{Error, Result};
+use arrow::ipc::CompressionType;
+use std::collections::HashMap;
+
+pub const TABLE_LOG_ARROW_COMPRESSION_ZSTD_LEVEL: &str =
"table.log.arrow.compression.zstd.level";
+pub const TABLE_LOG_ARROW_COMPRESSION_TYPE: &str =
"table.log.arrow.compression.type";
+pub const DEFAULT_NON_ZSTD_COMPRESSION_LEVEL: i32 = -1;
+pub const DEFAULT_ZSTD_COMPRESSION_LEVEL: i32 = 3;
+
+#[derive(Clone, Debug, PartialEq)]
+pub enum ArrowCompressionType {
+ None,
+ Lz4Frame,
+ Zstd,
+}
+
+impl ArrowCompressionType {
+ fn from_conf(properties: &HashMap<String, String>) -> Result<Self> {
+ match properties
+ .get(TABLE_LOG_ARROW_COMPRESSION_TYPE)
+ .map(|s| s.as_str())
+ {
+ Some("NONE") => Ok(Self::None),
+ Some("LZ4_FRAME") => Ok(Self::Lz4Frame),
+ Some("ZSTD") => Ok(Self::Zstd),
+ Some(other) => Err(Error::IllegalArgument {
+ message: format!("Unsupported compression type: {other}"),
+ }),
+ None => Ok(Self::Zstd),
+ }
+ }
+}
+
+#[derive(Clone, Debug)]
+pub struct ArrowCompressionInfo {
+ pub compression_type: ArrowCompressionType,
+ pub compression_level: i32,
+}
+
+impl ArrowCompressionInfo {
+ pub fn from_conf(properties: &HashMap<String, String>) -> Result<Self> {
+ let compression_type = ArrowCompressionType::from_conf(properties)?;
+
+ if compression_type != ArrowCompressionType::Zstd {
+ return Ok(Self {
+ compression_type,
+ compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
+ });
+ }
+
+ match properties
+ .get(TABLE_LOG_ARROW_COMPRESSION_ZSTD_LEVEL)
+ .map(|s| s.as_str().parse::<i32>())
+ {
+ Some(Ok(level)) if !(1..=22).contains(&level) =>
Err(Error::IllegalArgument {
+ message: format!(
+ "Invalid ZSTD compression level: {}. Expected a value
between 1 and 22.",
+ level
+ ),
+ }),
+ Some(Err(e)) => Err(Error::IllegalArgument {
+ message: format!(
+ "Invalid ZSTD compression level. Expected a value between
1 and 22. {}",
+ e
+ ),
+ }),
+
+ Some(Ok(level)) => Ok(Self {
+ compression_type,
+ compression_level: level,
+ }),
+ None => Ok(Self {
+ compression_type,
+ compression_level: DEFAULT_ZSTD_COMPRESSION_LEVEL,
+ }),
+ }
+ }
+
+ #[cfg(test)]
+ fn new(compression_type: ArrowCompressionType, compression_level: i32) ->
ArrowCompressionInfo {
+ Self {
+ compression_type,
+ compression_level,
+ }
+ }
+
+ pub fn get_compression_type(&self) -> Option<CompressionType> {
+ match self.compression_type {
+ ArrowCompressionType::Zstd => Some(CompressionType::ZSTD),
+ ArrowCompressionType::Lz4Frame => Some(CompressionType::LZ4_FRAME),
+ ArrowCompressionType::None => None,
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::collections::HashMap;
+
+ #[test]
+ fn test_from_conf() {
+ assert_eq!(
+ ArrowCompressionType::from_conf(&HashMap::new()).unwrap(),
+ ArrowCompressionType::Zstd
+ );
+
+ assert_eq!(
+ ArrowCompressionType::from_conf(&mk_map(&[(
+ "table.log.arrow.compression.type",
+ "NONE"
+ )]))
+ .unwrap(),
+ ArrowCompressionType::None
+ );
+
+ assert_eq!(
+ ArrowCompressionType::from_conf(&mk_map(&[(
+ "table.log.arrow.compression.type",
+ "LZ4_FRAME"
+ )]))
+ .unwrap(),
+ ArrowCompressionType::Lz4Frame
+ );
+
+ assert_eq!(
+ ArrowCompressionType::from_conf(&mk_map(&[(
+ "table.log.arrow.compression.type",
+ "ZSTD"
+ )]))
+ .unwrap(),
+ ArrowCompressionType::Zstd
+ );
+ }
+
+ #[test]
+ fn test_from_conf_invalid_compression_type() {
+ let props = mk_map(&[("table.log.arrow.compression.type", "FOO")]);
+
+ assert!(
+ ArrowCompressionInfo::from_conf(&props)
+ .unwrap_err()
+ .to_string()
+ .contains(
+ "Fluss hitting illegal argument error Unsupported
compression type: FOO."
+ )
+ );
+ }
+
+ #[test]
+ fn test_from_conf_zstd_compression_level() {
+ let compression_info = ArrowCompressionInfo::from_conf(&mk_map(&[(
+ "table.log.arrow.compression.type",
+ "ZSTD",
+ )]));
+ assert_eq!(compression_info.unwrap().compression_level, 3);
+ let compression_info = ArrowCompressionInfo::from_conf(&mk_map(&[
+ ("table.log.arrow.compression.type", "ZSTD"),
+ ("table.log.arrow.compression.zstd.level", "1"),
+ ]));
+ assert_eq!(compression_info.unwrap().compression_level, 1);
+ }
+
+ #[test]
+ fn test_from_conf_compression_level_out_of_range() {
+ let props = mk_map(&[
+ ("table.log.arrow.compression.type", "ZSTD"),
+ ("table.log.arrow.compression.zstd.level", "0"),
+ ]);
+
+ assert!(
+ ArrowCompressionInfo::from_conf(&props)
+ .unwrap_err()
+ .to_string()
+ .contains("Expected a value between 1 and 22.")
+ );
+
+ let props = mk_map(&[
+ ("table.log.arrow.compression.type", "ZSTD"),
+ ("table.log.arrow.compression.zstd.level", "23"),
+ ]);
+
+ assert!(
+ ArrowCompressionInfo::from_conf(&props)
+ .unwrap_err()
+ .to_string()
+ .contains("Expected a value between 1 and 22.")
+ );
+ }
+
+ #[test]
+ fn test_from_conf_compression_level_parse_error() {
+ let props = mk_map(&[
+ ("table.log.arrow.compression.type", "ZSTD"),
+ ("table.log.arrow.compression.zstd.level", "not-a-number"),
+ ]);
+
+ assert!(
+ ArrowCompressionInfo::from_conf(&props)
+ .unwrap_err()
+ .to_string()
+ .contains("Expected a value between 1 and 22.")
+ );
+ }
+
+ #[test]
+ fn get_compression_type_maps_correctly() {
+ assert_eq!(
+ ArrowCompressionInfo::new(ArrowCompressionType::None,
-1).get_compression_type(),
+ None
+ );
+ assert_eq!(
+ ArrowCompressionInfo::new(ArrowCompressionType::Lz4Frame,
-1).get_compression_type(),
+ Some(CompressionType::LZ4_FRAME)
+ );
+ assert_eq!(
+ ArrowCompressionInfo::new(ArrowCompressionType::Zstd,
-1).get_compression_type(),
+ Some(CompressionType::ZSTD)
+ );
+ }
+
+ fn mk_map(pairs: &[(&str, &str)]) -> HashMap<String, String> {
+ pairs
+ .iter()
+ .map(|(k, v)| (k.to_string(), v.to_string()))
+ .collect()
+ }
+}
diff --git a/crates/fluss/src/lib.rs b/crates/fluss/src/compression/mod.rs
similarity index 73%
copy from crates/fluss/src/lib.rs
copy to crates/fluss/src/compression/mod.rs
index 366edfc..2b86dba 100644
--- a/crates/fluss/src/lib.rs
+++ b/crates/fluss/src/compression/mod.rs
@@ -15,24 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-pub mod client;
-pub mod metadata;
-pub mod record;
-pub mod row;
-pub mod rpc;
+mod arrow_compression;
-mod cluster;
-
-pub mod config;
-pub mod error;
-
-pub mod io;
-mod util;
-
-pub type TableId = u64;
-pub type PartitionId = u64;
-pub type BucketId = i32;
-
-pub mod proto {
- include!(concat!(env!("OUT_DIR"), "/proto.rs"));
-}
+pub use arrow_compression::*;
diff --git a/crates/fluss/src/lib.rs b/crates/fluss/src/lib.rs
index 366edfc..25978ce 100644
--- a/crates/fluss/src/lib.rs
+++ b/crates/fluss/src/lib.rs
@@ -26,6 +26,7 @@ mod cluster;
pub mod config;
pub mod error;
+mod compression;
pub mod io;
mod util;
diff --git a/crates/fluss/src/metadata/table.rs
b/crates/fluss/src/metadata/table.rs
index 770c4f2..4f6c04b 100644
--- a/crates/fluss/src/metadata/table.rs
+++ b/crates/fluss/src/metadata/table.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use crate::compression::ArrowCompressionInfo;
use crate::error::Error::InvalidTableError;
use crate::error::{Error, Result};
use crate::metadata::datatype::{DataField, DataType, RowType};
@@ -721,6 +722,10 @@ impl TableConfig {
pub fn from_properties(properties: HashMap<String, String>) -> Self {
TableConfig { properties }
}
+
+ pub fn get_arrow_compression_info(&self) -> Result<ArrowCompressionInfo> {
+ ArrowCompressionInfo::from_conf(&self.properties)
+ }
}
impl TableInfo {
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index 0a803ae..5a5115e 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -16,6 +16,7 @@
// under the License.
use crate::client::{Record, WriteRecord};
+use crate::compression::ArrowCompressionInfo;
use crate::error::Result;
use crate::metadata::DataType;
use crate::record::{ChangeType, ScanRecord};
@@ -47,6 +48,7 @@ use std::{
sync::Arc,
};
+use arrow::ipc::writer::IpcWriteOptions;
/// const for record batch
pub const BASE_OFFSET_LENGTH: usize = 8;
pub const LENGTH_LENGTH: usize = 4;
@@ -104,6 +106,7 @@ pub struct MemoryLogRecordsArrowBuilder {
batch_sequence: i32,
arrow_record_batch_builder: Box<dyn ArrowRecordBatchInnerBuilder>,
is_closed: bool,
+ arrow_compression_info: ArrowCompressionInfo,
}
pub trait ArrowRecordBatchInnerBuilder: Send + Sync {
@@ -244,7 +247,12 @@ impl ArrowRecordBatchInnerBuilder for
RowAppendRecordBatchBuilder {
}
impl MemoryLogRecordsArrowBuilder {
- pub fn new(schema_id: i32, row_type: &DataType, to_append_record_batch:
bool) -> Self {
+ pub fn new(
+ schema_id: i32,
+ row_type: &DataType,
+ to_append_record_batch: bool,
+ arrow_compression_info: ArrowCompressionInfo,
+ ) -> Self {
let arrow_batch_builder: Box<dyn ArrowRecordBatchInnerBuilder> = {
if to_append_record_batch {
Box::new(PrebuiltRecordBatchBuilder::default())
@@ -260,6 +268,7 @@ impl MemoryLogRecordsArrowBuilder {
batch_sequence: NO_BATCH_SEQUENCE,
is_closed: false,
arrow_record_batch_builder: arrow_batch_builder,
+ arrow_compression_info,
}
}
@@ -289,7 +298,15 @@ impl MemoryLogRecordsArrowBuilder {
// serialize arrow batch
let mut arrow_batch_bytes = vec![];
let table_schema = self.arrow_record_batch_builder.schema();
- let mut writer = StreamWriter::try_new(&mut arrow_batch_bytes,
&table_schema)?;
+ let compression_type =
self.arrow_compression_info.get_compression_type();
+ let write_option =
+ IpcWriteOptions::try_with_compression(IpcWriteOptions::default(),
compression_type);
+ let mut writer = StreamWriter::try_new_with_options(
+ &mut arrow_batch_bytes,
+ &table_schema,
+ write_option?,
+ )?;
+
// get header len
let header = writer.get_ref().len();
let record_batch =
self.arrow_record_batch_builder.build_arrow_record_batch()?;
diff --git a/crates/fluss/tests/integration/table.rs
b/crates/fluss/tests/integration/table.rs
index 9eec98e..3f7946e 100644
--- a/crates/fluss/tests/integration/table.rs
+++ b/crates/fluss/tests/integration/table.rs
@@ -103,7 +103,6 @@ mod table_test {
.build()
.expect("Failed to build schema"),
)
- .property("table.log.arrow.compression.type", "NONE")
.build()
.expect("Failed to build table");
diff --git a/crates/fluss/tests/integration/table_remote_scan.rs
b/crates/fluss/tests/integration/table_remote_scan.rs
index bdbced9..43c89b5 100644
--- a/crates/fluss/tests/integration/table_remote_scan.rs
+++ b/crates/fluss/tests/integration/table_remote_scan.rs
@@ -142,7 +142,6 @@ mod table_remote_scan_test {
.build()
.expect("Failed to build schema"),
)
- .property("table.log.arrow.compression.type", "NONE")
.build()
.expect("Failed to build table");