This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9e60fcc87f2 KAFKA-18181 Refactor ShareConsumerTest (#18105)
9e60fcc87f2 is described below

commit 9e60fcc87f2f74561a6ece97ed0774a8e42455a2
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Sat Dec 14 09:39:44 2024 +0800

    KAFKA-18181 Refactor ShareConsumerTest (#18105)
    
    Reviewers: Apoorv Mittal <[email protected]>, Chia-Ping Tsai 
<[email protected]>
---
 .../java/kafka/test/api/ShareConsumerTest.java     | 404 +++++++--------------
 1 file changed, 127 insertions(+), 277 deletions(-)

diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java 
b/core/src/test/java/kafka/test/api/ShareConsumerTest.java
index d31ca18e8b7..89b357cf4c3 100644
--- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java
+++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java
@@ -77,16 +77,14 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
 
 import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -96,7 +94,6 @@ import static 
org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
 
 @Timeout(1200)
 @Tag("integration")
@@ -1003,7 +1000,8 @@ public class ShareConsumerTest {
     @Flaky("KAFKA-18033")
     @ParameterizedTest(name = "{displayName}.persister={0}")
     @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testMultipleConsumersInGroupConcurrentConsumption(String 
persister) {
+    public void testMultipleConsumersInGroupConcurrentConsumption(String 
persister)
+            throws InterruptedException, ExecutionException, TimeoutException {
         AtomicInteger totalMessagesConsumed = new AtomicInteger(0);
 
         int consumerCount = 4;
@@ -1013,51 +1011,37 @@ public class ShareConsumerTest {
         String groupId = "group1";
         alterShareAutoOffsetReset(groupId, "earliest");
 
-        ExecutorService producerExecutorService = 
Executors.newFixedThreadPool(producerCount);
-        ExecutorService consumerExecutorService = 
Executors.newFixedThreadPool(consumerCount);
-
+        List<CompletableFuture<Void>> producerFutures = new ArrayList<>();
         for (int i = 0; i < producerCount; i++) {
-            producerExecutorService.submit(() -> 
produceMessages(messagesPerProducer));
+            producerFutures.add(CompletableFuture.runAsync(() -> 
produceMessages(messagesPerProducer)));
         }
 
-        ConcurrentLinkedQueue<CompletableFuture<Integer>> futures = new 
ConcurrentLinkedQueue<>();
         int maxBytes = 100000;
+        List<CompletableFuture<Integer>> consumerFutures = new ArrayList<>();
         for (int i = 0; i < consumerCount; i++) {
             final int consumerNumber = i + 1;
-            consumerExecutorService.submit(() -> {
-                CompletableFuture<Integer> future = new CompletableFuture<>();
-                futures.add(future);
-                consumeMessages(totalMessagesConsumed, producerCount * 
messagesPerProducer, groupId, consumerNumber, 30, true, future, maxBytes);
-            });
+            consumerFutures.add(CompletableFuture.supplyAsync(() ->
+                    consumeMessages(totalMessagesConsumed,
+                            producerCount * messagesPerProducer, groupId, 
consumerNumber,
+                            30, true, maxBytes)));
         }
 
-        producerExecutorService.shutdown();
-        consumerExecutorService.shutdown();
+        
CompletableFuture.allOf(producerFutures.toArray(CompletableFuture[]::new)).get(60,
 TimeUnit.SECONDS);
+        
CompletableFuture.allOf(consumerFutures.toArray(CompletableFuture[]::new)).get(60,
 TimeUnit.SECONDS);
 
-        try {
-            assertTrue(producerExecutorService.awaitTermination(60, 
TimeUnit.SECONDS)); // Wait for all producer threads to complete
-            assertTrue(consumerExecutorService.awaitTermination(60, 
TimeUnit.SECONDS)); // Wait for all consumer threads to complete
-            int totalResult = 0;
-            for (CompletableFuture<Integer> future : futures) {
-                totalResult += future.get();
-            }
-            assertEquals(producerCount * messagesPerProducer, 
totalMessagesConsumed.get());
-            assertEquals(producerCount * messagesPerProducer, totalResult);
-        } catch (Exception e) {
-            fail("Exception occurred : " + e.getMessage());
-        }
+        int totalResult = 
consumerFutures.stream().mapToInt(CompletableFuture::join).sum();
+        assertEquals(producerCount * messagesPerProducer, totalResult);
     }
 
     @ParameterizedTest(name = "{displayName}.persister={0}")
     @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    @SuppressWarnings("NPathComplexity")
-    public void 
testMultipleConsumersInMultipleGroupsConcurrentConsumption(String persister) {
+    public void 
testMultipleConsumersInMultipleGroupsConcurrentConsumption(String persister)
+            throws ExecutionException, InterruptedException, TimeoutException {
         AtomicInteger totalMessagesConsumedGroup1 = new AtomicInteger(0);
         AtomicInteger totalMessagesConsumedGroup2 = new AtomicInteger(0);
         AtomicInteger totalMessagesConsumedGroup3 = new AtomicInteger(0);
 
         int producerCount = 4;
-        int consumerCount = 2;
         int messagesPerProducer = 2000;
         final int totalMessagesSent = producerCount * messagesPerProducer;
 
@@ -1069,100 +1053,47 @@ public class ShareConsumerTest {
         alterShareAutoOffsetReset(groupId2, "earliest");
         alterShareAutoOffsetReset(groupId3, "earliest");
 
-        ExecutorService producerExecutorService = 
Executors.newFixedThreadPool(producerCount);
-        ExecutorService shareGroupExecutorService1 = 
Executors.newFixedThreadPool(consumerCount);
-        ExecutorService shareGroupExecutorService2 = 
Executors.newFixedThreadPool(consumerCount);
-        ExecutorService shareGroupExecutorService3 = 
Executors.newFixedThreadPool(consumerCount);
-
-        CountDownLatch startSignal = new CountDownLatch(producerCount);
-
-        ConcurrentLinkedQueue<CompletableFuture<Integer>> producerFutures = 
new ConcurrentLinkedQueue<>();
-
+        List<CompletableFuture<Integer>> producerFutures = new ArrayList<>();
         for (int i = 0; i < producerCount; i++) {
-            producerExecutorService.submit(() -> {
-                CompletableFuture<Integer> future = 
produceMessages(messagesPerProducer);
-                producerFutures.add(future);
-                startSignal.countDown();
-            });
+            producerFutures.add(CompletableFuture.supplyAsync(() -> 
produceMessages(messagesPerProducer)));
         }
-
-        ConcurrentLinkedQueue<CompletableFuture<Integer>> futures1 = new 
ConcurrentLinkedQueue<>();
-        ConcurrentLinkedQueue<CompletableFuture<Integer>> futures2 = new 
ConcurrentLinkedQueue<>();
-        ConcurrentLinkedQueue<CompletableFuture<Integer>> futures3 = new 
ConcurrentLinkedQueue<>();
-
         // Wait for the producers to run
-        try {
-            boolean signalled = startSignal.await(15, TimeUnit.SECONDS);
-            assertTrue(signalled);
-        } catch (InterruptedException e) {
-            fail("Exception awaiting start signal");
-        }
+        assertDoesNotThrow(() -> 
CompletableFuture.allOf(producerFutures.toArray(CompletableFuture[]::new))
+                .get(15, TimeUnit.SECONDS), "Exception awaiting 
produceMessages");
+        int actualMessageSent = 
producerFutures.stream().mapToInt(CompletableFuture::join).sum();
 
-        int maxBytes = 100000;
+        List<CompletableFuture<Integer>> consumeMessagesFutures1 = new 
ArrayList<>();
+        List<CompletableFuture<Integer>> consumeMessagesFutures2 = new 
ArrayList<>();
+        List<CompletableFuture<Integer>> consumeMessagesFutures3 = new 
ArrayList<>();
 
-        for (int i = 0; i < consumerCount; i++) {
+        int maxBytes = 100000;
+        for (int i = 0; i < 2; i++) {
             final int consumerNumber = i + 1;
-            shareGroupExecutorService1.submit(() -> {
-                CompletableFuture<Integer> future = new CompletableFuture<>();
-                futures1.add(future);
-                consumeMessages(totalMessagesConsumedGroup1, 
totalMessagesSent, "group1", consumerNumber, 100, true, future, maxBytes);
-            });
-            shareGroupExecutorService2.submit(() -> {
-                CompletableFuture<Integer> future = new CompletableFuture<>();
-                futures2.add(future);
-                consumeMessages(totalMessagesConsumedGroup2, 
totalMessagesSent, "group2", consumerNumber, 100, true, future, maxBytes);
-            });
-            shareGroupExecutorService3.submit(() -> {
-                CompletableFuture<Integer> future = new CompletableFuture<>();
-                futures3.add(future);
-                consumeMessages(totalMessagesConsumedGroup3, 
totalMessagesSent, "group3", consumerNumber, 100, true, future, maxBytes);
-            });
-        }
-        producerExecutorService.shutdown();
-        shareGroupExecutorService1.shutdown();
-        shareGroupExecutorService2.shutdown();
-        shareGroupExecutorService3.shutdown();
-        try {
-            shareGroupExecutorService1.awaitTermination(120, 
TimeUnit.SECONDS); // Wait for all consumer threads for group 1 to complete
-            shareGroupExecutorService2.awaitTermination(120, 
TimeUnit.SECONDS); // Wait for all consumer threads for group 2 to complete
-            shareGroupExecutorService3.awaitTermination(120, 
TimeUnit.SECONDS); // Wait for all consumer threads for group 3 to complete
-
-            int totalResult1 = 0;
-            for (CompletableFuture<Integer> future : futures1) {
-                totalResult1 += future.get();
-            }
+            consumeMessagesFutures1.add(CompletableFuture.supplyAsync(() ->
+                    consumeMessages(totalMessagesConsumedGroup1, 
totalMessagesSent,
+                            "group1", consumerNumber, 100, true, maxBytes)));
 
-            int totalResult2 = 0;
-            for (CompletableFuture<Integer> future : futures2) {
-                totalResult2 += future.get();
-            }
+            consumeMessagesFutures2.add(CompletableFuture.supplyAsync(() ->
+                    consumeMessages(totalMessagesConsumedGroup2, 
totalMessagesSent,
+                            "group2", consumerNumber, 100, true, maxBytes)));
 
-            int totalResult3 = 0;
-            for (CompletableFuture<Integer> future : futures3) {
-                totalResult3 += future.get();
-            }
+            consumeMessagesFutures3.add(CompletableFuture.supplyAsync(() ->
+                    consumeMessages(totalMessagesConsumedGroup3, 
totalMessagesSent,
+                            "group3", consumerNumber, 100, true, maxBytes)));
+        }
 
-            assertEquals(totalMessagesSent, totalMessagesConsumedGroup1.get());
-            assertEquals(totalMessagesSent, totalMessagesConsumedGroup2.get());
-            assertEquals(totalMessagesSent, totalMessagesConsumedGroup3.get());
-            assertEquals(totalMessagesSent, totalResult1);
-            assertEquals(totalMessagesSent, totalResult2);
-            assertEquals(totalMessagesSent, totalResult3);
+        CompletableFuture.allOf(Stream.of(consumeMessagesFutures1.stream(), 
consumeMessagesFutures2.stream(),
+                        
consumeMessagesFutures3.stream()).flatMap(Function.identity()).toArray(CompletableFuture[]::new))
+                .get(120, TimeUnit.SECONDS);
 
-            int actualMessagesSent = 0;
-            try {
-                producerExecutorService.awaitTermination(60, 
TimeUnit.SECONDS); // Wait for all producer threads to complete
+        int totalResult1 = 
consumeMessagesFutures1.stream().mapToInt(CompletableFuture::join).sum();
+        int totalResult2 = 
consumeMessagesFutures2.stream().mapToInt(CompletableFuture::join).sum();
+        int totalResult3 = 
consumeMessagesFutures3.stream().mapToInt(CompletableFuture::join).sum();
 
-                for (CompletableFuture<Integer> future : producerFutures) {
-                    actualMessagesSent += future.get();
-                }
-            } catch (Exception e) {
-                fail("Exception occurred : " + e.getMessage());
-            }
-            assertEquals(totalMessagesSent, actualMessagesSent);
-        } catch (Exception e) {
-            fail("Exception occurred : " + e.getMessage());
-        }
+        assertEquals(totalMessagesSent, totalResult1);
+        assertEquals(totalMessagesSent, totalResult2);
+        assertEquals(totalMessagesSent, totalResult3);
+        assertEquals(totalMessagesSent, actualMessageSent);
     }
 
     @ParameterizedTest(name = "{displayName}.persister={0}")
@@ -1214,7 +1145,8 @@ public class ShareConsumerTest {
 
     @ParameterizedTest(name = "{displayName}.persister={0}")
     @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void 
testMultipleConsumersInGroupFailureConcurrentConsumption(String persister) {
+    public void 
testMultipleConsumersInGroupFailureConcurrentConsumption(String persister)
+            throws InterruptedException, ExecutionException, TimeoutException {
         AtomicInteger totalMessagesConsumed = new AtomicInteger(0);
 
         int consumerCount = 4;
@@ -1225,58 +1157,35 @@ public class ShareConsumerTest {
 
         alterShareAutoOffsetReset(groupId, "earliest");
 
-        ExecutorService consumerExecutorService = 
Executors.newFixedThreadPool(consumerCount);
-        ExecutorService producerExecutorService = 
Executors.newFixedThreadPool(producerCount);
-
+        List<CompletableFuture<Void>> produceMessageFutures = new 
ArrayList<>();
         for (int i = 0; i < producerCount; i++) {
-            Runnable task = () -> produceMessages(messagesPerProducer);
-            producerExecutorService.submit(task);
+            produceMessageFutures.add(CompletableFuture.runAsync(() -> 
produceMessages(messagesPerProducer)));
         }
 
-        ConcurrentLinkedQueue<CompletableFuture<Integer>> futuresSuccess = new 
ConcurrentLinkedQueue<>();
-
-        CountDownLatch startSignal = new CountDownLatch(1);
-
         int maxBytes = 1000000;
 
-        consumerExecutorService.submit(() -> {
-            // The "failing" consumer polls but immediately closes, which 
releases the records for the other consumers
-            CompletableFuture<Integer> future = new CompletableFuture<>();
-            AtomicInteger failedMessagesConsumed = new AtomicInteger(0);
-            consumeMessages(failedMessagesConsumed, producerCount * 
messagesPerProducer, groupId, 0, 1, false, future);
-            startSignal.countDown();
-        });
+        // The "failing" consumer polls but immediately closes, which releases 
the records for the other consumers
+        CompletableFuture<Integer> failedMessagesConsumedFuture = 
CompletableFuture.supplyAsync(
+                () -> consumeMessages(new AtomicInteger(0), producerCount * 
messagesPerProducer, groupId,
+                        0, 1, false));
 
         // Wait for the failed consumer to run
-        try {
-            boolean signalled = startSignal.await(15, TimeUnit.SECONDS);
-            assertTrue(signalled);
-        } catch (InterruptedException e) {
-            fail("Exception awaiting start signal");
-        }
+        assertDoesNotThrow(() -> failedMessagesConsumedFuture.get(15, 
TimeUnit.SECONDS),
+                "Exception awaiting consumeMessages");
 
+        List<CompletableFuture<Integer>> consumeMessagesFutures = new 
ArrayList<>();
         for (int i = 0; i < consumerCount; i++) {
             final int consumerNumber = i + 1;
-            consumerExecutorService.submit(() -> {
-                CompletableFuture<Integer> future = new CompletableFuture<>();
-                futuresSuccess.add(future);
-                consumeMessages(totalMessagesConsumed, producerCount * 
messagesPerProducer, groupId, consumerNumber, 40, true, future, maxBytes);
-            });
-        }
-        producerExecutorService.shutdown();
-        consumerExecutorService.shutdown();
-        try {
-            producerExecutorService.awaitTermination(60, TimeUnit.SECONDS); // 
Wait for all producer threads to complete
-            consumerExecutorService.awaitTermination(60, TimeUnit.SECONDS); // 
Wait for all consumer threads to complete
-            int totalSuccessResult = 0;
-            for (CompletableFuture<Integer> future : futuresSuccess) {
-                totalSuccessResult += future.get();
-            }
-            assertEquals(producerCount * messagesPerProducer, 
totalMessagesConsumed.get());
-            assertEquals(producerCount * messagesPerProducer, 
totalSuccessResult);
-        } catch (Exception e) {
-            fail("Exception occurred : " + e.getMessage());
+            consumeMessagesFutures.add(CompletableFuture.supplyAsync(
+                    () -> consumeMessages(totalMessagesConsumed, producerCount 
* messagesPerProducer,
+                            groupId, consumerNumber, 40, true, maxBytes)));
         }
+
+        
CompletableFuture.allOf(produceMessageFutures.toArray(CompletableFuture[]::new)).get(60,
 TimeUnit.SECONDS);
+        
CompletableFuture.allOf(consumeMessagesFutures.toArray(CompletableFuture[]::new)).get(60,
 TimeUnit.SECONDS);
+
+        int totalSuccessResult = 
consumeMessagesFutures.stream().mapToInt(CompletableFuture::join).sum();
+        assertEquals(producerCount * messagesPerProducer, totalSuccessResult);
     }
 
     @Flaky("KAFKA-18025")
@@ -1498,7 +1407,7 @@ public class ShareConsumerTest {
                 Thread.interrupted();
             }
 
-            assertDoesNotThrow(() -> shareConsumer.poll(Duration.ZERO));
+            assertDoesNotThrow(() -> shareConsumer.poll(Duration.ZERO), 
"Failed to consume records");
         }
     }
 
@@ -1627,63 +1536,34 @@ public class ShareConsumerTest {
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), 0, null, "key".getBytes(), "value".getBytes());
 
             // We write 10 records to the topic, so they would be written from 
offsets 0-9 on the topic.
-            try {
-                for (int i = 0; i < 10; i++) {
-                    producer.send(record).get();
-                }
-            } catch (Exception e) {
-                fail("Failed to send records: " + e);
+            for (int i = 0; i < 10; i++) {
+                assertDoesNotThrow(() -> producer.send(record).get(), "Failed 
to send records");
             }
 
             // We delete records before offset 5, so the LSO should move to 5.
             adminClient.deleteRecords(Collections.singletonMap(tp, 
RecordsToDelete.beforeOffset(5L)));
 
-            AtomicInteger totalMessagesConsumed = new AtomicInteger(0);
-            CompletableFuture<Integer> future = new CompletableFuture<>();
-            consumeMessages(totalMessagesConsumed, 5, groupId, 1, 10, true, 
future);
+            int messageCount = consumeMessages(new AtomicInteger(0), 5, 
groupId, 1, 10, true);
             // The records returned belong to offsets 5-9.
-            assertEquals(5, totalMessagesConsumed.get());
-            try {
-                assertEquals(5, future.get());
-            } catch (Exception e) {
-                fail("Exception occurred : " + e.getMessage());
-            }
+            assertEquals(5, messageCount);
 
             // We write 5 records to the topic, so they would be written from 
offsets 10-14 on the topic.
-            try {
-                for (int i = 0; i < 5; i++) {
-                    producer.send(record).get();
-                }
-            } catch (Exception e) {
-                fail("Failed to send records: " + e);
+            for (int i = 0; i < 5; i++) {
+                assertDoesNotThrow(() -> producer.send(record).get(), "Failed 
to send records");
             }
 
             // We delete records before offset 14, so the LSO should move to 
14.
             adminClient.deleteRecords(Collections.singletonMap(tp, 
RecordsToDelete.beforeOffset(14L)));
 
-            totalMessagesConsumed = new AtomicInteger(0);
-            future = new CompletableFuture<>();
-            consumeMessages(totalMessagesConsumed, 1, groupId, 1, 10, true, 
future);
+            int consumeMessagesCount = consumeMessages(new AtomicInteger(0), 
1, groupId, 1, 10, true);
             // The record returned belong to offset 14.
-            assertEquals(1, totalMessagesConsumed.get());
-            try {
-                assertEquals(1, future.get());
-            } catch (Exception e) {
-                fail("Exception occurred : " + e.getMessage());
-            }
+            assertEquals(1, consumeMessagesCount);
 
             // We delete records before offset 15, so the LSO should move to 
15 and now no records should be returned.
             adminClient.deleteRecords(Collections.singletonMap(tp, 
RecordsToDelete.beforeOffset(15L)));
 
-            totalMessagesConsumed = new AtomicInteger(0);
-            future = new CompletableFuture<>();
-            consumeMessages(totalMessagesConsumed, 0, groupId, 1, 5, true, 
future);
-            assertEquals(0, totalMessagesConsumed.get());
-            try {
-                assertEquals(0, future.get());
-            } catch (Exception e) {
-                fail("Exception occurred : " + e.getMessage());
-            }
+            messageCount = consumeMessages(new AtomicInteger(0), 0, groupId, 
1, 5, true);
+            assertEquals(0, messageCount);
         }
     }
 
@@ -1740,7 +1620,7 @@ public class ShareConsumerTest {
 
     @ParameterizedTest(name = "{displayName}.persister={0}")
     @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testShareAutoOffsetResetEarliestAfterLsoMovement(String 
persister) throws Exception {
+    public void testShareAutoOffsetResetEarliestAfterLsoMovement(String 
persister) {
         alterShareAutoOffsetReset("group1", "earliest");
         try (KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
              KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer())) {
@@ -1749,23 +1629,16 @@ public class ShareConsumerTest {
 
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             // We write 10 records to the topic, so they would be written from 
offsets 0-9 on the topic.
-            try {
-                for (int i = 0; i < 10; i++) {
-                    producer.send(record).get();
-                }
-            } catch (Exception e) {
-                fail("Failed to send records: " + e);
+            for (int i = 0; i < 10; i++) {
+                assertDoesNotThrow(() -> producer.send(record).get(), "Failed 
to send records");
             }
 
             // We delete records before offset 5, so the LSO should move to 5.
             adminClient.deleteRecords(Collections.singletonMap(tp, 
RecordsToDelete.beforeOffset(5L)));
 
-            AtomicInteger totalMessagesConsumed = new AtomicInteger(0);
-            CompletableFuture<Integer> future = new CompletableFuture<>();
-            consumeMessages(totalMessagesConsumed, 5, "group1", 1, 10, true, 
future);
+            int consumedMessageCount = consumeMessages(new AtomicInteger(0), 
5, "group1", 1, 10, true);
             // The records returned belong to offsets 5-9.
-            assertEquals(5, totalMessagesConsumed.get());
-            assertEquals(5, future.get());
+            assertEquals(5, consumedMessageCount);
         }
     }
 
@@ -1809,27 +1682,13 @@ public class ShareConsumerTest {
         }
     }
 
-    private CompletableFuture<Integer> produceMessages(int messageCount) {
-        CompletableFuture<Integer> future = new CompletableFuture<>();
-        Future<?>[] recordFutures = new Future<?>[messageCount];
-        int messagesSent = 0;
+    private int produceMessages(int messageCount) {
         try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer())) {
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
-            for (int i = 0; i < messageCount; i++) {
-                recordFutures[i] = producer.send(record);
-            }
-            for (int i = 0; i < messageCount; i++) {
-                try {
-                    recordFutures[i].get();
-                    messagesSent++;
-                } catch (Exception e) {
-                    fail("Failed to send record: " + e);
-                }
-            }
-        } finally {
-            future.complete(messagesSent);
+            IntStream.range(0, messageCount).forEach(__ -> 
producer.send(record));
+            producer.flush();
         }
-        return future;
+        return messageCount;
     }
 
     private void produceMessagesWithTimestamp(int messageCount, long 
startingTimestamp) {
@@ -1843,47 +1702,47 @@ public class ShareConsumerTest {
         }
     }
 
-    private void consumeMessages(AtomicInteger totalMessagesConsumed,
+    private int consumeMessages(AtomicInteger totalMessagesConsumed,
                                  int totalMessages,
                                  String groupId,
                                  int consumerNumber,
                                  int maxPolls,
-                                 boolean commit,
-                                 CompletableFuture<Integer> future) {
-        try (KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
groupId)) {
-            shareConsumer.subscribe(Collections.singleton(tp.topic()));
-            consumeMessages(shareConsumer, totalMessagesConsumed, 
totalMessages, consumerNumber, maxPolls, commit, future);
-        } catch (Exception e) {
-            fail("Consumer " + consumerNumber + " failed with exception: " + 
e);
-        }
+                                 boolean commit) {
+        return assertDoesNotThrow(() -> {
+            try (KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(
+                    new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
groupId)) {
+                shareConsumer.subscribe(Collections.singleton(tp.topic()));
+                return consumeMessages(shareConsumer, totalMessagesConsumed, 
totalMessages, consumerNumber, maxPolls, commit);
+            }
+        }, "Consumer " + consumerNumber + " failed with exception");
     }
 
-    private void consumeMessages(AtomicInteger totalMessagesConsumed,
+    private int consumeMessages(AtomicInteger totalMessagesConsumed,
                                  int totalMessages,
                                  String groupId,
                                  int consumerNumber,
                                  int maxPolls,
                                  boolean commit,
-                                 CompletableFuture<Integer> future,
                                  int maxFetchBytes) {
-        try (KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
groupId, Map.of(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 
maxFetchBytes))) {
-            shareConsumer.subscribe(Collections.singleton(tp.topic()));
-            consumeMessages(shareConsumer, totalMessagesConsumed, 
totalMessages, consumerNumber, maxPolls, commit, future);
-        } catch (Exception e) {
-            fail("Consumer " + consumerNumber + " failed with exception: " + 
e);
-        }
+        return assertDoesNotThrow(() -> {
+            try (KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(
+                    new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
groupId,
+                    Map.of(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 
maxFetchBytes))) {
+                shareConsumer.subscribe(Collections.singleton(tp.topic()));
+                return consumeMessages(shareConsumer, totalMessagesConsumed, 
totalMessages, consumerNumber, maxPolls, commit);
+            }
+        }, "Consumer " + consumerNumber + " failed with exception");
     }
 
-    private void consumeMessages(KafkaShareConsumer<byte[], byte[]> consumer,
+    private int consumeMessages(KafkaShareConsumer<byte[], byte[]> consumer,
                                  AtomicInteger totalMessagesConsumed,
                                  int totalMessages,
                                  int consumerNumber,
                                  int maxPolls,
-                                 boolean commit,
-                                 CompletableFuture<Integer> future) {
-        int messagesConsumed = 0;
-        int retries = 0;
-        try {
+                                 boolean commit) {
+        return assertDoesNotThrow(() -> {
+            int messagesConsumed = 0;
+            int retries = 0;
             if (totalMessages > 0) {
                 while (totalMessagesConsumed.get() < totalMessages && retries 
< maxPolls) {
                     ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofMillis(2000));
@@ -1904,11 +1763,8 @@ public class ShareConsumerTest {
                 // Complete acknowledgement of the records
                 consumer.commitSync(Duration.ofMillis(10000));
             }
-        } catch (Exception e) {
-            fail("Consumer " + consumerNumber + " failed with exception: " + 
e);
-        } finally {
-            future.complete(messagesConsumed);
-        }
+            return messagesConsumed;
+        }, "Consumer " + consumerNumber + " failed with exception");
     }
 
     private <K, V> List<ConsumerRecord<K, V>> 
consumeRecords(KafkaShareConsumer<K, V> consumer,
@@ -1919,29 +1775,27 @@ public class ShareConsumerTest {
             ConsumerRecords<K, V> records = 
consumer.poll(Duration.ofMillis(100));
             records.forEach(accumulatedRecords::add);
             long currentTimeMs = System.currentTimeMillis();
-            if (currentTimeMs - startTimeMs > 60000) {
-                fail("Timed out before consuming expected records.");
-            }
+            assertFalse(currentTimeMs - startTimeMs > 60000, "Timed out before 
consuming expected records.");
         }
         return accumulatedRecords;
     }
 
     private void createTopic(String topicName) {
         Properties props = cluster.clientProperties();
-        try (Admin admin = Admin.create(props)) {
-            admin.createTopics(Collections.singleton(new NewTopic(topicName, 
1, (short) 1))).all().get();
-        } catch (Exception e) {
-            fail("Failed to create topic");
-        }
+        assertDoesNotThrow(() -> {
+            try (Admin admin = Admin.create(props)) {
+                admin.createTopics(Collections.singleton(new 
NewTopic(topicName, 1, (short) 1))).all().get();
+            }
+        }, "Failed to create topic");
     }
 
     private void deleteTopic(String topicName) {
         Properties props = cluster.clientProperties();
-        try (Admin admin = Admin.create(props)) {
-            admin.deleteTopics(Collections.singleton(topicName)).all().get();
-        } catch (Exception e) {
-            fail("Failed to create topic");
-        }
+        assertDoesNotThrow(() -> {
+            try (Admin admin = Admin.create(props)) {
+                
admin.deleteTopics(Collections.singleton(topicName)).all().get();
+            }
+        }, "Failed to delete topic");
     }
 
     private Admin createAdminClient() {
@@ -1981,7 +1835,7 @@ public class ShareConsumerTest {
         return new KafkaShareConsumer<>(props, keyDeserializer, 
valueDeserializer);
     }
 
-    private void warmup() throws InterruptedException, ExecutionException, 
TimeoutException {
+    private void warmup() throws InterruptedException {
         createTopic(warmupTp.topic());
         TestUtils.waitForCondition(() ->
                 
!cluster.brokers().get(0).metadataCache().getAliveBrokerNodes(new 
ListenerName("EXTERNAL")).isEmpty(),
@@ -2007,12 +1861,8 @@ public class ShareConsumerTest {
         alterEntries.put(configResource, List.of(new AlterConfigOp(new 
ConfigEntry(
             GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, newValue), 
AlterConfigOp.OpType.SET)));
         AlterConfigsOptions alterOptions = new AlterConfigsOptions();
-        try {
-            adminClient.incrementalAlterConfigs(alterEntries, alterOptions)
+        assertDoesNotThrow(() -> 
adminClient.incrementalAlterConfigs(alterEntries, alterOptions)
                 .all()
-                .get(60, TimeUnit.SECONDS);
-        } catch (Exception e) {
-            fail("Exception was thrown: ", e);
-        }
+                .get(60, TimeUnit.SECONDS), "Failed to alter configs");
     }
 }

Reply via email to