This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f6d3fed9 chore(sdk): Adds Rust example for message compression using 
user_headers (#2613)
7f6d3fed9 is described below

commit 7f6d3fed93de7647da6bb0c417e14b917662a10b
Author: haubur <[email protected]>
AuthorDate: Wed Jan 28 07:45:22 2026 +0100

    chore(sdk): Adds Rust example for message compression using user_headers 
(#2613)
    
    Closes #30
    
    - Consolidates exisiting example `message-headers` into 
`message-headers/message-type`
    - Adds new example to `message-headers/message-compression`
    - Adds a producing example 
`message-headers/message-compression/producer/main.rs`
    - Adds a corresponding consuming 
example`message-headers/message-compression/consumer/main.rs`
    - Adds `codec.rs` to `examples/rust/src/shared` that implements shared 
utilities to compress and decompress messages along with methods to 
transparently resolve user_headers into a codec.
    - Adds README.md with some more details, to keep code examples readable.
---
 Cargo.lock                                         |   1 +
 examples/rust/Cargo.toml                           |  17 +-
 examples/rust/README.md                            |  11 +-
 .../message-headers/message-compression/README.md  | 282 +++++++++++++++++++++
 .../message-compression/consumer/main.rs           |  89 +++++++
 .../message-compression/producer/main.rs           | 101 ++++++++
 .../{ => message-type}/consumer/main.rs            |   0
 .../{ => message-type}/producer/main.rs            |   0
 examples/rust/src/shared/codec.rs                  | 112 ++++++++
 examples/rust/src/shared/mod.rs                    |   1 +
 10 files changed, 608 insertions(+), 6 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index a19aa1bcd..f7eeda3fb 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4859,6 +4859,7 @@ dependencies = [
  "clap",
  "futures-util",
  "iggy",
+ "lz4_flex",
  "rand 0.9.2",
  "serde",
  "serde_json",
diff --git a/examples/rust/Cargo.toml b/examples/rust/Cargo.toml
index 30992fb10..6fd30e6b5 100644
--- a/examples/rust/Cargo.toml
+++ b/examples/rust/Cargo.toml
@@ -29,6 +29,7 @@ chrono = { workspace = true }
 clap = { workspace = true }
 futures-util = { workspace = true }
 iggy = { workspace = true }
+lz4_flex = "0.12.0"
 rand = { workspace = true }
 serde = { workspace = true }
 serde_json = { workspace = true }
@@ -62,12 +63,20 @@ name = "message-envelope-producer"
 path = "src/message-envelope/producer/main.rs"
 
 [[example]]
-name = "message-headers-consumer"
-path = "src/message-headers/consumer/main.rs"
+name = "message-headers-type-consumer"
+path = "src/message-headers/message-type/consumer/main.rs"
 
 [[example]]
-name = "message-headers-producer"
-path = "src/message-headers/producer/main.rs"
+name = "message-headers-type-producer"
+path = "src/message-headers/message-type/producer/main.rs"
+
+[[example]]
+name = "message-headers-compression-consumer"
+path = "src/message-headers/message-compression/consumer/main.rs"
+
+[[example]]
+name = "message-headers-compression-producer"
+path = "src/message-headers/message-compression/producer/main.rs"
 
 [[example]]
 name = "multi-tenant-consumer"
diff --git a/examples/rust/README.md b/examples/rust/README.md
index b1de83d06..0f55671cc 100644
--- a/examples/rust/README.md
+++ b/examples/rust/README.md
@@ -97,8 +97,15 @@ Demonstrates fundamental client connection, authentication, 
batch message sendin
 Shows metadata management using custom headers:
 
 ```bash
-cargo run --example message-headers-producer
-cargo run --example message-headers-consumer
+cargo run --example message-headers-type-producer
+cargo run --example message-headers-type-consumer
+```
+
+Shows how user headers can be used for message compression in transit:
+
+```bash
+cargo run --example message-headers-compression-producer
+cargo run --example message-headers-compression-consumer
 ```
 
 Demonstrates using HeaderKey/HeaderValue for message metadata instead of 
payload-based typing, with header-based message routing.
diff --git a/examples/rust/src/message-headers/message-compression/README.md 
b/examples/rust/src/message-headers/message-compression/README.md
new file mode 100644
index 000000000..d1d35257c
--- /dev/null
+++ b/examples/rust/src/message-headers/message-compression/README.md
@@ -0,0 +1,282 @@
+# An Example on Message Compression in Transit
+
+This example illustrates how (with some minor additional implementation) the 
Iggy SDK can be used to compress and decompress messages in transit.
+
+## Running the Example
+
+Details on how to run the examples for the Rust Iggy SDK can be found in the 
parent folder 
[README.md](https://github.com/apache/iggy/tree/master/examples/rust#readme).
+
+Run the following commands
+
+1. Start the server
+
+    ```bash
+    cargo run --bin iggy-server -- --with-default-root-credentials
+    ```
+
+    **NOTE**: In case the server was running before, make sure to run `rm -rf 
local_data/` to delete server state data from prior runs.
+
+2. Run the producer to write compressed messages to the server
+
+    ```bash
+    cargo run --example message-headers-compression-producer
+    ```
+
+3. Run the consumer to read and decompress messages from the server
+
+    ```bash
+    cargo run --example message-headers-compression-consumer
+    ```
+
+## The Codec
+
+The **co**mpression and **dec**compression utilities are implemented in 
`examples/rust/src/shared/codec.rs` and used when sending messages to the 
server and reading them from the server.
+
+First, define a stream and a topic name.
+The producer will first initiate the stream and the topic on that stream and 
then write the example messages to that topic within that stream.
+The consumer will use the names as identifier to read messages from that topic 
on that stream.
+
+```rust
+pub const STREAM_NAME: &str = "compression-stream";
+pub const TOPIC_NAME: &str = "compression-topic";
+```
+
+Additionally, set a constant that defines the number of messages to be send to 
the server via the producer.
+
+```rust
+pub const NUM_MESSAGES: u32 = 1000;
+```
+
+### Spotlight: IggyMessage's
+
+In order to add functionality to compress and decompress messages during 
transit, we need to know what a message actually is and how it is implemented.
+Iggy implements two important types, that we need to know.
+
+* 
[IggyMessage](https://github.com/apache/iggy/blob/e46f294b7af4f86b0d7e26d984205a019a8885f8/core/common/src/types/message/iggy_message.rs#L108)
+* 
[ReceivedMessage](https://github.com/apache/iggy/blob/b26246252502ba6f5d6cad2895e7c468d9f959e4/core/sdk/src/clients/consumer.rs#L905)
+
+A message send to the server needs to be of type `IggyMessage`.
+
+```Rust
+pub struct IggyMessage {
+    /// Message metadata
+    pub header: IggyMessageHeader,
+    /// Message content
+    pub payload: Bytes,
+    /// Optional user-defined headers
+    pub user_headers: Option<Bytes>,
+}
+```
+
+The important bits in context of this example are the *payload* and the 
*user_headers*.
+Payload is of type Bytes and corresponds to the actual message that we want to 
send to the server.
+
+Let's suppose our example is an abstraction over a real world scenario, where 
some application sends it's application logs to the iggy-server. This 
application is therefore the producer.
+We also have a monitoring service, that inspects the logs of our application 
to check for any service disruptions. So this monitoring service needs to read 
the logs from the iggy-server and is therefore the consumer.
+
+Further suppose, the application logs are quite large and repetitive, since 
they follow a structured pattern (as logs usually do).
+It may be a good idea to reduce bandwidth by trading of some idle CPU time to 
compress the logs before sending them to the server.
+We go straight ahead, implement some compression functionalities and send the 
compressed messages to the server.
+If the monitoring service now consumes these messages we have a problem. The 
logs are still compressed.
+Even if we know that the messages are compressed we do not know how to 
decompress them because the algorithm that was used for compression is unknown.
+
+This is where `user_headers` become handy.
+The definition above tells us, that user_headers are (optional) Bytes. But 
thats because finally everything is serialized before sending to the server.
+Looking at the implementation of `IggyMessage` we see that user_headers are a 
serialized `HashMap` with Iggy specific types `HeaderKey` and `HeaderValue`.
+So the user_headers are basically a set of metadata defined by us, the user, 
using a key and a value.
+
+Thus, for the compression scenario the user_headers can be used to signal to a 
consumer that a message was compressed before it was sent to the server.
+The key to highlight message compression in this example is defined as:
+
+```rust
+pub const COMPRESSION_HEADER_KEY: &str = "iggy-compression";
+```
+
+By reading the user_headers and finding the "iggy-compression" key a consumer 
now knows, that the message was compressed. But it's still not transparent how 
it can be decompressed.
+We can use the `HeaderValue` to store the information on how to decompress the 
message, e.g. using a **Codec**.
+
+----
+
+So the HeaderKey is used to indicate that a message was compressed before 
transit and the HeaderValue indicates how it was compressed.
+Using this idea we can implement a Codec that is shared between the consumer 
and producer.
+
+The Codec is an enum listing all the available algorithms to compress and 
decompress.
+
+```rust
+pub enum Codec {
+    None,
+    Lz4,
+}
+```
+
+Further, we implement three methods
+
+* `header_key`: Returns the HeaderKey that defines if a message was compressed 
or not.
+The consumer uses it to look for the "iggy-compression" HeaderKey when 
inspecting the user_headers of a message.
+* `to_header_value`: Generates a HeaderValue from the specific Codec instance.
+* `from_header_value`: Resolves a HeaderValue into a Codec.
+This is used in the consumer. After the "iggy-compression" HeaderKey was found 
in the user_headers we can obtain the HeaderValue, from which
+we obtain the Codec type using this method and thereby gain access to the 
decompress method.
+
+```rust
+impl Codec {
+    pub fn header_key() -> HeaderKey {
+        HeaderKey::new(COMPRESSION_HEADER_KEY)
+            .expect("COMPRESSION_HEADER_KEY is an InvalidHeaderKey.")
+    }
+
+    pub fn to_header_value(&self) -> HeaderValue {
+        HeaderValue::from_str(&self.to_string()).expect("failed generating 
HeaderValue.")
+    }
+
+    pub fn from_header_value(value: &HeaderValue) -> Self {
+        let name = value
+            .as_str()
+            .expect("could not convert HeaderValue into str.");
+        Self::from_str(name).expect("compression algorithm not available.")
+    }
+```
+
+The other two methods implement the compression and decompression logic, which 
is specifc to the actual Codec instance, dependent on the enum's variant.
+The example Codec implements two. *None*, where data is not compressed and 
*Lz4* (using the lz4_flex crate).
+Note, that this can be easily extended to more algorithms.
+It might be reasonable to limit the number of bytes that can be decompressed 
to avoid large memory footprints, or even crashing the consumer.
+The `decompress` method, therefore takes one more byte as defined by the 
`MAX_PAYLOAD_SIZE` which is 
[64MB](https://github.com/apache/iggy/blob/05243138255349a78bd1e086a0d7fb264682f980/core/common/src/types/message/iggy_message.rs#L46).
+If the decoder read the full `MAX_PAYLOAD_SIZE` + 1 bytes, the payload exceeds 
the limit and the program panics.
+Note, that only the Lz4 branch in the match statement applies this logic.
+This is safe, because an `IggyMessage` ensures that the payload does not 
exceed `MAX_PAYLOAD_SIZE`, when using the builder.
+A compressed message that meets the limit of `MAX_PAYLOAD_SIZE`, however, can 
decompress into much more bytes.
+In a productive environment panics would be replaced with informative errors 
that can be properly handled.
+You would most likely want to continue reading messages from the server, even 
if one of them exceeds the limit.
+
+```rust
+impl Codec {
+    pub fn compress(&self, data: &[u8]) -> Vec<u8> {
+        match self {
+            Codec::None => Ok(data.to_vec()),
+            Codec::Lz4 => {
+                let mut compressed_data = Vec::new();
+                let mut encoder = FrameEncoder::new(&mut compressed_data);
+                encoder
+                    .write_all(data)
+                    .expect("Cannot write into buffer using Lz4 compression.");
+                encoder.finish().expect("Cannot finish Lz4 compression.");
+                Ok(compressed_data)
+            }
+        }
+    }
+
+    pub fn decompress(&self, data: &[u8]) -> Vec<u8> {
+        match self {
+            Codec::None => data.to_vec(),
+            Codec::Lz4 => {
+                let decoder = FrameDecoder::new(data);
+                let mut decompressed_data = Vec::new();
+                let bytes_read = decoder
+                    .take(MAX_PAYLOAD_SIZE as u64 + 1)
+                    .read_to_end(&mut decompressed_data)
+                    .expect("Cannot decode payload using Lz4.");
+
+                if bytes_read > MAX_PAYLOAD_SIZE as usize {
+                    panic!("Decompressed message exceeds MAX_PAYLOAD_SIZE!")
+                }
+                decompressed_data
+            }
+        }
+    }
+}
+```
+
+## The producer
+
+The example `/producer/main.rs` sets up a basic client that connects via TCP 
to a running iggy-server.
+Since the plain server on start-up does not have any, it creates a stream and 
a topic to which it writes the compressed messages.
+
+This is how the Codec described above is used to setup the user_headers entry 
to signal message compression.
+
+```rust
+let codec = Codec::Lz4;
+let key = Codec::header_key();
+let value = codec.to_header_value();
+let compression_headers = HashMap::from([(key, value)]);
+```
+
+The builder interface for the IggyMessage type is used to construct a message.
+Using the `user_headers` method sets the user_headers of that `IggyMessage`.
+Note, that this method sets or overwrites the user_headers of that message 
with the provided HashMap.
+Extending an existing header would require using the `user_headers_map` method 
and appending the returned HashMap.
+
+```rust
+let msg = IggyMessage::builder()
+    .payload(compressed_bytes)
+    .user_headers(compression_headers.clone())
+    .build()
+    .expect("IggyMessage should be buildable.");
+```
+
+Once all messages are generated they are send to the server.
+
+## The consumer
+
+The example `/consumer/main.rs` requires, that 
`../message-compression/producer/main.rs` was executed before.
+It sets up a client that connects via TCP to a running iggy-server and reads 
messages from the same stream and topic that
+was used by the producer.
+
+The core piece of the consumer is the while loop which awaits messages from 
the stream and topic.
+Note, the example terminates once the `NUM_MESSAGES` compressed messages were 
consumed.
+
+```rust
+while let Some(message) = consumer.next().await {
+    // message handling
+}
+```
+
+Within that loop we make use of the handle_payload_compression method.
+Every `ReceivedMessage` is processed by that method.
+A `ReceivedMessage` is a type that has an `IggyMessage` and two additional 
fields (which do not concern us here).
+
+```rust
+pub struct ReceivedMessage {
+    pub message: IggyMessage,
+    pub current_offset: u64,
+    pub partition_id: u32,
+}
+
+```
+
+The method decompresses the message payload.
+It first checks if the "iggy-compression" key is present in the `user_headers` 
of the `IggyMessage` that would indicate that the message is compressed.
+If that is not the case, the function returns `Ok(())` right away. In that 
case there is nothing to do.
+For the case where the "iggy-compression" key is present in the user_headers, 
a Codec is setup
+from the algorithm that was used to compress the message. Note, that 
.get_user_header takes the "iggy-compression" key and if it is present
+returns the HeaderValue, which is the algorithm as we have defined above.
+So at that point, codec is Codec::Lz4.
+The next step then is to decompress the payload and update the payload length 
attribute of the `IggyMessage` metadata since it changed.
+In a next and final step we update the user_headers. Since the message is now 
decompressed, the user_headers entry that signals compression should be removed.
+If the compression key-value pair was the only user header the user_headers 
are set to None, otherwise we just remove the compression key-value pair from 
the HashMap.
+
+```rust
+fn handle_payload_compression(msg: &mut ReceivedMessage) -> Result<(), 
IggyError> {
+    if let Ok(Some(algorithm)) = 
msg.message.get_user_header(&Codec::header_key()) {
+        let codec = Codec::from_header_value(&algorithm);
+
+        let decompressed_payload = codec.decompress(&msg.message.payload)?;
+        msg.message.payload = Bytes::from(decompressed_payload);
+        msg.message.header.payload_length = msg.message.payload.len() as u32;
+
+        if let Ok(Some(mut headers_map)) = msg.message.user_headers_map() {
+            headers_map.remove(&Codec::header_key());
+            let headers_bytes = headers_map.to_bytes();
+            msg.message.header.user_headers_length = headers_bytes.len() as 
u32;
+            msg.message.user_headers = if headers_map.is_empty() {
+                None
+            } else {
+                Some(headers_bytes)
+            };
+        }
+    }
+    Ok(())
+}
+```
+
+When executing the program, the consumed and decompressed messages will be 
printed to console.
diff --git 
a/examples/rust/src/message-headers/message-compression/consumer/main.rs 
b/examples/rust/src/message-headers/message-compression/consumer/main.rs
new file mode 100644
index 000000000..32104d754
--- /dev/null
+++ b/examples/rust/src/message-headers/message-compression/consumer/main.rs
@@ -0,0 +1,89 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use bytes::Bytes;
+use futures_util::stream::StreamExt;
+use iggy::prelude::*;
+// The compression and decompression utilities are shared between the producer 
and consumer compression examples.
+// Hence, we import them here.
+use iggy_examples::shared::codec::{Codec, NUM_MESSAGES, STREAM_NAME, 
TOPIC_NAME};
+
+pub const CONSUMER_NAME: &str = "example-consumer";
+
+#[tokio::main]
+async fn main() -> Result<(), IggyError> {
+    // Setup a client to connect to the iggy-server via TCP.
+    let client = IggyClientBuilder::new().with_tcp().build()?;
+    client.connect().await?;
+
+    // Login using default credentials.
+    client
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await?;
+
+    // Configure a consumer, to read the compressed messages that were 
previously written to the server (using ../producer/main.rs).
+    let mut consumer = client
+        .consumer(CONSUMER_NAME, STREAM_NAME, TOPIC_NAME, 0)?
+        .build();
+    consumer.init().await?;
+
+    // Prevent the consumer from indefinitely awaiting new messages.
+    // Since the producer wrote 1000 messages to the server, break if all of 
them were read (consumed_messages == NUM_MESSAGES).
+    let mut consumed_messages = 0;
+    while let Some(message) = consumer.next().await {
+        let mut received_message = message.expect("Message was not received 
from server.");
+        handle_payload_compression(&mut received_message)?;
+        consumed_messages += 1;
+        println!(
+            "Message payload was decompressed and reads: {:?}",
+            received_message.message.payload
+        );
+        if consumed_messages == NUM_MESSAGES {
+            return Ok(());
+        }
+    }
+
+    Ok(())
+}
+
+// A helper function to decompress a ReceivedMessage's payload.
+fn handle_payload_compression(msg: &mut ReceivedMessage) -> Result<(), 
IggyError> {
+    // Check if the message payload is compressed by inspecting the 
user-header.
+    if let Ok(Some(algorithm)) = 
msg.message.get_user_header(&Codec::header_key()) {
+        // setup the codec with the compression algorithm defined in the 
user-header (value)
+        let codec = Codec::from_header_value(&algorithm);
+
+        // decompress the payload and update the payload length
+        let decompressed_payload = codec.decompress(&msg.message.payload);
+        msg.message.payload = Bytes::from(decompressed_payload);
+        msg.message.header.payload_length = msg.message.payload.len() as u32;
+
+        // remove the compression header since payload is now decompressed
+        if let Ok(Some(mut headers_map)) = msg.message.user_headers_map() {
+            headers_map.remove(&Codec::header_key());
+            let headers_bytes = headers_map.to_bytes();
+            msg.message.header.user_headers_length = headers_bytes.len() as 
u32;
+            msg.message.user_headers = if headers_map.is_empty() {
+                None
+            } else {
+                Some(headers_bytes)
+            };
+        }
+    }
+    Ok(())
+}
diff --git 
a/examples/rust/src/message-headers/message-compression/producer/main.rs 
b/examples/rust/src/message-headers/message-compression/producer/main.rs
new file mode 100644
index 000000000..5c258b314
--- /dev/null
+++ b/examples/rust/src/message-headers/message-compression/producer/main.rs
@@ -0,0 +1,101 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use bytes::Bytes;
+use iggy::prelude::*;
+use std::collections::HashMap;
+// The compression and decompression utilities are shared between the producer 
and consumer compression examples.
+// Hence, we import them here.
+use iggy_examples::shared::codec::{Codec, NUM_MESSAGES, STREAM_NAME, 
TOPIC_NAME};
+
+#[tokio::main]
+async fn main() -> Result<(), IggyError> {
+    // Setup a client to connect to the iggy-server via TCP.
+    let client = IggyClientBuilder::new().with_tcp().build()?;
+    client.connect().await?;
+
+    // Login using default credentials.
+    client
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await?;
+
+    // Create a Stream.
+    client
+        .create_stream(STREAM_NAME)
+        .await
+        .expect("Stream was NOT created! Remove /local_data or start a fresh 
server with the --fresh flag to run this example.");
+
+    // Create a Topic on that Stream.
+    client
+        .create_topic(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            TOPIC_NAME,
+            1,                           // Number of partitions.
+            CompressionAlgorithm::None, // NOTE: This configures the 
compression on the server, not the actual messages in transit!
+            None,                       // Replication factor.
+            IggyExpiry::NeverExpire,    // Time until messages expire on the 
server.
+            MaxTopicSize::ServerDefault, // Defined in server/config.toml. 
Defaults to "unlimited".
+        )
+        .await
+        .expect("Topic was NOT created! Start a fresh server to run this 
example.");
+
+    // The Codec from ../compression.rs implements the compression and 
decompression utilities.
+    let codec = Codec::Lz4;
+    // NOTE: This is where the Codec is used to prepare the compression 
user-header for the IggyMessage.
+    let key = Codec::header_key();
+    let value = codec.to_header_value();
+    let compression_headers = HashMap::from([(key, value)]);
+
+    // Generate artificial example messages to send to the server.
+    let mut messages = Vec::new();
+    for i in 0..NUM_MESSAGES {
+        // For illustration purposes a log-like pattern is resembled.
+        let payload = format!(
+            r#"{{"ts": "2000-01-{:02}T{:02}:{:02}:{:02}Z", "level": "info", 
"trace":{}, "command": "command-{}", "status": 200, "latency_ms": {}}}"#,
+            i % 28,
+            i % 24,
+            i % 60,
+            i % 60,
+            i,
+            i % 1000,
+            i % 120
+        );
+        let payload = Bytes::from(payload);
+        let compressed_payload = codec.compress(&payload);
+        let compressed_bytes = Bytes::from(compressed_payload);
+
+        let msg = IggyMessage::builder()
+            .payload(compressed_bytes)
+            // NOTE: This is where the user_headers of IggyMessages are used 
to indicate, that a payload is compressed.
+            .user_headers(compression_headers.clone())
+            .build()
+            .expect("IggyMessage should be buildable.");
+        messages.push(msg);
+    }
+
+    // Send all compressed messages to the server.
+    let producer = client.producer(STREAM_NAME, TOPIC_NAME)?.build();
+    producer
+        .send(messages)
+        .await
+        .expect("Message sending failed.");
+
+    println!("All messages sent to server.");
+
+    Ok(())
+}
diff --git a/examples/rust/src/message-headers/consumer/main.rs 
b/examples/rust/src/message-headers/message-type/consumer/main.rs
similarity index 100%
rename from examples/rust/src/message-headers/consumer/main.rs
rename to examples/rust/src/message-headers/message-type/consumer/main.rs
diff --git a/examples/rust/src/message-headers/producer/main.rs 
b/examples/rust/src/message-headers/message-type/producer/main.rs
similarity index 100%
rename from examples/rust/src/message-headers/producer/main.rs
rename to examples/rust/src/message-headers/message-type/producer/main.rs
diff --git a/examples/rust/src/shared/codec.rs 
b/examples/rust/src/shared/codec.rs
new file mode 100644
index 000000000..30df429be
--- /dev/null
+++ b/examples/rust/src/shared/codec.rs
@@ -0,0 +1,112 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use iggy::prelude::*;
+use lz4_flex::frame::{FrameDecoder, FrameEncoder};
+use std::fmt::{Display, Formatter};
+use std::io::{Read, Write};
+use std::str::FromStr;
+
+pub const STREAM_NAME: &str = "compression-stream";
+pub const TOPIC_NAME: &str = "compression-topic";
+pub const COMPRESSION_HEADER_KEY: &str = "iggy-compression";
+pub const NUM_MESSAGES: u32 = 1000;
+
+// Codec that defines available compression algorithms.
+pub enum Codec {
+    None,
+    Lz4,
+}
+
+impl Display for Codec {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        match self {
+            Codec::None => write!(f, "none"),
+            Codec::Lz4 => write!(f, "lz4"),
+        }
+    }
+}
+
+impl FromStr for Codec {
+    type Err = String;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        match s.to_lowercase().as_str() {
+            "lz4" => Ok(Codec::Lz4),
+            "none" => Ok(Codec::None),
+            _ => Err(format!("Unknown compression type: {s}")),
+        }
+    }
+}
+
+impl Codec {
+    /// Returns the key to indicate compressed messages as HeaderKey.
+    pub fn header_key() -> HeaderKey {
+        HeaderKey::new(COMPRESSION_HEADER_KEY)
+            .expect("COMPRESSION_HEADER_KEY is an InvalidHeaderKey.")
+    }
+
+    /// Returns the compression algorithm type as HeaderValue.
+    pub fn to_header_value(&self) -> HeaderValue {
+        HeaderValue::from_str(&self.to_string()).expect("failed generating 
HeaderValue.")
+    }
+
+    /// Returns a Codec from a HeaderValue. Used when reading messages from 
the server.
+    pub fn from_header_value(value: &HeaderValue) -> Self {
+        let name = value
+            .as_str()
+            .expect("could not convert HeaderValue into str.");
+        Self::from_str(name).expect("compression algorithm not available.")
+    }
+
+    /// Takes a message payload and compresses it using the algorithm from 
Codec.
+    pub fn compress(&self, data: &[u8]) -> Vec<u8> {
+        match self {
+            Codec::None => data.to_vec(),
+            Codec::Lz4 => {
+                let mut compressed_data = Vec::new();
+                let mut encoder = FrameEncoder::new(&mut compressed_data);
+                encoder
+                    .write_all(data)
+                    .expect("Cannot write into buffer using Lz4 compression.");
+                encoder.finish().expect("Cannot finish Lz4 compression.");
+                compressed_data
+            }
+        }
+    }
+
+    /// Takes a compressed message payload and decompresses it using the 
algorithm from Codec.
+    pub fn decompress(&self, data: &[u8]) -> Vec<u8> {
+        match self {
+            Codec::None => data.to_vec(),
+            Codec::Lz4 => {
+                let decoder = FrameDecoder::new(data);
+                let mut decompressed_data = Vec::new();
+                let bytes_read = decoder
+                    .take(MAX_PAYLOAD_SIZE as u64 + 1)
+                    .read_to_end(&mut decompressed_data)
+                    .expect("Cannot decode payload using Lz4.");
+
+                if bytes_read > MAX_PAYLOAD_SIZE as usize {
+                    panic!("Decompressed message exceeds MAX_PAYLOAD_SIZE!")
+                }
+                decompressed_data
+            }
+        }
+    }
+}
diff --git a/examples/rust/src/shared/mod.rs b/examples/rust/src/shared/mod.rs
index 1fc58a095..d0537c924 100644
--- a/examples/rust/src/shared/mod.rs
+++ b/examples/rust/src/shared/mod.rs
@@ -18,6 +18,7 @@
 
 pub mod args;
 pub mod client;
+pub mod codec;
 pub mod messages;
 pub mod messages_generator;
 pub mod stream;

Reply via email to