hubcio commented on code in PR #2529:
URL: https://github.com/apache/iggy/pull/2529#discussion_r2653291953
##########
core/sdk/src/clients/consumer.rs:
##########
@@ -980,6 +977,48 @@ impl Stream for IggyConsumer {
message.payload = Bytes::from(payload);
message.header.payload_length =
message.payload.len() as u32;
}
+
+ // maybe decompress
+ if let Ok(Some(algorithm_value)) = message
+
.get_user_header(&HeaderKey::from_str("iggy-compression").unwrap())
+ {
+ let algorithm_name =
algorithm_value.as_str().unwrap();
+ let algorithm =
+
CompressionAlgorithm::from_str(algorithm_name).unwrap();
Review Comment:
implement `TryFromStr` for CompressionAlgorithm, dont unwrap
##########
Cargo.toml:
##########
@@ -142,9 +144,9 @@ integration = { path = "core/integration" }
keyring = { version = "3.6.3", features = ["sync-secret-service", "vendored"] }
lazy_static = "1.5.0"
log = "0.4.29"
+lz4 = "1.28.1"
Review Comment:
maybe use lz4_flex, since its pure rust impl (after you confirm that it
works), we'll avoid linking with other libs and our binary size will be smaller
for musl builds.
or https://github.com/bozaro/lz4-rs
##########
core/common/src/types/compression/compression_algorithm.rs:
##########
@@ -198,4 +394,57 @@ mod tests {
let invalid_compression_kind = CompressionAlgorithm::from_code(255);
assert!(invalid_compression_kind.is_err());
}
+
+ #[test]
+ fn test_emtpy_input_compression_decompression() {
Review Comment:
typo
##########
core/common/src/types/compression/compression_algorithm.rs:
##########
@@ -45,24 +49,110 @@ impl FromStr for CompressionAlgorithm {
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"gzip" => Ok(CompressionAlgorithm::Gzip),
+ "zstd" => Ok(CompressionAlgorithm::Zstd),
+ "lz4" => Ok(CompressionAlgorithm::Lz4),
+ "snappy" => Ok(CompressionAlgorithm::Snappy),
"none" => Ok(CompressionAlgorithm::None),
_ => Err(format!("Unknown compression type: {s}")),
}
}
}
impl CompressionAlgorithm {
+ pub fn compress(&self, data: &[u8]) -> Result<Vec<u8>, IggyError> {
+ match self {
+ CompressionAlgorithm::None => Ok(data.to_vec()),
+ CompressionAlgorithm::Gzip => {
+ let mut compressed_data = Vec::new();
+ let mut encoder = GzEncoder::new(&mut compressed_data,
Compression::default());
+ encoder
+ .write_all(data)
+ .map_err(|e| IggyError::CompressionError(e.to_string()))?;
+ encoder
+ .finish()
+ .map_err(|e| IggyError::CompressionError(e.to_string()))?;
+ Ok(compressed_data)
+ }
+ CompressionAlgorithm::Zstd => {
+ let compressed_data = zstd::encode_all(data, 0)
+ .map_err(|e| IggyError::CompressionError(e.to_string()))?;
+ Ok(compressed_data)
+ }
+ CompressionAlgorithm::Lz4 => {
+ let compressed_data = Vec::new();
+ let mut encoder = EncoderBuilder::new()
+ .level(4)
+ .build(compressed_data)
+ .map_err(|e| IggyError::CompressionError(e.to_string()))?;
+ encoder
+ .write_all(data)
+ .map_err(|e| IggyError::CompressionError(e.to_string()))?;
+ let (compressed_data, result) = encoder.finish();
+ result.map_err(|e|
IggyError::CompressionError(e.to_string()))?;
+ Ok(compressed_data)
+ }
+ CompressionAlgorithm::Snappy => {
+ let compressed_data = snap::raw::Encoder::new()
+ .compress_vec(data)
+ .map_err(|e| IggyError::CompressionError(e.to_string()))?;
+ Ok(compressed_data)
+ }
+ }
+ }
+
+ pub fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, IggyError> {
Review Comment:
please use `MAX_PAYLOAD_SIZE` for maximum payload size, we dont want to
unpack 10GB from couple of KBs :)
##########
core/sdk/src/clients/consumer.rs:
##########
@@ -980,6 +977,48 @@ impl Stream for IggyConsumer {
message.payload = Bytes::from(payload);
message.header.payload_length =
message.payload.len() as u32;
}
+
+ // maybe decompress
+ if let Ok(Some(algorithm_value)) = message
+
.get_user_header(&HeaderKey::from_str("iggy-compression").unwrap())
Review Comment:
`pub const COMPRESSION_HEADER_KEY: &str = "iggy-compression";`
and use it in all places
##########
core/integration/tests/sdk/producer/compression.rs:
##########
@@ -0,0 +1,217 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::sdk::producer::{cleanup, init_system};
+use bytes::Bytes;
+use futures::StreamExt;
+use iggy::prelude::*;
+use iggy::{clients::client::IggyClient, prelude::TcpClient};
+use iggy_common::{ClientCompressionConfig, TcpClientConfig};
+use integration::test_server::{TestServer, login_root};
+use serial_test::parallel;
+use std::sync::Arc;
+use std::time::Duration;
+use test_case::test_matrix;
+use tokio::time::sleep;
+
+const STREAM_NAME: &str = "test-stream";
+const TOPIC_NAME: &str = "test-topic";
+
+fn none() -> CompressionAlgorithm {
+ CompressionAlgorithm::None
+}
+fn gzip() -> CompressionAlgorithm {
+ CompressionAlgorithm::Gzip
+}
+fn zstd() -> CompressionAlgorithm {
+ CompressionAlgorithm::Zstd
+}
+fn lz4() -> CompressionAlgorithm {
+ CompressionAlgorithm::Lz4
+}
+fn snappy() -> CompressionAlgorithm {
+ CompressionAlgorithm::Snappy
+}
+
+fn compressible_payload(size: usize) -> (Bytes, String) {
+ let sample = "Test payload for compression.";
+ let n_reps = size.div_ceil(sample.len());
+ let payload_str: String = sample.repeat(n_reps);
+ let payload_str = payload_str[..size].to_string();
+ (Bytes::from(payload_str.clone()), payload_str)
+}
+
+#[test_matrix([none(), gzip(), zstd(), lz4(), snappy()])]
+#[tokio::test]
+#[parallel]
+async fn compression_send_receive_ok(algorithm: CompressionAlgorithm) {
+ // setup
+ let mut test_server = TestServer::default();
+ test_server.start();
+
+ let tcp_client_config = TcpClientConfig {
+ server_address: test_server.get_raw_tcp_addr().unwrap(),
+ ..TcpClientConfig::default()
+ };
+ let client =
ClientWrapper::Tcp(TcpClient::create(Arc::new(tcp_client_config)).unwrap());
+ let client = IggyClient::create(client, None, None);
+
+ client.connect().await.unwrap();
+ assert!(client.ping().await.is_ok(), "Failed to ping server");
+
+ login_root(&client).await;
+ init_system(&client).await;
+
+ client.connect().await.unwrap();
Review Comment:
no need for second connect
##########
core/sdk/src/clients/consumer.rs:
##########
@@ -980,6 +977,48 @@ impl Stream for IggyConsumer {
message.payload = Bytes::from(payload);
message.header.payload_length =
message.payload.len() as u32;
}
+
+ // maybe decompress
+ if let Ok(Some(algorithm_value)) = message
Review Comment:
perhaps put this part into `maybe_compress`? i'm not insisting, i'd like to
hear you opinion about it,
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]