This is an automated email from the ASF dual-hosted git repository.

maciej pushed a commit to branch java-improvements
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 82338804a3197f2883c28d945d7d79ee824ff481
Author: Maciej Modzelewski <[email protected]>
AuthorDate: Mon Mar 16 22:37:15 2026 +0100

    refactor(java): consolidate async tests into blocking base suite and harden 
client lifecycle
    
    Async integration tests duplicated CRUD coverage already exercised
    by the blocking test suite (which wraps the same async code path).
    This led to slow CI and made it unclear where to add new test
    cases.
    
    Removed AsyncPollMessageTest and UsersTcpClientTest entirely,
    slimmed AsyncClientIntegrationTest and AsyncConsumerGroupsTest
    to keep only async-specific scenarios (concurrency, chaining,
    error propagation). Moved missing coverage (polling strategies,
    content round-trip, user updates, password changes) into the
    blocking base tests where both TCP and HTTP transports benefit.
    
    Also hardened client builders: buildAndLogin() now closes the
    client on failure instead of leaking connections. TCP connection
    allows PING and GET_STATS without authentication, unblocking
    the BDD ping step. Added debug-level version logging on connect.
---
 .../org/apache/iggy/bdd/BasicMessagingSteps.java   |   2 +-
 .../iggy/client/async/tcp/AsyncIggyTcpClient.java  |   6 +
 .../async/tcp/AsyncIggyTcpClientBuilder.java       |   9 +-
 .../iggy/client/async/tcp/AsyncTcpConnection.java  |   9 +-
 .../iggy/client/blocking/http/IggyHttpClient.java  |   6 +
 .../blocking/http/IggyHttpClientBuilder.java       |  14 +-
 .../client/blocking/http/MessagesHttpClient.java   |   4 +-
 .../client/blocking/tcp/IggyTcpClientBuilder.java  |  11 +-
 .../client/async/AsyncClientIntegrationTest.java   | 732 +++++----------------
 .../iggy/client/async/AsyncConsumerGroupsTest.java | 343 +++-------
 .../iggy/client/async/AsyncPollMessageTest.java    | 333 ----------
 .../iggy/client/async/tcp/UsersTcpClientTest.java  | 239 -------
 .../blocking/ConsumerGroupsClientBaseTest.java     |   2 +-
 .../client/blocking/MessagesClientBaseTest.java    |  82 +++
 .../iggy/client/blocking/UsersClientBaseTest.java  |  64 +-
 15 files changed, 445 insertions(+), 1411 deletions(-)

diff --git 
a/bdd/java/src/test/java/org/apache/iggy/bdd/BasicMessagingSteps.java 
b/bdd/java/src/test/java/org/apache/iggy/bdd/BasicMessagingSteps.java
index d6998b396..0ad53dad0 100644
--- a/bdd/java/src/test/java/org/apache/iggy/bdd/BasicMessagingSteps.java
+++ b/bdd/java/src/test/java/org/apache/iggy/bdd/BasicMessagingSteps.java
@@ -58,7 +58,7 @@ public class BasicMessagingSteps {
                 
IggyTcpClient.builder().host(hostPort.host).port(hostPort.port).build();
 
         client.connect();
-        // client.system().ping(); //TODO: uncomment when ping does not 
require auth
+        client.system().ping();
         context.client = client;
     }
 
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
index cdd2c6716..81633ff49 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
@@ -19,6 +19,7 @@
 
 package org.apache.iggy.client.async.tcp;
 
+import org.apache.iggy.IggyVersion;
 import org.apache.iggy.client.async.ConsumerGroupsClient;
 import org.apache.iggy.client.async.ConsumerOffsetsClient;
 import org.apache.iggy.client.async.MessagesClient;
@@ -33,6 +34,8 @@ import org.apache.iggy.config.RetryPolicy;
 import org.apache.iggy.exception.IggyMissingCredentialsException;
 import org.apache.iggy.exception.IggyNotConnectedException;
 import org.apache.iggy.user.IdentityInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.time.Duration;
