This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9e60fcc87f2 KAFKA-18181 Refactor ShareConsumerTest (#18105)
9e60fcc87f2 is described below
commit 9e60fcc87f2f74561a6ece97ed0774a8e42455a2
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Sat Dec 14 09:39:44 2024 +0800
KAFKA-18181 Refactor ShareConsumerTest (#18105)
Reviewers: Apoorv Mittal <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../java/kafka/test/api/ShareConsumerTest.java | 404 +++++++--------------
1 file changed, 127 insertions(+), 277 deletions(-)
diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java
b/core/src/test/java/kafka/test/api/ShareConsumerTest.java
index d31ca18e8b7..89b357cf4c3 100644
--- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java
+++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java
@@ -77,16 +77,14 @@ import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -96,7 +94,6 @@ import static
org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
@Timeout(1200)
@Tag("integration")
@@ -1003,7 +1000,8 @@ public class ShareConsumerTest {
@Flaky("KAFKA-18033")
@ParameterizedTest(name = "{displayName}.persister={0}")
@ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
- public void testMultipleConsumersInGroupConcurrentConsumption(String
persister) {
+ public void testMultipleConsumersInGroupConcurrentConsumption(String
persister)
+ throws InterruptedException, ExecutionException, TimeoutException {
AtomicInteger totalMessagesConsumed = new AtomicInteger(0);
int consumerCount = 4;
@@ -1013,51 +1011,37 @@ public class ShareConsumerTest {
String groupId = "group1";
alterShareAutoOffsetReset(groupId, "earliest");
- ExecutorService producerExecutorService =
Executors.newFixedThreadPool(producerCount);
- ExecutorService consumerExecutorService =
Executors.newFixedThreadPool(consumerCount);
-
+ List<CompletableFuture<Void>> producerFutures = new ArrayList<>();
for (int i = 0; i < producerCount; i++) {
- producerExecutorService.submit(() ->
produceMessages(messagesPerProducer));
+ producerFutures.add(CompletableFuture.runAsync(() ->
produceMessages(messagesPerProducer)));
}
- ConcurrentLinkedQueue<CompletableFuture<Integer>> futures = new
ConcurrentLinkedQueue<>();
int maxBytes = 100000;
+ List<CompletableFuture<Integer>> consumerFutures = new ArrayList<>();
for (int i = 0; i < consumerCount; i++) {
final int consumerNumber = i + 1;
- consumerExecutorService.submit(() -> {
- CompletableFuture<Integer> future = new CompletableFuture<>();
- futures.add(future);
- consumeMessages(totalMessagesConsumed, producerCount *
messagesPerProducer, groupId, consumerNumber, 30, true, future, maxBytes);
- });
+ consumerFutures.add(CompletableFuture.supplyAsync(() ->
+ consumeMessages(totalMessagesConsumed,
+ producerCount * messagesPerProducer, groupId,
consumerNumber,
+ 30, true, maxBytes)));
}
- producerExecutorService.shutdown();
- consumerExecutorService.shutdown();
+
CompletableFuture.allOf(producerFutures.toArray(CompletableFuture[]::new)).get(60,
TimeUnit.SECONDS);
+
CompletableFuture.allOf(consumerFutures.toArray(CompletableFuture[]::new)).get(60,
TimeUnit.SECONDS);
- try {
- assertTrue(producerExecutorService.awaitTermination(60,
TimeUnit.SECONDS)); // Wait for all producer threads to complete
- assertTrue(consumerExecutorService.awaitTermination(60,
TimeUnit.SECONDS)); // Wait for all consumer threads to complete
- int totalResult = 0;
- for (CompletableFuture<Integer> future : futures) {
- totalResult += future.get();
- }
- assertEquals(producerCount * messagesPerProducer,
totalMessagesConsumed.get());
- assertEquals(producerCount * messagesPerProducer, totalResult);
- } catch (Exception e) {
- fail("Exception occurred : " + e.getMessage());
- }
+ int totalResult =
consumerFutures.stream().mapToInt(CompletableFuture::join).sum();
+ assertEquals(producerCount * messagesPerProducer, totalResult);
}
@ParameterizedTest(name = "{displayName}.persister={0}")
@ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
- @SuppressWarnings("NPathComplexity")
- public void
testMultipleConsumersInMultipleGroupsConcurrentConsumption(String persister) {
+ public void
testMultipleConsumersInMultipleGroupsConcurrentConsumption(String persister)
+ throws ExecutionException, InterruptedException, TimeoutException {
AtomicInteger totalMessagesConsumedGroup1 = new AtomicInteger(0);
AtomicInteger totalMessagesConsumedGroup2 = new AtomicInteger(0);
AtomicInteger totalMessagesConsumedGroup3 = new AtomicInteger(0);
int producerCount = 4;
- int consumerCount = 2;
int messagesPerProducer = 2000;
final int totalMessagesSent = producerCount * messagesPerProducer;
@@ -1069,100 +1053,47 @@ public class ShareConsumerTest {
alterShareAutoOffsetReset(groupId2, "earliest");
alterShareAutoOffsetReset(groupId3, "earliest");
- ExecutorService producerExecutorService =
Executors.newFixedThreadPool(producerCount);
- ExecutorService shareGroupExecutorService1 =
Executors.newFixedThreadPool(consumerCount);
- ExecutorService shareGroupExecutorService2 =
Executors.newFixedThreadPool(consumerCount);
- ExecutorService shareGroupExecutorService3 =
Executors.newFixedThreadPool(consumerCount);
-
- CountDownLatch startSignal = new CountDownLatch(producerCount);
-
- ConcurrentLinkedQueue<CompletableFuture<Integer>> producerFutures =
new ConcurrentLinkedQueue<>();
-
+ List<CompletableFuture<Integer>> producerFutures = new ArrayList<>();
for (int i = 0; i < producerCount; i++) {
- producerExecutorService.submit(() -> {
- CompletableFuture<Integer> future =
produceMessages(messagesPerProducer);
- producerFutures.add(future);
- startSignal.countDown();
- });
+ producerFutures.add(CompletableFuture.supplyAsync(() ->
produceMessages(messagesPerProducer)));
}
-
- ConcurrentLinkedQueue<CompletableFuture<Integer>> futures1 = new
ConcurrentLinkedQueue<>();
- ConcurrentLinkedQueue<CompletableFuture<Integer>> futures2 = new
ConcurrentLinkedQueue<>();
- ConcurrentLinkedQueue<CompletableFuture<Integer>> futures3 = new
ConcurrentLinkedQueue<>();
-
// Wait for the producers to run
- try {
- boolean signalled = startSignal.await(15, TimeUnit.SECONDS);
- assertTrue(signalled);
- } catch (InterruptedException e) {
- fail("Exception awaiting start signal");
- }
+ assertDoesNotThrow(() ->
CompletableFuture.allOf(producerFutures.toArray(CompletableFuture[]::new))
+ .get(15, TimeUnit.SECONDS), "Exception awaiting
produceMessages");
+ int actualMessageSent =
producerFutures.stream().mapToInt(CompletableFuture::join).sum();
- int maxBytes = 100000;
+ List<CompletableFuture<Integer>> consumeMessagesFutures1 = new
ArrayList<>();
+ List<CompletableFuture<Integer>> consumeMessagesFutures2 = new
ArrayList<>();
+ List<CompletableFuture<Integer>> consumeMessagesFutures3 = new
ArrayList<>();
- for (int i = 0; i < consumerCount; i++) {
+ int maxBytes = 100000;
+ for (int i = 0; i < 2; i++) {
final int consumerNumber = i + 1;
- shareGroupExecutorService1.submit(() -> {
- CompletableFuture<Integer> future = new CompletableFuture<>();
- futures1.add(future);
- consumeMessages(totalMessagesConsumedGroup1,
totalMessagesSent, "group1", consumerNumber, 100, true, future, maxBytes);
- });
- shareGroupExecutorService2.submit(() -> {
- CompletableFuture<Integer> future = new CompletableFuture<>();
- futures2.add(future);
- consumeMessages(totalMessagesConsumedGroup2,
totalMessagesSent, "group2", consumerNumber, 100, true, future, maxBytes);
- });
- shareGroupExecutorService3.submit(() -> {
- CompletableFuture<Integer> future = new CompletableFuture<>();
- futures3.add(future);
- consumeMessages(totalMessagesConsumedGroup3,
totalMessagesSent, "group3", consumerNumber, 100, true, future, maxBytes);
- });
- }
- producerExecutorService.shutdown();
- shareGroupExecutorService1.shutdown();
- shareGroupExecutorService2.shutdown();
- shareGroupExecutorService3.shutdown();
- try {
- shareGroupExecutorService1.awaitTermination(120,
TimeUnit.SECONDS); // Wait for all consumer threads for group 1 to complete
- shareGroupExecutorService2.awaitTermination(120,
TimeUnit.SECONDS); // Wait for all consumer threads for group 2 to complete
- shareGroupExecutorService3.awaitTermination(120,
TimeUnit.SECONDS); // Wait for all consumer threads for group 3 to complete
-
- int totalResult1 = 0;
- for (CompletableFuture<Integer> future : futures1) {
- totalResult1 += future.get();
- }
+ consumeMessagesFutures1.add(CompletableFuture.supplyAsync(() ->
+ consumeMessages(totalMessagesConsumedGroup1,
totalMessagesSent,
+ "group1", consumerNumber, 100, true, maxBytes)));
- int totalResult2 = 0;
- for (CompletableFuture<Integer> future : futures2) {
- totalResult2 += future.get();
- }
+ consumeMessagesFutures2.add(CompletableFuture.supplyAsync(() ->
+ consumeMessages(totalMessagesConsumedGroup2,
totalMessagesSent,
+ "group2", consumerNumber, 100, true, maxBytes)));
- int totalResult3 = 0;
- for (CompletableFuture<Integer> future : futures3) {
- totalResult3 += future.get();
- }
+ consumeMessagesFutures3.add(CompletableFuture.supplyAsync(() ->
+ consumeMessages(totalMessagesConsumedGroup3,
totalMessagesSent,
+ "group3", consumerNumber, 100, true, maxBytes)));
+ }
- assertEquals(totalMessagesSent, totalMessagesConsumedGroup1.get());
- assertEquals(totalMessagesSent, totalMessagesConsumedGroup2.get());
- assertEquals(totalMessagesSent, totalMessagesConsumedGroup3.get());
- assertEquals(totalMessagesSent, totalResult1);
- assertEquals(totalMessagesSent, totalResult2);
- assertEquals(totalMessagesSent, totalResult3);
+ CompletableFuture.allOf(Stream.of(consumeMessagesFutures1.stream(),
consumeMessagesFutures2.stream(),
+
consumeMessagesFutures3.stream()).flatMap(Function.identity()).toArray(CompletableFuture[]::new))
+ .get(120, TimeUnit.SECONDS);
- int actualMessagesSent = 0;
- try {
- producerExecutorService.awaitTermination(60,
TimeUnit.SECONDS); // Wait for all producer threads to complete
+ int totalResult1 =
consumeMessagesFutures1.stream().mapToInt(CompletableFuture::join).sum();
+ int totalResult2 =
consumeMessagesFutures2.stream().mapToInt(CompletableFuture::join).sum();
+ int totalResult3 =
consumeMessagesFutures3.stream().mapToInt(CompletableFuture::join).sum();
- for (CompletableFuture<Integer> future : producerFutures) {
- actualMessagesSent += future.get();
- }
- } catch (Exception e) {
- fail("Exception occurred : " + e.getMessage());
- }
- assertEquals(totalMessagesSent, actualMessagesSent);
- } catch (Exception e) {
- fail("Exception occurred : " + e.getMessage());
- }
+ assertEquals(totalMessagesSent, totalResult1);
+ assertEquals(totalMessagesSent, totalResult2);
+ assertEquals(totalMessagesSent, totalResult3);
+ assertEquals(totalMessagesSent, actualMessageSent);
}
@ParameterizedTest(name = "{displayName}.persister={0}")
@@ -1214,7 +1145,8 @@ public class ShareConsumerTest {
@ParameterizedTest(name = "{displayName}.persister={0}")
@ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
- public void
testMultipleConsumersInGroupFailureConcurrentConsumption(String persister) {
+ public void
testMultipleConsumersInGroupFailureConcurrentConsumption(String persister)
+ throws InterruptedException, ExecutionException, TimeoutException {
AtomicInteger totalMessagesConsumed = new AtomicInteger(0);
int consumerCount = 4;
@@ -1225,58 +1157,35 @@ public class ShareConsumerTest {
alterShareAutoOffsetReset(groupId, "earliest");
- ExecutorService consumerExecutorService =
Executors.newFixedThreadPool(consumerCount);
- ExecutorService producerExecutorService =
Executors.newFixedThreadPool(producerCount);
-
+ List<CompletableFuture<Void>> produceMessageFutures = new
ArrayList<>();
for (int i = 0; i < producerCount; i++) {
- Runnable task = () -> produceMessages(messagesPerProducer);
- producerExecutorService.submit(task);
+ produceMessageFutures.add(CompletableFuture.runAsync(() ->
produceMessages(messagesPerProducer)));
}
- ConcurrentLinkedQueue<CompletableFuture<Integer>> futuresSuccess = new
ConcurrentLinkedQueue<>();
-
- CountDownLatch startSignal = new CountDownLatch(1);
-
int maxBytes = 1000000;
- consumerExecutorService.submit(() -> {
- // The "failing" consumer polls but immediately closes, which
releases the records for the other consumers
- CompletableFuture<Integer> future = new CompletableFuture<>();
- AtomicInteger failedMessagesConsumed = new AtomicInteger(0);
- consumeMessages(failedMessagesConsumed, producerCount *
messagesPerProducer, groupId, 0, 1, false, future);
- startSignal.countDown();
- });
+ // The "failing" consumer polls but immediately closes, which releases
the records for the other consumers
+ CompletableFuture<Integer> failedMessagesConsumedFuture =
CompletableFuture.supplyAsync(
+ () -> consumeMessages(new AtomicInteger(0), producerCount *
messagesPerProducer, groupId,
+ 0, 1, false));
// Wait for the failed consumer to run
- try {
- boolean signalled = startSignal.await(15, TimeUnit.SECONDS);
- assertTrue(signalled);
- } catch (InterruptedException e) {
- fail("Exception awaiting start signal");
- }
+ assertDoesNotThrow(() -> failedMessagesConsumedFuture.get(15,
TimeUnit.SECONDS),
+ "Exception awaiting consumeMessages");
+ List<CompletableFuture<Integer>> consumeMessagesFutures = new
ArrayList<>();
for (int i = 0; i < consumerCount; i++) {
final int consumerNumber = i + 1;
- consumerExecutorService.submit(() -> {
- CompletableFuture<Integer> future = new CompletableFuture<>();
- futuresSuccess.add(future);
- consumeMessages(totalMessagesConsumed, producerCount *
messagesPerProducer, groupId, consumerNumber, 40, true, future, maxBytes);
- });
- }
- producerExecutorService.shutdown();
- consumerExecutorService.shutdown();
- try {
- producerExecutorService.awaitTermination(60, TimeUnit.SECONDS); //
Wait for all producer threads to complete
- consumerExecutorService.awaitTermination(60, TimeUnit.SECONDS); //
Wait for all consumer threads to complete
- int totalSuccessResult = 0;
- for (CompletableFuture<Integer> future : futuresSuccess) {
- totalSuccessResult += future.get();
- }
- assertEquals(producerCount * messagesPerProducer,
totalMessagesConsumed.get());
- assertEquals(producerCount * messagesPerProducer,
totalSuccessResult);
- } catch (Exception e) {
- fail("Exception occurred : " + e.getMessage());
+ consumeMessagesFutures.add(CompletableFuture.supplyAsync(
+ () -> consumeMessages(totalMessagesConsumed, producerCount
* messagesPerProducer,
+ groupId, consumerNumber, 40, true, maxBytes)));
}
+
+
CompletableFuture.allOf(produceMessageFutures.toArray(CompletableFuture[]::new)).get(60,
TimeUnit.SECONDS);
+
CompletableFuture.allOf(consumeMessagesFutures.toArray(CompletableFuture[]::new)).get(60,
TimeUnit.SECONDS);
+
+ int totalSuccessResult =
consumeMessagesFutures.stream().mapToInt(CompletableFuture::join).sum();
+ assertEquals(producerCount * messagesPerProducer, totalSuccessResult);
}
@Flaky("KAFKA-18025")
@@ -1498,7 +1407,7 @@ public class ShareConsumerTest {
Thread.interrupted();
}
- assertDoesNotThrow(() -> shareConsumer.poll(Duration.ZERO));
+ assertDoesNotThrow(() -> shareConsumer.poll(Duration.ZERO),
"Failed to consume records");
}
}
@@ -1627,63 +1536,34 @@ public class ShareConsumerTest {
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), 0, null, "key".getBytes(), "value".getBytes());
// We write 10 records to the topic, so they would be written from
offsets 0-9 on the topic.
- try {
- for (int i = 0; i < 10; i++) {
- producer.send(record).get();
- }
- } catch (Exception e) {
- fail("Failed to send records: " + e);
+ for (int i = 0; i < 10; i++) {
+ assertDoesNotThrow(() -> producer.send(record).get(), "Failed
to send records");
}
// We delete records before offset 5, so the LSO should move to 5.
adminClient.deleteRecords(Collections.singletonMap(tp,
RecordsToDelete.beforeOffset(5L)));
- AtomicInteger totalMessagesConsumed = new AtomicInteger(0);
- CompletableFuture<Integer> future = new CompletableFuture<>();
- consumeMessages(totalMessagesConsumed, 5, groupId, 1, 10, true,
future);
+ int messageCount = consumeMessages(new AtomicInteger(0), 5,
groupId, 1, 10, true);
// The records returned belong to offsets 5-9.
- assertEquals(5, totalMessagesConsumed.get());
- try {
- assertEquals(5, future.get());
- } catch (Exception e) {
- fail("Exception occurred : " + e.getMessage());
- }
+ assertEquals(5, messageCount);
// We write 5 records to the topic, so they would be written from
offsets 10-14 on the topic.
- try {
- for (int i = 0; i < 5; i++) {
- producer.send(record).get();
- }
- } catch (Exception e) {
- fail("Failed to send records: " + e);
+ for (int i = 0; i < 5; i++) {
+ assertDoesNotThrow(() -> producer.send(record).get(), "Failed
to send records");
}
// We delete records before offset 14, so the LSO should move to
14.
adminClient.deleteRecords(Collections.singletonMap(tp,
RecordsToDelete.beforeOffset(14L)));
- totalMessagesConsumed = new AtomicInteger(0);
- future = new CompletableFuture<>();
- consumeMessages(totalMessagesConsumed, 1, groupId, 1, 10, true,
future);
+ int consumeMessagesCount = consumeMessages(new AtomicInteger(0),
1, groupId, 1, 10, true);
// The record returned belong to offset 14.
- assertEquals(1, totalMessagesConsumed.get());
- try {
- assertEquals(1, future.get());
- } catch (Exception e) {
- fail("Exception occurred : " + e.getMessage());
- }
+ assertEquals(1, consumeMessagesCount);
// We delete records before offset 15, so the LSO should move to
15 and now no records should be returned.
adminClient.deleteRecords(Collections.singletonMap(tp,
RecordsToDelete.beforeOffset(15L)));
- totalMessagesConsumed = new AtomicInteger(0);
- future = new CompletableFuture<>();
- consumeMessages(totalMessagesConsumed, 0, groupId, 1, 5, true,
future);
- assertEquals(0, totalMessagesConsumed.get());
- try {
- assertEquals(0, future.get());
- } catch (Exception e) {
- fail("Exception occurred : " + e.getMessage());
- }
+ messageCount = consumeMessages(new AtomicInteger(0), 0, groupId,
1, 5, true);
+ assertEquals(0, messageCount);
}
}
@@ -1740,7 +1620,7 @@ public class ShareConsumerTest {
@ParameterizedTest(name = "{displayName}.persister={0}")
@ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
- public void testShareAutoOffsetResetEarliestAfterLsoMovement(String
persister) throws Exception {
+ public void testShareAutoOffsetResetEarliestAfterLsoMovement(String
persister) {
alterShareAutoOffsetReset("group1", "earliest");
try (KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer())) {
@@ -1749,23 +1629,16 @@ public class ShareConsumerTest {
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
// We write 10 records to the topic, so they would be written from
offsets 0-9 on the topic.
- try {
- for (int i = 0; i < 10; i++) {
- producer.send(record).get();
- }
- } catch (Exception e) {
- fail("Failed to send records: " + e);
+ for (int i = 0; i < 10; i++) {
+ assertDoesNotThrow(() -> producer.send(record).get(), "Failed
to send records");
}
// We delete records before offset 5, so the LSO should move to 5.
adminClient.deleteRecords(Collections.singletonMap(tp,
RecordsToDelete.beforeOffset(5L)));
- AtomicInteger totalMessagesConsumed = new AtomicInteger(0);
- CompletableFuture<Integer> future = new CompletableFuture<>();
- consumeMessages(totalMessagesConsumed, 5, "group1", 1, 10, true,
future);
+ int consumedMessageCount = consumeMessages(new AtomicInteger(0),
5, "group1", 1, 10, true);
// The records returned belong to offsets 5-9.
- assertEquals(5, totalMessagesConsumed.get());
- assertEquals(5, future.get());
+ assertEquals(5, consumedMessageCount);
}
}
@@ -1809,27 +1682,13 @@ public class ShareConsumerTest {
}
}
- private CompletableFuture<Integer> produceMessages(int messageCount) {
- CompletableFuture<Integer> future = new CompletableFuture<>();
- Future<?>[] recordFutures = new Future<?>[messageCount];
- int messagesSent = 0;
+ private int produceMessages(int messageCount) {
try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer())) {
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
- for (int i = 0; i < messageCount; i++) {
- recordFutures[i] = producer.send(record);
- }
- for (int i = 0; i < messageCount; i++) {
- try {
- recordFutures[i].get();
- messagesSent++;
- } catch (Exception e) {
- fail("Failed to send record: " + e);
- }
- }
- } finally {
- future.complete(messagesSent);
+ IntStream.range(0, messageCount).forEach(__ ->
producer.send(record));
+ producer.flush();
}
- return future;
+ return messageCount;
}
private void produceMessagesWithTimestamp(int messageCount, long
startingTimestamp) {
@@ -1843,47 +1702,47 @@ public class ShareConsumerTest {
}
}
- private void consumeMessages(AtomicInteger totalMessagesConsumed,
+ private int consumeMessages(AtomicInteger totalMessagesConsumed,
int totalMessages,
String groupId,
int consumerNumber,
int maxPolls,
- boolean commit,
- CompletableFuture<Integer> future) {
- try (KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
groupId)) {
- shareConsumer.subscribe(Collections.singleton(tp.topic()));
- consumeMessages(shareConsumer, totalMessagesConsumed,
totalMessages, consumerNumber, maxPolls, commit, future);
- } catch (Exception e) {
- fail("Consumer " + consumerNumber + " failed with exception: " +
e);
- }
+ boolean commit) {
+ return assertDoesNotThrow(() -> {
+ try (KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(
+ new ByteArrayDeserializer(), new ByteArrayDeserializer(),
groupId)) {
+ shareConsumer.subscribe(Collections.singleton(tp.topic()));
+ return consumeMessages(shareConsumer, totalMessagesConsumed,
totalMessages, consumerNumber, maxPolls, commit);
+ }
+ }, "Consumer " + consumerNumber + " failed with exception");
}
- private void consumeMessages(AtomicInteger totalMessagesConsumed,
+ private int consumeMessages(AtomicInteger totalMessagesConsumed,
int totalMessages,
String groupId,
int consumerNumber,
int maxPolls,
boolean commit,
- CompletableFuture<Integer> future,
int maxFetchBytes) {
- try (KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
groupId, Map.of(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
maxFetchBytes))) {
- shareConsumer.subscribe(Collections.singleton(tp.topic()));
- consumeMessages(shareConsumer, totalMessagesConsumed,
totalMessages, consumerNumber, maxPolls, commit, future);
- } catch (Exception e) {
- fail("Consumer " + consumerNumber + " failed with exception: " +
e);
- }
+ return assertDoesNotThrow(() -> {
+ try (KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(
+ new ByteArrayDeserializer(), new ByteArrayDeserializer(),
groupId,
+ Map.of(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
maxFetchBytes))) {
+ shareConsumer.subscribe(Collections.singleton(tp.topic()));
+ return consumeMessages(shareConsumer, totalMessagesConsumed,
totalMessages, consumerNumber, maxPolls, commit);
+ }
+ }, "Consumer " + consumerNumber + " failed with exception");
}
- private void consumeMessages(KafkaShareConsumer<byte[], byte[]> consumer,
+ private int consumeMessages(KafkaShareConsumer<byte[], byte[]> consumer,
AtomicInteger totalMessagesConsumed,
int totalMessages,
int consumerNumber,
int maxPolls,
- boolean commit,
- CompletableFuture<Integer> future) {
- int messagesConsumed = 0;
- int retries = 0;
- try {
+ boolean commit) {
+ return assertDoesNotThrow(() -> {
+ int messagesConsumed = 0;
+ int retries = 0;
if (totalMessages > 0) {
while (totalMessagesConsumed.get() < totalMessages && retries
< maxPolls) {
ConsumerRecords<byte[], byte[]> records =
consumer.poll(Duration.ofMillis(2000));
@@ -1904,11 +1763,8 @@ public class ShareConsumerTest {
// Complete acknowledgement of the records
consumer.commitSync(Duration.ofMillis(10000));
}
- } catch (Exception e) {
- fail("Consumer " + consumerNumber + " failed with exception: " +
e);
- } finally {
- future.complete(messagesConsumed);
- }
+ return messagesConsumed;
+ }, "Consumer " + consumerNumber + " failed with exception");
}
private <K, V> List<ConsumerRecord<K, V>>
consumeRecords(KafkaShareConsumer<K, V> consumer,
@@ -1919,29 +1775,27 @@ public class ShareConsumerTest {
ConsumerRecords<K, V> records =
consumer.poll(Duration.ofMillis(100));
records.forEach(accumulatedRecords::add);
long currentTimeMs = System.currentTimeMillis();
- if (currentTimeMs - startTimeMs > 60000) {
- fail("Timed out before consuming expected records.");
- }
+ assertFalse(currentTimeMs - startTimeMs > 60000, "Timed out before
consuming expected records.");
}
return accumulatedRecords;
}
private void createTopic(String topicName) {
Properties props = cluster.clientProperties();
- try (Admin admin = Admin.create(props)) {
- admin.createTopics(Collections.singleton(new NewTopic(topicName,
1, (short) 1))).all().get();
- } catch (Exception e) {
- fail("Failed to create topic");
- }
+ assertDoesNotThrow(() -> {
+ try (Admin admin = Admin.create(props)) {
+ admin.createTopics(Collections.singleton(new
NewTopic(topicName, 1, (short) 1))).all().get();
+ }
+ }, "Failed to create topic");
}
private void deleteTopic(String topicName) {
Properties props = cluster.clientProperties();
- try (Admin admin = Admin.create(props)) {
- admin.deleteTopics(Collections.singleton(topicName)).all().get();
- } catch (Exception e) {
- fail("Failed to create topic");
- }
+ assertDoesNotThrow(() -> {
+ try (Admin admin = Admin.create(props)) {
+
admin.deleteTopics(Collections.singleton(topicName)).all().get();
+ }
+ }, "Failed to delete topic");
}
private Admin createAdminClient() {
@@ -1981,7 +1835,7 @@ public class ShareConsumerTest {
return new KafkaShareConsumer<>(props, keyDeserializer,
valueDeserializer);
}
- private void warmup() throws InterruptedException, ExecutionException,
TimeoutException {
+ private void warmup() throws InterruptedException {
createTopic(warmupTp.topic());
TestUtils.waitForCondition(() ->
!cluster.brokers().get(0).metadataCache().getAliveBrokerNodes(new
ListenerName("EXTERNAL")).isEmpty(),
@@ -2007,12 +1861,8 @@ public class ShareConsumerTest {
alterEntries.put(configResource, List.of(new AlterConfigOp(new
ConfigEntry(
GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, newValue),
AlterConfigOp.OpType.SET)));
AlterConfigsOptions alterOptions = new AlterConfigsOptions();
- try {
- adminClient.incrementalAlterConfigs(alterEntries, alterOptions)
+ assertDoesNotThrow(() ->
adminClient.incrementalAlterConfigs(alterEntries, alterOptions)
.all()
- .get(60, TimeUnit.SECONDS);
- } catch (Exception e) {
- fail("Exception was thrown: ", e);
- }
+ .get(60, TimeUnit.SECONDS), "Failed to alter configs");
}
}