This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 4ce374a  chore: fix bug where column prune won't work when compression 
is enabled (#108)
4ce374a is described below

commit 4ce374a884084438491ec30f59441e965ad0b9ad
Author: Keith Lee <[email protected]>
AuthorDate: Mon Dec 22 09:43:43 2025 +0000

    chore: fix bug where column prune won't work when compression is enabled 
(#108)
---
 bindings/cpp/examples/example.cpp                  |   1 -
 crates/fluss/src/client/write/accumulator.rs       |   2 +
 crates/fluss/src/client/write/batch.rs             |   4 +
 crates/fluss/src/compression/arrow_compression.rs  | 245 +++++++++++++++++++++
 crates/fluss/src/{lib.rs => compression/mod.rs}    |  22 +-
 crates/fluss/src/lib.rs                            |   1 +
 crates/fluss/src/metadata/table.rs                 |   5 +
 crates/fluss/src/record/arrow.rs                   |  21 +-
 crates/fluss/tests/integration/table.rs            |   1 -
 .../fluss/tests/integration/table_remote_scan.rs   |   1 -
 10 files changed, 278 insertions(+), 25 deletions(-)

diff --git a/bindings/cpp/examples/example.cpp 
b/bindings/cpp/examples/example.cpp
index 04f9ac6..6ff2b9b 100644
--- a/bindings/cpp/examples/example.cpp
+++ b/bindings/cpp/examples/example.cpp
@@ -61,7 +61,6 @@ int main() {
     auto descriptor = fluss::TableDescriptor::NewBuilder()
                           .SetSchema(schema)
                           .SetBucketCount(3)
-                          .SetProperty("table.log.arrow.compression.type", 
"NONE")
                           .SetComment("cpp example table with 3 buckets")
                           .Build();
 
diff --git a/crates/fluss/src/client/write/accumulator.rs 
b/crates/fluss/src/client/write/accumulator.rs
index e4ca957..215adbe 100644
--- a/crates/fluss/src/client/write/accumulator.rs
+++ b/crates/fluss/src/client/write/accumulator.rs
@@ -94,6 +94,7 @@ impl RecordAccumulator {
 
         let table_path = &record.table_path;
         let table_info = cluster.get_table(table_path);
+        let arrow_compression_info = 
table_info.get_table_config().get_arrow_compression_info()?;
         let row_type = &cluster.get_table(table_path).row_type;
 
         let schema_id = table_info.schema_id;
@@ -102,6 +103,7 @@ impl RecordAccumulator {
             self.batch_id.fetch_add(1, Ordering::Relaxed),
             table_path.as_ref().clone(),
             schema_id,
+            arrow_compression_info,
             row_type,
             bucket_id,
             current_time_ms(),
diff --git a/crates/fluss/src/client/write/batch.rs 
b/crates/fluss/src/client/write/batch.rs
index 13b3d36..ba04db4 100644
--- a/crates/fluss/src/client/write/batch.rs
+++ b/crates/fluss/src/client/write/batch.rs
@@ -18,6 +18,7 @@
 use crate::BucketId;
 use crate::client::broadcast::{BatchWriteResult, BroadcastOnce};
 use crate::client::{ResultHandle, WriteRecord};
+use crate::compression::ArrowCompressionInfo;
 use crate::error::Result;
 use crate::metadata::{DataType, TablePath};
 use crate::record::MemoryLogRecordsArrowBuilder;
@@ -132,10 +133,12 @@ pub struct ArrowLogWriteBatch {
 }
 
 impl ArrowLogWriteBatch {
+    #[allow(clippy::too_many_arguments)]
     pub fn new(
         batch_id: i64,
         table_path: TablePath,
         schema_id: i32,
+        arrow_compression_info: ArrowCompressionInfo,
         row_type: &DataType,
         bucket_id: BucketId,
         create_ms: i64,
@@ -148,6 +151,7 @@ impl ArrowLogWriteBatch {
                 schema_id,
                 row_type,
                 to_append_record_batch,
+                arrow_compression_info,
             ),
         }
     }
diff --git a/crates/fluss/src/compression/arrow_compression.rs 
b/crates/fluss/src/compression/arrow_compression.rs
new file mode 100644
index 0000000..32dfadb
--- /dev/null
+++ b/crates/fluss/src/compression/arrow_compression.rs
@@ -0,0 +1,245 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::{Error, Result};
+use arrow::ipc::CompressionType;
+use std::collections::HashMap;
+
+pub const TABLE_LOG_ARROW_COMPRESSION_ZSTD_LEVEL: &str = 
"table.log.arrow.compression.zstd.level";
+pub const TABLE_LOG_ARROW_COMPRESSION_TYPE: &str = 
"table.log.arrow.compression.type";
+pub const DEFAULT_NON_ZSTD_COMPRESSION_LEVEL: i32 = -1;
+pub const DEFAULT_ZSTD_COMPRESSION_LEVEL: i32 = 3;
+
+#[derive(Clone, Debug, PartialEq)]
+pub enum ArrowCompressionType {
+    None,
+    Lz4Frame,
+    Zstd,
+}
+
+impl ArrowCompressionType {
+    fn from_conf(properties: &HashMap<String, String>) -> Result<Self> {
+        match properties
+            .get(TABLE_LOG_ARROW_COMPRESSION_TYPE)
+            .map(|s| s.as_str())
+        {
+            Some("NONE") => Ok(Self::None),
+            Some("LZ4_FRAME") => Ok(Self::Lz4Frame),
+            Some("ZSTD") => Ok(Self::Zstd),
+            Some(other) => Err(Error::IllegalArgument {
+                message: format!("Unsupported compression type: {other}"),
+            }),
+            None => Ok(Self::Zstd),
+        }
+    }
+}
+
+#[derive(Clone, Debug)]
+pub struct ArrowCompressionInfo {
+    pub compression_type: ArrowCompressionType,
+    pub compression_level: i32,
+}
+
+impl ArrowCompressionInfo {
+    pub fn from_conf(properties: &HashMap<String, String>) -> Result<Self> {
+        let compression_type = ArrowCompressionType::from_conf(properties)?;
+
+        if compression_type != ArrowCompressionType::Zstd {
+            return Ok(Self {
+                compression_type,
+                compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
+            });
+        }
+
+        match properties
+            .get(TABLE_LOG_ARROW_COMPRESSION_ZSTD_LEVEL)
+            .map(|s| s.as_str().parse::<i32>())
+        {
+            Some(Ok(level)) if !(1..=22).contains(&level) => 
Err(Error::IllegalArgument {
+                message: format!(
+                    "Invalid ZSTD compression level: {}. Expected a value 
between 1 and 22.",
+                    level
+                ),
+            }),
+            Some(Err(e)) => Err(Error::IllegalArgument {
+                message: format!(
+                    "Invalid ZSTD compression level. Expected a value between 
1 and 22. {}",
+                    e
+                ),
+            }),
+
+            Some(Ok(level)) => Ok(Self {
+                compression_type,
+                compression_level: level,
+            }),
+            None => Ok(Self {
+                compression_type,
+                compression_level: DEFAULT_ZSTD_COMPRESSION_LEVEL,
+            }),
+        }
+    }
+
+    #[cfg(test)]
+    fn new(compression_type: ArrowCompressionType, compression_level: i32) -> 
ArrowCompressionInfo {
+        Self {
+            compression_type,
+            compression_level,
+        }
+    }
+
+    pub fn get_compression_type(&self) -> Option<CompressionType> {
+        match self.compression_type {
+            ArrowCompressionType::Zstd => Some(CompressionType::ZSTD),
+            ArrowCompressionType::Lz4Frame => Some(CompressionType::LZ4_FRAME),
+            ArrowCompressionType::None => None,
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use std::collections::HashMap;
+
+    #[test]
+    fn test_from_conf() {
+        assert_eq!(
+            ArrowCompressionType::from_conf(&HashMap::new()).unwrap(),
+            ArrowCompressionType::Zstd
+        );
+
+        assert_eq!(
+            ArrowCompressionType::from_conf(&mk_map(&[(
+                "table.log.arrow.compression.type",
+                "NONE"
+            )]))
+            .unwrap(),
+            ArrowCompressionType::None
+        );
+
+        assert_eq!(
+            ArrowCompressionType::from_conf(&mk_map(&[(
+                "table.log.arrow.compression.type",
+                "LZ4_FRAME"
+            )]))
+            .unwrap(),
+            ArrowCompressionType::Lz4Frame
+        );
+
+        assert_eq!(
+            ArrowCompressionType::from_conf(&mk_map(&[(
+                "table.log.arrow.compression.type",
+                "ZSTD"
+            )]))
+            .unwrap(),
+            ArrowCompressionType::Zstd
+        );
+    }
+
+    #[test]
+    fn test_from_conf_invalid_compression_type() {
+        let props = mk_map(&[("table.log.arrow.compression.type", "FOO")]);
+
+        assert!(
+            ArrowCompressionInfo::from_conf(&props)
+                .unwrap_err()
+                .to_string()
+                .contains(
+                    "Fluss hitting illegal argument error Unsupported 
compression type: FOO."
+                )
+        );
+    }
+
+    #[test]
+    fn test_from_conf_zstd_compression_level() {
+        let compression_info = ArrowCompressionInfo::from_conf(&mk_map(&[(
+            "table.log.arrow.compression.type",
+            "ZSTD",
+        )]));
+        assert_eq!(compression_info.unwrap().compression_level, 3);
+        let compression_info = ArrowCompressionInfo::from_conf(&mk_map(&[
+            ("table.log.arrow.compression.type", "ZSTD"),
+            ("table.log.arrow.compression.zstd.level", "1"),
+        ]));
+        assert_eq!(compression_info.unwrap().compression_level, 1);
+    }
+
+    #[test]
+    fn test_from_conf_compression_level_out_of_range() {
+        let props = mk_map(&[
+            ("table.log.arrow.compression.type", "ZSTD"),
+            ("table.log.arrow.compression.zstd.level", "0"),
+        ]);
+
+        assert!(
+            ArrowCompressionInfo::from_conf(&props)
+                .unwrap_err()
+                .to_string()
+                .contains("Expected a value between 1 and 22.")
+        );
+
+        let props = mk_map(&[
+            ("table.log.arrow.compression.type", "ZSTD"),
+            ("table.log.arrow.compression.zstd.level", "23"),
+        ]);
+
+        assert!(
+            ArrowCompressionInfo::from_conf(&props)
+                .unwrap_err()
+                .to_string()
+                .contains("Expected a value between 1 and 22.")
+        );
+    }
+
+    #[test]
+    fn test_from_conf_compression_level_parse_error() {
+        let props = mk_map(&[
+            ("table.log.arrow.compression.type", "ZSTD"),
+            ("table.log.arrow.compression.zstd.level", "not-a-number"),
+        ]);
+
+        assert!(
+            ArrowCompressionInfo::from_conf(&props)
+                .unwrap_err()
+                .to_string()
+                .contains("Expected a value between 1 and 22.")
+        );
+    }
+
+    #[test]
+    fn get_compression_type_maps_correctly() {
+        assert_eq!(
+            ArrowCompressionInfo::new(ArrowCompressionType::None, 
-1).get_compression_type(),
+            None
+        );
+        assert_eq!(
+            ArrowCompressionInfo::new(ArrowCompressionType::Lz4Frame, 
-1).get_compression_type(),
+            Some(CompressionType::LZ4_FRAME)
+        );
+        assert_eq!(
+            ArrowCompressionInfo::new(ArrowCompressionType::Zstd, 
-1).get_compression_type(),
+            Some(CompressionType::ZSTD)
+        );
+    }
+
+    fn mk_map(pairs: &[(&str, &str)]) -> HashMap<String, String> {
+        pairs
+            .iter()
+            .map(|(k, v)| (k.to_string(), v.to_string()))
+            .collect()
+    }
+}
diff --git a/crates/fluss/src/lib.rs b/crates/fluss/src/compression/mod.rs
similarity index 73%
copy from crates/fluss/src/lib.rs
copy to crates/fluss/src/compression/mod.rs
index 366edfc..2b86dba 100644
--- a/crates/fluss/src/lib.rs
+++ b/crates/fluss/src/compression/mod.rs
@@ -15,24 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-pub mod client;
-pub mod metadata;
-pub mod record;
-pub mod row;
-pub mod rpc;
+mod arrow_compression;
 
-mod cluster;
-
-pub mod config;
-pub mod error;
-
-pub mod io;
-mod util;
-
-pub type TableId = u64;
-pub type PartitionId = u64;
-pub type BucketId = i32;
-
-pub mod proto {
-    include!(concat!(env!("OUT_DIR"), "/proto.rs"));
-}
+pub use arrow_compression::*;
diff --git a/crates/fluss/src/lib.rs b/crates/fluss/src/lib.rs
index 366edfc..25978ce 100644
--- a/crates/fluss/src/lib.rs
+++ b/crates/fluss/src/lib.rs
@@ -26,6 +26,7 @@ mod cluster;
 pub mod config;
 pub mod error;
 
+mod compression;
 pub mod io;
 mod util;
 
diff --git a/crates/fluss/src/metadata/table.rs 
b/crates/fluss/src/metadata/table.rs
index 770c4f2..4f6c04b 100644
--- a/crates/fluss/src/metadata/table.rs
+++ b/crates/fluss/src/metadata/table.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::compression::ArrowCompressionInfo;
 use crate::error::Error::InvalidTableError;
 use crate::error::{Error, Result};
 use crate::metadata::datatype::{DataField, DataType, RowType};
@@ -721,6 +722,10 @@ impl TableConfig {
     pub fn from_properties(properties: HashMap<String, String>) -> Self {
         TableConfig { properties }
     }
+
+    pub fn get_arrow_compression_info(&self) -> Result<ArrowCompressionInfo> {
+        ArrowCompressionInfo::from_conf(&self.properties)
+    }
 }
 
 impl TableInfo {
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index 0a803ae..5a5115e 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use crate::client::{Record, WriteRecord};
+use crate::compression::ArrowCompressionInfo;
 use crate::error::Result;
 use crate::metadata::DataType;
 use crate::record::{ChangeType, ScanRecord};
@@ -47,6 +48,7 @@ use std::{
     sync::Arc,
 };
 
+use arrow::ipc::writer::IpcWriteOptions;
 /// const for record batch
 pub const BASE_OFFSET_LENGTH: usize = 8;
 pub const LENGTH_LENGTH: usize = 4;
@@ -104,6 +106,7 @@ pub struct MemoryLogRecordsArrowBuilder {
     batch_sequence: i32,
     arrow_record_batch_builder: Box<dyn ArrowRecordBatchInnerBuilder>,
     is_closed: bool,
+    arrow_compression_info: ArrowCompressionInfo,
 }
 
 pub trait ArrowRecordBatchInnerBuilder: Send + Sync {
@@ -244,7 +247,12 @@ impl ArrowRecordBatchInnerBuilder for 
RowAppendRecordBatchBuilder {
 }
 
 impl MemoryLogRecordsArrowBuilder {
-    pub fn new(schema_id: i32, row_type: &DataType, to_append_record_batch: 
bool) -> Self {
+    pub fn new(
+        schema_id: i32,
+        row_type: &DataType,
+        to_append_record_batch: bool,
+        arrow_compression_info: ArrowCompressionInfo,
+    ) -> Self {
         let arrow_batch_builder: Box<dyn ArrowRecordBatchInnerBuilder> = {
             if to_append_record_batch {
                 Box::new(PrebuiltRecordBatchBuilder::default())
@@ -260,6 +268,7 @@ impl MemoryLogRecordsArrowBuilder {
             batch_sequence: NO_BATCH_SEQUENCE,
             is_closed: false,
             arrow_record_batch_builder: arrow_batch_builder,
+            arrow_compression_info,
         }
     }
 
@@ -289,7 +298,15 @@ impl MemoryLogRecordsArrowBuilder {
         // serialize arrow batch
         let mut arrow_batch_bytes = vec![];
         let table_schema = self.arrow_record_batch_builder.schema();
-        let mut writer = StreamWriter::try_new(&mut arrow_batch_bytes, 
&table_schema)?;
+        let compression_type = 
self.arrow_compression_info.get_compression_type();
+        let write_option =
+            IpcWriteOptions::try_with_compression(IpcWriteOptions::default(), 
compression_type);
+        let mut writer = StreamWriter::try_new_with_options(
+            &mut arrow_batch_bytes,
+            &table_schema,
+            write_option?,
+        )?;
+
         // get header len
         let header = writer.get_ref().len();
         let record_batch = 
self.arrow_record_batch_builder.build_arrow_record_batch()?;
diff --git a/crates/fluss/tests/integration/table.rs 
b/crates/fluss/tests/integration/table.rs
index 9eec98e..3f7946e 100644
--- a/crates/fluss/tests/integration/table.rs
+++ b/crates/fluss/tests/integration/table.rs
@@ -103,7 +103,6 @@ mod table_test {
                     .build()
                     .expect("Failed to build schema"),
             )
-            .property("table.log.arrow.compression.type", "NONE")
             .build()
             .expect("Failed to build table");
 
diff --git a/crates/fluss/tests/integration/table_remote_scan.rs 
b/crates/fluss/tests/integration/table_remote_scan.rs
index bdbced9..43c89b5 100644
--- a/crates/fluss/tests/integration/table_remote_scan.rs
+++ b/crates/fluss/tests/integration/table_remote_scan.rs
@@ -142,7 +142,6 @@ mod table_remote_scan_test {
                     .build()
                     .expect("Failed to build schema"),
             )
-            .property("table.log.arrow.compression.type", "NONE")
             .build()
             .expect("Failed to build table");
 

Reply via email to