@@ -91,6 +94,8 @@ import java.util.concurrent.CompletableFuture;
  */
 public class AsyncIggyTcpClient {
 
+    private static final Logger log = 
LoggerFactory.getLogger(AsyncIggyTcpClient.class);
+
     private final String host;
     private final int port;
     private final Optional<String> username;
@@ -173,6 +178,7 @@ public class AsyncIggyTcpClient {
         TCPConnectionPoolConfig poolConfig = poolConfigBuilder.build();
         connection = new AsyncTcpConnection(host, port, enableTls, 
tlsCertificate, poolConfig);
         return connection.connect().thenRun(() -> {
+            log.debug("Connected to {}:{} | {}", host, port, 
IggyVersion.getInstance());
             messagesClient = new MessagesTcpClient(connection);
             consumerGroupsClient = new ConsumerGroupsTcpClient(connection);
             consumerOffsetsClient = new ConsumerOffsetsTcpClient(connection);
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClientBuilder.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClientBuilder.java
index 88264103a..433e72499 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClientBuilder.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClientBuilder.java
@@ -259,6 +259,13 @@ public final class AsyncIggyTcpClientBuilder {
                     "Credentials must be provided to use buildAndLogin(). Use 
credentials(username, password).");
         }
         AsyncIggyTcpClient client = build();
-        return client.connect().thenCompose(v -> client.login()).thenApply(v 
-> client);
+        return client.connect()
+                .thenCompose(v -> client.login())
+                .thenApply(v -> client)
+                .whenComplete((result, ex) -> {
+                    if (ex != null) {
+                        client.close();
+                    }
+                });
     }
 }
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
index 20b49f0f5..e8901042d 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
@@ -193,16 +193,19 @@ public class AsyncTcpConnection {
             }
 
             Channel channel = f.getNow();
-            boolean isLoginOp = (commandCode == 
CommandCode.User.LOGIN.getValue()
+            boolean isLoginCommand = (commandCode == 
CommandCode.User.LOGIN.getValue()
                     || commandCode == 
CommandCode.PersonalAccessToken.LOGIN.getValue());
+            boolean requiresAuth = !isLoginCommand
+                    && commandCode != CommandCode.System.PING.getValue()
+                    && commandCode != CommandCode.System.GET_STATS.getValue();
 
             responseFuture.handle((res, ex) -> {
-                handlePostResponse(channel, commandCode, isLoginOp, ex);
+                handlePostResponse(channel, commandCode, isLoginCommand, ex);
                 return null;
             });
 
             CompletableFuture<Void> authStep;
-            if (isLoginOp) {
+            if (!requiresAuth) {
                 authStep = CompletableFuture.completedFuture(null);
             } else if (!authenticated) {
                 payload.release();
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/http/IggyHttpClient.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/http/IggyHttpClient.java
index 360f691c8..bb532394f 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/http/IggyHttpClient.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/http/IggyHttpClient.java
@@ -19,6 +19,7 @@
 
 package org.apache.iggy.client.blocking.http;
 
+import org.apache.iggy.IggyVersion;
 import org.apache.iggy.client.blocking.ConsumerGroupsClient;
 import org.apache.iggy.client.blocking.ConsumerOffsetsClient;
 import org.apache.iggy.client.blocking.IggyBaseClient;
@@ -30,6 +31,8 @@ import org.apache.iggy.client.blocking.SystemClient;
 import org.apache.iggy.client.blocking.TopicsClient;
 import org.apache.iggy.client.blocking.UsersClient;
 import org.apache.iggy.exception.IggyMissingCredentialsException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.File;
@@ -41,6 +44,8 @@ public class IggyHttpClient implements IggyBaseClient, 
Closeable {
 
     static final int DEFAULT_HTTP_PORT = 3000;
 
+    private static final Logger log = 
LoggerFactory.getLogger(IggyHttpClient.class);
+
     private final InternalHttpClient internalHttpClient;
     private final SystemHttpClient systemClient;
     private final StreamsHttpClient streamsClient;
@@ -81,6 +86,7 @@ public class IggyHttpClient implements IggyBaseClient, 
Closeable {
         consumerOffsetsClient = new 
ConsumerOffsetsHttpClient(internalHttpClient);
         messagesClient = new MessagesHttpClient(internalHttpClient);
         personalAccessTokensHttpClient = new 
PersonalAccessTokensHttpClient(internalHttpClient);
+        log.debug("Initialized HTTP client for {} | {}", url, 
IggyVersion.getInstance());
     }
 
     /**
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/http/IggyHttpClientBuilder.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/http/IggyHttpClientBuilder.java
index 28c000113..4118658aa 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/http/IggyHttpClientBuilder.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/http/IggyHttpClientBuilder.java
@@ -24,6 +24,7 @@ import org.apache.iggy.exception.IggyInvalidArgumentException;
 import org.apache.iggy.exception.IggyMissingCredentialsException;
 
 import java.io.File;
+import java.io.IOException;
 import java.time.Duration;
 
 /**
@@ -231,7 +232,16 @@ public final class IggyHttpClientBuilder {
                     "Credentials must be provided to use buildAndLogin(). Use 
credentials(username, password).");
         }
         IggyHttpClient client = build();
-        client.login();
-        return client;
+        try {
+            client.login();
+            return client;
+        } catch (RuntimeException e) {
+            try {
+                client.close();
+            } catch (IOException closeEx) {
+                e.addSuppressed(closeEx);
+            }
+            throw e;
+        }
     }
 }
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/http/MessagesHttpClient.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/http/MessagesHttpClient.java
index e8db08d02..3d2ce2797 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/http/MessagesHttpClient.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/http/MessagesHttpClient.java
@@ -55,8 +55,8 @@ class MessagesHttpClient implements MessagesClient {
                 partitionId
                         .map(id -> new BasicNameValuePair("partition_id", 
id.toString()))
                         .orElse(null),
-                new BasicNameValuePair("strategy_kind", 
strategy.kind().name()),
-                new BasicNameValuePair("strategy_value", 
strategy.value().toString()),
+                new BasicNameValuePair("kind", 
strategy.kind().name().toLowerCase()),
+                new BasicNameValuePair("value", strategy.value().toString()),
                 new BasicNameValuePair("count", count.toString()),
                 new BasicNameValuePair("auto_commit", 
Boolean.toString(autoCommit)));
         return httpClient.execute(request, PolledMessages.class);
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/IggyTcpClientBuilder.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/IggyTcpClientBuilder.java
index ab8df4b9b..e72b99231 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/IggyTcpClientBuilder.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/IggyTcpClientBuilder.java
@@ -217,8 +217,13 @@ public final class IggyTcpClientBuilder {
                     "Credentials must be provided to use buildAndLogin(). Use 
credentials(username, password).");
         }
         IggyTcpClient client = build();
-        client.connect();
-        client.login();
-        return client;
+        try {
+            client.connect();
+            client.login();
+            return client;
+        } catch (RuntimeException e) {
+            client.close();
+            throw e;
+        }
     }
 }
diff --git 
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncClientIntegrationTest.java
 
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncClientIntegrationTest.java
index 87b315266..fbe1045d1 100644
--- 
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncClientIntegrationTest.java
+++ 
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncClientIntegrationTest.java
@@ -22,20 +22,15 @@ package org.apache.iggy.client.async;
 import org.apache.iggy.client.BaseIntegrationTest;
 import org.apache.iggy.client.async.tcp.AsyncIggyTcpClient;
 import org.apache.iggy.consumergroup.Consumer;
-import org.apache.iggy.identifier.ConsumerId;
 import org.apache.iggy.identifier.StreamId;
 import org.apache.iggy.identifier.TopicId;
 import org.apache.iggy.message.Message;
 import org.apache.iggy.message.Partitioning;
 import org.apache.iggy.message.PollingStrategy;
 import org.apache.iggy.topic.CompressionAlgorithm;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.MethodOrderer;
-import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.TestMethodOrder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,593 +45,212 @@ import java.util.concurrent.TimeUnit;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /**
- * Integration test for the complete async client flow.
- * Tests connection, authentication, stream/topic management, and message 
operations.
+ * Async-specific integration tests that exercise concurrency and 
CompletableFuture patterns.
+ *
+ * <p>Basic CRUD operations (streams, topics, messages, users, consumer 
groups, partitions,
+ * consumer offsets, personal access tokens, system) are covered by the 
blocking test suite
+ * which exercises the same async code path via the blocking wrapper.
  */
-@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
 public class AsyncClientIntegrationTest extends BaseIntegrationTest {
+
     private static final Logger log = 
LoggerFactory.getLogger(AsyncClientIntegrationTest.class);
 
     private static final String USERNAME = "iggy";
     private static final String PASSWORD = "iggy";
-
-    private static final String TEST_STREAM = "async-test-stream-" + 
UUID.randomUUID();
-    private static final String TEST_TOPIC = "async-test-topic";
-    private static final long PARTITION_ID = 1L;
-
-    private static AsyncIggyTcpClient client;
-
-    @BeforeAll
-    public static void setup() throws Exception {
-        log.info("Setting up async client for integration tests");
-        client = new AsyncIggyTcpClient(serverHost(), serverTcpPort());
-
-        // Connect and login
-        client.connect()
-                .thenCompose(v -> {
-                    log.info("Connected to Iggy server");
-                    return client.users().login(USERNAME, PASSWORD);
-                })
-                .get(5, TimeUnit.SECONDS);
-
-        log.info("Successfully logged in as: {}", USERNAME);
+    private static final int TIMEOUT_SECONDS = 5;
+
+    private AsyncIggyTcpClient client;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        client = AsyncIggyTcpClient.builder()
+                .host(serverHost())
+                .port(serverTcpPort())
+                .build();
+        client.connect().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+        client.users().login(USERNAME, PASSWORD).get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
     }
 
-    @AfterAll
-    public static void tearDown() throws Exception {
-        log.info("Cleaning up test resources");
-
-        try {
-            // Clean up test stream if it exists
-            client.streams().deleteStream(StreamId.of(TEST_STREAM)).get(5, 
TimeUnit.SECONDS);
-            log.info("Deleted test stream: {}", TEST_STREAM);
-        } catch (RuntimeException e) {
-            // Stream may not exist, which is fine
-            log.debug("Stream cleanup failed (may not exist): {}", 
e.getMessage());
-        }
-
-        // Close the client
+    @AfterEach
+    void tearDown() throws Exception {
         if (client != null) {
-            client.close().get(5, TimeUnit.SECONDS);
-            log.info("Closed async client");
-        }
-    }
-
-    @Test
-    @Order(1)
-    public void testCreateStream() throws Exception {
-        log.info("Testing stream creation");
-
-        var streamDetails = client.streams().createStream(TEST_STREAM).get(5, 
TimeUnit.SECONDS);
-
-        assertThat(streamDetails).isNotNull();
-        assertThat(streamDetails.name()).isEqualTo(TEST_STREAM);
-        log.info("Successfully created stream: {}", streamDetails.name());
-    }
-
-    @Test
-    @Order(2)
-    public void testGetStream() throws Exception {
-        log.info("Testing stream retrieval");
-
-        var streamOpt = 
client.streams().getStream(StreamId.of(TEST_STREAM)).get(5, TimeUnit.SECONDS);
-
-        assertThat(streamOpt).isPresent();
-        assertThat(streamOpt.get().name()).isEqualTo(TEST_STREAM);
-        log.info("Successfully retrieved stream: {}", streamOpt.get().name());
-    }
-
-    @Test
-    @Order(3)
-    public void testCreateTopic() throws Exception {
-        log.info("Testing topic creation");
-
-        var topicDetails = client.topics()
-                .createTopic(
-                        StreamId.of(TEST_STREAM),
-                        2L, // 2 partitions
-                        CompressionAlgorithm.None,
-                        BigInteger.ZERO,
-                        BigInteger.ZERO,
-                        Optional.empty(),
-                        TEST_TOPIC)
-                .get(5, TimeUnit.SECONDS);
-
-        assertThat(topicDetails).isNotNull();
-        assertThat(topicDetails.name()).isEqualTo(TEST_TOPIC);
-        assertThat(topicDetails.partitionsCount()).isEqualTo(2);
-        log.info(
-                "Successfully created topic: {} with {} partitions",
-                topicDetails.name(),
-                topicDetails.partitionsCount());
-    }
-
-    @Test
-    @Order(4)
-    public void testGetTopic() throws Exception {
-        log.info("Testing topic retrieval");
-
-        var topicOpt = client.topics()
-                .getTopic(StreamId.of(TEST_STREAM), TopicId.of(TEST_TOPIC))
-                .get(5, TimeUnit.SECONDS);
-
-        assertThat(topicOpt).isPresent();
-        assertThat(topicOpt.get().name()).isEqualTo(TEST_TOPIC);
-        log.info("Successfully retrieved topic: {}", topicOpt.get().name());
-    }
-
-    @Test
-    @Order(5)
-    public void testSendMessages() throws Exception {
-        log.info("Testing message sending");
-
-        List<Message> messages = new ArrayList<>();
-        for (int i = 0; i < 10; i++) {
-            String content = String.format("Test message %d - %s", i, 
UUID.randomUUID());
-            messages.add(Message.of(content));
-        }
-
-        // Send messages to partition 1
-        client.messages()
-                .sendMessages(
-                        StreamId.of(TEST_STREAM),
-                        TopicId.of(TEST_TOPIC),
-                        Partitioning.partitionId(PARTITION_ID),
-                        messages)
-                .get(5, TimeUnit.SECONDS);
-
-        log.info("Successfully sent {} messages", messages.size());
-    }
-
-    @Test
-    @Order(6)
-    public void testPollMessages() throws Exception {
-        log.info("Testing message polling");
-
-        // Poll messages from partition 1 - Use valid consumer instead of null
-        var consumer = Consumer.of(12345L); // Create consumer with ID
-        var polledMessages = client.messages()
-                .pollMessages(
-                        StreamId.of(TEST_STREAM),
-                        TopicId.of(TEST_TOPIC),
-                        Optional.of(PARTITION_ID),
-                        consumer, // Use valid consumer instead of null
-                        PollingStrategy.offset(BigInteger.ZERO),
-                        10L,
-                        false)
-                .get(5, TimeUnit.SECONDS);
-
-        assertThat(polledMessages).isNotNull();
-        assertThat(polledMessages.partitionId()).isEqualTo(PARTITION_ID);
-        assertThat(polledMessages.messages()).isNotEmpty();
-        log.info(
-                "Successfully polled {} messages from partition {}",
-                polledMessages.messages().size(),
-                polledMessages.partitionId());
-
-        // Verify message content
-        for (var message : polledMessages.messages()) {
-            String content = new String(message.payload());
-            assertThat(content).startsWith("Test message");
-            log.debug("Polled message: {}", content);
+            client.close().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
         }
     }
 
-    // TODO: Re-enable when server supports null consumer polling
-    // This test fails because it uses null consumer which causes server 
timeout
     @Test
-    @Disabled
-    @Order(7)
-    public void testSendAndPollLargeVolume() throws Exception {
-        log.info("Testing high-volume message operations");
+    void shouldPollMessagesAndVerifyContent() throws Exception {
+        // given
+        String streamName = "poll-content-test-" + UUID.randomUUID();
+        StreamId streamId = StreamId.of(streamName);
+        TopicId topicId = TopicId.of("test-topic");
+        long partitionId = 1L;
 
-        int messageCount = 100;
-        List<CompletableFuture<Void>> sendFutures = new ArrayList<>();
-
-        // Send messages in batches asynchronously
-        for (int batch = 0; batch < 10; batch++) {
-            List<Message> batchMessages = new ArrayList<>();
+        try {
+            client.streams().createStream(streamName).get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+            client.topics()
+                    .createTopic(
+                            streamId,
+                            2L,
+                            CompressionAlgorithm.None,
+                            BigInteger.ZERO,
+                            BigInteger.ZERO,
+                            Optional.empty(),
+                            "test-topic")
+                    .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+            List<Message> messages = new ArrayList<>();
             for (int i = 0; i < 10; i++) {
-                int msgNum = batch * 10 + i;
-                String content = String.format("Batch message %d - %s", 
msgNum, System.currentTimeMillis());
-                batchMessages.add(Message.of(content));
+                messages.add(Message.of(String.format("Test message %d", i)));
             }
+            client.messages()
+                    .sendMessages(streamId, topicId, 
Partitioning.partitionId(partitionId), messages)
+                    .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
 
-            var future = client.messages()
-                    .sendMessages(
-                            StreamId.of(TEST_STREAM),
-                            TopicId.of(TEST_TOPIC),
-                            Partitioning.partitionId(PARTITION_ID),
-                            batchMessages);
-            sendFutures.add(future);
-        }
-
-        // Wait for all sends to complete
-        CompletableFuture.allOf(sendFutures.toArray(new 
CompletableFuture[0])).get(10, TimeUnit.SECONDS);
-
-        log.info("Successfully sent {} messages in batches", messageCount);
-
-        // Poll all messages
-        var polledMessages = client.messages()
-                .pollMessages(
-                        StreamId.of(TEST_STREAM),
-                        TopicId.of(TEST_TOPIC),
-                        Optional.of(PARTITION_ID),
-                        null,
-                        PollingStrategy.offset(BigInteger.ZERO),
-                        (long) messageCount + 10, // Poll all messages sent
-                        false)
-                .get(5, TimeUnit.SECONDS);
-
-        assertThat(polledMessages).isNotNull();
-        
assertThat(polledMessages.messages().size()).isGreaterThanOrEqualTo(messageCount);
-        log.info("Successfully polled {} messages", 
polledMessages.messages().size());
-    }
-
-    // TODO: This test fails with connection issues after null consumer timeout
-    // The connection gets closed after the previous test's null consumer 
timeout
-    @Test
-    @Disabled
-    @Order(8)
-    public void testUpdateTopic() throws Exception {
-        log.info("Testing topic update");
-
-        // Update topic with new compression algorithm
-        client.topics()
-                .updateTopic(
-                        StreamId.of(TEST_STREAM),
-                        TopicId.of(TEST_TOPIC),
-                        CompressionAlgorithm.Gzip,
-                        BigInteger.valueOf(3600000000L), // 1 hour message 
expiry
-                        BigInteger.valueOf(1073741824L), // 1GB max size
-                        Optional.empty(),
-                        TEST_TOPIC)
-                .get(5, TimeUnit.SECONDS);
-
-        // Verify the update
-        var updatedTopic = client.topics()
-                .getTopic(StreamId.of(TEST_STREAM), TopicId.of(TEST_TOPIC))
-                .get(5, TimeUnit.SECONDS);
-
-        assertThat(updatedTopic).isPresent();
-        
assertThat(updatedTopic.get().compressionAlgorithm()).isEqualTo(CompressionAlgorithm.Gzip);
-        log.info(
-                "Successfully updated topic with compression: {}",
-                updatedTopic.get().compressionAlgorithm());
-    }
-
-    // TODO: This test fails with connection issues after null consumer timeout
-    // The connection gets closed after previous tests' null consumer timeout
-    @Test
-    @Disabled
-    @Order(9)
-    public void testDeleteTopic() throws Exception {
-        log.info("Testing topic deletion");
-
-        // Create a temporary topic to delete
-        String tempTopic = "temp-topic-" + UUID.randomUUID();
-        client.topics()
-                .createTopic(
-                        StreamId.of(TEST_STREAM),
-                        1L,
-                        CompressionAlgorithm.None,
-                        BigInteger.ZERO,
-                        BigInteger.ZERO,
-                        Optional.empty(),
-                        tempTopic)
-                .get(5, TimeUnit.SECONDS);
-
-        // Delete the topic
-        client.topics()
-                .deleteTopic(StreamId.of(TEST_STREAM), TopicId.of(tempTopic))
-                .get(5, TimeUnit.SECONDS);
-
-        // Verify deletion
-        var deletedTopic = client.topics()
-                .getTopic(StreamId.of(TEST_STREAM), TopicId.of(tempTopic))
-                .get(5, TimeUnit.SECONDS);
-
-        assertThat(deletedTopic).isNotPresent();
-        log.info("Successfully deleted topic: {}", tempTopic);
-    }
-
-    @Test
-    @Order(10)
-    void testConcurrentOperations() throws Exception {
-        log.info("Testing concurrent async operations");
-
-        // Create multiple concurrent operations
-        List<CompletableFuture<?>> operations = new ArrayList<>();
-
-        // Send messages concurrently
-        for (int i = 0; i < 5; i++) {
-            final int threadNum = i;
-            var future = CompletableFuture.supplyAsync(() -> {
-                        List<Message> messages = new ArrayList<>();
-                        for (int j = 0; j < 20; j++) {
-                            String content = String.format("Thread %d - 
Message %d", threadNum, j);
-                            messages.add(Message.of(content));
-                        }
-                        return messages;
-                    })
-                    .thenCompose(messages -> client.messages()
-                            .sendMessages(
-                                    StreamId.of(TEST_STREAM),
-                                    TopicId.of(TEST_TOPIC),
-                                    Partitioning.partitionId(PARTITION_ID),
-                                    messages));
-            operations.add(future);
-        }
-
-        // Poll messages concurrently using valid consumer
-        for (int i = 0; i < 3; i++) {
-            final long consumerId = 10000L + i;
-            var future = client.messages()
+            // when
+            var polledMessages = client.messages()
                     .pollMessages(
-                            StreamId.of(TEST_STREAM),
-                            TopicId.of(TEST_TOPIC),
-                            Optional.of(PARTITION_ID),
-                            Consumer.of(consumerId),
-                            PollingStrategy.last(),
+                            streamId,
+                            topicId,
+                            Optional.of(partitionId),
+                            Consumer.of(12345L),
+                            PollingStrategy.offset(BigInteger.ZERO),
                             10L,
-                            false);
-            operations.add(future);
-        }
+                            false)
+                    .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
 
-        // Wait for all operations to complete
-        CompletableFuture.allOf(operations.toArray(new 
CompletableFuture[0])).get(15, TimeUnit.SECONDS);
-
-        log.info("Successfully completed {} concurrent operations", 
operations.size());
-    }
+            // then
+            assertThat(polledMessages).isNotNull();
+            assertThat(polledMessages.partitionId()).isEqualTo(partitionId);
+            assertThat(polledMessages.messages()).hasSize(10);
 
-    // ===== System client tests =====
-
-    @Test
-    @Order(11)
-    public void testGetStats() throws Exception {
-        log.info("Testing system getStats");
-
-        var stats = client.system().getStats().get(5, TimeUnit.SECONDS);
-
-        assertThat(stats).isNotNull();
-        log.info("Successfully retrieved server stats");
-    }
-
-    @Test
-    @Order(12)
-    public void testGetMe() throws Exception {
-        log.info("Testing system getMe");
-
-        var me = client.system().getMe().get(5, TimeUnit.SECONDS);
-
-        assertThat(me).isNotNull();
-        assertThat(me.clientId()).isGreaterThan(0);
-        log.info("Successfully retrieved current client info, clientId: {}", 
me.clientId());
-    }
-
-    @Test
-    @Order(13)
-    public void testGetClients() throws Exception {
-        log.info("Testing system getClients");
-
-        var clients = client.system().getClients().get(5, TimeUnit.SECONDS);
-
-        assertThat(clients).isNotNull();
-        assertThat(clients).isNotEmpty();
-        log.info("Successfully retrieved {} connected clients", 
clients.size());
-    }
-
-    @Test
-    @Order(14)
-    public void testGetClient() throws Exception {
-        log.info("Testing system getClient");
-
-        var me = client.system().getMe().get(5, TimeUnit.SECONDS);
-        var clientInfo = client.system().getClient(me.clientId()).get(5, 
TimeUnit.SECONDS);
-
-        assertThat(clientInfo).isNotNull();
-        assertThat(clientInfo.clientId()).isEqualTo(me.clientId());
-        log.info("Successfully retrieved client info for clientId: {}", 
clientInfo.clientId());
-    }
-
-    // ===== Consumer groups client tests =====
-
-    @Test
-    @Order(20)
-    public void testCreateAndGetConsumerGroup() throws Exception {
-        log.info("Testing consumer group create and get");
-
-        String groupName = "async-test-group";
-        var group = client.consumerGroups()
-                .createConsumerGroup(StreamId.of(TEST_STREAM), 
TopicId.of(TEST_TOPIC), groupName)
-                .get(5, TimeUnit.SECONDS);
-
-        assertThat(group).isNotNull();
-        assertThat(group.name()).isEqualTo(groupName);
-        log.info("Successfully created consumer group: {}", group.name());
-
-        // Get by ID
-        var retrieved = client.consumerGroups()
-                .getConsumerGroup(StreamId.of(TEST_STREAM), 
TopicId.of(TEST_TOPIC), ConsumerId.of(group.id()))
-                .get(5, TimeUnit.SECONDS);
-
-        assertThat(retrieved).isPresent();
-        assertThat(retrieved.get().id()).isEqualTo(group.id());
-        log.info("Successfully retrieved consumer group by ID: {}", 
group.id());
-    }
-
-    @Test
-    @Order(21)
-    public void testGetConsumerGroups() throws Exception {
-        log.info("Testing consumer groups list");
-
-        var groups = client.consumerGroups()
-                .getConsumerGroups(StreamId.of(TEST_STREAM), 
TopicId.of(TEST_TOPIC))
-                .get(5, TimeUnit.SECONDS);
-
-        assertThat(groups).isNotNull();
-        assertThat(groups).isNotEmpty();
-        log.info("Successfully retrieved {} consumer groups", groups.size());
-    }
-
-    @Test
-    @Order(22)
-    public void testDeleteConsumerGroup() throws Exception {
-        log.info("Testing consumer group deletion");
-
-        String groupName = "async-delete-group";
-        var group = client.consumerGroups()
-                .createConsumerGroup(StreamId.of(TEST_STREAM), 
TopicId.of(TEST_TOPIC), groupName)
-                .get(5, TimeUnit.SECONDS);
-
-        client.consumerGroups()
-                .deleteConsumerGroup(StreamId.of(TEST_STREAM), 
TopicId.of(TEST_TOPIC), ConsumerId.of(group.id()))
-                .get(5, TimeUnit.SECONDS);
-
-        var deleted = client.consumerGroups()
-                .getConsumerGroup(StreamId.of(TEST_STREAM), 
TopicId.of(TEST_TOPIC), ConsumerId.of(group.id()))
-                .get(5, TimeUnit.SECONDS);
-
-        assertThat(deleted).isEmpty();
-        log.info("Successfully deleted consumer group: {}", groupName);
-    }
-
-    // ===== Consumer offsets client tests =====
-
-    @Test
-    @Order(30)
-    public void testStoreAndGetConsumerOffset() throws Exception {
-        log.info("Testing consumer offset store and get");
-
-        // Send message to partition 0 to ensure it is not empty so we can 
store offset 0
-        // Note: storeConsumerOffset with empty partitionId defaults to 
partition 0 on server
-        client.messages()
-                .sendMessages(
-                        StreamId.of(TEST_STREAM),
-                        TopicId.of(TEST_TOPIC),
-                        Partitioning.partitionId(0L),
-                        List.of(Message.of("test")))
-                .get(5, TimeUnit.SECONDS);
-
-        var consumer = new Consumer(Consumer.Kind.Consumer, 
ConsumerId.of(5000L));
-        var offset = BigInteger.valueOf(0);
-
-        client.consumerOffsets()
-                .storeConsumerOffset(
-                        StreamId.of(TEST_STREAM), TopicId.of(TEST_TOPIC), 
Optional.empty(), consumer, offset)
-                .get(5, TimeUnit.SECONDS);
-
-        log.info("Successfully stored consumer offset: {}", offset);
-
-        var retrieved = client.consumerOffsets()
-                .getConsumerOffset(StreamId.of(TEST_STREAM), 
TopicId.of(TEST_TOPIC), Optional.of(0L), consumer)
-                .get(5, TimeUnit.SECONDS);
-
-        assertThat(retrieved).isPresent();
-        log.info("Successfully retrieved consumer offset");
-    }
-
-    // ===== Partitions client tests =====
-
-    @Test
-    @Order(40)
-    public void testCreateAndDeletePartitions() throws Exception {
-        log.info("Testing partition create and delete");
-
-        // Get initial partition count
-        var topicBefore = client.topics()
-                .getTopic(StreamId.of(TEST_STREAM), TopicId.of(TEST_TOPIC))
-                .get(5, TimeUnit.SECONDS);
-        assertThat(topicBefore).isPresent();
-        long initialCount = topicBefore.get().partitionsCount();
-        log.info("Initial partition count: {}", initialCount);
-
-        // Create additional partitions
-        client.partitions()
-                .createPartitions(StreamId.of(TEST_STREAM), 
TopicId.of(TEST_TOPIC), 3L)
-                .get(5, TimeUnit.SECONDS);
-
-        var topicAfterCreate = client.topics()
-                .getTopic(StreamId.of(TEST_STREAM), TopicId.of(TEST_TOPIC))
-                .get(5, TimeUnit.SECONDS);
-        assertThat(topicAfterCreate).isPresent();
-        
assertThat(topicAfterCreate.get().partitionsCount()).isEqualTo(initialCount + 
3);
-        log.info("Partition count after create: {}", 
topicAfterCreate.get().partitionsCount());
-
-        // Delete the added partitions
-        client.partitions()
-                .deletePartitions(StreamId.of(TEST_STREAM), 
TopicId.of(TEST_TOPIC), 3L)
-                .get(5, TimeUnit.SECONDS);
-
-        var topicAfterDelete = client.topics()
-                .getTopic(StreamId.of(TEST_STREAM), TopicId.of(TEST_TOPIC))
-                .get(5, TimeUnit.SECONDS);
-        assertThat(topicAfterDelete).isPresent();
-        
assertThat(topicAfterDelete.get().partitionsCount()).isEqualTo(initialCount);
-        log.info("Partition count after delete: {}", 
topicAfterDelete.get().partitionsCount());
+            for (int i = 0; i < polledMessages.messages().size(); i++) {
+                String content = new 
String(polledMessages.messages().get(i).payload());
+                assertThat(content).isEqualTo(String.format("Test message %d", 
i));
+            }
+        } finally {
+            client.streams().deleteStream(streamId).get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+        }
     }
 
-    // ===== Personal access tokens client tests =====
-
     @Test
-    @Order(50)
-    public void testCreateAndDeletePersonalAccessToken() throws Exception {
-        log.info("Testing personal access token create and delete");
+    void shouldHandleConcurrentSendsAndPolls() throws Exception {
+        // given
+        String streamName = "concurrent-test-" + UUID.randomUUID();
+        StreamId streamId = StreamId.of(streamName);
+        TopicId topicId = TopicId.of("test-topic");
+        long partitionId = 1L;
 
-        String tokenName = "async-test-token";
-        var token = client.personalAccessTokens()
-                .createPersonalAccessToken(tokenName, 
BigInteger.valueOf(50_000))
-                .get(5, TimeUnit.SECONDS);
-
-        assertThat(token).isNotNull();
-        log.info("Successfully created personal access token: {}", tokenName);
-
-        // List tokens
-        var tokens = 
client.personalAccessTokens().getPersonalAccessTokens().get(5, 
TimeUnit.SECONDS);
-
-        assertThat(tokens).isNotNull();
-        assertThat(tokens).anyMatch(t -> t.name().equals(tokenName));
-        log.info("Found {} personal access tokens", tokens.size());
+        try {
+            client.streams().createStream(streamName).get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+            client.topics()
+                    .createTopic(
+                            streamId,
+                            2L,
+                            CompressionAlgorithm.None,
+                            BigInteger.ZERO,
+                            BigInteger.ZERO,
+                            Optional.empty(),
+                            "test-topic")
+                    .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+            // when — send messages concurrently from multiple threads
+            List<CompletableFuture<?>> operations = new ArrayList<>();
+            for (int i = 0; i < 5; i++) {
+                final int threadNum = i;
+                var future = CompletableFuture.supplyAsync(() -> {
+                            List<Message> messages = new ArrayList<>();
+                            for (int j = 0; j < 20; j++) {
+                                messages.add(Message.of(String.format("Thread 
%d - Message %d", threadNum, j)));
+                            }
+                            return messages;
+                        })
+                        .thenCompose(messages -> client.messages()
+                                .sendMessages(streamId, topicId, 
Partitioning.partitionId(partitionId), messages));
+                operations.add(future);
+            }
 
-        // Delete token
-        
client.personalAccessTokens().deletePersonalAccessToken(tokenName).get(5, 
TimeUnit.SECONDS);
+            // poll concurrently while sends are in progress
+            for (int i = 0; i < 3; i++) {
+                final long consumerId = 10000L + i;
+                var future = client.messages()
+                        .pollMessages(
+                                streamId,
+                                topicId,
+                                Optional.of(partitionId),
+                                Consumer.of(consumerId),
+                                PollingStrategy.last(),
+                                10L,
+                                false);
+                operations.add(future);
+            }
 
-        var tokensAfterDelete =
-                client.personalAccessTokens().getPersonalAccessTokens().get(5, 
TimeUnit.SECONDS);
+            // then — all operations should complete without errors
+            CompletableFuture.allOf(operations.toArray(new 
CompletableFuture[0]))
+                    .get(15, TimeUnit.SECONDS);
 
-        assertThat(tokensAfterDelete).noneMatch(t -> 
t.name().equals(tokenName));
-        log.info("Successfully deleted personal access token: {}", tokenName);
+            log.info("Completed {} concurrent operations", operations.size());
+        } finally {
+            client.streams().deleteStream(streamId).get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+        }
     }
 
     @Test
-    @Order(51)
-    public void testLoginWithPersonalAccessToken() throws Exception {
-        log.info("Testing login with personal access token");
-
-        String tokenName = "async-login-token";
-        var token = client.personalAccessTokens()
-                .createPersonalAccessToken(tokenName, 
BigInteger.valueOf(50_000))
-                .get(5, TimeUnit.SECONDS);
-
-        assertThat(token).isNotNull();
-        assertThat(token.token()).isNotEmpty();
-        log.info("Created token for login test");
+    void shouldSendAndPollLargeVolume() throws Exception {
+        // given
+        String streamName = "volume-test-" + UUID.randomUUID();
+        StreamId streamId = StreamId.of(streamName);
+        TopicId topicId = TopicId.of("test-topic");
+        long partitionId = 1L;
+        int messageCount = 100;
 
-        // Login with PAT using a separate client
-        var patClient = new AsyncIggyTcpClient(serverHost(), serverTcpPort());
         try {
-            patClient.connect().get(5, TimeUnit.SECONDS);
-            var identity = patClient
-                    .personalAccessTokens()
-                    .loginWithPersonalAccessToken(token.token())
-                    .get(5, TimeUnit.SECONDS);
+            client.streams().createStream(streamName).get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+            client.topics()
+                    .createTopic(
+                            streamId,
+                            2L,
+                            CompressionAlgorithm.None,
+                            BigInteger.ZERO,
+                            BigInteger.ZERO,
+                            Optional.empty(),
+                            "test-topic")
+                    .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+            // when — send messages in concurrent batches
+            List<CompletableFuture<Void>> sendFutures = new ArrayList<>();
+            for (int batch = 0; batch < 10; batch++) {
+                List<Message> batchMessages = new ArrayList<>();
+                for (int i = 0; i < 10; i++) {
+                    int msgNum = batch * 10 + i;
+                    batchMessages.add(Message.of(String.format("Batch message 
%d", msgNum)));
+                }
+                sendFutures.add(client.messages()
+                        .sendMessages(streamId, topicId, 
Partitioning.partitionId(partitionId), batchMessages));
+            }
+            CompletableFuture.allOf(sendFutures.toArray(new 
CompletableFuture[0]))
+                    .get(10, TimeUnit.SECONDS);
 
-            assertThat(identity).isNotNull();
-            log.info("Successfully logged in with personal access token");
+            // then — poll all messages back
+            var polledMessages = client.messages()
+                    .pollMessages(
+                            streamId,
+                            topicId,
+                            Optional.of(partitionId),
+                            Consumer.of(1L),
+                            PollingStrategy.offset(BigInteger.ZERO),
+                            (long) messageCount + 10,
+                            false)
+                    .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+            assertThat(polledMessages).isNotNull();
+            
assertThat(polledMessages.messages().size()).isGreaterThanOrEqualTo(messageCount);
+            log.info(
+                    "Sent {} messages in batches, polled {} back",
+                    messageCount,
+                    polledMessages.messages().size());
         } finally {
-            patClient.close().get(5, TimeUnit.SECONDS);
-            // Clean up token
-            
client.personalAccessTokens().deletePersonalAccessToken(tokenName).get(5, 
TimeUnit.SECONDS);
+            client.streams().deleteStream(streamId).get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
         }
     }
 }
diff --git 
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncConsumerGroupsTest.java
 
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncConsumerGroupsTest.java
index 46e976912..f1cc13111 100644
--- 
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncConsumerGroupsTest.java
+++ 
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncConsumerGroupsTest.java
@@ -47,11 +47,11 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /**
- * Dedicated async-specific tests for {@link ConsumerGroupsClient} via
- * {@link org.apache.iggy.client.async.tcp.ConsumerGroupsTcpClient}.
+ * Async-specific tests for {@link ConsumerGroupsClient}: error scenarios,
+ * multi-client membership, and CompletableFuture patterns (chaining, 
concurrency).
  *
- * <p>Covers all CRUD operations, join/leave membership, error scenarios, and
- * CompletableFuture-specific patterns (chaining, concurrency, exception 
propagation).
+ * <p>Basic CRUD and join/leave operations are covered by the blocking
+ * {@code ConsumerGroupsClientBaseTest} and {@code 
ConsumerGroupsTcpClientTest}.
  */
 public class AsyncConsumerGroupsTest extends BaseIntegrationTest {
 
@@ -74,10 +74,7 @@ public class AsyncConsumerGroupsTest extends 
BaseIntegrationTest {
         client = new AsyncIggyTcpClient(serverHost(), serverTcpPort());
 
         client.connect()
-                .thenCompose(v -> {
-                    log.info("Connected to Iggy server");
-                    return client.users().login(USERNAME, PASSWORD);
-                })
+                .thenCompose(v -> client.users().login(USERNAME, PASSWORD))
                 .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
 
         client.streams().createStream(TEST_STREAM).get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
@@ -91,235 +88,79 @@ public class AsyncConsumerGroupsTest extends 
BaseIntegrationTest {
                         Optional.empty(),
                         TEST_TOPIC)
                 .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
-        log.info("Created stream '{}' and topic '{}'", TEST_STREAM, 
TEST_TOPIC);
     }
 
     @AfterAll
     public static void tearDown() throws Exception {
-        log.info("Cleaning up async consumer groups test resources");
         try {
             client.streams().deleteStream(STREAM_ID).get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
-            log.info("Deleted test stream: {}", TEST_STREAM);
         } catch (RuntimeException e) {
             log.debug("Stream cleanup failed (may not exist): {}", 
e.getMessage());
         }
         if (client != null) {
             client.close().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-            log.info("Closed async client");
-        }
-    }
-
-    // ===== Happy path tests =====
-
-    @Test
-    void shouldCreateConsumerGroupAsync() throws Exception {
-        String groupName = "create-test-" + UUID.randomUUID();
-
-        ConsumerGroupDetails group = client.consumerGroups()
-                .createConsumerGroup(STREAM_ID, TOPIC_ID, groupName)
-                .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
-        assertThat(group).isNotNull();
-        assertThat(group.id()).isNotNull();
-        assertThat(group.name()).isEqualTo(groupName);
-        assertThat(group.membersCount()).isEqualTo(0);
-        assertThat(group.members()).isEmpty();
-    }
-
-    @Test
-    void shouldGetConsumerGroupByIdAsync() throws Exception {
-        String groupName = "get-by-id-test-" + UUID.randomUUID();
-
-        ConsumerGroupDetails created = client.consumerGroups()
-                .createConsumerGroup(STREAM_ID, TOPIC_ID, groupName)
-                .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
-        Optional<ConsumerGroupDetails> retrieved = client.consumerGroups()
-                .getConsumerGroup(STREAM_ID, TOPIC_ID, 
ConsumerId.of(created.id()))
-                .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
-        assertThat(retrieved).isPresent();
-        assertThat(retrieved.get().id()).isEqualTo(created.id());
-        assertThat(retrieved.get().name()).isEqualTo(groupName);
-    }
-
-    @Test
-    void shouldGetConsumerGroupByNameAsync() throws Exception {
-        String groupName = "get-by-name-test-" + UUID.randomUUID();
-
-        ConsumerGroupDetails created = client.consumerGroups()
-                .createConsumerGroup(STREAM_ID, TOPIC_ID, groupName)
-                .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
-        Optional<ConsumerGroupDetails> byId = client.consumerGroups()
-                .getConsumerGroup(STREAM_ID, TOPIC_ID, 
ConsumerId.of(created.id()))
-                .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-        Optional<ConsumerGroupDetails> byName = client.consumerGroups()
-                .getConsumerGroup(STREAM_ID, TOPIC_ID, 
ConsumerId.of(groupName))
-                .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
-        assertThat(byName).isPresent();
-        assertThat(byName.get().name()).isEqualTo(groupName);
-        assertThat(byId).isEqualTo(byName);
-    }
-
-    @Test
-    void shouldListAllConsumerGroupsAsync() throws Exception {
-        String streamName = "list-all-stream-" + UUID.randomUUID();
-        String topicName = "list-all-topic";
-        StreamId streamId = StreamId.of(streamName);
-        TopicId topicId = TopicId.of(topicName);
-
-        try {
-            client.streams().createStream(streamName).get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
-            client.topics()
-                    .createTopic(
-                            streamId,
-                            1L,
-                            CompressionAlgorithm.None,
-                            BigInteger.ZERO,
-                            BigInteger.ZERO,
-                            Optional.empty(),
-                            topicName)
-                    .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
-            client.consumerGroups()
-                    .createConsumerGroup(streamId, topicId, "group-a")
-                    .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-            client.consumerGroups()
-                    .createConsumerGroup(streamId, topicId, "group-b")
-                    .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-            client.consumerGroups()
-                    .createConsumerGroup(streamId, topicId, "group-c")
-                    .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
-            List<ConsumerGroup> groups =
-                    client.consumerGroups().getConsumerGroups(streamId, 
topicId).get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
-            assertThat(groups).hasSize(3);
-            
assertThat(groups).map(ConsumerGroup::name).containsExactlyInAnyOrder("group-a",
 "group-b", "group-c");
-        } finally {
-            client.streams().deleteStream(streamId).get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
         }
     }
 
-    @Test
-    void shouldDeleteConsumerGroupAsync() throws Exception {
-        String groupName = "delete-test-" + UUID.randomUUID();
-
-        ConsumerGroupDetails created = client.consumerGroups()
-                .createConsumerGroup(STREAM_ID, TOPIC_ID, groupName)
-                .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
-        client.consumerGroups()
-                .deleteConsumerGroup(STREAM_ID, TOPIC_ID, 
ConsumerId.of(created.id()))
-                .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
-        Optional<ConsumerGroupDetails> deleted = client.consumerGroups()
-                .getConsumerGroup(STREAM_ID, TOPIC_ID, 
ConsumerId.of(created.id()))
-                .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
-        assertThat(deleted).isEmpty();
-    }
+    // ===== Error scenario tests =====
 
     @Test
-    void shouldDeleteConsumerGroupByNameAsync() throws Exception {
-        String groupName = "delete-by-name-test-" + UUID.randomUUID();
-
-        client.consumerGroups()
-                .createConsumerGroup(STREAM_ID, TOPIC_ID, groupName)
-                .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
-        client.consumerGroups()
-                .deleteConsumerGroup(STREAM_ID, TOPIC_ID, 
ConsumerId.of(groupName))
-                .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
-        Optional<ConsumerGroupDetails> deleted = client.consumerGroups()
-                .getConsumerGroup(STREAM_ID, TOPIC_ID, 
ConsumerId.of(groupName))
+    void shouldReturnEmptyForNonExistentGroup() throws Exception {
+        // when
+        Optional<ConsumerGroupDetails> result = client.consumerGroups()
+                .getConsumerGroup(STREAM_ID, TOPIC_ID, ConsumerId.of(999_999L))
                 .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
 
-        assertThat(deleted).isEmpty();
+        // then
+        assertThat(result).isEmpty();
     }
 
-    // ===== Join/Leave tests =====
-
     @Test
-    void shouldJoinConsumerGroupAsync() throws Exception {
-        String groupName = "join-test-" + UUID.randomUUID();
-
-        ConsumerGroupDetails created = client.consumerGroups()
-                .createConsumerGroup(STREAM_ID, TOPIC_ID, groupName)
-                .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-        ConsumerId groupId = ConsumerId.of(created.id());
-
-        client.consumerGroups().joinConsumerGroup(STREAM_ID, TOPIC_ID, 
groupId).get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
-        ConsumerGroupDetails group = client.consumerGroups()
-                .getConsumerGroup(STREAM_ID, TOPIC_ID, groupId)
-                .get(TIMEOUT_SECONDS, TimeUnit.SECONDS)
-                .get();
-
-        assertThat(group.membersCount()).isEqualTo(1);
-        assertThat(group.members()).hasSize(1);
-        assertThat(group.members().get(0).partitionsCount()).isGreaterThan(0);
+    void shouldFailToDeleteNonExistentGroup() {
+        // when
+        var future = client.consumerGroups().deleteConsumerGroup(STREAM_ID, 
TOPIC_ID, ConsumerId.of(999_999L));
 
-        client.consumerGroups().leaveConsumerGroup(STREAM_ID, TOPIC_ID, 
groupId).get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+        // then
+        assertThatThrownBy(() -> future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS))
+                .isInstanceOf(ExecutionException.class)
+                .cause()
+                .isInstanceOf(IggyResourceNotFoundException.class);
     }
 
     @Test
-    void shouldLeaveConsumerGroupAsync() throws Exception {
-        String groupName = "leave-test-" + UUID.randomUUID();
-
-        ConsumerGroupDetails created = client.consumerGroups()
-                .createConsumerGroup(STREAM_ID, TOPIC_ID, groupName)
-                .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-        ConsumerId groupId = ConsumerId.of(created.id());
-
-        client.consumerGroups().joinConsumerGroup(STREAM_ID, TOPIC_ID, 
groupId).get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
-        client.consumerGroups().leaveConsumerGroup(STREAM_ID, TOPIC_ID, 
groupId).get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
-        ConsumerGroupDetails group = client.consumerGroups()
-                .getConsumerGroup(STREAM_ID, TOPIC_ID, groupId)
-                .get(TIMEOUT_SECONDS, TimeUnit.SECONDS)
-                .get();
+    void shouldFailToJoinNonExistentGroup() {
+        // when
+        var future = client.consumerGroups().joinConsumerGroup(STREAM_ID, 
TOPIC_ID, ConsumerId.of(999_999L));
 
-        assertThat(group.membersCount()).isEqualTo(0);
-        assertThat(group.members()).isEmpty();
+        // then
+        assertThatThrownBy(() -> future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS))
+                .isInstanceOf(ExecutionException.class)
+                .cause()
+                .isInstanceOf(IggyResourceNotFoundException.class);
     }
 
     @Test
-    void shouldJoinAndLeaveConsumerGroupSequentiallyAsync() throws Exception {
-        String groupName = "sequential-test-" + UUID.randomUUID();
-
+    void shouldFailToLeaveGroupNotJoined() throws Exception {
+        // given
+        String groupName = "leave-not-joined-test-" + UUID.randomUUID();
         ConsumerGroupDetails created = client.consumerGroups()
                 .createConsumerGroup(STREAM_ID, TOPIC_ID, groupName)
                 .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-        ConsumerId groupId = ConsumerId.of(created.id());
 
-        ConsumerGroupDetails afterLeave = client.consumerGroups()
-                .joinConsumerGroup(STREAM_ID, TOPIC_ID, groupId)
-                .thenCompose(v -> 
client.consumerGroups().getConsumerGroup(STREAM_ID, TOPIC_ID, groupId))
-                .thenCompose(groupOpt -> {
-                    assertThat(groupOpt).isPresent();
-                    assertThat(groupOpt.get().membersCount()).isEqualTo(1);
-                    return 
client.consumerGroups().leaveConsumerGroup(STREAM_ID, TOPIC_ID, groupId);
-                })
-                .thenCompose(v -> 
client.consumerGroups().getConsumerGroup(STREAM_ID, TOPIC_ID, groupId))
-                .thenApply(groupOpt -> {
-                    assertThat(groupOpt).isPresent();
-                    return groupOpt.get();
-                })
-                .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+        // when
+        var future = client.consumerGroups().leaveConsumerGroup(STREAM_ID, 
TOPIC_ID, ConsumerId.of(created.id()));
 
-        assertThat(afterLeave.membersCount()).isEqualTo(0);
+        // then
+        assertThatThrownBy(() -> future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS))
+                .isInstanceOf(ExecutionException.class)
+                .cause()
+                .isInstanceOf(IggyResourceNotFoundException.class);
     }
 
     @Test
-    void shouldHandleMultipleClientsJoiningGroupAsync() throws Exception {
+    void shouldHandleMultipleClientsJoiningGroup() throws Exception {
+        // given
         String groupName = "multi-client-test-" + UUID.randomUUID();
-
         ConsumerGroupDetails created = client.consumerGroups()
                 .createConsumerGroup(STREAM_ID, TOPIC_ID, groupName)
                 .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
@@ -332,6 +173,7 @@ public class AsyncConsumerGroupsTest extends 
BaseIntegrationTest {
                     .thenCompose(v -> secondClient.users().login(USERNAME, 
PASSWORD))
                     .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
 
+            // when
             client.consumerGroups()
                     .joinConsumerGroup(STREAM_ID, TOPIC_ID, groupId)
                     .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
@@ -340,6 +182,7 @@ public class AsyncConsumerGroupsTest extends 
BaseIntegrationTest {
                     .joinConsumerGroup(STREAM_ID, TOPIC_ID, groupId)
                     .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
 
+            // then
             ConsumerGroupDetails group = client.consumerGroups()
                     .getConsumerGroup(STREAM_ID, TOPIC_ID, groupId)
                     .get(TIMEOUT_SECONDS, TimeUnit.SECONDS)
@@ -348,6 +191,7 @@ public class AsyncConsumerGroupsTest extends 
BaseIntegrationTest {
             assertThat(group.membersCount()).isEqualTo(2);
             assertThat(group.members()).hasSize(2);
 
+            // cleanup — leave before closing
             client.consumerGroups()
                     .leaveConsumerGroup(STREAM_ID, TOPIC_ID, groupId)
                     .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
@@ -355,84 +199,61 @@ public class AsyncConsumerGroupsTest extends 
BaseIntegrationTest {
                     .consumerGroups()
                     .leaveConsumerGroup(STREAM_ID, TOPIC_ID, groupId)
                     .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
-            ConsumerGroupDetails afterLeave = client.consumerGroups()
-                    .getConsumerGroup(STREAM_ID, TOPIC_ID, groupId)
-                    .get(TIMEOUT_SECONDS, TimeUnit.SECONDS)
-                    .get();
-
-            assertThat(afterLeave.membersCount()).isEqualTo(0);
         } finally {
             secondClient.close().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
         }
     }
 
-    // ===== Error scenario tests =====
-
-    @Test
-    void shouldReturnEmptyForNonExistentGroup() throws Exception {
-        Optional<ConsumerGroupDetails> result = client.consumerGroups()
-                .getConsumerGroup(STREAM_ID, TOPIC_ID, ConsumerId.of(999_999L))
-                .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
-        assertThat(result).isEmpty();
-    }
-
-    @Test
-    void shouldFailToDeleteNonExistentGroup() {
-        var future = client.consumerGroups().deleteConsumerGroup(STREAM_ID, 
TOPIC_ID, ConsumerId.of(999_999L));
-
-        assertThatThrownBy(() -> future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS))
-                .isInstanceOf(ExecutionException.class)
-                .cause()
-                .isInstanceOf(IggyResourceNotFoundException.class);
-    }
-
-    @Test
-    void shouldFailToJoinNonExistentGroup() {
-        var future = client.consumerGroups().joinConsumerGroup(STREAM_ID, 
TOPIC_ID, ConsumerId.of(999_999L));
-
-        assertThatThrownBy(() -> future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS))
-                .isInstanceOf(ExecutionException.class)
-                .cause()
-                .isInstanceOf(IggyResourceNotFoundException.class);
-    }
-
-    @Test
-    void shouldFailToLeaveGroupNotJoined() throws Exception {
-        String groupName = "leave-not-joined-test-" + UUID.randomUUID();
-
-        ConsumerGroupDetails created = client.consumerGroups()
-                .createConsumerGroup(STREAM_ID, TOPIC_ID, groupName)
-                .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
-        var future = client.consumerGroups().leaveConsumerGroup(STREAM_ID, 
TOPIC_ID, ConsumerId.of(created.id()));
-
-        assertThatThrownBy(() -> future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS))
-                .isInstanceOf(ExecutionException.class)
-                .cause()
-                .isInstanceOf(IggyResourceNotFoundException.class);
-    }
-
-    // ===== CompletableFuture-specific tests =====
-
     @Test
     void shouldChainCreateAndGetWithThenCompose() throws Exception {
+        // given
         String groupName = "chain-test-" + UUID.randomUUID();
 
+        // when
         Optional<ConsumerGroupDetails> result = client.consumerGroups()
                 .createConsumerGroup(STREAM_ID, TOPIC_ID, groupName)
                 .thenCompose(created ->
                         client.consumerGroups().getConsumerGroup(STREAM_ID, 
TOPIC_ID, ConsumerId.of(created.id())))
                 .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
 
+        // then
         assertThat(result).isPresent();
         assertThat(result.get().name()).isEqualTo(groupName);
         assertThat(result.get().membersCount()).isEqualTo(0);
     }
 
+    @Test
+    void shouldChainJoinVerifyLeaveVerify() throws Exception {
+        // given
+        String groupName = "sequential-chain-test-" + UUID.randomUUID();
+        ConsumerGroupDetails created = client.consumerGroups()
+                .createConsumerGroup(STREAM_ID, TOPIC_ID, groupName)
+                .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+        ConsumerId groupId = ConsumerId.of(created.id());
+
+        // when
+        ConsumerGroupDetails afterLeave = client.consumerGroups()
+                .joinConsumerGroup(STREAM_ID, TOPIC_ID, groupId)
+                .thenCompose(v -> 
client.consumerGroups().getConsumerGroup(STREAM_ID, TOPIC_ID, groupId))
+                .thenCompose(groupOpt -> {
+                    assertThat(groupOpt).isPresent();
+                    assertThat(groupOpt.get().membersCount()).isEqualTo(1);
+                    return 
client.consumerGroups().leaveConsumerGroup(STREAM_ID, TOPIC_ID, groupId);
+                })
+                .thenCompose(v -> 
client.consumerGroups().getConsumerGroup(STREAM_ID, TOPIC_ID, groupId))
+                .thenApply(groupOpt -> {
+                    assertThat(groupOpt).isPresent();
+                    return groupOpt.get();
+                })
+                .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+        // then
+        assertThat(afterLeave.membersCount()).isEqualTo(0);
+    }
+
     @Test
     void shouldHandleConcurrentGroupCreations() throws Exception {
+        // given
         String streamName = "concurrent-create-stream-" + UUID.randomUUID();
         String topicName = "concurrent-create-topic";
         StreamId streamId = StreamId.of(streamName);
@@ -451,14 +272,15 @@ public class AsyncConsumerGroupsTest extends 
BaseIntegrationTest {
                             topicName)
                     .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
 
+            // when
             List<CompletableFuture<ConsumerGroupDetails>> futures = new 
ArrayList<>();
             for (int i = 0; i < 5; i++) {
                 
futures.add(client.consumerGroups().createConsumerGroup(streamId, topicId, 
"concurrent-group-" + i));
             }
-
             CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
                     .get(TIMEOUT_SECONDS * 2, TimeUnit.SECONDS);
 
+            // then
             for (CompletableFuture<ConsumerGroupDetails> f : futures) {
                 assertThat(f.isDone()).isTrue();
                 assertThat(f.isCompletedExceptionally()).isFalse();
@@ -466,7 +288,6 @@ public class AsyncConsumerGroupsTest extends 
BaseIntegrationTest {
 
             List<ConsumerGroup> groups =
                     client.consumerGroups().getConsumerGroups(streamId, 
topicId).get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
             assertThat(groups).hasSize(5);
             assertThat(groups)
                     .map(ConsumerGroup::name)
@@ -483,8 +304,8 @@ public class AsyncConsumerGroupsTest extends 
BaseIntegrationTest {
 
     @Test
     void shouldHandleConcurrentJoinAndLeaveOperations() throws Exception {
+        // given
         String groupName = "concurrent-join-leave-test-" + UUID.randomUUID();
-
         ConsumerGroupDetails created = client.consumerGroups()
                 .createConsumerGroup(STREAM_ID, TOPIC_ID, groupName)
                 .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
@@ -502,30 +323,32 @@ public class AsyncConsumerGroupsTest extends 
BaseIntegrationTest {
                     .thenCompose(v -> thirdClient.users().login(USERNAME, 
PASSWORD))
                     .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
 
+            // when — join concurrently
             CompletableFuture.allOf(
                             
client.consumerGroups().joinConsumerGroup(STREAM_ID, TOPIC_ID, groupId),
                             
secondClient.consumerGroups().joinConsumerGroup(STREAM_ID, TOPIC_ID, groupId),
                             
thirdClient.consumerGroups().joinConsumerGroup(STREAM_ID, TOPIC_ID, groupId))
                     .get(TIMEOUT_SECONDS * 2, TimeUnit.SECONDS);
 
+            // then
             ConsumerGroupDetails afterJoin = client.consumerGroups()
                     .getConsumerGroup(STREAM_ID, TOPIC_ID, groupId)
                     .get(TIMEOUT_SECONDS, TimeUnit.SECONDS)
                     .get();
-
             assertThat(afterJoin.membersCount()).isEqualTo(3);
 
+            // when — leave concurrently
             CompletableFuture.allOf(
                             
client.consumerGroups().leaveConsumerGroup(STREAM_ID, TOPIC_ID, groupId),
                             
secondClient.consumerGroups().leaveConsumerGroup(STREAM_ID, TOPIC_ID, groupId),
                             
thirdClient.consumerGroups().leaveConsumerGroup(STREAM_ID, TOPIC_ID, groupId))
                     .get(TIMEOUT_SECONDS * 2, TimeUnit.SECONDS);
 
+            // then
             ConsumerGroupDetails afterLeave = client.consumerGroups()
                     .getConsumerGroup(STREAM_ID, TOPIC_ID, groupId)
                     .get(TIMEOUT_SECONDS, TimeUnit.SECONDS)
                     .get();
-
             assertThat(afterLeave.membersCount()).isEqualTo(0);
         } finally {
             secondClient.close().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
diff --git 
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncPollMessageTest.java
 
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncPollMessageTest.java
deleted file mode 100644
index 62011dd60..000000000
--- 
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncPollMessageTest.java
+++ /dev/null
@@ -1,333 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iggy.client.async;
-
-import org.apache.iggy.client.BaseIntegrationTest;
-import org.apache.iggy.client.async.tcp.AsyncIggyTcpClient;
-import org.apache.iggy.consumergroup.Consumer;
-import org.apache.iggy.identifier.StreamId;
-import org.apache.iggy.identifier.TopicId;
-import org.apache.iggy.message.Message;
-import org.apache.iggy.message.Partitioning;
-import org.apache.iggy.message.PollingStrategy;
-import org.apache.iggy.topic.CompressionAlgorithm;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.MethodOrderer;
-import org.junit.jupiter.api.Order;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.TestMethodOrder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.math.BigInteger;
-import java.util.Optional;
-import java.util.UUID;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.stream.IntStream;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/**
- * Test class specifically for testing poll message functionality with 
different consumer scenarios.
- * <p>
- * Key findings:
- * 1. Polling with NULL consumer causes server to not respond (timeout)
- * 2. Polling with invalid consumer ID returns error 1010
- * 3. Polling with valid consumer group member works correctly
- */
-@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
-public abstract class AsyncPollMessageTest extends BaseIntegrationTest {
-
-    private static final Logger log = 
LoggerFactory.getLogger(AsyncPollMessageTest.class);
-    private static AsyncIggyTcpClient client;
-    private static String testStream;
-    private static final String TEST_TOPIC = "poll-test-topic";
-    private static final String CONSUMER_GROUP_NAME = "test-consumer-group";
-    private static final Long PARTITION_ID = 1L;
-    private static final int MESSAGE_COUNT = 10;
-
-    @BeforeEach
-    void setupEachTest() throws Exception {
-        // Ensure connection is established before each test
-        if (client == null || !isConnected(client)) {
-            log.info("Reconnecting client for test");
-            if (client != null) {
-                try {
-                    client.close().get(1, TimeUnit.SECONDS);
-                } catch (RuntimeException e) {
-                    // Ignore close errors
-                }
-            }
-            client = new AsyncIggyTcpClient(serverHost(), serverTcpPort());
-            client.connect().get(5, TimeUnit.SECONDS);
-            client.users().login("iggy", "iggy").get(5, TimeUnit.SECONDS);
-            log.info("Client reconnected successfully");
-        }
-    }
-
-    private boolean isConnected(AsyncIggyTcpClient client) {
-        // Check if client is connected by attempting a simple operation
-        try {
-            client.streams().getStreams().get(1, TimeUnit.SECONDS);
-            return true;
-        } catch (InterruptedException | ExecutionException | TimeoutException 
e) {
-            return false;
-        }
-    }
-
-    @BeforeAll
-    static void setup() throws Exception {
-        log.info("Setting up async client for poll message tests");
-
-        // Initialize client
-        client = new AsyncIggyTcpClient(serverHost(), serverTcpPort());
-        client.connect().get(5, TimeUnit.SECONDS);
-        client.users().login("iggy", "iggy").get(5, TimeUnit.SECONDS);
-        log.info("Successfully connected and logged in");
-
-        // Create unique stream for this test run
-        testStream = "poll-test-stream-" + UUID.randomUUID();
-        var stream = client.streams().createStream(testStream).get(5, 
TimeUnit.SECONDS);
-        log.info("Created test stream: {}", stream.name());
-
-        // Create topic with 2 partitions
-        var topic = client.topics()
-                .createTopic(
-                        StreamId.of(testStream),
-                        2L,
-                        CompressionAlgorithm.None,
-                        BigInteger.ZERO,
-                        BigInteger.ZERO,
-                        Optional.empty(),
-                        TEST_TOPIC)
-                .get(5, TimeUnit.SECONDS);
-        log.info("Created test topic: {} with {} partitions", topic.name(), 
topic.partitionsCount());
-
-        // Send test messages to both partitions
-        for (int partition = 0; partition < 2; partition++) {
-            final int partNum = partition;
-            var messages = IntStream.range(0, MESSAGE_COUNT)
-                    .mapToObj(i -> Message.of(String.format("Message %d for 
partition %d", i, partNum)))
-                    .toList();
-
-            client.messages()
-                    .sendMessages(
-                            StreamId.of(testStream),
-                            TopicId.of(TEST_TOPIC),
-                            Partitioning.partitionId((long) partition),
-                            messages)
-                    .get(5, TimeUnit.SECONDS);
-
-            log.info("Sent {} messages to partition {}", MESSAGE_COUNT, 
partition);
-        }
-    }
-
-    @AfterAll
-    static void cleanup() throws Exception {
-        log.info("Cleaning up test resources");
-
-        // Delete stream (cascades to topics and consumer groups)
-        try {
-            client.streams().deleteStream(StreamId.of(testStream)).get(5, 
TimeUnit.SECONDS);
-            log.info("Deleted test stream: {}", testStream);
-        } catch (RuntimeException e) {
-            log.warn("Failed to delete test stream: {}", e.getMessage());
-        }
-
-        // Close client
-        if (client != null) {
-            client.close().get(5, TimeUnit.SECONDS);
-            log.info("Closed async client");
-        }
-    }
-
-    @Test
-    @Order(2)
-    @DisplayName("Poll with various consumer IDs")
-    void testPollWithVariousConsumerIDs() throws Exception {
-        log.info("TEST 2: Polling with various consumer IDs");
-
-        // Test with a large consumer ID (likely doesn't exist but server 
still accepts)
-        var largeIdConsumer = Consumer.of(99999L);
-
-        try {
-            var polledMessages = client.messages()
-                    .pollMessages(
-                            StreamId.of(testStream),
-                            TopicId.of(TEST_TOPIC),
-                            Optional.of(PARTITION_ID),
-                            largeIdConsumer,
-                            PollingStrategy.offset(BigInteger.ZERO),
-                            10L,
-                            false)
-                    .get(5, TimeUnit.SECONDS);
-
-            // Server accepts any valid consumer ID format
-            assertThat(polledMessages).isNotNull();
-            log.info(
-                    "Server accepted consumer ID 99999 and returned {} 
messages",
-                    polledMessages.messages().size());
-        } catch (ExecutionException e) {
-            log.info("Consumer ID 99999 was rejected: {}", 
e.getCause().getMessage());
-        }
-    }
-
-    @Test
-    @Order(3)
-    @DisplayName("Poll with valid consumer ID")
-    void testPollWithValidConsumer() throws Exception {
-        log.info("TEST 3: Polling with valid consumer");
-
-        // Use a simple consumer ID
-        var consumer = Consumer.of(1L);
-
-        try {
-            var polledMessages = client.messages()
-                    .pollMessages(
-                            StreamId.of(testStream),
-                            TopicId.of(TEST_TOPIC),
-                            Optional.of(PARTITION_ID),
-                            consumer,
-                            PollingStrategy.offset(BigInteger.ZERO),
-                            5L,
-                            false)
-                    .get(5, TimeUnit.SECONDS);
-
-            assertThat(polledMessages).isNotNull();
-            log.info(
-                    "Successfully polled {} messages with consumer ID 1",
-                    polledMessages.messages().size());
-
-            // Log message content
-            polledMessages.messages().forEach(msg -> log.info("  - Message: 
{}", new String(msg.payload())));
-        } catch (ExecutionException e) {
-            log.info(
-                    "Polling with consumer ID 1 failed (expected if consumer 
doesn't exist): {}",
-                    e.getCause().getMessage());
-            // This is expected if the consumer doesn't exist in the system
-        }
-    }
-
-    @Test
-    @Order(4)
-    @DisplayName("Poll with direct partition access")
-    void testPollDirectPartitionAccess() throws Exception {
-        log.info("TEST 4: Direct partition polling");
-
-        try {
-            // Try polling with a session consumer
-            var sessionConsumer = Consumer.of(0L);
-
-            var polledMessages = client.messages()
-                    .pollMessages(
-                            StreamId.of(testStream),
-                            TopicId.of(TEST_TOPIC),
-                            Optional.of(PARTITION_ID),
-                            sessionConsumer,
-                            PollingStrategy.offset(BigInteger.ZERO),
-                            5L,
-                            false)
-                    .get(5, TimeUnit.SECONDS);
-
-            assertThat(polledMessages).isNotNull();
-            log.info(
-                    "Successfully polled {} messages with session consumer",
-                    polledMessages.messages().size());
-
-        } catch (ExecutionException e) {
-            log.info(
-                    "Direct partition access failed (may require consumer 
group): {}",
-                    e.getCause().getMessage());
-            // This is expected behavior if server requires consumer group 
membership
-        }
-    }
-
-    @Test
-    @Order(5)
-    @DisplayName("Poll with different strategies")
-    void testPollWithDifferentStrategies() throws Exception {
-        log.info("TEST 5: Testing different polling strategies");
-
-        var consumer = Consumer.of(1L);
-
-        // Test 1: Poll from beginning
-        log.info("  Testing FIRST strategy");
-        try {
-            var firstMessages = client.messages()
-                    .pollMessages(
-                            StreamId.of(testStream),
-                            TopicId.of(TEST_TOPIC),
-                            Optional.of(1L),
-                            consumer,
-                            PollingStrategy.first(),
-                            3L,
-                            false)
-                    .get(5, TimeUnit.SECONDS);
-            log.info(
-                    "    Polled {} messages from beginning",
-                    firstMessages.messages().size());
-        } catch (RuntimeException e) {
-            log.info("    FIRST strategy failed: {}", e.getMessage());
-        }
-
-        // Test 2: Poll from specific offset
-        log.info("  Testing OFFSET strategy");
-        try {
-            var offsetMessages = client.messages()
-                    .pollMessages(
-                            StreamId.of(testStream),
-                            TopicId.of(TEST_TOPIC),
-                            Optional.of(1L),
-                            consumer,
-                            PollingStrategy.offset(BigInteger.valueOf(5)),
-                            3L,
-                            false)
-                    .get(5, TimeUnit.SECONDS);
-            log.info(
-                    "    Polled {} messages from offset 5",
-                    offsetMessages.messages().size());
-        } catch (RuntimeException e) {
-            log.info("    OFFSET strategy failed: {}", e.getMessage());
-        }
-
-        // Test 3: Poll latest messages
-        log.info("  Testing LAST strategy");
-        try {
-            var lastMessages = client.messages()
-                    .pollMessages(
-                            StreamId.of(testStream),
-                            TopicId.of(TEST_TOPIC),
-                            Optional.of(1L),
-                            consumer,
-                            PollingStrategy.last(),
-                            1L,
-                            false)
-                    .get(5, TimeUnit.SECONDS);
-            log.info("    Polled {} latest messages", 
lastMessages.messages().size());
-        } catch (RuntimeException e) {
-            log.info("    LAST strategy failed: {}", e.getMessage());
-        }
-    }
-}
diff --git 
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/tcp/UsersTcpClientTest.java
 
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/tcp/UsersTcpClientTest.java
deleted file mode 100644
index 19194b9cb..000000000
--- 
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/tcp/UsersTcpClientTest.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iggy.client.async.tcp;
-
-import org.apache.iggy.client.BaseIntegrationTest;
-import org.apache.iggy.identifier.UserId;
-import org.apache.iggy.user.GlobalPermissions;
-import org.apache.iggy.user.IdentityInfo;
-import org.apache.iggy.user.Permissions;
-import org.apache.iggy.user.UserInfo;
-import org.apache.iggy.user.UserStatus;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.TimeUnit;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-class UsersTcpClientTest extends BaseIntegrationTest {
-
-    private static final Logger log = 
LoggerFactory.getLogger(UsersTcpClientTest.class);
-    private static final String USERNAME = "iggy";
-    private static final String PASSWORD = "iggy";
-
-    private static AsyncIggyTcpClient client;
-    private static IdentityInfo loggedInUser;
-
-    @BeforeAll
-    public static void setup() throws Exception {
-        log.info("Setting up async client for integration tests");
-        client = new AsyncIggyTcpClient(serverHost(), serverTcpPort());
-
-        // Connect and login
-        loggedInUser = client.connect()
-                .thenCompose(v -> {
-                    log.info("Connected to Iggy server");
-                    return client.users().login(USERNAME, PASSWORD);
-                })
-                .get(5, TimeUnit.SECONDS);
-
-        log.info("Successfully logged in as: {}", USERNAME);
-    }
-
-    @Test
-    void shouldLogIn() {
-        assertThat(loggedInUser).isNotNull();
-        assertThat(loggedInUser.userId()).isEqualTo(0L);
-    }
-
-    @Test
-    void shouldGetUserWhenUserExists() throws Exception {
-        var userDetails = client.users().getUser(0L).get(5, TimeUnit.SECONDS);
-
-        assertThat(userDetails).isPresent();
-        assertThat(userDetails.get().id()).isEqualTo(0L);
-        assertThat(userDetails.get().username()).isEqualTo(USERNAME);
-    }
-
-    @Test
-    void shouldGetEmptyOptionalWhenUserDoesNotExist() throws Exception {
-        var userDetails = client.users().getUser(123456L).get(5, 
TimeUnit.SECONDS);
-
-        assertThat(userDetails).isNotPresent();
-    }
-
-    @Test
-    void shouldGetUsers() throws Exception {
-        var users = client.users().getUsers().get(5, TimeUnit.SECONDS);
-
-        assertThat(users).isNotEmpty();
-        assertThat(users).hasSize(1);
-        assertThat(users.get(0).username()).isEqualTo(USERNAME);
-    }
-
-    @Test
-    void shouldCreateAndDeleteUsers() throws Exception {
-        var users = client.users().getUsers().get(5, TimeUnit.SECONDS);
-        assertThat(users).hasSize(1);
-
-        var globalPermissions =
-                new GlobalPermissions(true, false, false, false, false, false, 
false, false, false, false);
-        var permissions = Optional.of(new Permissions(globalPermissions, 
Map.of()));
-
-        var foo = client.users()
-                .createUser("foo", "foo", UserStatus.Active, permissions)
-                .get(5, TimeUnit.SECONDS);
-        assertThat(foo).isNotNull();
-        assertThat(foo.permissions()).isPresent();
-        
assertThat(foo.permissions().get().global()).isEqualTo(globalPermissions);
-
-        var bar = client.users()
-                .createUser("bar", "bar", UserStatus.Active, permissions)
-                .get(5, TimeUnit.SECONDS);
-        assertThat(bar).isNotNull();
-        assertThat(bar.permissions()).isPresent();
-        
assertThat(bar.permissions().get().global()).isEqualTo(globalPermissions);
-
-        users = client.users().getUsers().get(5, TimeUnit.SECONDS);
-
-        assertThat(users).hasSize(3);
-        
assertThat(users).map(UserInfo::username).containsExactlyInAnyOrder(USERNAME, 
"foo", "bar");
-
-        client.users().deleteUser(foo.id()).get(5, TimeUnit.SECONDS);
-        users = client.users().getUsers().get(5, TimeUnit.SECONDS);
-        assertThat(users).hasSize(2);
-
-        client.users().deleteUser(UserId.of(bar.id())).get(5, 
TimeUnit.SECONDS);
-        users = client.users().getUsers().get(5, TimeUnit.SECONDS);
-        assertThat(users).hasSize(1);
-    }
-
-    @Test
-    void shouldUpdateUser() throws Exception {
-        var created = client.users()
-                .createUser("test", "test", UserStatus.Active, 
Optional.empty())
-                .get(5, TimeUnit.SECONDS);
-
-        client.users()
-                .updateUser(created.id(), Optional.of("foo"), 
Optional.of(UserStatus.Inactive))
-                .get(5, TimeUnit.SECONDS);
-
-        var user = client.users().getUser(created.id()).get(5, 
TimeUnit.SECONDS);
-        assertThat(user).isPresent();
-        assertThat(user.get().username()).isEqualTo("foo");
-        assertThat(user.get().status()).isEqualTo(UserStatus.Inactive);
-
-        client.users()
-                .updateUser(created.id(), Optional.empty(), 
Optional.of(UserStatus.Active))
-                .get(5, TimeUnit.SECONDS);
-
-        user = client.users().getUser(created.id()).get(5, TimeUnit.SECONDS);
-        assertThat(user).isPresent();
-        assertThat(user.get().username()).isEqualTo("foo");
-        assertThat(user.get().status()).isEqualTo(UserStatus.Active);
-
-        client.users()
-                .updateUser(UserId.of(created.id()), Optional.of("test"), 
Optional.empty())
-                .get(5, TimeUnit.SECONDS);
-
-        user = client.users().getUser(created.id()).get(5, TimeUnit.SECONDS);
-        assertThat(user).isPresent();
-        assertThat(user.get().username()).isEqualTo("test");
-        assertThat(user.get().status()).isEqualTo(UserStatus.Active);
-
-        client.users().deleteUser(created.id()).get(5, TimeUnit.SECONDS);
-
-        var users = client.users().getUsers().get(5, TimeUnit.SECONDS);
-        assertThat(users).hasSize(1);
-    }
-
-    @Test
-    void shouldUpdatePermissions() throws Exception {
-        var created = client.users()
-                .createUser("test", "test", UserStatus.Active, 
Optional.empty())
-                .get(5, TimeUnit.SECONDS);
-
-        var allPermissions = new Permissions(
-                new GlobalPermissions(true, true, true, true, true, true, 
true, true, true, true), Map.of());
-        var noPermissions = new Permissions(
-                new GlobalPermissions(false, false, false, false, false, 
false, false, false, false, false), Map.of());
-
-        client.users()
-                .updatePermissions(created.id(), Optional.of(allPermissions))
-                .get(5, TimeUnit.SECONDS);
-
-        var user = client.users().getUser(created.id()).get(5, 
TimeUnit.SECONDS);
-        assertThat(user).isPresent();
-        assertThat(user.get().permissions()).isPresent();
-        assertThat(user.get().permissions().get()).isEqualTo(allPermissions);
-
-        client.users()
-                .updatePermissions(UserId.of(created.id()), 
Optional.of(noPermissions))
-                .get(5, TimeUnit.SECONDS);
-
-        user = client.users().getUser(created.id()).get(5, TimeUnit.SECONDS);
-        assertThat(user).isPresent();
-        assertThat(user.get().permissions()).isPresent();
-        assertThat(user.get().permissions().get()).isEqualTo(noPermissions);
-
-        client.users().deleteUser(created.id()).get(5, TimeUnit.SECONDS);
-
-        var users = client.users().getUsers().get(5, TimeUnit.SECONDS);
-        assertThat(users).hasSize(1);
-    }
-
-    @Test
-    void shouldChangePassword() throws Exception {
-        var newUser = client.users()
-                .createUser("test", "test", UserStatus.Active, 
Optional.empty())
-                .get(5, TimeUnit.SECONDS);
-        client.users().logout().get(5, TimeUnit.SECONDS);
-
-        var identity = client.users().login("test", "test").get(5, 
TimeUnit.SECONDS);
-        assertThat(identity).isNotNull();
-        assertThat(identity.userId()).isEqualTo(newUser.id());
-
-        client.users().changePassword(identity.userId(), "test", 
"foobar").get(5, TimeUnit.SECONDS);
-        client.users().logout().get(5, TimeUnit.SECONDS);
-        identity = client.users().login("test", "foobar").get(5, 
TimeUnit.SECONDS);
-        assertThat(identity).isNotNull();
-        assertThat(identity.userId()).isEqualTo(newUser.id());
-
-        client.users()
-                .changePassword(UserId.of(identity.userId()), "foobar", 
"barfoo")
-                .get(5, TimeUnit.SECONDS);
-        client.users().logout().get(5, TimeUnit.SECONDS);
-        identity = client.users().login("test", "barfoo").get(5, 
TimeUnit.SECONDS);
-        assertThat(identity).isNotNull();
-        assertThat(identity.userId()).isEqualTo(newUser.id());
-
-        client.users().logout().get(5, TimeUnit.SECONDS);
-        client.users().login(USERNAME, PASSWORD).get(5, TimeUnit.SECONDS);
-        client.users().deleteUser(newUser.id()).get(5, TimeUnit.SECONDS);
-
-        var users = client.users().getUsers().get(5, TimeUnit.SECONDS);
-        assertThat(users).hasSize(1);
-    }
-}
diff --git 
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/ConsumerGroupsClientBaseTest.java
 
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/ConsumerGroupsClientBaseTest.java
index 00e5e1eb2..fc5d85d28 100644
--- 
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/ConsumerGroupsClientBaseTest.java
+++ 
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/ConsumerGroupsClientBaseTest.java
@@ -77,7 +77,7 @@ public abstract class ConsumerGroupsClientBaseTest extends 
IntegrationTest {
         String groupName = "consumes-group-42";
         createConsumerGroup(groupName);
         var consumerGroup = consumerGroupsClient.getConsumerGroup(STREAM_NAME, 
TOPIC_NAME, ConsumerId.of(groupName));
-        assert consumerGroup.isPresent();
+        assertThat(consumerGroup).isPresent();
 
         // when
         consumerGroupsClient.deleteConsumerGroup(STREAM_NAME, TOPIC_NAME, 
ConsumerId.of(groupName));
diff --git 
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/MessagesClientBaseTest.java
 
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/MessagesClientBaseTest.java
index feacaacbf..040e1790b 100644
--- 
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/MessagesClientBaseTest.java
+++ 
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/MessagesClientBaseTest.java
@@ -29,6 +29,7 @@ import org.junit.jupiter.api.Test;
 
 import java.math.BigInteger;
 import java.util.List;
+import java.util.Optional;
 
 import static java.util.Optional.empty;
 import static org.apache.iggy.TestConstants.STREAM_NAME;
@@ -68,6 +69,87 @@ public abstract class MessagesClientBaseTest extends 
IntegrationTest {
         assertThat(polledMessages.messages()).hasSize(1);
     }
 
+    @Test
+    void shouldPollMessagesWithFirstStrategy() {
+        // given
+        setUpStreamAndTopic();
+        messagesClient.sendMessages(
+                STREAM_NAME,
+                TOPIC_NAME,
+                Partitioning.partitionId(0L),
+                List.of(Message.of("first"), Message.of("second"), 
Message.of("third")));
+
+        // when
+        var polledMessages = messagesClient.pollMessages(
+                STREAM_NAME, TOPIC_NAME, Optional.of(0L), Consumer.of(0L), 
PollingStrategy.first(), 10L, false);
+
+        // then
+        assertThat(polledMessages.messages()).hasSize(3);
+        assertThat(new 
String(polledMessages.messages().get(0).payload())).isEqualTo("first");
+    }
+
+    @Test
+    void shouldPollMessagesWithOffsetStrategy() {
+        // given
+        setUpStreamAndTopic();
+        messagesClient.sendMessages(
+                STREAM_NAME,
+                TOPIC_NAME,
+                Partitioning.partitionId(0L),
+                List.of(Message.of("msg-0"), Message.of("msg-1"), 
Message.of("msg-2")));
+
+        // when — poll starting from offset 1 (skip first message)
+        var polledMessages = messagesClient.pollMessages(
+                STREAM_NAME,
+                TOPIC_NAME,
+                Optional.of(0L),
+                Consumer.of(0L),
+                PollingStrategy.offset(BigInteger.ONE),
+                10L,
+                false);
+
+        // then
+        assertThat(polledMessages.messages()).hasSize(2);
+        assertThat(new 
String(polledMessages.messages().get(0).payload())).isEqualTo("msg-1");
+        assertThat(new 
String(polledMessages.messages().get(1).payload())).isEqualTo("msg-2");
+    }
+
+    @Test
+    void shouldPollMessagesWithLastStrategy() {
+        // given
+        setUpStreamAndTopic();
+        messagesClient.sendMessages(
+                STREAM_NAME,
+                TOPIC_NAME,
+                Partitioning.partitionId(0L),
+                List.of(Message.of("msg-0"), Message.of("msg-1"), 
Message.of("msg-2")));
+
+        // when
+        var polledMessages = messagesClient.pollMessages(
+                STREAM_NAME, TOPIC_NAME, Optional.of(0L), Consumer.of(0L), 
PollingStrategy.last(), 1L, false);
+
+        // then
+        assertThat(polledMessages.messages()).hasSize(1);
+        assertThat(new 
String(polledMessages.messages().get(0).payload())).isEqualTo("msg-2");
+    }
+
+    @Test
+    void shouldVerifyMessageContentRoundTrip() {
+        // given
+        setUpStreamAndTopic();
+        String content = "hello from java sdk – special chars: łóżko, 日本語, 
emoji 🎉";
+        messagesClient.sendMessages(
+                STREAM_NAME, TOPIC_NAME, Partitioning.partitionId(0L), 
List.of(Message.of(content)));
+
+        // when
+        var polledMessages = messagesClient.pollMessages(
+                STREAM_NAME, TOPIC_NAME, Optional.of(0L), Consumer.of(0L), 
PollingStrategy.first(), 10L, false);
+
+        // then
+        assertThat(polledMessages.messages()).hasSize(1);
+        assertThat(new 
String(polledMessages.messages().get(0).payload())).isEqualTo(content);
+    }
+
     @Test
     void shouldSendMessageWithBalancedPartitioning() {
         // given
diff --git 
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/UsersClientBaseTest.java
 
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/UsersClientBaseTest.java
index 95f376cb1..78bcea06c 100644
--- 
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/UsersClientBaseTest.java
+++ 
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/UsersClientBaseTest.java
@@ -64,6 +64,8 @@ public abstract class UsersClientBaseTest extends 
IntegrationTest {
 
         // then
         assertThat(user).isPresent();
+        assertThat(user.get().id()).isEqualTo(0L);
+        assertThat(user.get().username()).isEqualTo("iggy");
     }
 
     @Test
@@ -103,6 +105,40 @@ public abstract class UsersClientBaseTest extends 
IntegrationTest {
         assertThat(users).hasSize(1);
     }
 
+    @Test
+    void shouldUpdateUsername() {
+        // given
+        login();
+        UserInfoDetails user = usersClient.createUser("test", "test", 
UserStatus.Active, Optional.empty());
+        trackUser(user.id());
+
+        // when
+        usersClient.updateUser(user.id(), Optional.of("new-name"), 
Optional.empty());
+
+        // then
+        var updatedUser = usersClient.getUser(user.id());
+        assertThat(updatedUser).isPresent();
+        assertThat(updatedUser.get().username()).isEqualTo("new-name");
+        assertThat(updatedUser.get().status()).isEqualTo(UserStatus.Active);
+    }
+
+    @Test
+    void shouldUpdateUsernameAndStatusTogether() {
+        // given
+        login();
+        UserInfoDetails user = usersClient.createUser("test", "test", 
UserStatus.Active, Optional.empty());
+        trackUser(user.id());
+
+        // when
+        usersClient.updateUser(user.id(), Optional.of("renamed"), 
Optional.of(UserStatus.Inactive));
+
+        // then
+        var updatedUser = usersClient.getUser(user.id());
+        assertThat(updatedUser).isPresent();
+        assertThat(updatedUser.get().username()).isEqualTo("renamed");
+        assertThat(updatedUser.get().status()).isEqualTo(UserStatus.Inactive);
+    }
+
     @Test
     void shouldUpdateUserStatus() {
         // given
@@ -145,18 +181,32 @@ public abstract class UsersClientBaseTest extends 
IntegrationTest {
     @Test
     void shouldChangePassword() {
         // given
-        IdentityInfo identity = usersClient.login("iggy", "iggy");
+        login();
+        UserInfoDetails testUser = usersClient.createUser("test", "test", 
UserStatus.Active, Optional.empty());
+        trackUser(testUser.id());
+        usersClient.logout();
 
-        // when
-        usersClient.changePassword(identity.userId(), "iggy", "new-pass");
+        IdentityInfo identity = usersClient.login("test", "test");
+        assertThat(identity).isNotNull();
+        assertThat(identity.userId()).isEqualTo(testUser.id());
+
+        // when — first password change
+        usersClient.changePassword(identity.userId(), "test", "new-pass");
         usersClient.logout();
-        IdentityInfo newLogin = usersClient.login("iggy", "new-pass");
+        IdentityInfo afterFirstChange = usersClient.login("test", "new-pass");
 
         // then
-        assertThat(newLogin).isNotNull();
+        assertThat(afterFirstChange).isNotNull();
+        assertThat(afterFirstChange.userId()).isEqualTo(testUser.id());
+
+        // when — second password change
+        usersClient.changePassword(identity.userId(), "new-pass", 
"final-pass");
+        usersClient.logout();
+        IdentityInfo afterSecondChange = usersClient.login("test", 
"final-pass");
 
-        // restore original password for other tests
-        usersClient.changePassword(identity.userId(), "new-pass", "iggy");
+        // then
+        assertThat(afterSecondChange).isNotNull();
+        assertThat(afterSecondChange.userId()).isEqualTo(testUser.id());
     }
 
     @Test

Reply via email to