mmodzelewski commented on code in PR #2213:
URL: https://github.com/apache/iggy/pull/2213#discussion_r2399988128


##########
foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncBytesDeserializer.java:
##########


Review Comment:
   Serializers and deserializers are independent of execution model, both 
blocking and async clients should use the same mappers, so this should be 
refactored.



##########
foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java:
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.client.async.tcp;
+
+import io.netty.buffer.Unpooled;
+import org.apache.iggy.client.async.AsyncConsumerGroupsClient;
+import org.apache.iggy.client.async.AsyncMessagesClient;
+import org.apache.iggy.client.async.AsyncStreamsClient;
+import org.apache.iggy.client.async.AsyncTopicsClient;
+import org.apache.iggy.client.blocking.tcp.CommandCode;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Async TCP client for Apache Iggy using Netty.
+ * This is a true async implementation with non-blocking I/O.
+ */
+public class AsyncIggyTcpClient {
+
+    private final String host;
+    private final int port;
+    private AsyncTcpConnection connection;
+    private AsyncMessagesClient messagesClient;
+    private AsyncConsumerGroupsClient consumerGroupsClient;
+    private AsyncStreamsClient streamsClient;
+    private AsyncTopicsClient topicsClient;
+
+    public AsyncIggyTcpClient(String host, int port) {
+        this.host = host;
+        this.port = port;
+    }
+
+    /**
+     * Connects to the Iggy server asynchronously.
+     */
+    public CompletableFuture<Void> connect() {
+        connection = new AsyncTcpConnection(host, port);
+        return connection.connect()
+            .thenRun(() -> {
+                messagesClient = new AsyncMessagesTcpClient(connection);
+                consumerGroupsClient = new 
AsyncConsumerGroupsTcpClient(connection);
+                streamsClient = new AsyncStreamsTcpClient(connection);
+                topicsClient = new AsyncTopicsTcpClient(connection);
+            });
+    }
+
+    /**
+     * Logs in to the server asynchronously.
+     */
+    public CompletableFuture<Void> login(String username, String password) {

Review Comment:
   please align with the blocking client and move login under Users client



##########
foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncClientIntegrationTest.java:
##########
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.client.async;
+
+import org.apache.iggy.client.async.tcp.AsyncIggyTcpClient;
+import org.apache.iggy.consumergroup.Consumer;
+import org.apache.iggy.identifier.StreamId;
+import org.apache.iggy.identifier.TopicId;
+import org.apache.iggy.message.Message;
+import org.apache.iggy.message.Partitioning;
+import org.apache.iggy.message.PollingStrategy;
+import org.apache.iggy.topic.CompressionAlgorithm;
+import org.junit.jupiter.api.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+/**
+ * Integration test for the complete async client flow.
+ * Tests connection, authentication, stream/topic management, and message 
operations.
+ */
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+public class AsyncClientIntegrationTest {
+    private static final Logger logger = 
LoggerFactory.getLogger(AsyncClientIntegrationTest.class);
+
+    private static final String HOST = "127.0.0.1";
+    private static final int PORT = 8090;
+    private static final String USERNAME = "iggy";
+    private static final String PASSWORD = "iggy";
+
+    private static final String TEST_STREAM = "async-test-stream-" + 
UUID.randomUUID();
+    private static final String TEST_TOPIC = "async-test-topic";
+    private static final long PARTITION_ID = 1L;
+
+    private static AsyncIggyTcpClient client;
+
+    @BeforeAll
+    public static void setup() throws Exception {
+        logger.info("Setting up async client for integration tests");
+        client = new AsyncIggyTcpClient(HOST, PORT);
+
+        // Connect and login
+        client.connect()
+            .thenCompose(v -> {
+                logger.info("Connected to Iggy server");
+                return client.login(USERNAME, PASSWORD);
+            })
+            .get(5, TimeUnit.SECONDS);
+
+        logger.info("Successfully logged in as: {}", USERNAME);
+    }
+
+    @AfterAll
+    public static void tearDown() throws Exception {
+        logger.info("Cleaning up test resources");
+
+        try {
+            // Clean up test stream if it exists
+            client.streams().deleteStreamAsync(StreamId.of(TEST_STREAM))
+                .get(5, TimeUnit.SECONDS);
+            logger.info("Deleted test stream: {}", TEST_STREAM);
+        } catch (Exception e) {
+            // Stream may not exist, which is fine
+            logger.debug("Stream cleanup failed (may not exist): {}", 
e.getMessage());
+        }
+
+        // Close the client
+        if (client != null) {
+            client.close().get(5, TimeUnit.SECONDS);
+            logger.info("Closed async client");
+        }
+    }
+
+    @Test
+    @Order(1)
+    public void testCreateStream() throws Exception {
+        logger.info("Testing stream creation");
+
+        var streamDetails = client.streams()
+            .createStreamAsync(Optional.empty(), TEST_STREAM)
+            .get(5, TimeUnit.SECONDS);
+
+        assertNotNull(streamDetails);
+        assertEquals(TEST_STREAM, streamDetails.name());
+        logger.info("Successfully created stream: {}", streamDetails.name());
+    }
+
+    @Test
+    @Order(2)
+    public void testGetStream() throws Exception {
+        logger.info("Testing stream retrieval");
+
+        var streamOpt = client.streams()
+            .getStreamAsync(StreamId.of(TEST_STREAM))
+            .get(5, TimeUnit.SECONDS);
+
+        assertTrue(streamOpt.isPresent());

Review Comment:
   please align with the existing convention and use AssertJ assertions instead



##########
foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncConsumerGroupsTcpClient.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.client.async.tcp;
+
+import io.netty.buffer.Unpooled;
+import org.apache.iggy.client.async.AsyncConsumerGroupsClient;
+import org.apache.iggy.client.blocking.tcp.CommandCode;
+import org.apache.iggy.identifier.ConsumerId;
+import org.apache.iggy.identifier.StreamId;
+import org.apache.iggy.identifier.TopicId;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Async TCP implementation of consumer groups client.
+ */
+public class AsyncConsumerGroupsTcpClient implements AsyncConsumerGroupsClient 
{
+
+    private final AsyncTcpConnection connection;
+
+    public AsyncConsumerGroupsTcpClient(AsyncTcpConnection connection) {
+        this.connection = connection;
+    }
+
+    @Override
+    public CompletableFuture<Void> joinConsumerGroup(StreamId streamId, 
TopicId topicId, ConsumerId groupId) {
+        var payload = Unpooled.buffer();
+
+        // Serialize stream ID
+        payload.writeBytes(AsyncBytesSerializer.toBytes(streamId));
+
+        // Serialize topic ID
+        payload.writeBytes(AsyncBytesSerializer.toBytes(topicId));
+
+        // Serialize consumer group ID
+        payload.writeBytes(AsyncBytesSerializer.toBytes(groupId));
+
+        System.out.println("Joining consumer group - Stream: " + streamId + ", 
Topic: " + topicId + ", Group: " + groupId);

Review Comment:
   Please change all system prints to use Logger with proper logging level



##########
foreign/java/BUILD_AND_TEST.md:
##########
@@ -0,0 +1,320 @@
+# Apache Iggy Java SDK - Build and Test Guide
+
+## Overview
+
+This document provides comprehensive instructions for building and testing the
+Apache Iggy Java SDK, including the new async client implementation with fixed
+polling functionality.
+
+## Prerequisites
+
+### Required Software
+
+- **Java**: JDK 17 or higher
+- **Gradle**: Version 8.3+ (included via wrapper)
+- **Iggy Server**: Running instance on localhost:8090
+- **Git**: For version control
+
+### Starting the Iggy Server
+
+```bash
+# Navigate to Iggy server directory
+cd /path/to/iggy
+
+# Start server with default credentials
+cargo run --bin iggy-server -- --with-default-root-credentials
+```
+
+The server should be running on `127.0.0.1:8090` with default credentials:
+
+- Username: `iggy`
+- Password: `iggy`
+
+## Project Structure
+
+```bash
+iggy/foreign/java/
+├── java-sdk/           # Main SDK implementation
+│   ├── src/
+│   │   ├── main/      # Source code
+│   │   │   └── java/org/apache/iggy/
+│   │   │       ├── client/
+│   │   │       │   ├── async/        # Async client implementation
+│   │   │       │   └── blocking/     # Blocking client implementation
+│   │   │       └── ...
+│   │   └── test/      # Test code
+│   │       └── java/org/apache/iggy/client/async/
+│   │           └── AsyncPollMessageTest.java  # test for polling fn
+│   └── build.gradle
+├── examples/          # Example applications
+└── gradlew           # Gradle wrapper
+```
+
+## Building the Project
+
+### 1. Clean Build
+
+```bash
+cd iggy/foreign/java/java-sdk
+
+# Clean previous builds
+../gradlew clean
+
+# Build without running tests
+../gradlew build -x test
+
+# Or build with tests (skip checkstyle)
+../gradlew build -x checkstyleMain -x checkstyleTest
+```
+
+### 2. Compile Only
+
+```bash
+# Compile main source code
+../gradlew compileJava
+
+# Compile test code
+../gradlew compileTestJava
+```
+
+## Running Tests
+
+### 1. Run All Tests
+
+```bash
+# Run all tests (skip checkstyle)
+../gradlew test -x checkstyleMain -x checkstyleTest
+```
+
+### 2. Run Specific Test Class
+
+```bash
+# Run AsyncPollMessageTest specifically
+../gradlew test --tests AsyncPollMessageTest -x checkstyleMain -x 
checkstyleTest
+
+# Run with detailed output
+../gradlew test --tests AsyncPollMessageTest --info -x checkstyleMain -x 
checkstyleTest
+```
+
+### 3. Run Test Suite by Package
+
+```bash
+# Run all async client tests
+../gradlew test --tests "org.apache.iggy.client.async.*" -x checkstyleMain -x 
checkstyleTest
+```
+
+### 4. View Test Results
+
+Test reports are generated at:
+
+```bash
+java-sdk/build/reports/tests/test/index.html
+```
+
+Open this file in a browser to view detailed test results.
+
+## Key Test: AsyncPollMessageTest

Review Comment:
   While some parts of this document might be useful, like information about 
how to build/test. I don't think it should have information about main tests, 
test coverage or fixes applied. This will quickly go out of sync with the 
actual code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to