This is an automated email from the ASF dual-hosted git repository.

hubcio pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 239d7ee0b feat(cpp): add messaging FFI functions for C++ SDK (#3046)
239d7ee0b is described below

commit 239d7ee0bd2721c572d38b06669efca21e0c2f6b
Author: xin <[email protected]>
AuthorDate: Mon May 11 21:54:34 2026 +0900

    feat(cpp): add messaging FFI functions for C++ SDK (#3046)
---
 foreign/cpp/Cargo.toml                      |    5 +-
 foreign/cpp/build.rs                        |    1 +
 foreign/cpp/src/client.rs                   |  234 ++++-
 foreign/cpp/src/identifier.rs               |    4 +
 foreign/cpp/src/lib.rs                      |   78 ++
 foreign/cpp/src/messages.rs                 |   85 ++
 foreign/cpp/src/stream.rs                   |   14 +
 foreign/cpp/tests/client/low_level_e2e.cpp  |    1 +
 foreign/cpp/tests/common/test_helpers.hpp   |   17 +
 foreign/cpp/tests/message/low_level_e2e.cpp | 1249 +++++++++++++++++++++++++++
 foreign/cpp/tests/message/unit_tests.cpp    |  116 +++
 foreign/cpp/tests/stream/low_level_e2e.cpp  |  170 ++++
 12 files changed, 1970 insertions(+), 4 deletions(-)

diff --git a/foreign/cpp/Cargo.toml b/foreign/cpp/Cargo.toml
index 477cb0b74..a5642f771 100644
--- a/foreign/cpp/Cargo.toml
+++ b/foreign/cpp/Cargo.toml
@@ -27,10 +27,13 @@ ignored = ["cxx-build"]
 crate-type = ["staticlib"]
 
 [dependencies]
+bytes = "1.11.1"
 cxx = "1.0.194"
 iggy = { path = "../../core/sdk" }
 iggy_common = { path = "../../core/common" }
-tokio = { version = "1.49.0", features = ["rt-multi-thread"] }
+# Explicitly enable the runtime + I/O drivers required by 
`Runtime::enable_all()` in lib.rs.
+# Listing the features here insulates this crate from upstream SDK feature 
changes.
+tokio = { version = "1.49.0", features = ["rt-multi-thread", "macros", "time", 
"net", "io-util"] }
 
 [build-dependencies]
 cxx-build = "1.0.194"
diff --git a/foreign/cpp/build.rs b/foreign/cpp/build.rs
index 476a9d796..d2cbf0094 100644
--- a/foreign/cpp/build.rs
+++ b/foreign/cpp/build.rs
@@ -24,6 +24,7 @@ fn main() {
     println!("cargo:rerun-if-changed=src/consumer_group.rs");
     println!("cargo:rerun-if-changed=src/identifier.rs");
     println!("cargo:rerun-if-changed=src/lib.rs");
+    println!("cargo:rerun-if-changed=src/messages.rs");
     println!("cargo:rerun-if-changed=src/stream.rs");
     println!("cargo:rerun-if-changed=src/topic.rs");
 }
diff --git a/foreign/cpp/src/client.rs b/foreign/cpp/src/client.rs
index 341e0c1a0..d4500937f 100644
--- a/foreign/cpp/src/client.rs
+++ b/foreign/cpp/src/client.rs
@@ -17,18 +17,40 @@
 
 use crate::{RUNTIME, ffi};
 use iggy::prelude::{
-    Client as IggyConnectionClient, CompressionAlgorithm as 
RustCompressionAlgorithm,
+    Client as IggyConnectionClient, CompressionAlgorithm as 
RustCompressionAlgorithm, Consumer,
     ConsumerGroupClient, Identifier as RustIdentifier, IggyClient as 
RustIggyClient,
     IggyClientBuilder as RustIggyClientBuilder, IggyError, IggyExpiry as 
RustIggyExpiry,
-    MaxTopicSize as RustMaxTopicSize, PartitionClient, StreamClient, 
TopicClient, UserClient,
+    IggyMessage, IggyTimestamp, MaxTopicSize as RustMaxTopicSize, 
MessageClient, PartitionClient,
+    Partitioning, PollingStrategy, StreamClient, TopicClient, UserClient,
 };
 use std::str::FromStr;
 use std::sync::Arc;
 
+/// Sentinel value passed from C++ to mean "no partition specified" — the 
server picks the
+/// partition based on the consumer/strategy. Cxx FFI does not support 
`Option<u32>`, so we
+/// reserve `u32::MAX` as the sentinel for `partition_id`.
+const ANY_PARTITION_ID: u32 = u32::MAX;
+
 pub struct Client {
     pub inner: Arc<RustIggyClient>,
 }
 
+/// Creates a new client connection and returns a raw pointer to the 
underlying [`Client`].
+///
+/// # Ownership
+///
+/// The returned `*mut Client` is owned by the caller (the C++ side). The 
caller is responsible
+/// for calling [`delete_connection`] exactly once to release the resources. 
Failing to do so
+/// leaks the underlying tokio runtime resources and the open network 
connection.
+///
+/// # Safety
+///
+/// - Passing the pointer to [`delete_connection`] more than once is undefined 
behaviour
+///   (double-free).
+/// - Using the pointer after [`delete_connection`] has been called is 
undefined behaviour
+///   (use-after-free).
+/// - This function does not provide synchronisation. The pointer must not be 
used concurrently
+///   from multiple threads unless the caller serialises access externally.
 pub fn new_connection(connection_string: String) -> Result<*mut Client, 
String> {
     let connection_str = connection_string.as_str();
     let client = match connection_str {
@@ -73,6 +95,17 @@ impl Client {
         })
     }
 
+    pub fn get_streams(&self) -> Result<Vec<ffi::Stream>, String> {
+        RUNTIME.block_on(async {
+            let streams = self
+                .inner
+                .get_streams()
+                .await
+                .map_err(|error| format!("Could not get streams: {error}"))?;
+            Ok(streams.into_iter().map(ffi::Stream::from).collect())
+        })
+    }
+
     pub fn create_stream(&self, stream_name: String) -> Result<(), String> {
         RUNTIME.block_on(async {
             self.inner
@@ -127,6 +160,139 @@ impl Client {
     //     })
     // }
 
+    #[allow(clippy::too_many_arguments)]
+    pub fn send_messages(
+        &self,
+        stream_id: ffi::Identifier,
+        topic_id: ffi::Identifier,
+        partitioning_kind: String,
+        partitioning_value: Vec<u8>,
+        messages: Vec<ffi::IggyMessageToSend>,
+    ) -> Result<(), String> {
+        let rust_stream_id = RustIdentifier::try_from(stream_id)
+            .map_err(|error| format!("Could not send messages: {error}"))?;
+        let rust_topic_id = RustIdentifier::try_from(topic_id)
+            .map_err(|error| format!("Could not send messages: {error}"))?;
+
+        let partitioning = match partitioning_kind.as_str() {
+            "balanced" => Partitioning::balanced(),
+            "partition_id" => {
+                if partitioning_value.len() != 4 {
+                    return Err(format!(
+                        "Could not send messages: partition_id requires 
exactly 4 bytes, got {}",
+                        partitioning_value.len()
+                    ));
+                }
+                let id =
+                    
u32::from_le_bytes(partitioning_value.as_slice().try_into().map_err(|_| {
+                        "Could not send messages: invalid partition_id 
value".to_string()
+                    })?);
+                Partitioning::partition_id(id)
+            }
+            "messages_key" => {
+                if partitioning_value.is_empty() {
+                    return Err(
+                        "Could not send messages: messages_key requires a 
non-empty value"
+                            .to_string(),
+                    );
+                }
+                
Partitioning::messages_key(&partitioning_value).map_err(|error| {
+                    format!("Could not send messages: invalid messages key: 
{error}")
+                })?
+            }
+            _ => {
+                return Err(format!(
+                    "Could not send messages: invalid partitioning kind: 
{partitioning_kind}"
+                ));
+            }
+        };
+
+        let mut iggy_messages: Vec<IggyMessage> = messages
+            .into_iter()
+            .map(IggyMessage::try_from)
+            .collect::<Result<Vec<_>, _>>()?;
+
+        RUNTIME.block_on(async {
+            self.inner
+                .send_messages(
+                    &rust_stream_id,
+                    &rust_topic_id,
+                    &partitioning,
+                    &mut iggy_messages,
+                )
+                .await
+                .map_err(|error| format!("Could not send messages: {error}"))?;
+            Ok(())
+        })
+    }
+
+    #[allow(clippy::too_many_arguments)]
+    pub fn poll_messages(
+        &self,
+        stream_id: ffi::Identifier,
+        topic_id: ffi::Identifier,
+        partition_id: u32,
+        consumer_kind: String,
+        consumer_id: ffi::Identifier,
+        polling_strategy_kind: String,
+        polling_strategy_value: u64,
+        count: u32,
+        auto_commit: bool,
+    ) -> Result<ffi::PolledMessages, String> {
+        let rust_stream_id = RustIdentifier::try_from(stream_id)
+            .map_err(|error| format!("Could not poll messages: {error}"))?;
+        let rust_topic_id = RustIdentifier::try_from(topic_id)
+            .map_err(|error| format!("Could not poll messages: {error}"))?;
+        let rust_consumer_id = RustIdentifier::try_from(consumer_id)
+            .map_err(|error| format!("Could not poll messages: {error}"))?;
+
+        let consumer = match consumer_kind.as_str() {
+            "consumer" => Consumer::new(rust_consumer_id),
+            "consumer_group" => Consumer::group(rust_consumer_id),
+            _ => {
+                return Err(format!(
+                    "Could not poll messages: invalid consumer kind: 
{consumer_kind}"
+                ));
+            }
+        };
+
+        let strategy = match polling_strategy_kind.as_str() {
+            "offset" => PollingStrategy::offset(polling_strategy_value),
+            "timestamp" => 
PollingStrategy::timestamp(IggyTimestamp::from(polling_strategy_value)),
+            "first" => PollingStrategy::first(),
+            "last" => PollingStrategy::last(),
+            "next" => PollingStrategy::next(),
+            _ => {
+                return Err(format!(
+                    "Could not poll messages: invalid polling strategy: 
{polling_strategy_kind}"
+                ));
+            }
+        };
+
+        let opt_partition = if partition_id == ANY_PARTITION_ID {
+            None
+        } else {
+            Some(partition_id)
+        };
+
+        RUNTIME.block_on(async {
+            let polled = self
+                .inner
+                .poll_messages(
+                    &rust_stream_id,
+                    &rust_topic_id,
+                    opt_partition,
+                    &consumer,
+                    &strategy,
+                    count,
+                    auto_commit,
+                )
+                .await
+                .map_err(|error| format!("Could not poll messages: {error}"))?;
+            Ok(ffi::PolledMessages::from(polled))
+        })
+    }
+
     #[allow(clippy::too_many_arguments)]
     pub fn create_topic(
         &self,
@@ -378,6 +544,66 @@ impl Client {
             Ok(())
         })
     }
+
+    pub fn join_consumer_group(
+        &self,
+        stream_id: ffi::Identifier,
+        topic_id: ffi::Identifier,
+        group_id: ffi::Identifier,
+    ) -> Result<(), String> {
+        let rust_stream_id = 
RustIdentifier::try_from(stream_id).map_err(|error| {
+            format!("Could not join consumer group: invalid stream identifier: 
{error}")
+        })?;
+        let rust_topic_id = RustIdentifier::try_from(topic_id).map_err(|error| 
{
+            format!("Could not join consumer group: invalid topic identifier: 
{error}")
+        })?;
+        let rust_group_id = RustIdentifier::try_from(group_id).map_err(|error| 
{
+            format!("Could not join consumer group: invalid group identifier: 
{error}")
+        })?;
+
+        RUNTIME.block_on(async {
+            self.inner
+                .join_consumer_group(&rust_stream_id, &rust_topic_id, 
&rust_group_id)
+                .await
+                .map_err(|error| {
+                    format!(
+                        "Could not join consumer group '{}' for topic '{}' on 
stream '{}': {error}",
+                        rust_group_id, rust_topic_id, rust_stream_id
+                    )
+                })?;
+            Ok(())
+        })
+    }
+
+    pub fn leave_consumer_group(
+        &self,
+        stream_id: ffi::Identifier,
+        topic_id: ffi::Identifier,
+        group_id: ffi::Identifier,
+    ) -> Result<(), String> {
+        let rust_stream_id = 
RustIdentifier::try_from(stream_id).map_err(|error| {
+            format!("Could not leave consumer group: invalid stream 
identifier: {error}")
+        })?;
+        let rust_topic_id = RustIdentifier::try_from(topic_id).map_err(|error| 
{
+            format!("Could not leave consumer group: invalid topic identifier: 
{error}")
+        })?;
+        let rust_group_id = RustIdentifier::try_from(group_id).map_err(|error| 
{
+            format!("Could not leave consumer group: invalid group identifier: 
{error}")
+        })?;
+
+        RUNTIME.block_on(async {
+            self.inner
+                .leave_consumer_group(&rust_stream_id, &rust_topic_id, 
&rust_group_id)
+                .await
+                .map_err(|error| {
+                    format!(
+                        "Could not leave consumer group '{}' for topic '{}' on 
stream '{}': {error}",
+                        rust_group_id, rust_topic_id, rust_stream_id
+                    )
+                })?;
+            Ok(())
+        })
+    }
 }
 
 pub unsafe fn delete_connection(client: *mut Client) -> Result<(), String> {
@@ -385,7 +611,9 @@ pub unsafe fn delete_connection(client: *mut Client) -> 
Result<(), String> {
         return Ok(());
     }
 
-    // TODO(slbotbm): Address comment from @hubcio: if logout_user will fail 
you will have a leak, this will be tagged by e.g. valgrind if someone will test 
iggy rigorously
+    // `Box::from_raw` below runs unconditionally, so the client is always 
released regardless
+    // of `logout_result`. The result is only used to surface a logout error 
to the caller — there
+    // is no leak path here.
     let logout_result = RUNTIME.block_on(async { unsafe { &*client 
}.inner.logout_user().await });
 
     unsafe {
diff --git a/foreign/cpp/src/identifier.rs b/foreign/cpp/src/identifier.rs
index 02c9d8a48..93877458d 100644
--- a/foreign/cpp/src/identifier.rs
+++ b/foreign/cpp/src/identifier.rs
@@ -62,6 +62,10 @@ impl TryFrom<ffi::Identifier> for RustIdentifier {
     }
 }
 
+// Rust 1.95 added the `wrong_self_convention` lint for `from_*` methods that 
take `&mut self`.
+// These methods initialize the FFI `Identifier` struct in place from C++ — 
keeping the names
+// preserves the C++ ABI used by every test and downstream binding.
+#[allow(clippy::wrong_self_convention)]
 impl ffi::Identifier {
     pub fn from_string(&mut self, id: String) -> Result<(), String> {
         *self = RustIdentifier::named(&id)
diff --git a/foreign/cpp/src/lib.rs b/foreign/cpp/src/lib.rs
index 4d47a5b94..7f87d5765 100644
--- a/foreign/cpp/src/lib.rs
+++ b/foreign/cpp/src/lib.rs
@@ -17,10 +17,12 @@
 mod client;
 mod consumer_group;
 mod identifier;
+mod messages;
 mod stream;
 mod topic;
 
 use client::{Client, delete_connection, new_connection};
+use messages::make_message;
 use std::sync::LazyLock;
 
 static RUNTIME: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
@@ -51,6 +53,43 @@ mod ffi {
         partitions_count: u32,
     }
 
+    struct Stream {
+        id: u32,
+        created_at: u64,
+        name: String,
+        size_bytes: u64,
+        messages_count: u64,
+        topics_count: u32,
+    }
+
+    struct IggyMessageToSend {
+        id_lo: u64,
+        id_hi: u64,
+        payload: Vec<u8>,
+        user_headers: Vec<u8>,
+    }
+
+    struct IggyMessagePolled {
+        checksum: u64,
+        id_lo: u64,
+        id_hi: u64,
+        offset: u64,
+        timestamp: u64,
+        origin_timestamp: u64,
+        user_headers_length: u32,
+        payload_length: u32,
+        reserved: u64,
+        payload: Vec<u8>,
+        user_headers: Vec<u8>,
+    }
+
+    struct PolledMessages {
+        partition_id: u32,
+        current_offset: u64,
+        count: u32,
+        messages: Vec<IggyMessagePolled>,
+    }
+
     struct StreamDetails {
         id: u32,
         created_at: u64,
@@ -83,6 +122,7 @@ mod ffi {
         fn login_user(self: &Client, username: String, password: String) -> 
Result<()>;
         fn connect(self: &Client) -> Result<()>;
         fn create_stream(self: &Client, stream_name: String) -> Result<()>;
+        fn get_streams(self: &Client) -> Result<Vec<Stream>>;
         fn get_stream(self: &Client, stream_id: Identifier) -> 
Result<StreamDetails>;
         fn delete_stream(self: &Client, stream_id: Identifier) -> Result<()>;
         // fn purge_stream(&self, stream_id: Identifier) -> Result<()>;
@@ -129,6 +169,44 @@ mod ffi {
             topic_id: Identifier,
             group_id: Identifier,
         ) -> Result<()>;
+        fn join_consumer_group(
+            self: &Client,
+            stream_id: Identifier,
+            topic_id: Identifier,
+            group_id: Identifier,
+        ) -> Result<()>;
+        fn leave_consumer_group(
+            self: &Client,
+            stream_id: Identifier,
+            topic_id: Identifier,
+            group_id: Identifier,
+        ) -> Result<()>;
+
+        #[allow(clippy::too_many_arguments)]
+        fn poll_messages(
+            self: &Client,
+            stream_id: Identifier,
+            topic_id: Identifier,
+            partition_id: u32,
+            consumer_kind: String,
+            consumer_id: Identifier,
+            polling_strategy_kind: String,
+            polling_strategy_value: u64,
+            count: u32,
+            auto_commit: bool,
+        ) -> Result<PolledMessages>;
+
+        fn make_message(payload: Vec<u8>) -> IggyMessageToSend;
+
+        #[allow(clippy::too_many_arguments)]
+        fn send_messages(
+            self: &Client,
+            stream_id: Identifier,
+            topic_id: Identifier,
+            partitioning_kind: String,
+            partitioning_value: Vec<u8>,
+            messages: Vec<IggyMessageToSend>,
+        ) -> Result<()>;
 
         unsafe fn delete_connection(client: *mut Client) -> Result<()>;
 
diff --git a/foreign/cpp/src/messages.rs b/foreign/cpp/src/messages.rs
new file mode 100644
index 000000000..2a36c62f3
--- /dev/null
+++ b/foreign/cpp/src/messages.rs
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::ffi;
+use bytes::Bytes;
+use iggy::prelude::{IggyMessage as RustIggyMessage, PolledMessages as 
RustPolledMessages};
+
+pub fn make_message(payload: Vec<u8>) -> ffi::IggyMessageToSend {
+    ffi::IggyMessageToSend {
+        id_lo: 0,
+        id_hi: 0,
+        payload,
+        user_headers: Vec::new(),
+    }
+}
+
+impl From<RustIggyMessage> for ffi::IggyMessagePolled {
+    fn from(m: RustIggyMessage) -> Self {
+        let id_bytes = m.header.id.to_le_bytes();
+        let id_lo = u64::from_le_bytes(id_bytes[0..8].try_into().unwrap());
+        let id_hi = u64::from_le_bytes(id_bytes[8..16].try_into().unwrap());
+        ffi::IggyMessagePolled {
+            checksum: m.header.checksum,
+            id_lo,
+            id_hi,
+            offset: m.header.offset,
+            timestamp: m.header.timestamp,
+            origin_timestamp: m.header.origin_timestamp,
+            user_headers_length: m.header.user_headers_length,
+            payload_length: m.header.payload_length,
+            reserved: m.header.reserved,
+            payload: m.payload.to_vec(),
+            user_headers: m.user_headers.map(|h| 
h.to_vec()).unwrap_or_default(),
+        }
+    }
+}
+
+impl TryFrom<ffi::IggyMessageToSend> for RustIggyMessage {
+    type Error = String;
+
+    fn try_from(m: ffi::IggyMessageToSend) -> Result<Self, Self::Error> {
+        if !m.user_headers.is_empty() {
+            return Err(
+                "Could not convert message: user_headers are not yet supported 
in the C++ SDK"
+                    .to_string(),
+            );
+        }
+        let id = ((m.id_hi as u128) << 64) | (m.id_lo as u128);
+        let payload = Bytes::from(m.payload);
+        RustIggyMessage::builder()
+            .id(id)
+            .payload(payload)
+            .build()
+            .map_err(|error| format!("Could not convert message: {error}"))
+    }
+}
+
+impl From<RustPolledMessages> for ffi::PolledMessages {
+    fn from(p: RustPolledMessages) -> Self {
+        ffi::PolledMessages {
+            partition_id: p.partition_id,
+            current_offset: p.current_offset,
+            count: p.count,
+            messages: p
+                .messages
+                .into_iter()
+                .map(ffi::IggyMessagePolled::from)
+                .collect(),
+        }
+    }
+}
diff --git a/foreign/cpp/src/stream.rs b/foreign/cpp/src/stream.rs
index 5c1f6e9d7..b6e0ae22b 100644
--- a/foreign/cpp/src/stream.rs
+++ b/foreign/cpp/src/stream.rs
@@ -16,8 +16,22 @@
 // under the License.
 
 use crate::ffi;
+use iggy::prelude::Stream as RustStream;
 use iggy::prelude::StreamDetails as RustStreamDetails;
 
+impl From<RustStream> for ffi::Stream {
+    fn from(s: RustStream) -> Self {
+        ffi::Stream {
+            id: s.id,
+            created_at: s.created_at.as_micros(),
+            name: s.name,
+            size_bytes: s.size.as_bytes_u64(),
+            messages_count: s.messages_count,
+            topics_count: s.topics_count,
+        }
+    }
+}
+
 impl From<RustStreamDetails> for ffi::StreamDetails {
     fn from(stream: RustStreamDetails) -> Self {
         ffi::StreamDetails {
diff --git a/foreign/cpp/tests/client/low_level_e2e.cpp 
b/foreign/cpp/tests/client/low_level_e2e.cpp
index 26b82b82f..7306f0c95 100644
--- a/foreign/cpp/tests/client/low_level_e2e.cpp
+++ b/foreign/cpp/tests/client/low_level_e2e.cpp
@@ -16,6 +16,7 @@
 // under the License.
 
 // TODO(slbotbm): create fixture for setup/teardown.
+// TODO(slbotbm): Add tests for join_consumer_group() and 
leave_consumer_group()
 
 #include <string>
 
diff --git a/foreign/cpp/tests/common/test_helpers.hpp 
b/foreign/cpp/tests/common/test_helpers.hpp
index 5457c09d7..15851f08c 100644
--- a/foreign/cpp/tests/common/test_helpers.hpp
+++ b/foreign/cpp/tests/common/test_helpers.hpp
@@ -42,3 +42,20 @@ inline iggy::ffi::Client *login_to_server() {
     client->login_user("iggy", "iggy");
     return client;
 }
+
+inline rust::Vec<std::uint8_t> to_payload(const std::string &s) {
+    rust::Vec<std::uint8_t> v;
+    for (const char c : s) {
+        v.push_back(static_cast<std::uint8_t>(c));
+    }
+    return v;
+}
+
+inline rust::Vec<std::uint8_t> partition_id_bytes(std::uint32_t id) {
+    rust::Vec<std::uint8_t> v;
+    v.push_back(static_cast<std::uint8_t>(id & 0xFF));
+    v.push_back(static_cast<std::uint8_t>((id >> 8) & 0xFF));
+    v.push_back(static_cast<std::uint8_t>((id >> 16) & 0xFF));
+    v.push_back(static_cast<std::uint8_t>((id >> 24) & 0xFF));
+    return v;
+}
diff --git a/foreign/cpp/tests/message/low_level_e2e.cpp 
b/foreign/cpp/tests/message/low_level_e2e.cpp
new file mode 100644
index 000000000..4290afadf
--- /dev/null
+++ b/foreign/cpp/tests/message/low_level_e2e.cpp
@@ -0,0 +1,1249 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <chrono>
+#include <cstdint>
+#include <string>
+#include <thread>
+
+#include <gtest/gtest.h>
+
+#include "lib.rs.h"
+#include "tests/common/test_helpers.hpp"
+
+TEST(LowLevelE2E_Message, SendAndPollMessagesRoundTrip) {
+    RecordProperty("description", "Sends 10 messages and polls them back, 
verifying count, offsets, and payloads.");
+    const std::string stream_name = "cpp-msg-roundtrip";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+    for (std::uint32_t i = 0; i < 10; i++) {
+        auto msg = iggy::ffi::make_message(to_payload("test message " + 
std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+
+    ASSERT_NO_THROW(client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0),
+                                          "partition_id", 
partition_id_bytes(0), std::move(messages)));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
0, 100, false);
+
+    ASSERT_EQ(polled.partition_id, 0u) << "Polled partition_id mismatches the 
partition we sent to";
+    ASSERT_EQ(polled.count, 10u);
+    ASSERT_EQ(polled.messages.size(), 10u);
+    for (std::uint32_t i = 0; i < 10; i++) {
+        ASSERT_EQ(polled.messages[i].offset, static_cast<std::uint64_t>(i));
+        std::string expected = "test message " + std::to_string(i);
+        std::string actual(polled.messages[i].payload.begin(), 
polled.messages[i].payload.end());
+        ASSERT_EQ(actual, expected) << "Payload mismatch at offset " << i;
+    }
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesVerifyMessageIds) {
+    RecordProperty("description", "Verifies that polled message IDs match the 
sent IDs.");
+    const std::string stream_name = "cpp-msg-verify-ids";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+    auto msg  = iggy::ffi::make_message(to_payload("id-test-message"));
+    msg.id_lo = 42;
+    msg.id_hi = 0;
+    messages.push_back(std::move(msg));
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
0, 100, false);
+
+    ASSERT_EQ(polled.messages.size(), 1u);
+    ASSERT_EQ(polled.messages[0].id_lo, 42u);
+    ASSERT_EQ(polled.messages[0].id_hi, 0u);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesFromEmptyPartition) {
+    RecordProperty("description", "Verifies polling from an empty partition 
returns zero messages.");
+    const std::string stream_name = "cpp-msg-empty-poll";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
0, 100, false);
+
+    ASSERT_EQ(polled.count, 0u);
+    ASSERT_EQ(polled.messages.size(), 0u);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesBeforeLoginThrows) {
+    RecordProperty("description", "Verifies send_messages throws when not 
authenticated.");
+    iggy::ffi::Client *client = nullptr;
+    ASSERT_NO_THROW({ client = iggy::ffi::new_connection(""); });
+    ASSERT_NE(client, nullptr);
+    ASSERT_NO_THROW(client->connect());
+
+    rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+    auto msg = iggy::ffi::make_message(to_payload("should-fail"));
+    messages.push_back(std::move(msg));
+
+    ASSERT_THROW(client->send_messages(make_numeric_identifier(1), 
make_numeric_identifier(1), "partition_id",
+                                       partition_id_bytes(0), 
std::move(messages)),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesWithInvalidStreamId) {
+    RecordProperty("description", "Throws when sending messages with an 
invalid stream identifier.");
+    iggy::ffi::Client *client = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+    auto msg = iggy::ffi::make_message(to_payload("test"));
+    messages.push_back(std::move(msg));
+
+    iggy::ffi::Identifier invalid_id;
+    invalid_id.kind   = "invalid";
+    invalid_id.length = 0;
+
+    ASSERT_THROW(client->send_messages(invalid_id, make_numeric_identifier(1), 
"partition_id", partition_id_bytes(0),
+                                       std::move(messages)),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesToNonExistentStream) {
+    RecordProperty("description", "Throws when sending messages to a 
non-existent stream.");
+    iggy::ffi::Client *client = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+    auto msg = iggy::ffi::make_message(to_payload("test"));
+    messages.push_back(std::move(msg));
+
+    
ASSERT_THROW(client->send_messages(make_string_identifier("nonexistent-stream-12345"),
 make_numeric_identifier(0),
+                                       "partition_id", partition_id_bytes(0), 
std::move(messages)),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesWithInvalidPartitioningKind) {
+    RecordProperty("description", "Throws when sending messages with an 
invalid partitioning kind.");
+    const std::string stream_name = "cpp-msg-invalid-part-kind";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+    auto msg = iggy::ffi::make_message(to_payload("test"));
+    messages.push_back(std::move(msg));
+
+    ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "invalid_kind",
+                                       partition_id_bytes(0), 
std::move(messages)),
+                 std::exception);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesWithInvalidPartitioningValue) {
+    RecordProperty("description", "Throws when sending messages with 
insufficient partitioning value bytes.");
+    const std::string stream_name = "cpp-msg-invalid-part-val";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+    auto msg = iggy::ffi::make_message(to_payload("test"));
+    messages.push_back(std::move(msg));
+
+    rust::Vec<std::uint8_t> short_bytes;
+    short_bytes.push_back(0x00);
+    short_bytes.push_back(0x01);
+
+    ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                                       std::move(short_bytes), 
std::move(messages)),
+                 std::exception);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesToSpecificPartitionVerified) {
+    RecordProperty("description",
+                   "Verifies messages sent to a specific partition are only 
retrievable from that partition.");
+    const std::string stream_name = "cpp-msg-specific-part";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 3, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+    for (std::uint32_t i = 0; i < 5; i++) {
+        auto msg = iggy::ffi::make_message(to_payload("partition-test-" + 
std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled_part0 = 
client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0,
+                                              "consumer", 
make_numeric_identifier(1), "offset", 0, 100, false);
+    ASSERT_EQ(polled_part0.partition_id, 0u);
+    ASSERT_EQ(polled_part0.count, 5u);
+
+    auto polled_part1 = 
client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 1,
+                                              "consumer", 
make_numeric_identifier(1), "offset", 0, 100, false);
+    ASSERT_EQ(polled_part1.partition_id, 1u);
+    ASSERT_EQ(polled_part1.count, 0u);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendEmptyMessageVectorThrows) {
+    RecordProperty("description", "Throws when sending an empty message 
vector.");
+    const std::string stream_name = "cpp-msg-empty-vec";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::IggyMessageToSend> empty_messages;
+
+    ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                                       partition_id_bytes(0), 
std::move(empty_messages)),
+                 std::exception);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessageWithEmptyPayloadThrows) {
+    RecordProperty("description", "Throws when sending a message with an empty 
payload.");
+    const std::string stream_name = "cpp-msg-empty-payload";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+    rust::Vec<std::uint8_t> empty_payload;
+    auto msg = iggy::ffi::make_message(std::move(empty_payload));
+    messages.push_back(std::move(msg));
+
+    ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                                       partition_id_bytes(0), 
std::move(messages)),
+                 std::exception);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessageWithOversizedPayloadThrows) {
+    RecordProperty("description", "Throws when sending a message exceeding 
maximum payload size.");
+    const std::string stream_name = "cpp-msg-oversized";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    // Build a payload one byte over the SDK's max payload size (64 MB). 
cxx::Vec exposes no
+    // public reserve API, so the loop relies on amortised geometric growth.
+    constexpr std::uint32_t kOversizedPayloadBytes = 64'000'001u;
+    rust::Vec<std::uint8_t> oversized_payload;
+    for (std::uint32_t i = 0; i < kOversizedPayloadBytes; i++) {
+        oversized_payload.push_back(0x41);
+    }
+
+    rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+    auto msg = iggy::ffi::make_message(std::move(oversized_payload));
+    messages.push_back(std::move(msg));
+
+    ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                                       partition_id_bytes(0), 
std::move(messages)),
+                 std::exception);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesPreservesOrder) {
+    RecordProperty("description", "Verifies messages are stored and retrieved 
in the order they were sent.");
+    const std::string stream_name = "cpp-msg-order";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+    for (std::uint32_t i = 0; i < 50; i++) {
+        auto msg = iggy::ffi::make_message(to_payload("order-" + 
std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
0, 100, false);
+
+    ASSERT_EQ(polled.count, 50u);
+    for (std::uint32_t i = 0; i < 50; i++) {
+        ASSERT_EQ(polled.messages[i].offset, static_cast<std::uint64_t>(i));
+        std::string expected = "order-" + std::to_string(i);
+        std::string actual(polled.messages[i].payload.begin(), 
polled.messages[i].payload.end());
+        EXPECT_EQ(actual, expected) << "Payload mismatch at offset " << i;
+    }
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesWithDuplicateIds) {
+    RecordProperty("description", "Verifies sending multiple messages with the 
same ID succeeds.");
+    const std::string stream_name = "cpp-msg-dup-ids";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+    for (std::uint32_t i = 0; i < 3; i++) {
+        auto msg  = iggy::ffi::make_message(to_payload("dup-id-msg-" + 
std::to_string(i)));
+        msg.id_lo = 99;
+        msg.id_hi = 0;
+        messages.push_back(std::move(msg));
+    }
+
+    ASSERT_NO_THROW(client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0),
+                                          "partition_id", 
partition_id_bytes(0), std::move(messages)));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
0, 100, false);
+
+    ASSERT_EQ(polled.count, 3u);
+    for (std::size_t i = 0; i < polled.messages.size(); i++) {
+        EXPECT_EQ(polled.messages[i].id_lo, 99u);
+    }
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesWithVariousPayloads) {
+    RecordProperty("description",
+                   "Verifies various payload types including null bytes, 
UTF-8, and binary data are preserved.");
+    const std::string stream_name = "cpp-msg-various-payloads";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<std::uint8_t> payload_null;
+    payload_null.push_back(0x00);
+    payload_null.push_back(0x01);
+    payload_null.push_back(0x00);
+    payload_null.push_back(0xFF);
+
+    rust::Vec<std::uint8_t> payload_binary;
+    payload_binary.push_back(0xDE);
+    payload_binary.push_back(0xAD);
+    payload_binary.push_back(0xBE);
+    payload_binary.push_back(0xEF);
+
+    rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+
+    auto msg0 = iggy::ffi::make_message(to_payload("simple ascii"));
+    messages.push_back(std::move(msg0));
+
+    auto msg1 = iggy::ffi::make_message(std::move(payload_null));
+    messages.push_back(std::move(msg1));
+
+    auto msg2 = iggy::ffi::make_message(to_payload("héllo wörld"));
+    messages.push_back(std::move(msg2));
+
+    auto msg3 = iggy::ffi::make_message(std::move(payload_binary));
+    messages.push_back(std::move(msg3));
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
0, 100, false);
+
+    ASSERT_EQ(polled.count, 4u);
+
+    std::string ascii_actual(polled.messages[0].payload.begin(), 
polled.messages[0].payload.end());
+    EXPECT_EQ(ascii_actual, "simple ascii");
+
+    ASSERT_EQ(polled.messages[1].payload.size(), 4u);
+    EXPECT_EQ(polled.messages[1].payload[0], 0x00);
+    EXPECT_EQ(polled.messages[1].payload[1], 0x01);
+    EXPECT_EQ(polled.messages[1].payload[2], 0x00);
+    EXPECT_EQ(polled.messages[1].payload[3], 0xFF);
+
+    std::string utf8_actual(polled.messages[2].payload.begin(), 
polled.messages[2].payload.end());
+    EXPECT_EQ(utf8_actual, "héllo wörld");
+
+    ASSERT_EQ(polled.messages[3].payload.size(), 4u);
+    EXPECT_EQ(polled.messages[3].payload[0], 0xDE);
+    EXPECT_EQ(polled.messages[3].payload[1], 0xAD);
+    EXPECT_EQ(polled.messages[3].payload[2], 0xBE);
+    EXPECT_EQ(polled.messages[3].payload[3], 0xEF);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesBeforeLoginThrows) {
+    RecordProperty("description", "Throws when polling messages before 
authentication.");
+    iggy::ffi::Client *client = nullptr;
+    ASSERT_NO_THROW({ client = iggy::ffi::new_connection(""); });
+    ASSERT_NE(client, nullptr);
+    ASSERT_NO_THROW(client->connect());
+
+    ASSERT_THROW(client->poll_messages(make_numeric_identifier(1), 
make_numeric_identifier(0), 0, "consumer",
+                                       make_numeric_identifier(1), "offset", 
0, 10, false),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesWithInvalidStreamIdThrows) {
+    RecordProperty("description", "Throws when polling messages with an 
invalid stream identifier.");
+    iggy::ffi::Client *client = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    iggy::ffi::Identifier invalid_id;
+    invalid_id.kind   = "invalid";
+    invalid_id.length = 0;
+
+    ASSERT_THROW(client->poll_messages(invalid_id, make_numeric_identifier(0), 
0, "consumer",
+                                       make_numeric_identifier(1), "offset", 
0, 10, false),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesFromNonExistentStreamThrows) {
+    RecordProperty("description", "Throws when polling messages from a 
non-existent stream.");
+    iggy::ffi::Client *client = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    
ASSERT_THROW(client->poll_messages(make_string_identifier("nonexistent-stream-poll"),
 make_numeric_identifier(0), 0,
+                                       "consumer", make_numeric_identifier(1), 
"offset", 0, 10, false),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesWithInvalidConsumerKindThrows) {
+    RecordProperty("description", "Throws when polling messages with an 
invalid consumer kind.");
+    const std::string stream_name = "cpp-msg-invalid-consumer";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    ASSERT_THROW(client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "invalid",
+                                       make_numeric_identifier(1), "offset", 
0, 10, false),
+                 std::exception);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesWithInvalidStrategyKindThrows) {
+    RecordProperty("description", "Throws when polling messages with an 
invalid polling strategy kind.");
+    const std::string stream_name = "cpp-msg-invalid-strategy";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    ASSERT_THROW(client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                       make_numeric_identifier(1), "invalid", 
0, 10, false),
+                 std::exception);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesCountLessThanAvailable) {
+    RecordProperty("description", "Returns only the requested count when fewer 
messages are requested than available.");
+    const std::string stream_name = "cpp-msg-count-less";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+    for (std::uint32_t i = 0; i < 10; i++) {
+        auto msg = iggy::ffi::make_message(to_payload("msg-" + 
std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
0, 5, false);
+
+    ASSERT_EQ(polled.count, 5u);
+    ASSERT_EQ(polled.messages.size(), 5u);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesWithLargeOffset) {
+    RecordProperty("description", "Returns zero messages when polling with an 
offset beyond available messages.");
+    const std::string stream_name = "cpp-msg-large-offset";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+    for (std::uint32_t i = 0; i < 5; i++) {
+        auto msg = iggy::ffi::make_message(to_payload("msg-" + 
std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
999999, 100, false);
+
+    ASSERT_EQ(polled.count, 0u);
+    ASSERT_EQ(polled.messages.size(), 0u);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesFirstStrategy) {
+    RecordProperty("description", "Verifies first polling strategy returns 
messages from the beginning.");
+    const std::string stream_name = "cpp-msg-first-strategy";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+    for (std::uint32_t i = 0; i < 10; i++) {
+        auto msg = iggy::ffi::make_message(to_payload("msg-" + 
std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "first", 
0, 3, false);
+
+    ASSERT_EQ(polled.count, 3u);
+    ASSERT_EQ(polled.messages.size(), 3u);
+    EXPECT_EQ(polled.messages[0].offset, 0u);
+    for (std::uint32_t i = 0; i < 3; i++) {
+        EXPECT_EQ(polled.messages[i].offset, static_cast<std::uint64_t>(i));
+        std::string expected = "msg-" + std::to_string(i);
+        std::string actual(polled.messages[i].payload.begin(), 
polled.messages[i].payload.end());
+        EXPECT_EQ(actual, expected) << "Payload mismatch at offset " << i;
+    }
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesLastStrategy) {
+    RecordProperty("description", "Verifies last polling strategy returns 
messages from the end.");
+    const std::string stream_name = "cpp-msg-last-strategy";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+    for (std::uint32_t i = 0; i < 10; i++) {
+        auto msg = iggy::ffi::make_message(to_payload("msg-" + 
std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "last", 0, 
3, false);
+
+    ASSERT_EQ(polled.count, 3u);
+    ASSERT_EQ(polled.messages.size(), 3u);
+    EXPECT_EQ(polled.messages[0].offset, 7u);
+    EXPECT_EQ(polled.messages[2].offset, 9u);
+    for (std::uint32_t i = 0; i < 3; i++) {
+        std::string expected = "msg-" + std::to_string(7 + i);
+        std::string actual(polled.messages[i].payload.begin(), 
polled.messages[i].payload.end());
+        EXPECT_EQ(actual, expected) << "Payload mismatch at index " << i;
+    }
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesNextStrategyNoAutoCommit) {
+    RecordProperty("description",
+                   "Verifies next strategy without auto-commit returns the 
same messages on repeated calls.");
+    const std::string stream_name = "cpp-msg-next-no-commit";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+    for (std::uint32_t i = 0; i < 5; i++) {
+        auto msg = iggy::ffi::make_message(to_payload("msg-" + 
std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled1 = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                         make_numeric_identifier(1), "next", 
0, 100, false);
+    ASSERT_EQ(polled1.count, 5u);
+
+    auto polled2 = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                         make_numeric_identifier(1), "next", 
0, 100, false);
+    ASSERT_EQ(polled2.count, 5u);
+    for (std::uint32_t i = 0; i < 5; i++) {
+        EXPECT_EQ(polled1.messages[i].offset, static_cast<std::uint64_t>(i));
+        std::string expected = "msg-" + std::to_string(i);
+        std::string actual(polled1.messages[i].payload.begin(), 
polled1.messages[i].payload.end());
+        EXPECT_EQ(actual, expected) << "polled1 payload mismatch at index " << 
i;
+    }
+    for (std::uint32_t i = 0; i < 5; i++) {
+        EXPECT_EQ(polled2.messages[i].offset, static_cast<std::uint64_t>(i));
+        std::string expected = "msg-" + std::to_string(i);
+        std::string actual(polled2.messages[i].payload.begin(), 
polled2.messages[i].payload.end());
+        EXPECT_EQ(actual, expected) << "polled2 payload mismatch at index " << 
i;
+    }
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesNextStrategyAutoCommit) {
+    RecordProperty("description", "Verifies next strategy with auto-commit 
advances the offset on subsequent polls.");
+    const std::string stream_name = "cpp-msg-next-auto-commit";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+    for (std::uint32_t i = 0; i < 10; i++) {
+        auto msg = iggy::ffi::make_message(to_payload("msg-" + 
std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled1 = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                         make_numeric_identifier(1), "next", 
0, 5, true);
+    ASSERT_EQ(polled1.count, 5u);
+    EXPECT_EQ(polled1.messages[0].offset, 0u);
+    EXPECT_EQ(polled1.messages[4].offset, 4u);
+
+    auto polled2 = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                         make_numeric_identifier(1), "next", 
0, 5, true);
+    ASSERT_EQ(polled2.count, 5u);
+    EXPECT_EQ(polled2.messages[0].offset, 5u);
+    EXPECT_EQ(polled2.messages[4].offset, 9u);
+    for (std::uint32_t i = 0; i < 5; i++) {
+        std::string expected1 = "msg-" + std::to_string(i);
+        std::string actual1(polled1.messages[i].payload.begin(), 
polled1.messages[i].payload.end());
+        EXPECT_EQ(actual1, expected1) << "polled1 payload mismatch at index " 
<< i;
+    }
+    for (std::uint32_t i = 0; i < 5; i++) {
+        std::string expected2 = "msg-" + std::to_string(5 + i);
+        std::string actual2(polled2.messages[i].payload.begin(), 
polled2.messages[i].payload.end());
+        EXPECT_EQ(actual2, expected2) << "polled2 payload mismatch at index " 
<< i;
+    }
+
+    auto polled3 = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                         make_numeric_identifier(1), "next", 
0, 5, true);
+    ASSERT_EQ(polled3.count, 0u);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesConsumerIdIndependence) {
+    RecordProperty("description", "Verifies different consumer IDs maintain 
independent offsets.");
+    const std::string stream_name = "cpp-msg-consumer-indep";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+    for (std::uint32_t i = 0; i < 5; i++) {
+        auto msg = iggy::ffi::make_message(to_payload("msg-" + 
std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled_c1 = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0,
+                                           "consumer", 
make_numeric_identifier(1), "next", 0, 3, true);
+    ASSERT_EQ(polled_c1.count, 3u);
+
+    auto polled_c2 = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0,
+                                           "consumer", 
make_numeric_identifier(2), "next", 0, 5, true);
+    ASSERT_EQ(polled_c2.count, 5u);
+
+    auto polled_c1_again = 
client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0,
+                                                 "consumer", 
make_numeric_identifier(1), "next", 0, 5, true);
+    ASSERT_EQ(polled_c1_again.count, 2u);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesMultipleSendsThenPollOrder) {
+    RecordProperty("description", "Verifies message ordering is preserved 
across multiple send batches.");
+    const std::string stream_name = "cpp-msg-multi-batch-order";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::IggyMessageToSend> batch1;
+    for (std::uint32_t i = 0; i < 5; i++) {
+        auto msg = iggy::ffi::make_message(to_payload("batch1-" + 
std::to_string(i)));
+        batch1.push_back(std::move(msg));
+    }
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(batch1));
+
+    rust::Vec<iggy::ffi::IggyMessageToSend> batch2;
+    for (std::uint32_t i = 0; i < 5; i++) {
+        auto msg = iggy::ffi::make_message(to_payload("batch2-" + 
std::to_string(i)));
+        batch2.push_back(std::move(msg));
+    }
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(batch2));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
0, 100, false);
+
+    ASSERT_EQ(polled.count, 10u);
+    for (std::uint32_t i = 0; i < 10; i++) {
+        EXPECT_EQ(polled.messages[i].offset, static_cast<std::uint64_t>(i)) << 
"Offset mismatch at index " << i;
+    }
+    for (std::uint32_t i = 0; i < 5; i++) {
+        std::string expected = "batch1-" + std::to_string(i);
+        std::string actual(polled.messages[i].payload.begin(), 
polled.messages[i].payload.end());
+        EXPECT_EQ(actual, expected) << "batch1 payload mismatch at index " << 
i;
+    }
+    for (std::uint32_t i = 0; i < 5; i++) {
+        std::string expected = "batch2-" + std::to_string(i);
+        std::string actual(polled.messages[5 + i].payload.begin(), 
polled.messages[5 + i].payload.end());
+        EXPECT_EQ(actual, expected) << "batch2 payload mismatch at index " << 
i;
+    }
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesMultipleCustomIds) {
+    RecordProperty("description", "Verifies multiple messages with distinct 
custom IDs are all preserved.");
+    const std::string stream_name = "cpp-msg-multi-custom-ids";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    const std::uint64_t id_values[] = {100, 200, 300, 400, 500};
+    rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+    for (std::uint32_t i = 0; i < 5; i++) {
+        auto msg  = iggy::ffi::make_message(to_payload("msg-" + 
std::to_string(i)));
+        msg.id_lo = id_values[i];
+        msg.id_hi = 0;
+        messages.push_back(std::move(msg));
+    }
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
0, 100, false);
+
+    ASSERT_EQ(polled.count, 5u);
+    for (std::uint32_t i = 0; i < 5; i++) {
+        EXPECT_EQ(polled.messages[i].id_lo, id_values[i]) << "ID mismatch at 
index " << i;
+        EXPECT_EQ(polled.messages[i].id_hi, 0u);
+    }
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesAfterStreamDeletedThrows) {
+    RecordProperty("description", "Throws when polling messages after the 
stream has been deleted.");
+    const std::string stream_name = "cpp-msg-deleted-stream";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+    auto msg = iggy::ffi::make_message(to_payload("test"));
+    messages.push_back(std::move(msg));
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    std::uint32_t saved_stream_id = stream.id;
+    client->delete_stream(make_numeric_identifier(saved_stream_id));
+
+    
ASSERT_THROW(client->poll_messages(make_numeric_identifier(saved_stream_id), 
make_numeric_identifier(0), 0,
+                                       "consumer", make_numeric_identifier(1), 
"offset", 0, 10, false),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesWithInvalidPartitionIdThrows) {
+    RecordProperty("description", "Throws when polling with a non-existent 
partition ID.");
+    const std::string stream_name = "cpp-msg-invalid-partition";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    ASSERT_THROW(client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 9999, "consumer",
+                                       make_numeric_identifier(1), "offset", 
0, 10, false),
+                 std::exception);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesWithCountZeroThrows) {
+    RecordProperty("description", "Throws when polling with count=0.");
+    const std::string stream_name = "cpp-msg-count-zero";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    ASSERT_THROW(client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                       make_numeric_identifier(1), "offset", 
0, 0, false),
+                 std::exception);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesWithoutSpecifyingPartition) {
+    RecordProperty("description",
+                   "Verifies polling with partition_id=u32::MAX defaults to 
partition 0 and returns messages.");
+    const std::string stream_name = "cpp-msg-no-partition";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+    for (std::uint32_t i = 0; i < 5; i++) {
+        auto msg = iggy::ffi::make_message(to_payload("msg-" + 
std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), UINT32_MAX,
+                                        "consumer", 
make_numeric_identifier(1), "offset", 0, 100, false);
+
+    // The Rust side maps UINT32_MAX to None, so the server picks a partition. 
With a single
+    // partition topic that should always be partition 0.
+    ASSERT_EQ(polled.partition_id, 0u) << "u32::MAX sentinel did not map to 
None — partition_id sentinel regression?";
+    ASSERT_EQ(polled.count, 5u);
+    ASSERT_EQ(polled.messages.size(), 5u);
+    for (std::uint32_t i = 0; i < 5; i++) {
+        std::string expected = "msg-" + std::to_string(i);
+        std::string actual(polled.messages[i].payload.begin(), 
polled.messages[i].payload.end());
+        EXPECT_EQ(actual, expected) << "Payload mismatch at index " << i;
+    }
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesTimestampStrategy) {
+    RecordProperty("description",
+                   "Verifies timestamp polling strategy returns messages with 
timestamp >= the specified value.");
+    const std::string stream_name = "cpp-msg-timestamp-strategy";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::IggyMessageToSend> batch1;
+    for (std::uint32_t i = 0; i < 5; i++) {
+        auto msg = iggy::ffi::make_message(to_payload("batch1-" + 
std::to_string(i)));
+        batch1.push_back(std::move(msg));
+    }
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(batch1));
+
+    std::this_thread::sleep_for(std::chrono::milliseconds(100));
+
+    rust::Vec<iggy::ffi::IggyMessageToSend> batch2;
+    for (std::uint32_t i = 0; i < 5; i++) {
+        auto msg = iggy::ffi::make_message(to_payload("batch2-" + 
std::to_string(i)));
+        batch2.push_back(std::move(msg));
+    }
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(batch2));
+
+    auto all = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                     make_numeric_identifier(1), "offset", 0, 
100, false);
+    ASSERT_EQ(all.count, 10u);
+
+    // IggyTimestamp::now() is microsecond-resolution and we slept 100ms 
between batches; a gap
+    // smaller than half that window means the test has degraded into a 
tautology on busy CI.
+    constexpr std::uint64_t kMinTimestampGapMicros = 50'000;
+    std::uint64_t batch1_timestamp                 = all.messages[0].timestamp;
+    std::uint64_t batch2_timestamp                 = all.messages[5].timestamp;
+    ASSERT_GT(batch2_timestamp, batch1_timestamp);
+    ASSERT_GE(batch2_timestamp - batch1_timestamp, kMinTimestampGapMicros)
+        << "Timestamp gap collapsed (" << (batch2_timestamp - batch1_timestamp)
+        << "us) — test no longer exercises timestamp filtering";
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(2), 
"timestamp", batch2_timestamp, 100, false);
+
+    ASSERT_GE(polled.count, 5u);
+    // The server contract is `timestamp >= polling_strategy_value`. If a 
batch1 message lands on
+    // exactly the same microsecond as batch2's first message, the count can 
legitimately exceed 5,
+    // so verify by prefix rather than indexing each message against 
`batch2-N`.
+    for (std::size_t i = 0; i < polled.messages.size(); i++) {
+        EXPECT_GE(polled.messages[i].timestamp, batch2_timestamp)
+            << "Message at index " << i << " has earlier timestamp";
+        std::string actual(polled.messages[i].payload.begin(), 
polled.messages[i].payload.end());
+        EXPECT_TRUE(actual.rfind("batch1-", 0) == 0 || actual.rfind("batch2-", 
0) == 0)
+            << "Polled message at index " << i << " has unexpected payload: " 
<< actual;
+    }
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesMonotonicOffsets) {
+    RecordProperty("description",
+                   "Verifies offsets are monotonically increasing and 
continuous across multiple polls.");
+    const std::string stream_name = "cpp-msg-monotonic-offsets";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+    for (std::uint32_t i = 0; i < 20; i++) {
+        auto msg = iggy::ffi::make_message(to_payload("mono-" + 
std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    std::uint64_t expected_offset = 0;
+    for (int chunk = 0; chunk < 4; chunk++) {
+        auto polled =
+            client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                  make_numeric_identifier(1), "offset", 
expected_offset, 5, false);
+
+        ASSERT_EQ(polled.count, 5u) << "Chunk " << chunk;
+        ASSERT_EQ(polled.messages.size(), 5u) << "Chunk " << chunk;
+
+        for (std::size_t i = 0; i < polled.messages.size(); i++) {
+            EXPECT_EQ(polled.messages[i].offset, expected_offset) << "Chunk " 
<< chunk << " index " << i;
+            expected_offset++;
+        }
+    }
+
+    ASSERT_EQ(expected_offset, 20u);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesLargeBatch) {
+    RecordProperty("description", "Verifies sending a large batch of 1000 
messages succeeds and all are retrievable.");
+    const std::string stream_name = "cpp-msg-large-batch";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+    for (std::uint32_t i = 0; i < 1000; i++) {
+        auto msg = iggy::ffi::make_message(to_payload("batch-msg-" + 
std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+
+    ASSERT_NO_THROW(client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0),
+                                          "partition_id", 
partition_id_bytes(0), std::move(messages)));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
0, 1000, false);
+
+    ASSERT_EQ(polled.count, 1000u);
+    ASSERT_EQ(polled.messages.size(), 1000u);
+    EXPECT_EQ(polled.messages[0].offset, 0u);
+    EXPECT_EQ(polled.messages[999].offset, 999u);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesWithInvalidTopicIdThrows) {
+    RecordProperty("description", "Throws when sending messages with an 
invalid topic identifier.");
+    iggy::ffi::Client *client = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+    auto msg = iggy::ffi::make_message(to_payload("test"));
+    messages.push_back(std::move(msg));
+
+    iggy::ffi::Identifier invalid_id;
+    invalid_id.kind   = "invalid";
+    invalid_id.length = 0;
+
+    ASSERT_THROW(client->send_messages(make_numeric_identifier(1), invalid_id, 
"partition_id", partition_id_bytes(0),
+                                       std::move(messages)),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesWithInvalidTopicIdThrows) {
+    RecordProperty("description", "Throws when polling messages with an 
invalid topic identifier.");
+    iggy::ffi::Client *client = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    iggy::ffi::Identifier invalid_id;
+    invalid_id.kind   = "invalid";
+    invalid_id.length = 0;
+
+    ASSERT_THROW(client->poll_messages(make_numeric_identifier(1), invalid_id, 
0, "consumer",
+                                       make_numeric_identifier(1), "offset", 
0, 10, false),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesWithInvalidConsumerIdThrows) {
+    RecordProperty("description", "Throws when polling messages with an 
invalid consumer identifier.");
+    const std::string stream_name = "cpp-msg-invalid-consumer-id";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    iggy::ffi::Identifier invalid_id;
+    invalid_id.kind   = "invalid";
+    invalid_id.length = 0;
+
+    ASSERT_THROW(client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                       invalid_id, "offset", 0, 10, false),
+                 std::exception);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, ConsumerGroupCreateJoinAndPollMessages) {
+    RecordProperty("description",
+                   "Creates a consumer group, joins it, sends messages, and 
polls them using consumer_group kind.");
+    const std::string stream_name = "cpp-msg-consumer-group";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    auto group =
+        client->create_consumer_group(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "test-group");
+    ASSERT_EQ(group.members_count, 0u);
+
+    
ASSERT_NO_THROW(client->join_consumer_group(make_numeric_identifier(stream.id), 
make_numeric_identifier(0),
+                                                
make_numeric_identifier(group.id)));
+
+    auto group_after_join = 
client->get_consumer_group(make_numeric_identifier(stream.id), 
make_numeric_identifier(0),
+                                                       
make_numeric_identifier(group.id));
+    ASSERT_EQ(group_after_join.members_count, 1u);
+
+    rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+    for (std::uint32_t i = 0; i < 10; i++) {
+        auto msg = iggy::ffi::make_message(to_payload("cg-msg-" + 
std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0,
+                                        "consumer_group", 
make_numeric_identifier(group.id), "offset", 0, 100, false);
+
+    ASSERT_EQ(polled.count, 10u);
+    ASSERT_EQ(polled.messages.size(), 10u);
+    for (std::uint32_t i = 0; i < 10; i++) {
+        std::string expected = "cg-msg-" + std::to_string(i);
+        std::string actual(polled.messages[i].payload.begin(), 
polled.messages[i].payload.end());
+        EXPECT_EQ(actual, expected) << "Payload mismatch at offset " << i;
+    }
+
+    
ASSERT_NO_THROW(client->leave_consumer_group(make_numeric_identifier(stream.id),
 make_numeric_identifier(0),
+                                                 
make_numeric_identifier(group.id)));
+
+    auto group_after_leave = 
client->get_consumer_group(make_numeric_identifier(stream.id), 
make_numeric_identifier(0),
+                                                        
make_numeric_identifier(group.id));
+    ASSERT_EQ(group_after_leave.members_count, 0u);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
diff --git a/foreign/cpp/tests/message/unit_tests.cpp 
b/foreign/cpp/tests/message/unit_tests.cpp
new file mode 100644
index 000000000..6a2768c46
--- /dev/null
+++ b/foreign/cpp/tests/message/unit_tests.cpp
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <string>
+
+#include <gtest/gtest.h>
+
+#include "lib.rs.h"
+
+TEST(MessageTest, MakeMessageSetsPayload) {
+    RecordProperty("description", "Verifies make_message stores payload bytes 
correctly.");
+    rust::Vec<std::uint8_t> payload;
+    const std::string text = "hello world";
+    for (const char c : text) {
+        payload.push_back(static_cast<std::uint8_t>(c));
+    }
+
+    auto msg = iggy::ffi::make_message(std::move(payload));
+
+    ASSERT_EQ(msg.payload.size(), text.size());
+    for (std::size_t i = 0; i < text.size(); i++) {
+        EXPECT_EQ(msg.payload[i], static_cast<std::uint8_t>(text[i]));
+    }
+}
+
+TEST(MessageTest, MakeMessageZerosIdAndHeaders) {
+    RecordProperty("description", "Verifies make_message initializes id and 
user_headers to zero/empty.");
+    rust::Vec<std::uint8_t> payload;
+    payload.push_back(0x42);
+
+    auto msg = iggy::ffi::make_message(std::move(payload));
+
+    EXPECT_EQ(msg.id_lo, 0u);
+    EXPECT_EQ(msg.id_hi, 0u);
+    EXPECT_TRUE(msg.user_headers.empty());
+}
+
+TEST(MessageTest, MakeMessageWithEmptyPayload) {
+    RecordProperty("description", "Verifies make_message accepts an empty 
payload.");
+    rust::Vec<std::uint8_t> empty_payload;
+
+    auto msg = iggy::ffi::make_message(std::move(empty_payload));
+
+    ASSERT_EQ(msg.payload.size(), 0u);
+}
+
+TEST(MessageTest, MakeMessageWithSingleByte) {
+    RecordProperty("description", "Verifies make_message works with a 
single-byte payload.");
+    rust::Vec<std::uint8_t> payload;
+    payload.push_back(0xFF);
+
+    auto msg = iggy::ffi::make_message(std::move(payload));
+
+    ASSERT_EQ(msg.payload.size(), 1u);
+    EXPECT_EQ(msg.payload[0], 0xFF);
+}
+
+TEST(MessageTest, MakeMessageWithNullBytes) {
+    RecordProperty("description", "Verifies make_message preserves null bytes 
in payload.");
+    rust::Vec<std::uint8_t> payload;
+    payload.push_back(0x00);
+    payload.push_back(0x01);
+    payload.push_back(0x00);
+
+    auto msg = iggy::ffi::make_message(std::move(payload));
+
+    ASSERT_EQ(msg.payload.size(), 3u);
+    EXPECT_EQ(msg.payload[0], 0x00);
+    EXPECT_EQ(msg.payload[1], 0x01);
+    EXPECT_EQ(msg.payload[2], 0x00);
+}
+
+TEST(MessageTest, MakeMessageThenSetCustomId) {
+    RecordProperty("description", "Verifies custom ID can be set after 
make_message without affecting payload.");
+    rust::Vec<std::uint8_t> payload;
+    payload.push_back(0x42);
+    auto msg = iggy::ffi::make_message(std::move(payload));
+
+    msg.id_lo = 100;
+    msg.id_hi = 200;
+
+    EXPECT_EQ(msg.id_lo, 100u);
+    EXPECT_EQ(msg.id_hi, 200u);
+    ASSERT_EQ(msg.payload.size(), 1u);
+    EXPECT_EQ(msg.payload[0], 0x42);
+}
+
+TEST(MessageTest, MakeMessageWithLargePayload) {
+    RecordProperty("description", "Verifies make_message handles a larger 
payload correctly.");
+    rust::Vec<std::uint8_t> payload;
+    for (std::uint32_t i = 0; i < 10000; i++) {
+        payload.push_back(static_cast<std::uint8_t>(i % 256));
+    }
+
+    auto msg = iggy::ffi::make_message(std::move(payload));
+
+    ASSERT_EQ(msg.payload.size(), 10000u);
+    EXPECT_EQ(msg.payload[0], 0u);
+    EXPECT_EQ(msg.payload[255], 255u);
+    EXPECT_EQ(msg.payload[256], 0u);
+}
diff --git a/foreign/cpp/tests/stream/low_level_e2e.cpp 
b/foreign/cpp/tests/stream/low_level_e2e.cpp
index 43d0496de..f723571fa 100644
--- a/foreign/cpp/tests/stream/low_level_e2e.cpp
+++ b/foreign/cpp/tests/stream/low_level_e2e.cpp
@@ -19,6 +19,7 @@
 
 #include <cstdint>
 #include <string>
+#include <vector>
 
 #include <gtest/gtest.h>
 
@@ -293,3 +294,172 @@ TEST(LowLevelE2E_Stream, 
GetStreamByNumericIdentifierReturnsStreamDetails) {
     ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
     client = nullptr;
 }
+
+TEST(LowLevelE2E_Stream, GetStreamsReturnsEmptyAfterCleanup) {
+    RecordProperty("description", "Verifies get_streams returns empty vector 
after cleaning up all streams.");
+    iggy::ffi::Client *client = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    auto streams = client->get_streams();
+    for (const auto &s : streams) {
+        client->delete_stream(make_numeric_identifier(s.id));
+    }
+
+    streams = client->get_streams();
+    ASSERT_EQ(streams.size(), 0);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Stream, GetStreamsReturnsStreamAfterCreation) {
+    RecordProperty("description", "Verifies created stream appears in 
get_streams result.");
+    const std::string stream_name = "cpp-stream-get-streams";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto streams = client->get_streams();
+    ASSERT_GE(streams.size(), 1);
+
+    bool found = false;
+    for (const auto &s : streams) {
+        if (std::string(s.name) == stream_name) {
+            found = true;
+            EXPECT_GT(s.created_at, static_cast<std::uint64_t>(0));
+            EXPECT_EQ(s.size_bytes, static_cast<std::uint64_t>(0));
+            EXPECT_EQ(s.messages_count, static_cast<std::uint64_t>(0));
+            EXPECT_EQ(s.topics_count, 0u);
+            break;
+        }
+    }
+    ASSERT_TRUE(found) << "Stream '" << stream_name << "' not found in 
get_streams result";
+
+    client->delete_stream(make_string_identifier(stream_name));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Stream, GetStreamsFieldsVerification) {
+    RecordProperty("description",
+                   "Verifies get_streams returns correct field values after 
creating stream with topic and messages.");
+    const std::string stream_name = "cpp-stream-fields-verify";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+    for (std::uint32_t i = 0; i < 5; i++) {
+        auto msg = iggy::ffi::make_message(to_payload("field-verify-message-" 
+ std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto streams = client->get_streams();
+    ASSERT_GE(streams.size(), 1u);
+
+    bool found = false;
+    for (const auto &s : streams) {
+        if (std::string(s.name) == stream_name) {
+            found = true;
+            EXPECT_EQ(s.topics_count, 1u);
+            EXPECT_EQ(s.messages_count, 5u);
+            break;
+        }
+    }
+    ASSERT_TRUE(found) << "Stream '" << stream_name << "' not found in 
get_streams result";
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Stream, GetStreamsBeforeLoginThrows) {
+    RecordProperty("description", "Throws when get_streams is called before 
authentication.");
+    iggy::ffi::Client *client = nullptr;
+    ASSERT_NO_THROW({ client = iggy::ffi::new_connection(""); });
+    ASSERT_NE(client, nullptr);
+
+    ASSERT_THROW(client->get_streams(), std::exception);
+    ASSERT_NO_THROW(client->connect());
+    ASSERT_THROW(client->get_streams(), std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+    client = nullptr;
+}
+
+TEST(LowLevelE2E_Stream, GetStreamsConsistentWithGetStream) {
+    RecordProperty("description", "Verifies get_streams result is consistent 
with get_stream for the same stream.");
+    const std::string stream_name = "cpp-stream-consistency";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+
+    std::string list_name;
+    std::uint32_t list_id           = 0;
+    std::uint32_t list_topics_count = 0;
+    std::uint64_t list_created_at   = 0;
+    std::uint64_t list_size_bytes   = 0;
+    auto streams                    = client->get_streams();
+    for (const auto &s : streams) {
+        if (std::string(s.name) == stream_name) {
+            list_name         = std::string(s.name);
+            list_id           = s.id;
+            list_topics_count = s.topics_count;
+            list_created_at   = s.created_at;
+            list_size_bytes   = s.size_bytes;
+            break;
+        }
+    }
+    ASSERT_FALSE(list_name.empty()) << "Stream '" << stream_name << "' not 
found in get_streams result";
+
+    auto single        = 
client->get_stream(make_string_identifier(stream_name));
+    auto single_name   = std::string(single.name);
+    auto single_topics = single.topics_count;
+
+    EXPECT_EQ(list_name, single_name);
+    EXPECT_EQ(list_id, single.id);
+    EXPECT_EQ(list_topics_count, single_topics);
+    EXPECT_EQ(list_created_at, single.created_at);
+    EXPECT_EQ(list_size_bytes, single.size_bytes);
+
+    client->delete_stream(make_string_identifier(stream_name));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+    client = nullptr;
+}
+
+TEST(LowLevelE2E_Stream, GetStreamsRepeatedCallsReturnSameResult) {
+    RecordProperty("description", "Verifies repeated get_streams calls return 
consistent results.");
+    const std::string stream_name = "cpp-stream-repeated";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+
+    auto streams1 = client->get_streams();
+    auto streams2 = client->get_streams();
+    auto streams3 = client->get_streams();
+
+    ASSERT_EQ(streams1.size(), streams2.size());
+    ASSERT_EQ(streams2.size(), streams3.size());
+
+    auto contains_stream = [&](const rust::Vec<iggy::ffi::Stream> &vec) {
+        for (const auto &s : vec) {
+            if (std::string(s.name) == stream_name) {
+                return true;
+            }
+        }
+        return false;
+    };
+
+    ASSERT_TRUE(contains_stream(streams1)) << "Stream not found in first call";
+    ASSERT_TRUE(contains_stream(streams2)) << "Stream not found in second 
call";
+    ASSERT_TRUE(contains_stream(streams3)) << "Stream not found in third call";
+
+    client->delete_stream(make_string_identifier(stream_name));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+    client = nullptr;
+}


Reply via email to