This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 52afc3c12 feat(rust): add dedicated TCP/TLS producer and consumer
examples (#2820)
52afc3c12 is described below
commit 52afc3c1297e5e95b10b12a3d37ee011693c88c1
Author: Atharva Lade <[email protected]>
AuthorDate: Mon Mar 9 15:12:39 2026 -0500
feat(rust): add dedicated TCP/TLS producer and consumer examples (#2820)
---
examples/rust/Cargo.toml | 8 ++
examples/rust/README.md | 22 ++++
examples/rust/src/shared/args.rs | 2 +
examples/rust/src/tcp-tls/consumer/main.rs | 130 ++++++++++++++++++++++
examples/rust/src/tcp-tls/producer/main.rs | 170 +++++++++++++++++++++++++++++
scripts/run-rust-examples-from-readme.sh | 82 +++++++++++++-
6 files changed, 412 insertions(+), 2 deletions(-)
diff --git a/examples/rust/Cargo.toml b/examples/rust/Cargo.toml
index 513979249..970d67e97 100644
--- a/examples/rust/Cargo.toml
+++ b/examples/rust/Cargo.toml
@@ -125,3 +125,11 @@ path = "src/stream-builder/stream-producer-config/main.rs"
[[example]]
name = "sink-data-producer"
path = "src/sink-data-producer/main.rs"
+
+[[example]]
+name = "tcp-tls-producer"
+path = "src/tcp-tls/producer/main.rs"
+
+[[example]]
+name = "tcp-tls-consumer"
+path = "src/tcp-tls/consumer/main.rs"
diff --git a/examples/rust/README.md b/examples/rust/README.md
index 4a546f73d..c5e61b01e 100644
--- a/examples/rust/README.md
+++ b/examples/rust/README.md
@@ -185,6 +185,28 @@ cargo run --example stream-consumer-config
These examples document all available configuration options including
partitioning strategies, retry policies, batching, AutoCommit strategies,
polling strategies, and retry mechanisms.
+## Security Examples
+
+### TCP/TLS
+
+Demonstrates secure TLS-encrypted TCP connections using custom CA certificates:
+
+```bash
+cargo run --example tcp-tls-producer
+cargo run --example tcp-tls-consumer
+```
+
+These examples require a TLS-enabled Iggy server. Start the server with:
+
+```bash
+IGGY_TCP_TLS_ENABLED=true \
+IGGY_TCP_TLS_CERT_FILE=core/certs/iggy_cert.pem \
+IGGY_TCP_TLS_KEY_FILE=core/certs/iggy_key.pem \
+cargo r --bin iggy-server
+```
+
+Uses `IggyClientBuilder` with TLS options (`with_tls_enabled`,
`with_tls_domain`, `with_tls_ca_file`) to establish TLS-encrypted TCP
connections with CA certificate verification.
+
## Example Structure
All examples can be executed directly from the repository. Follow these steps:
diff --git a/examples/rust/src/shared/args.rs b/examples/rust/src/shared/args.rs
index aa04c3b16..4b5fa2171 100644
--- a/examples/rust/src/shared/args.rs
+++ b/examples/rust/src/shared/args.rs
@@ -295,6 +295,7 @@ impl Args {
"multi-tenant-producer" | "multi-tenant-consumer" =>
"tenant-stream",
"new-sdk-producer" | "new-sdk-consumer" => "new-sdk-stream",
"sink-data-producer" => "sink-stream",
+ "tcp-tls-producer" | "tcp-tls-consumer" => "tls-stream",
"stream-basic" => "stream-basic",
"stream-producer" | "stream-consumer" => "stream-example",
"stream-producer-config" | "stream-consumer-config" =>
"stream-config",
@@ -313,6 +314,7 @@ impl Args {
"multi-tenant-producer" | "multi-tenant-consumer" => "events",
"new-sdk-producer" | "new-sdk-consumer" => "new-sdk-topic",
"sink-data-producer" => "users",
+ "tcp-tls-producer" | "tcp-tls-consumer" => "tls-topic",
"stream-basic" => "stream-topic",
"stream-producer" | "stream-consumer" => "stream-topic",
"stream-producer-config" | "stream-consumer-config" =>
"config-topic",
diff --git a/examples/rust/src/tcp-tls/consumer/main.rs
b/examples/rust/src/tcp-tls/consumer/main.rs
new file mode 100644
index 000000000..0431cb82d
--- /dev/null
+++ b/examples/rust/src/tcp-tls/consumer/main.rs
@@ -0,0 +1,130 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+// TCP/TLS Consumer Example
+//
+// Demonstrates how to consume messages from an Iggy server over a
TLS-encrypted
+// TCP connection using custom certificates from core/certs/.
+//
+// Prerequisites:
+// Start the Iggy server with TLS enabled:
+// IGGY_TCP_TLS_ENABLED=true \
+// IGGY_TCP_TLS_CERT_FILE=core/certs/iggy_cert.pem \
+// IGGY_TCP_TLS_KEY_FILE=core/certs/iggy_key.pem \
+// cargo r --bin iggy-server
+//
+// Run this example (from repo root):
+// cargo run --example tcp-tls-consumer -p iggy_examples
+
+use iggy::prelude::*;
+use std::error::Error;
+use std::str::FromStr;
+use tokio::time::sleep;
+use tracing::info;
+use tracing_subscriber::layer::SubscriberExt;
+use tracing_subscriber::util::SubscriberInitExt;
+use tracing_subscriber::{EnvFilter, Registry};
+
+const STREAM_NAME: &str = "tls-stream";
+const TOPIC_NAME: &str = "tls-topic";
+const PARTITION_ID: u32 = 0;
+const BATCHES_LIMIT: u32 = 5;
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn Error>> {
+ Registry::default()
+ .with(tracing_subscriber::fmt::layer())
+
.with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO")))
+ .init();
+
+ // Build a TCP client with TLS enabled.
+ // with_tls_enabled(true) activates TLS on the TCP transport
+ // with_tls_domain(...) sets the expected server hostname for
certificate verification
+ // with_tls_ca_file(...) points to the CA certificate used to verify
the server cert
+ let client = IggyClientBuilder::new()
+ .with_tcp()
+ .with_server_address("127.0.0.1:8090".to_string())
+ .with_tls_enabled(true)
+ .with_tls_domain("localhost".to_string())
+ .with_tls_ca_file("core/certs/iggy_ca_cert.pem".to_string())
+ .build()?;
+
+ client.connect().await?;
+ client
+ .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+ .await?;
+ info!("Connected and logged in over TLS.");
+
+ consume_messages(&client).await
+}
+
+async fn consume_messages(client: &dyn Client) -> Result<(), Box<dyn Error>> {
+ let interval = IggyDuration::from_str("500ms")?;
+ info!(
+ "Messages will be consumed from stream: {}, topic: {}, partition: {}
with interval {}.",
+ STREAM_NAME,
+ TOPIC_NAME,
+ PARTITION_ID,
+ interval.as_human_time_string()
+ );
+
+ let mut offset = 0;
+ let messages_per_batch = 10;
+ let mut consumed_batches = 0;
+ let consumer = Consumer::default();
+ loop {
+ if consumed_batches == BATCHES_LIMIT {
+ info!("Consumed {consumed_batches} batches of messages, exiting.");
+ return Ok(());
+ }
+
+ let polled_messages = client
+ .poll_messages(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ &Identifier::named(TOPIC_NAME).unwrap(),
+ Some(PARTITION_ID),
+ &consumer,
+ &PollingStrategy::offset(offset),
+ messages_per_batch,
+ false,
+ )
+ .await?;
+
+ if polled_messages.messages.is_empty() {
+ info!("No messages found.");
+ sleep(interval.get_duration()).await;
+ continue;
+ }
+
+ offset += polled_messages.messages.len() as u64;
+ for message in polled_messages.messages {
+ handle_message(&message)?;
+ }
+ consumed_batches += 1;
+ sleep(interval.get_duration()).await;
+ }
+}
+
+fn handle_message(message: &IggyMessage) -> Result<(), Box<dyn Error>> {
+ let payload = std::str::from_utf8(&message.payload)?;
+ info!(
+ "Handling message at offset: {}, payload: {}...",
+ message.header.offset, payload
+ );
+ Ok(())
+}
diff --git a/examples/rust/src/tcp-tls/producer/main.rs
b/examples/rust/src/tcp-tls/producer/main.rs
new file mode 100644
index 000000000..c96e05126
--- /dev/null
+++ b/examples/rust/src/tcp-tls/producer/main.rs
@@ -0,0 +1,170 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+// TCP/TLS Producer Example
+//
+// Demonstrates how to produce messages to an Iggy server over a TLS-encrypted
+// TCP connection using custom certificates from core/certs/.
+//
+// Prerequisites:
+// Start the Iggy server with TLS enabled:
+// IGGY_TCP_TLS_ENABLED=true \
+// IGGY_TCP_TLS_CERT_FILE=core/certs/iggy_cert.pem \
+// IGGY_TCP_TLS_KEY_FILE=core/certs/iggy_key.pem \
+// cargo r --bin iggy-server
+//
+// Run this example (from repo root):
+// cargo run --example tcp-tls-producer -p iggy_examples
+
+use iggy::prelude::*;
+use std::error::Error;
+use std::str::FromStr;
+use tracing::{info, warn};
+use tracing_subscriber::layer::SubscriberExt;
+use tracing_subscriber::util::SubscriberInitExt;
+use tracing_subscriber::{EnvFilter, Registry};
+
+const STREAM_NAME: &str = "tls-stream";
+const TOPIC_NAME: &str = "tls-topic";
+const PARTITION_ID: u32 = 0;
+const BATCHES_LIMIT: u32 = 5;
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn Error>> {
+ Registry::default()
+ .with(tracing_subscriber::fmt::layer())
+
.with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO")))
+ .init();
+
+ // Build a TCP client with TLS enabled.
+ // with_tls_enabled(true) activates TLS on the TCP transport
+ // with_tls_domain(...) sets the expected server hostname for
certificate verification
+ // with_tls_ca_file(...) points to the CA certificate used to verify
the server cert
+ let client = IggyClientBuilder::new()
+ .with_tcp()
+ .with_server_address("127.0.0.1:8090".to_string())
+ .with_tls_enabled(true)
+ .with_tls_domain("localhost".to_string())
+ .with_tls_ca_file("core/certs/iggy_ca_cert.pem".to_string())
+ .build()?;
+
+ client.connect().await?;
+ client
+ .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+ .await?;
+ info!("Connected and logged in over TLS.");
+
+ let (stream_id, topic_id) = init_system(&client).await;
+ produce_messages(&client, stream_id, topic_id).await
+}
+
+async fn init_system(client: &IggyClient) -> (u32, u32) {
+ let stream = match client.create_stream(STREAM_NAME).await {
+ Ok(stream) => {
+ info!("Stream was created.");
+ stream
+ }
+ Err(_) => {
+ warn!("Stream already exists and will not be created again.");
+ client
+ .get_stream(&Identifier::named(STREAM_NAME).unwrap())
+ .await
+ .unwrap()
+ .expect("Failed to get stream")
+ }
+ };
+
+ let topic = match client
+ .create_topic(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ TOPIC_NAME,
+ 1,
+ CompressionAlgorithm::default(),
+ None,
+ IggyExpiry::NeverExpire,
+ MaxTopicSize::ServerDefault,
+ )
+ .await
+ {
+ Ok(topic) => {
+ info!("Topic was created.");
+ topic
+ }
+ Err(_) => {
+ warn!("Topic already exists and will not be created again.");
+ client
+ .get_topic(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ &Identifier::named(TOPIC_NAME).unwrap(),
+ )
+ .await
+ .unwrap()
+ .expect("Failed to get topic")
+ }
+ };
+
+ (stream.id, topic.id)
+}
+
+async fn produce_messages(
+ client: &dyn Client,
+ stream_id: u32,
+ topic_id: u32,
+) -> Result<(), Box<dyn Error>> {
+ let duration = IggyDuration::from_str("500ms")?;
+ let mut interval = tokio::time::interval(duration.get_duration());
+ info!(
+ "Messages will be sent to stream: {} ({}), topic: {} ({}), partition:
{} with interval {}.",
+ STREAM_NAME,
+ stream_id,
+ TOPIC_NAME,
+ topic_id,
+ PARTITION_ID,
+ duration.as_human_time_string()
+ );
+
+ let mut current_id = 0;
+ let messages_per_batch = 10;
+ let mut sent_batches = 0;
+ let partitioning = Partitioning::partition_id(PARTITION_ID);
+ loop {
+ if sent_batches == BATCHES_LIMIT {
+ info!("Sent {sent_batches} batches of messages, exiting.");
+ return Ok(());
+ }
+
+ interval.tick().await;
+ let mut messages = Vec::new();
+ for _ in 0..messages_per_batch {
+ current_id += 1;
+ let payload = format!("message-{current_id}");
+ let message = IggyMessage::from_str(&payload)?;
+ messages.push(message);
+ }
+ client
+ .send_messages(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ &Identifier::named(TOPIC_NAME).unwrap(),
+ &partitioning,
+ &mut messages,
+ )
+ .await?;
+ sent_batches += 1;
+ info!("Sent {messages_per_batch} message(s).");
+ }
+}
diff --git a/scripts/run-rust-examples-from-readme.sh
b/scripts/run-rust-examples-from-readme.sh
index 1ac95c356..97c4ac9eb 100755
--- a/scripts/run-rust-examples-from-readme.sh
+++ b/scripts/run-rust-examples-from-readme.sh
@@ -177,13 +177,91 @@ for readme_file in README.md examples/rust/README.md; do
# Add a small delay between examples to avoid potential race conditions
sleep 2
- done < <(grep -E "^cargo run --example" "${readme_file}")
+ done < <(grep -E "^cargo run --example" "${readme_file}" | grep -v
"tcp-tls")
done
-# Terminate server
+# Terminate non-TLS server
kill -TERM "$(cat ${PID_FILE})"
test -e ${PID_FILE} && rm ${PID_FILE}
+# Run TLS examples if non-TLS examples passed
+if [ "${exit_code}" -eq 0 ]; then
+ TLS_README="examples/rust/README.md"
+ if [ -f "${TLS_README}" ] && grep -qE "^cargo run --example.*tcp-tls"
"${TLS_README}"; then
+ echo ""
+ echo "=== Running TLS examples ==="
+ echo ""
+
+ # Clean up for fresh TLS start
+ test -d local_data && rm -fr local_data
+ test -e ${LOG_FILE} && rm ${LOG_FILE}
+
+ # Start TLS server
+ echo "Starting TLS server from ${SERVER_BIN}..."
+ IGGY_ROOT_USERNAME=iggy IGGY_ROOT_PASSWORD=iggy \
+ IGGY_TCP_TLS_ENABLED=true \
+ IGGY_TCP_TLS_CERT_FILE=core/certs/iggy_cert.pem \
+ IGGY_TCP_TLS_KEY_FILE=core/certs/iggy_key.pem \
+ ${SERVER_BIN} &>${LOG_FILE} &
+ echo $! >${PID_FILE}
+
+ # Wait for TLS server to start
+ SERVER_START_TIME=0
+ while ! grep -q "has started" ${LOG_FILE}; do
+ if [ ${SERVER_START_TIME} -gt ${TIMEOUT} ]; then
+ echo "TLS server did not start within ${TIMEOUT} seconds."
+ ps fx
+ cat ${LOG_FILE}
+ exit_code=1
+ break
+ fi
+ echo "Waiting for TLS Iggy server to start... ${SERVER_START_TIME}"
+ sleep 1
+ ((SERVER_START_TIME += 1))
+ done
+
+ if [ "${exit_code}" -eq 0 ]; then
+ while IFS= read -r command; do
+ # Remove backticks and comments from command
+ command=$(echo "${command}" | tr -d '`' | sed 's/^#.*//')
+ # Skip empty lines
+ if [ -z "${command}" ]; then
+ continue
+ fi
+ # Add target flag if specified
+ if [ -n "${TARGET}" ]; then
+ command="${command//cargo run /cargo run --target
${TARGET} }"
+ fi
+
+ echo -e "\e[33mChecking TLS example command:\e[0m ${command}"
+ echo ""
+
+ set +e
+ eval "${command}"
+ exit_code=$?
+ set -e
+
+ # Stop at first failure
+ if [ ${exit_code} -ne 0 ]; then
+ echo ""
+ echo -e "\e[31mTLS example command failed:\e[0m ${command}"
+ echo ""
+ break
+ fi
+ # Add a small delay between examples to avoid potential race
conditions
+ sleep 2
+
+ done < <(grep -E "^cargo run --example.*tcp-tls" "${TLS_README}")
+ fi
+
+ # Terminate TLS server
+ if [ -e ${PID_FILE} ]; then
+ kill -TERM "$(cat ${PID_FILE})" 2>/dev/null || true
+ rm -f ${PID_FILE}
+ fi
+ fi
+fi
+
# If everything is ok remove log and pid files otherwise cat server log
if [ "${exit_code}" -eq 0 ]; then
echo "Test passed"