This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 52afc3c12 feat(rust): add dedicated TCP/TLS producer and consumer 
examples (#2820)
52afc3c12 is described below

commit 52afc3c1297e5e95b10b12a3d37ee011693c88c1
Author: Atharva Lade <[email protected]>
AuthorDate: Mon Mar 9 15:12:39 2026 -0500

    feat(rust): add dedicated TCP/TLS producer and consumer examples (#2820)
---
 examples/rust/Cargo.toml                   |   8 ++
 examples/rust/README.md                    |  22 ++++
 examples/rust/src/shared/args.rs           |   2 +
 examples/rust/src/tcp-tls/consumer/main.rs | 130 ++++++++++++++++++++++
 examples/rust/src/tcp-tls/producer/main.rs | 170 +++++++++++++++++++++++++++++
 scripts/run-rust-examples-from-readme.sh   |  82 +++++++++++++-
 6 files changed, 412 insertions(+), 2 deletions(-)

diff --git a/examples/rust/Cargo.toml b/examples/rust/Cargo.toml
index 513979249..970d67e97 100644
--- a/examples/rust/Cargo.toml
+++ b/examples/rust/Cargo.toml
@@ -125,3 +125,11 @@ path = "src/stream-builder/stream-producer-config/main.rs"
 [[example]]
 name = "sink-data-producer"
 path = "src/sink-data-producer/main.rs"
+
+[[example]]
+name = "tcp-tls-producer"
+path = "src/tcp-tls/producer/main.rs"
+
+[[example]]
+name = "tcp-tls-consumer"
+path = "src/tcp-tls/consumer/main.rs"
diff --git a/examples/rust/README.md b/examples/rust/README.md
index 4a546f73d..c5e61b01e 100644
--- a/examples/rust/README.md
+++ b/examples/rust/README.md
@@ -185,6 +185,28 @@ cargo run --example stream-consumer-config
 
 These examples document all available configuration options including 
partitioning strategies, retry policies, batching, AutoCommit strategies, 
polling strategies, and retry mechanisms.
 
+## Security Examples
+
+### TCP/TLS
+
+Demonstrates secure TLS-encrypted TCP connections using custom CA certificates:
+
+```bash
+cargo run --example tcp-tls-producer
+cargo run --example tcp-tls-consumer
+```
+
+These examples require a TLS-enabled Iggy server. Start the server with:
+
+```bash
+IGGY_TCP_TLS_ENABLED=true \
+IGGY_TCP_TLS_CERT_FILE=core/certs/iggy_cert.pem \
+IGGY_TCP_TLS_KEY_FILE=core/certs/iggy_key.pem \
+cargo r --bin iggy-server
+```
+
+Uses `IggyClientBuilder` with TLS options (`with_tls_enabled`, 
`with_tls_domain`, `with_tls_ca_file`) to establish TLS-encrypted TCP 
connections with CA certificate verification.
+
 ## Example Structure
 
 All examples can be executed directly from the repository. Follow these steps:
diff --git a/examples/rust/src/shared/args.rs b/examples/rust/src/shared/args.rs
index aa04c3b16..4b5fa2171 100644
--- a/examples/rust/src/shared/args.rs
+++ b/examples/rust/src/shared/args.rs
@@ -295,6 +295,7 @@ impl Args {
             "multi-tenant-producer" | "multi-tenant-consumer" => 
"tenant-stream",
             "new-sdk-producer" | "new-sdk-consumer" => "new-sdk-stream",
             "sink-data-producer" => "sink-stream",
+            "tcp-tls-producer" | "tcp-tls-consumer" => "tls-stream",
             "stream-basic" => "stream-basic",
             "stream-producer" | "stream-consumer" => "stream-example",
             "stream-producer-config" | "stream-consumer-config" => 
"stream-config",
@@ -313,6 +314,7 @@ impl Args {
             "multi-tenant-producer" | "multi-tenant-consumer" => "events",
             "new-sdk-producer" | "new-sdk-consumer" => "new-sdk-topic",
             "sink-data-producer" => "users",
+            "tcp-tls-producer" | "tcp-tls-consumer" => "tls-topic",
             "stream-basic" => "stream-topic",
             "stream-producer" | "stream-consumer" => "stream-topic",
             "stream-producer-config" | "stream-consumer-config" => 
"config-topic",
diff --git a/examples/rust/src/tcp-tls/consumer/main.rs 
b/examples/rust/src/tcp-tls/consumer/main.rs
new file mode 100644
index 000000000..0431cb82d
--- /dev/null
+++ b/examples/rust/src/tcp-tls/consumer/main.rs
@@ -0,0 +1,130 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+// TCP/TLS Consumer Example
+//
+// Demonstrates how to consume messages from an Iggy server over a 
TLS-encrypted
+// TCP connection using custom certificates from core/certs/.
+//
+// Prerequisites:
+//   Start the Iggy server with TLS enabled:
+//     IGGY_TCP_TLS_ENABLED=true \
+//     IGGY_TCP_TLS_CERT_FILE=core/certs/iggy_cert.pem \
+//     IGGY_TCP_TLS_KEY_FILE=core/certs/iggy_key.pem \
+//     cargo r --bin iggy-server
+//
+// Run this example (from repo root):
+//   cargo run --example tcp-tls-consumer -p iggy_examples
+
+use iggy::prelude::*;
+use std::error::Error;
+use std::str::FromStr;
+use tokio::time::sleep;
+use tracing::info;
+use tracing_subscriber::layer::SubscriberExt;
+use tracing_subscriber::util::SubscriberInitExt;
+use tracing_subscriber::{EnvFilter, Registry};
+
+const STREAM_NAME: &str = "tls-stream";
+const TOPIC_NAME: &str = "tls-topic";
+const PARTITION_ID: u32 = 0;
+const BATCHES_LIMIT: u32 = 5;
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn Error>> {
+    Registry::default()
+        .with(tracing_subscriber::fmt::layer())
+        
.with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO")))
+        .init();
+
+    // Build a TCP client with TLS enabled.
+    // with_tls_enabled(true)     activates TLS on the TCP transport
+    // with_tls_domain(...)       sets the expected server hostname for 
certificate verification
+    // with_tls_ca_file(...)      points to the CA certificate used to verify 
the server cert
+    let client = IggyClientBuilder::new()
+        .with_tcp()
+        .with_server_address("127.0.0.1:8090".to_string())
+        .with_tls_enabled(true)
+        .with_tls_domain("localhost".to_string())
+        .with_tls_ca_file("core/certs/iggy_ca_cert.pem".to_string())
+        .build()?;
+
+    client.connect().await?;
+    client
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await?;
+    info!("Connected and logged in over TLS.");
+
+    consume_messages(&client).await
+}
+
+async fn consume_messages(client: &dyn Client) -> Result<(), Box<dyn Error>> {
+    let interval = IggyDuration::from_str("500ms")?;
+    info!(
+        "Messages will be consumed from stream: {}, topic: {}, partition: {} 
with interval {}.",
+        STREAM_NAME,
+        TOPIC_NAME,
+        PARTITION_ID,
+        interval.as_human_time_string()
+    );
+
+    let mut offset = 0;
+    let messages_per_batch = 10;
+    let mut consumed_batches = 0;
+    let consumer = Consumer::default();
+    loop {
+        if consumed_batches == BATCHES_LIMIT {
+            info!("Consumed {consumed_batches} batches of messages, exiting.");
+            return Ok(());
+        }
+
+        let polled_messages = client
+            .poll_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                Some(PARTITION_ID),
+                &consumer,
+                &PollingStrategy::offset(offset),
+                messages_per_batch,
+                false,
+            )
+            .await?;
+
+        if polled_messages.messages.is_empty() {
+            info!("No messages found.");
+            sleep(interval.get_duration()).await;
+            continue;
+        }
+
+        offset += polled_messages.messages.len() as u64;
+        for message in polled_messages.messages {
+            handle_message(&message)?;
+        }
+        consumed_batches += 1;
+        sleep(interval.get_duration()).await;
+    }
+}
+
+fn handle_message(message: &IggyMessage) -> Result<(), Box<dyn Error>> {
+    let payload = std::str::from_utf8(&message.payload)?;
+    info!(
+        "Handling message at offset: {}, payload: {}...",
+        message.header.offset, payload
+    );
+    Ok(())
+}
diff --git a/examples/rust/src/tcp-tls/producer/main.rs 
b/examples/rust/src/tcp-tls/producer/main.rs
new file mode 100644
index 000000000..c96e05126
--- /dev/null
+++ b/examples/rust/src/tcp-tls/producer/main.rs
@@ -0,0 +1,170 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+// TCP/TLS Producer Example
+//
+// Demonstrates how to produce messages to an Iggy server over a TLS-encrypted
+// TCP connection using custom certificates from core/certs/.
+//
+// Prerequisites:
+//   Start the Iggy server with TLS enabled:
+//     IGGY_TCP_TLS_ENABLED=true \
+//     IGGY_TCP_TLS_CERT_FILE=core/certs/iggy_cert.pem \
+//     IGGY_TCP_TLS_KEY_FILE=core/certs/iggy_key.pem \
+//     cargo r --bin iggy-server
+//
+// Run this example (from repo root):
+//   cargo run --example tcp-tls-producer -p iggy_examples
+
+use iggy::prelude::*;
+use std::error::Error;
+use std::str::FromStr;
+use tracing::{info, warn};
+use tracing_subscriber::layer::SubscriberExt;
+use tracing_subscriber::util::SubscriberInitExt;
+use tracing_subscriber::{EnvFilter, Registry};
+
+const STREAM_NAME: &str = "tls-stream";
+const TOPIC_NAME: &str = "tls-topic";
+const PARTITION_ID: u32 = 0;
+const BATCHES_LIMIT: u32 = 5;
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn Error>> {
+    Registry::default()
+        .with(tracing_subscriber::fmt::layer())
+        
.with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO")))
+        .init();
+
+    // Build a TCP client with TLS enabled.
+    // with_tls_enabled(true)     activates TLS on the TCP transport
+    // with_tls_domain(...)       sets the expected server hostname for 
certificate verification
+    // with_tls_ca_file(...)      points to the CA certificate used to verify 
the server cert
+    let client = IggyClientBuilder::new()
+        .with_tcp()
+        .with_server_address("127.0.0.1:8090".to_string())
+        .with_tls_enabled(true)
+        .with_tls_domain("localhost".to_string())
+        .with_tls_ca_file("core/certs/iggy_ca_cert.pem".to_string())
+        .build()?;
+
+    client.connect().await?;
+    client
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await?;
+    info!("Connected and logged in over TLS.");
+
+    let (stream_id, topic_id) = init_system(&client).await;
+    produce_messages(&client, stream_id, topic_id).await
+}
+
+async fn init_system(client: &IggyClient) -> (u32, u32) {
+    let stream = match client.create_stream(STREAM_NAME).await {
+        Ok(stream) => {
+            info!("Stream was created.");
+            stream
+        }
+        Err(_) => {
+            warn!("Stream already exists and will not be created again.");
+            client
+                .get_stream(&Identifier::named(STREAM_NAME).unwrap())
+                .await
+                .unwrap()
+                .expect("Failed to get stream")
+        }
+    };
+
+    let topic = match client
+        .create_topic(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            TOPIC_NAME,
+            1,
+            CompressionAlgorithm::default(),
+            None,
+            IggyExpiry::NeverExpire,
+            MaxTopicSize::ServerDefault,
+        )
+        .await
+    {
+        Ok(topic) => {
+            info!("Topic was created.");
+            topic
+        }
+        Err(_) => {
+            warn!("Topic already exists and will not be created again.");
+            client
+                .get_topic(
+                    &Identifier::named(STREAM_NAME).unwrap(),
+                    &Identifier::named(TOPIC_NAME).unwrap(),
+                )
+                .await
+                .unwrap()
+                .expect("Failed to get topic")
+        }
+    };
+
+    (stream.id, topic.id)
+}
+
+async fn produce_messages(
+    client: &dyn Client,
+    stream_id: u32,
+    topic_id: u32,
+) -> Result<(), Box<dyn Error>> {
+    let duration = IggyDuration::from_str("500ms")?;
+    let mut interval = tokio::time::interval(duration.get_duration());
+    info!(
+        "Messages will be sent to stream: {} ({}), topic: {} ({}), partition: 
{} with interval {}.",
+        STREAM_NAME,
+        stream_id,
+        TOPIC_NAME,
+        topic_id,
+        PARTITION_ID,
+        duration.as_human_time_string()
+    );
+
+    let mut current_id = 0;
+    let messages_per_batch = 10;
+    let mut sent_batches = 0;
+    let partitioning = Partitioning::partition_id(PARTITION_ID);
+    loop {
+        if sent_batches == BATCHES_LIMIT {
+            info!("Sent {sent_batches} batches of messages, exiting.");
+            return Ok(());
+        }
+
+        interval.tick().await;
+        let mut messages = Vec::new();
+        for _ in 0..messages_per_batch {
+            current_id += 1;
+            let payload = format!("message-{current_id}");
+            let message = IggyMessage::from_str(&payload)?;
+            messages.push(message);
+        }
+        client
+            .send_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                &partitioning,
+                &mut messages,
+            )
+            .await?;
+        sent_batches += 1;
+        info!("Sent {messages_per_batch} message(s).");
+    }
+}
diff --git a/scripts/run-rust-examples-from-readme.sh 
b/scripts/run-rust-examples-from-readme.sh
index 1ac95c356..97c4ac9eb 100755
--- a/scripts/run-rust-examples-from-readme.sh
+++ b/scripts/run-rust-examples-from-readme.sh
@@ -177,13 +177,91 @@ for readme_file in README.md examples/rust/README.md; do
         # Add a small delay between examples to avoid potential race conditions
         sleep 2
 
-    done < <(grep -E "^cargo run --example" "${readme_file}")
+    done < <(grep -E "^cargo run --example" "${readme_file}" | grep -v 
"tcp-tls")
 done
 
-# Terminate server
+# Terminate non-TLS server
 kill -TERM "$(cat ${PID_FILE})"
 test -e ${PID_FILE} && rm ${PID_FILE}
 
+# Run TLS examples if non-TLS examples passed
+if [ "${exit_code}" -eq 0 ]; then
+    TLS_README="examples/rust/README.md"
+    if [ -f "${TLS_README}" ] && grep -qE "^cargo run --example.*tcp-tls" 
"${TLS_README}"; then
+        echo ""
+        echo "=== Running TLS examples ==="
+        echo ""
+
+        # Clean up for fresh TLS start
+        test -d local_data && rm -fr local_data
+        test -e ${LOG_FILE} && rm ${LOG_FILE}
+
+        # Start TLS server
+        echo "Starting TLS server from ${SERVER_BIN}..."
+        IGGY_ROOT_USERNAME=iggy IGGY_ROOT_PASSWORD=iggy \
+            IGGY_TCP_TLS_ENABLED=true \
+            IGGY_TCP_TLS_CERT_FILE=core/certs/iggy_cert.pem \
+            IGGY_TCP_TLS_KEY_FILE=core/certs/iggy_key.pem \
+            ${SERVER_BIN} &>${LOG_FILE} &
+        echo $! >${PID_FILE}
+
+        # Wait for TLS server to start
+        SERVER_START_TIME=0
+        while ! grep -q "has started" ${LOG_FILE}; do
+            if [ ${SERVER_START_TIME} -gt ${TIMEOUT} ]; then
+                echo "TLS server did not start within ${TIMEOUT} seconds."
+                ps fx
+                cat ${LOG_FILE}
+                exit_code=1
+                break
+            fi
+            echo "Waiting for TLS Iggy server to start... ${SERVER_START_TIME}"
+            sleep 1
+            ((SERVER_START_TIME += 1))
+        done
+
+        if [ "${exit_code}" -eq 0 ]; then
+            while IFS= read -r command; do
+                # Remove backticks and comments from command
+                command=$(echo "${command}" | tr -d '`' | sed 's/^#.*//')
+                # Skip empty lines
+                if [ -z "${command}" ]; then
+                    continue
+                fi
+                # Add target flag if specified
+                if [ -n "${TARGET}" ]; then
+                    command="${command//cargo run /cargo run --target 
${TARGET} }"
+                fi
+
+                echo -e "\e[33mChecking TLS example command:\e[0m ${command}"
+                echo ""
+
+                set +e
+                eval "${command}"
+                exit_code=$?
+                set -e
+
+                # Stop at first failure
+                if [ ${exit_code} -ne 0 ]; then
+                    echo ""
+                    echo -e "\e[31mTLS example command failed:\e[0m ${command}"
+                    echo ""
+                    break
+                fi
+                # Add a small delay between examples to avoid potential race 
conditions
+                sleep 2
+
+            done < <(grep -E "^cargo run --example.*tcp-tls" "${TLS_README}")
+        fi
+
+        # Terminate TLS server
+        if [ -e ${PID_FILE} ]; then
+            kill -TERM "$(cat ${PID_FILE})" 2>/dev/null || true
+            rm -f ${PID_FILE}
+        fi
+    fi
+fi
+
 # If everything is ok remove log and pid files otherwise cat server log
 if [ "${exit_code}" -eq 0 ]; then
     echo "Test passed"

Reply via email to