atharvalade commented on code in PR #2711:
URL: https://github.com/apache/iggy/pull/2711#discussion_r2818215127


##########
examples/java/src/main/java/org/apache/iggy/examples/async/AsyncProducer.java:
##########
@@ -29,183 +30,239 @@
 import org.slf4j.LoggerFactory;
 
 import java.math.BigInteger;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
- * AsyncProducer demonstrates how to use the async client to send messages to 
Apache Iggy.
- * This producer sends messages asynchronously and handles responses using 
CompletableFuture.
+ * Async Producer Example - High Throughput
+ *
+ * <p>Demonstrates high-throughput message production using the async 
(non-blocking) Iggy client.
+ *
+ * <h2>WHEN TO USE THE ASYNC CLIENT:</h2>
+ * <ul>
+ *   <li>You need &gt; 5000 msg/sec throughput</li>
+ *   <li>Your application is already async/reactive (Spring WebFlux, Vert.x, 
etc.)</li>
+ *   <li>You want to pipeline multiple requests over a single connection</li>
+ *   <li>You're building a service that handles many concurrent streams</li>
+ * </ul>
+ *
+ * <h2>KEY DIFFERENCES FROM BLOCKING CLIENT:</h2>
+ * <ul>
+ *   <li>All methods return CompletableFuture instead of direct values</li>
+ *   <li>Built on Netty's non-blocking I/O (no thread-per-request)</li>
+ *   <li>Can pipeline multiple send operations without waiting for each to 
complete</li>
+ *   <li>Single connection handles all concurrent requests via event loop 
multiplexing</li>
+ * </ul>
+ *
+ * <h2>PERFORMANCE CHARACTERISTICS:</h2>
+ * <ul>
+ *   <li>Throughput: Higher (pipelines requests, no thread contention)</li>
+ *   <li>Latency per request: Similar to blocking</li>
+ *   <li>Thread usage: Minimal (Netty event loop threads)</li>
+ *   <li>Code complexity: Higher (futures, callbacks)</li>
+ * </ul>
+ *
+ * <p>This example shows:
+ * <ul>
+ *   <li>Async client setup with CompletableFuture chaining</li>
+ *   <li>Pipelined message sending (fire multiple sends without blocking)</li>
+ *   <li>Error handling with exceptionally()</li>
+ *   <li>Performance measurement</li>
+ *   <li>Proper async shutdown</li>
+ * </ul>
+ *
+ * <p>Run with: {@code ./gradlew runAsyncProducer}
  */
-public class AsyncProducer {
+public final class AsyncProducer {
     private static final Logger log = 
LoggerFactory.getLogger(AsyncProducer.class);
 
-    private static final String HOST = "127.0.0.1";
-    private static final int PORT = 8090;
+    // Configuration
+    private static final String IGGY_HOST = "localhost";
+    private static final int IGGY_PORT = 8090;
     private static final String USERNAME = "iggy";
     private static final String PASSWORD = "iggy";
+    private static final String STREAM_NAME = "async-example-stream";
+    private static final String TOPIC_NAME = "async-example-topic";
+    private static final int PARTITION_COUNT = 3;
 
-    private static final String STREAM_NAME = "async-test";
-    private static final String TOPIC_NAME = "events";
-    private static final long PARTITION_ID = 0L;
+    // High-throughput configuration
+    private static final int MESSAGE_BATCH_SIZE = 500; // Larger batches for 
async
+    private static final int TOTAL_BATCHES = 20;
+    private static final int MAX_IN_FLIGHT = 5; // Pipeline up to 5 concurrent 
sends
 
-    private static final int MESSAGE_COUNT = 100;
-    private static final int MESSAGE_SIZE = 256;
+    private AsyncProducer() {
+        // Utility class
+    }
 
-    private final AsyncIggyTcpClient client;
-    private final AtomicInteger successCount = new AtomicInteger(0);
-    private final AtomicInteger errorCount = new AtomicInteger(0);
+    public static void main(String[] args) {
+        AsyncIggyTcpClient client = null;
 
-    public AsyncProducer() {
-        this.client = new AsyncIggyTcpClient(HOST, PORT);
-    }
+        try {
+            log.info("=== Async Producer Example (High Throughput) ===");
 
-    public CompletableFuture<Void> start() {
-        log.info("Starting AsyncProducer...");
+            // 1. Build, connect, and login - all chained with 
CompletableFuture
+            log.info("Connecting to Iggy server at {}:{}...", IGGY_HOST, 
IGGY_PORT);
 
-        return client.connect()
-                .thenCompose(v -> {
-                    log.info("Connected to Iggy server at {}:{}", HOST, PORT);
-                    return client.users().login(USERNAME, PASSWORD);
-                })
-                .thenCompose(v -> {
-                    log.info("Logged in successfully as user: {}", USERNAME);
-                    return setupStreamAndTopic();
-                })
-                .thenCompose(v -> {
-                    log.info("Stream and topic setup complete");
-                    return sendMessages();
-                })
-                .thenRun(() -> {
-                    log.info("All messages sent. Success: {}, Errors: {}", 
successCount.get(), errorCount.get());
-                })
-                .exceptionally(ex -> {
-                    log.error("Error in producer flow", ex);
-                    return null;
-                });
+            // ASYNC PATTERN: Use join() only at the end to block until client 
is ready.
+            // In a real async app (e.g. Spring WebFlux), you'd chain 
everything without join().
+            client = Iggy.tcpClientBuilder()
+                    .async()
+                    .host(IGGY_HOST)
+                    .port(IGGY_PORT)
+                    .credentials(USERNAME, PASSWORD)
+                    .buildAndLogin()
+                    .join(); // Block here to wait for connection
+
+            log.info("Connected successfully");
+
+            // 2. Setup stream and topic
+            AsyncIggyTcpClient finalClient = client;
+            setupStreamAndTopic(finalClient).join();
+
+            // 3. Send messages with pipelining
+            sendMessagesAsync(finalClient).join();
+
+            log.info("=== Producer completed successfully ===");
+
+        } catch (RuntimeException e) {
+            log.error("Producer failed", e);
+            System.exit(1);
+        } finally {
+            // Always close the client
+            if (client != null) {
+                try {
+                    client.close().join();
+                    log.info("Client closed");
+                } catch (RuntimeException e) {
+                    log.error("Error closing client", e);
+                }
+            }
+        }
     }
 
-    private CompletableFuture<Void> setupStreamAndTopic() {
-        log.info("Checking stream: {}", STREAM_NAME);
+    private static CompletableFuture<Void> 
setupStreamAndTopic(AsyncIggyTcpClient client) {
+        // ASYNC CHAINING PATTERN:
+        // Each operation returns CompletableFuture. We chain them with 
thenCompose().
+        // Errors propagate down the chain and can be handled with 
exceptionally().
 
         return client.streams()
                 .getStream(StreamId.of(STREAM_NAME))
-                .thenCompose(stream -> {
-                    if (stream.isEmpty()) {
-                        log.info("Creating stream: {}", STREAM_NAME);
-                        return client.streams()
-                                .createStream(STREAM_NAME)
-                                .thenAccept(created -> log.info("Stream 
created: {}", created.name()));
-                    } else {
-                        log.info("Stream exists: {}", STREAM_NAME);
-                        return CompletableFuture.completedFuture(null);
-                    }
+                .thenApply(stream -> {
+                    log.info("Stream '{}' already exists", STREAM_NAME);

Review Comment:
   Rewrote `setupStreamAndTopic` to properly check `Optional.isPresent`() 
before acting, and simplified the producer to use the builder pattern 
consistent with `AsyncConsumer`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to