This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 01afba8fdbd MINOR: Refactor ShareConsumerTest to use
ClusterTestExtensions. (#18656)
01afba8fdbd is described below
commit 01afba8fdbd2d24adbc6ab64bca361ed53f870ac
Author: Sushant Mahajan <[email protected]>
AuthorDate: Thu Jan 23 22:05:33 2025 +0530
MINOR: Refactor ShareConsumerTest to use ClusterTestExtensions. (#18656)
Reviewers: ShivsundarR <[email protected]>, Apoorv Mittal
<[email protected]>, Andrew Schofield <[email protected]>
---
.../java/kafka/test/api/ShareConsumerTest.java | 582 +++++++++++----------
.../kafka/common/test/api/ClusterInstance.java | 16 +
2 files changed, 336 insertions(+), 262 deletions(-)
diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java
b/core/src/test/java/kafka/test/api/ShareConsumerTest.java
index 237fe34bbe7..c7e62d0eb70 100644
--- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java
+++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java
@@ -26,12 +26,12 @@ import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.clients.consumer.AcknowledgementCommitCallback;
+import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.KafkaShareConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.consumer.ShareConsumer;
+import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
@@ -48,22 +48,21 @@ import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.record.TimestampType;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.test.KafkaClusterTestKit;
-import org.apache.kafka.common.test.TestKitNodes;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.ClusterTestExtensions;
import org.apache.kafka.common.test.api.Flaky;
+import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.coordinator.group.GroupConfig;
import org.apache.kafka.test.TestUtils;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
import java.time.Duration;
import java.util.ArrayList;
@@ -101,67 +100,64 @@ import static org.junit.jupiter.api.Assertions.fail;
@Timeout(1200)
@Tag("integration")
+@ExtendWith(ClusterTestExtensions.class)
+@ClusterTestDefaults(
+ serverProperties = {
+ @ClusterConfigProperty(key = "auto.create.topics.enable", value =
"false"),
+ @ClusterConfigProperty(key = "group.coordinator.rebalance.protocols",
value = "classic,consumer,share"),
+ @ClusterConfigProperty(key = "group.share.enable", value = "true"),
+ @ClusterConfigProperty(key = "group.share.partition.max.record.locks",
value = "10000"),
+ @ClusterConfigProperty(key = "group.share.record.lock.duration.ms",
value = "15000"),
+ @ClusterConfigProperty(key = "offsets.topic.replication.factor", value
= "1"),
+ @ClusterConfigProperty(key = "share.coordinator.state.topic.min.isr",
value = "1"),
+ @ClusterConfigProperty(key =
"share.coordinator.state.topic.num.partitions", value = "3"),
+ @ClusterConfigProperty(key =
"share.coordinator.state.topic.replication.factor", value = "1"),
+ @ClusterConfigProperty(key = "transaction.state.log.min.isr", value =
"1"),
+ @ClusterConfigProperty(key =
"transaction.state.log.replication.factor", value = "1"),
+ @ClusterConfigProperty(key = "unstable.api.versions.enable", value =
"true")
+ },
+ types = {Type.KRAFT}
+)
public class ShareConsumerTest {
- private KafkaClusterTestKit cluster;
+ private final ClusterInstance cluster;
private final TopicPartition tp = new TopicPartition("topic", 0);
private final TopicPartition tp2 = new TopicPartition("topic2", 0);
private final TopicPartition warmupTp = new TopicPartition("warmup", 0);
private List<TopicPartition> sgsTopicPartitions;
- private Admin adminClient;
-
- @BeforeEach
- public void createCluster(TestInfo testInfo) throws Exception {
- cluster = new KafkaClusterTestKit.Builder(
- new TestKitNodes.Builder()
- .setNumBrokerNodes(1)
- .setNumControllerNodes(1)
- .build())
- .setConfigProp("auto.create.topics.enable", "false")
- .setConfigProp("group.coordinator.rebalance.protocols",
"classic,consumer,share")
- .setConfigProp("group.share.enable", "true")
- .setConfigProp("group.share.partition.max.record.locks", "10000")
- .setConfigProp("group.share.record.lock.duration.ms", "15000")
- .setConfigProp("offsets.topic.replication.factor", "1")
- .setConfigProp("share.coordinator.state.topic.min.isr", "1")
- .setConfigProp("share.coordinator.state.topic.num.partitions", "3")
- .setConfigProp("share.coordinator.state.topic.replication.factor",
"1")
- .setConfigProp("transaction.state.log.min.isr", "1")
- .setConfigProp("transaction.state.log.replication.factor", "1")
- .setConfigProp("unstable.api.versions.enable", "true")
- .build();
- cluster.format();
- cluster.startup();
- cluster.waitForActiveController();
- cluster.waitForReadyBrokers();
- createTopic("topic");
- createTopic("topic2");
- adminClient = createAdminClient();
- sgsTopicPartitions = IntStream.range(0, 3)
- .mapToObj(part -> new
TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, part))
- .toList();
- warmup();
- }
-
- @AfterEach
- public void destroyCluster() throws Exception {
- adminClient.close();
- cluster.close();
- }
-
- @Test
+ public ShareConsumerTest(ClusterInstance cluster) {
+ this.cluster = cluster;
+ }
+
+ private void setup() {
+ try {
+ this.cluster.waitForReadyBrokers();
+ createTopic("topic");
+ createTopic("topic2");
+ sgsTopicPartitions = IntStream.range(0, 3)
+ .mapToObj(part -> new
TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, part))
+ .toList();
+ this.warmup();
+ } catch (Exception e) {
+ fail(e);
+ }
+ }
+
+ @ClusterTest
public void testPollNoSubscribeFails() {
- try (KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
+ setup();
+ try (ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
assertEquals(Collections.emptySet(), shareConsumer.subscription());
// "Consumer is not subscribed to any topics."
assertThrows(IllegalStateException.class, () ->
shareConsumer.poll(Duration.ofMillis(500)));
}
}
- @Test
+ @ClusterTest
public void testSubscribeAndPollNoRecords() {
+ setup();
alterShareAutoOffsetReset("group1", "earliest");
- try (KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
+ try (ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
Set<String> subscription = Collections.singleton(tp.topic());
shareConsumer.subscribe(subscription);
assertEquals(subscription, shareConsumer.subscription());
@@ -171,10 +167,11 @@ public class ShareConsumerTest {
}
}
- @Test
+ @ClusterTest
public void testSubscribePollUnsubscribe() {
+ setup();
alterShareAutoOffsetReset("group1", "earliest");
- try (KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
+ try (ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
Set<String> subscription = Collections.singleton(tp.topic());
shareConsumer.subscribe(subscription);
assertEquals(subscription, shareConsumer.subscription());
@@ -186,10 +183,11 @@ public class ShareConsumerTest {
}
}
- @Test
+ @ClusterTest
public void testSubscribePollSubscribe() {
+ setup();
alterShareAutoOffsetReset("group1", "earliest");
- try (KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
+ try (ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
Set<String> subscription = Collections.singleton(tp.topic());
shareConsumer.subscribe(subscription);
assertEquals(subscription, shareConsumer.subscription());
@@ -203,10 +201,11 @@ public class ShareConsumerTest {
}
}
- @Test
+ @ClusterTest
public void testSubscribeUnsubscribePollFails() {
+ setup();
alterShareAutoOffsetReset("group1", "earliest");
- try (KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
+ try (ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
Set<String> subscription = Collections.singleton(tp.topic());
shareConsumer.subscribe(subscription);
assertEquals(subscription, shareConsumer.subscription());
@@ -220,10 +219,11 @@ public class ShareConsumerTest {
}
}
- @Test
+ @ClusterTest
public void testSubscribeSubscribeEmptyPollFails() {
+ setup();
alterShareAutoOffsetReset("group1", "earliest");
- try (KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
+ try (ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
Set<String> subscription = Collections.singleton(tp.topic());
shareConsumer.subscribe(subscription);
assertEquals(subscription, shareConsumer.subscription());
@@ -237,11 +237,12 @@ public class ShareConsumerTest {
}
}
- @Test
+ @ClusterTest
public void testSubscriptionAndPoll() {
+ setup();
alterShareAutoOffsetReset("group1", "earliest");
- try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
- KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
producer.send(record);
@@ -253,11 +254,12 @@ public class ShareConsumerTest {
}
}
- @Test
+ @ClusterTest
public void testSubscriptionAndPollMultiple() {
+ setup();
alterShareAutoOffsetReset("group1", "earliest");
- try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
- KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
producer.send(record);
@@ -275,11 +277,12 @@ public class ShareConsumerTest {
}
}
- @Test
+ @ClusterTest
public void testAcknowledgementSentOnSubscriptionChange() throws
ExecutionException, InterruptedException {
+ setup();
alterShareAutoOffsetReset("group1", "earliest");
- try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
- KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
Map<TopicPartition, Set<Long>> partitionOffsetsMap = new
HashMap<>();
Map<TopicPartition, Exception> partitionExceptionMap = new
HashMap<>();
@@ -311,11 +314,12 @@ public class ShareConsumerTest {
}
}
- @Test
+ @ClusterTest
public void testAcknowledgementCommitCallbackSuccessfulAcknowledgement()
throws Exception {
+ setup();
alterShareAutoOffsetReset("group1", "earliest");
- try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
- KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
Map<TopicPartition, Set<Long>> partitionOffsetsMap = new
HashMap<>();
Map<TopicPartition, Exception> partitionExceptionMap = new
HashMap<>();
@@ -341,11 +345,12 @@ public class ShareConsumerTest {
}
}
- @Test
+ @ClusterTest
public void testAcknowledgementCommitCallbackOnClose() {
+ setup();
alterShareAutoOffsetReset("group1", "earliest");
- try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
- KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
Map<TopicPartition, Set<Long>> partitionOffsetsMap = new
HashMap<>();
Map<TopicPartition, Exception> partitionExceptionMap = new
HashMap<>();
@@ -372,11 +377,12 @@ public class ShareConsumerTest {
}
@Flaky("KAFKA-18033")
- @Test
+ @ClusterTest
public void testAcknowledgementCommitCallbackInvalidRecordStateException()
throws Exception {
+ setup();
alterShareAutoOffsetReset("group1", "earliest");
- try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
- KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
Map<TopicPartition, Set<Long>> partitionOffsetsMap = new
HashMap<>();
Map<TopicPartition, Exception> partitionExceptionMap = new
HashMap<>();
@@ -426,11 +432,12 @@ public class ShareConsumerTest {
}
}
- @Test
+ @ClusterTest
public void testHeaders() {
+ setup();
alterShareAutoOffsetReset("group1", "earliest");
- try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
- KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
int numRecords = 1;
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
@@ -454,8 +461,16 @@ public class ShareConsumerTest {
private void testHeadersSerializeDeserialize(Serializer<byte[]>
serializer, Deserializer<byte[]> deserializer) {
alterShareAutoOffsetReset("group1", "earliest");
- try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), serializer);
- KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(deserializer, new ByteArrayDeserializer(), "group1")) {
+ Map<String, Object> producerConfig = Map.of(
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
serializer.getClass().getName()
+ );
+
+ Map<String, Object> consumerConfig = Map.of(
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
deserializer.getClass().getName()
+ );
+
+ try (Producer<byte[], byte[]> producer =
createProducer(producerConfig);
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1", consumerConfig)) {
int numRecords = 1;
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
@@ -469,20 +484,22 @@ public class ShareConsumerTest {
}
}
- @Test
+ @ClusterTest
public void testHeadersSerializerDeserializer() {
+ setup();
testHeadersSerializeDeserialize(new BaseConsumerTest.SerializerImpl(),
new BaseConsumerTest.DeserializerImpl());
verifyShareGroupStateTopicRecordsProduced();
}
- @Test
+ @ClusterTest
public void testMaxPollRecords() {
+ setup();
int numRecords = 10000;
int maxPollRecords = 2;
alterShareAutoOffsetReset("group1", "earliest");
- try (KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
- "group1",
Collections.singletonMap(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
String.valueOf(maxPollRecords)))) {
+ try (ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1",
+ Collections.singletonMap(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
String.valueOf(maxPollRecords)))) {
long startingTimestamp = System.currentTimeMillis();
produceMessagesWithTimestamp(numRecords, startingTimestamp);
@@ -508,12 +525,13 @@ public class ShareConsumerTest {
}
}
- @Test
+ @ClusterTest
public void testControlRecordsSkipped() throws Exception {
+ setup();
alterShareAutoOffsetReset("group1", "earliest");
- try (KafkaProducer<byte[], byte[]> transactionalProducer =
createProducer(new ByteArraySerializer(), new ByteArraySerializer(), "T1");
- KafkaProducer<byte[], byte[]> nonTransactionalProducer =
createProducer(new ByteArraySerializer(), new ByteArraySerializer());
- KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
+ try (Producer<byte[], byte[]> transactionalProducer =
createProducer("T1");
+ Producer<byte[], byte[]> nonTransactionalProducer =
createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
@@ -553,11 +571,12 @@ public class ShareConsumerTest {
}
}
- @Test
+ @ClusterTest
public void testExplicitAcknowledgeSuccess() {
+ setup();
alterShareAutoOffsetReset("group1", "earliest");
- try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
- KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
producer.send(record);
@@ -574,11 +593,12 @@ public class ShareConsumerTest {
}
}
- @Test
+ @ClusterTest
public void testExplicitAcknowledgeCommitSuccess() {
+ setup();
alterShareAutoOffsetReset("group1", "earliest");
- try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
- KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
producer.send(record);
@@ -597,12 +617,13 @@ public class ShareConsumerTest {
}
}
- @Test
+ @ClusterTest
public void testExplicitAcknowledgementCommitAsync() throws
InterruptedException {
+ setup();
alterShareAutoOffsetReset("group1", "earliest");
- try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
- KafkaShareConsumer<byte[], byte[]> shareConsumer1 =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
- KafkaShareConsumer<byte[], byte[]> shareConsumer2 =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer1 =
createShareConsumer("group1");
+ ShareConsumer<byte[], byte[]> shareConsumer2 =
createShareConsumer("group1")) {
ProducerRecord<byte[], byte[]> record1 = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
ProducerRecord<byte[], byte[]> record2 = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
@@ -653,11 +674,12 @@ public class ShareConsumerTest {
}
}
- @Test
+ @ClusterTest
public void testExplicitAcknowledgementCommitAsyncPartialBatch() {
+ setup();
alterShareAutoOffsetReset("group1", "earliest");
- try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
- KafkaShareConsumer<byte[], byte[]> shareConsumer1 =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer1 =
createShareConsumer("group1")) {
ProducerRecord<byte[], byte[]> record1 = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
ProducerRecord<byte[], byte[]> record2 = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
@@ -716,11 +738,12 @@ public class ShareConsumerTest {
}
}
- @Test
+ @ClusterTest
public void testExplicitAcknowledgeReleasePollAccept() {
+ setup();
alterShareAutoOffsetReset("group1", "earliest");
- try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
- KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
producer.send(record);
@@ -739,11 +762,12 @@ public class ShareConsumerTest {
}
}
- @Test
+ @ClusterTest
public void testExplicitAcknowledgeReleaseAccept() {
+ setup();
alterShareAutoOffsetReset("group1", "earliest");
- try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
- KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
producer.send(record);
@@ -759,11 +783,12 @@ public class ShareConsumerTest {
}
}
- @Test
+ @ClusterTest
public void testExplicitAcknowledgeReleaseClose() {
+ setup();
alterShareAutoOffsetReset("group1", "earliest");
- try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
- KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
producer.send(record);
@@ -777,11 +802,12 @@ public class ShareConsumerTest {
}
}
- @Test
+ @ClusterTest
public void testExplicitAcknowledgeThrowsNotInBatch() {
+ setup();
alterShareAutoOffsetReset("group1", "earliest");
- try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
- KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
producer.send(record);
@@ -799,11 +825,12 @@ public class ShareConsumerTest {
}
}
- @Test
+ @ClusterTest
public void testImplicitAcknowledgeFailsExplicit() {
+ setup();
alterShareAutoOffsetReset("group1", "earliest");
- try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
- KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
producer.send(record);
@@ -820,11 +847,12 @@ public class ShareConsumerTest {
}
}
- @Test
+ @ClusterTest
public void testImplicitAcknowledgeCommitSync() {
+ setup();
alterShareAutoOffsetReset("group1", "earliest");
- try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
- KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
producer.send(record);
@@ -843,11 +871,12 @@ public class ShareConsumerTest {
}
}
- @Test
+ @ClusterTest
public void testImplicitAcknowledgementCommitAsync() throws
InterruptedException {
+ setup();
alterShareAutoOffsetReset("group1", "earliest");
- try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
- KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
ProducerRecord<byte[], byte[]> record1 = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
ProducerRecord<byte[], byte[]> record2 = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
@@ -882,14 +911,19 @@ public class ShareConsumerTest {
}
}
- @Test
+ @ClusterTest
public void testFetchRecordLargerThanMaxPartitionFetchBytes() throws
Exception {
+ setup();
int maxPartitionFetchBytes = 10000;
alterShareAutoOffsetReset("group1", "earliest");
- try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
- KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
- "group1",
Collections.singletonMap(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
String.valueOf(maxPartitionFetchBytes)))) {
+ try (
+ Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+ "group1",
+
Collections.singletonMap(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
String.valueOf(maxPartitionFetchBytes))
+ )
+ ) {
ProducerRecord<byte[], byte[]> smallRecord = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
ProducerRecord<byte[], byte[]> bigRecord = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), new
byte[maxPartitionFetchBytes]);
@@ -904,13 +938,14 @@ public class ShareConsumerTest {
}
}
- @Test
+ @ClusterTest
public void testMultipleConsumersWithDifferentGroupIds() throws
InterruptedException {
+ setup();
alterShareAutoOffsetReset("group1", "earliest");
alterShareAutoOffsetReset("group2", "earliest");
- try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
- KafkaShareConsumer<byte[], byte[]> shareConsumer1 =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
- KafkaShareConsumer<byte[], byte[]> shareConsumer2 =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group2")) {
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer1 =
createShareConsumer("group1");
+ ShareConsumer<byte[], byte[]> shareConsumer2 =
createShareConsumer("group2")) {
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
@@ -955,12 +990,13 @@ public class ShareConsumerTest {
}
}
- @Test
+ @ClusterTest
public void testMultipleConsumersInGroupSequentialConsumption() {
+ setup();
alterShareAutoOffsetReset("group1", "earliest");
- try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
- KafkaShareConsumer<byte[], byte[]> shareConsumer1 =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
- KafkaShareConsumer<byte[], byte[]> shareConsumer2 =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer1 =
createShareConsumer("group1");
+ ShareConsumer<byte[], byte[]> shareConsumer2 =
createShareConsumer("group1")) {
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
shareConsumer1.subscribe(Collections.singleton(tp.topic()));
@@ -992,9 +1028,10 @@ public class ShareConsumerTest {
}
@Flaky("KAFKA-18033")
- @Test
+ @ClusterTest
public void testMultipleConsumersInGroupConcurrentConsumption()
throws InterruptedException, ExecutionException, TimeoutException {
+ setup();
AtomicInteger totalMessagesConsumed = new AtomicInteger(0);
int consumerCount = 4;
@@ -1026,9 +1063,10 @@ public class ShareConsumerTest {
assertEquals(producerCount * messagesPerProducer, totalResult);
}
- @Test
+ @ClusterTest
public void testMultipleConsumersInMultipleGroupsConcurrentConsumption()
throws ExecutionException, InterruptedException, TimeoutException {
+ setup();
AtomicInteger totalMessagesConsumedGroup1 = new AtomicInteger(0);
AtomicInteger totalMessagesConsumedGroup2 = new AtomicInteger(0);
AtomicInteger totalMessagesConsumedGroup3 = new AtomicInteger(0);
@@ -1089,12 +1127,13 @@ public class ShareConsumerTest {
verifyShareGroupStateTopicRecordsProduced();
}
- @Test
+ @ClusterTest
public void testConsumerCloseInGroupSequential() {
+ setup();
alterShareAutoOffsetReset("group1", "earliest");
- try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
- KafkaShareConsumer<byte[], byte[]> shareConsumer1 =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
- KafkaShareConsumer<byte[], byte[]> shareConsumer2 =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer1 =
createShareConsumer("group1");
+ ShareConsumer<byte[], byte[]> shareConsumer2 =
createShareConsumer("group1")) {
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
shareConsumer1.subscribe(Collections.singleton(tp.topic()));
@@ -1136,9 +1175,10 @@ public class ShareConsumerTest {
}
}
- @Test
+ @ClusterTest
public void testMultipleConsumersInGroupFailureConcurrentConsumption()
throws InterruptedException, ExecutionException, TimeoutException {
+ setup();
AtomicInteger totalMessagesConsumed = new AtomicInteger(0);
int consumerCount = 4;
@@ -1181,11 +1221,12 @@ public class ShareConsumerTest {
verifyShareGroupStateTopicRecordsProduced();
}
- @Test
+ @ClusterTest
public void testAcquisitionLockTimeoutOnConsumer() throws
InterruptedException {
+ setup();
alterShareAutoOffsetReset("group1", "earliest");
- try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
- KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
ProducerRecord<byte[], byte[]> producerRecord1 = new
ProducerRecord<>(tp.topic(), tp.partition(), null,
"key_1".getBytes(), "value_1".getBytes());
@@ -1242,14 +1283,15 @@ public class ShareConsumerTest {
}
/**
- * Test to verify that the acknowledgement commit callback cannot invoke
methods of KafkaShareConsumer.
+ * Test to verify that the acknowledgement commit callback cannot invoke
methods of ShareConsumer.
* The exception thrown is verified in {@link
TestableAcknowledgementCommitCallbackWithShareConsumer}
*/
- @Test
+ @ClusterTest
public void
testAcknowledgementCommitCallbackCallsShareConsumerDisallowed() {
+ setup();
alterShareAutoOffsetReset("group1", "earliest");
- try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
- KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
producer.send(record);
@@ -1258,7 +1300,7 @@ public class ShareConsumerTest {
shareConsumer.setAcknowledgementCommitCallback(new
TestableAcknowledgementCommitCallbackWithShareConsumer<>(shareConsumer));
shareConsumer.subscribe(Collections.singleton(tp.topic()));
- // The acknowledgment commit callback will try to call a method of
KafkaShareConsumer
+ // The acknowledgment commit callback will try to call a method of
ShareConsumer
shareConsumer.poll(Duration.ofMillis(5000));
// The second poll sends the acknowledgements implicitly.
// The acknowledgement commit callback will be called and the
exception is thrown.
@@ -1269,15 +1311,15 @@ public class ShareConsumerTest {
}
private class TestableAcknowledgementCommitCallbackWithShareConsumer<K, V>
implements AcknowledgementCommitCallback {
- private final KafkaShareConsumer<K, V> shareConsumer;
+ private final ShareConsumer<K, V> shareConsumer;
-
TestableAcknowledgementCommitCallbackWithShareConsumer(KafkaShareConsumer<K, V>
shareConsumer) {
+
TestableAcknowledgementCommitCallbackWithShareConsumer(ShareConsumer<K, V>
shareConsumer) {
this.shareConsumer = shareConsumer;
}
@Override
public void onComplete(Map<TopicIdPartition, Set<Long>> offsetsMap,
Exception exception) {
- // Accessing methods of KafkaShareConsumer should throw an
exception.
+ // Accessing methods of ShareConsumer should throw an exception.
assertThrows(IllegalStateException.class, shareConsumer::close);
assertThrows(IllegalStateException.class, () ->
shareConsumer.subscribe(Collections.singleton(tp.topic())));
assertThrows(IllegalStateException.class, () ->
shareConsumer.poll(Duration.ofMillis(5000)));
@@ -1285,20 +1327,21 @@ public class ShareConsumerTest {
}
/**
- * Test to verify that the acknowledgement commit callback can invoke
KafkaShareConsumer.wakeup() and it
+ * Test to verify that the acknowledgement commit callback can invoke
ShareConsumer.wakeup() and it
* wakes up the enclosing poll.
*/
- @Test
+ @ClusterTest
public void testAcknowledgementCommitCallbackCallsShareConsumerWakeup()
throws InterruptedException {
+ setup();
alterShareAutoOffsetReset("group1", "earliest");
- try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
- KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
producer.send(record);
producer.flush();
- // The acknowledgment commit callback will try to call a method of
KafkaShareConsumer
+ // The acknowledgment commit callback will try to call a method of
ShareConsumer
shareConsumer.setAcknowledgementCommitCallback(new
TestableAcknowledgementCommitCallbackWakeup<>(shareConsumer));
shareConsumer.subscribe(Collections.singleton(tp.topic()));
@@ -1324,9 +1367,9 @@ public class ShareConsumerTest {
}
private static class TestableAcknowledgementCommitCallbackWakeup<K, V>
implements AcknowledgementCommitCallback {
- private final KafkaShareConsumer<K, V> shareConsumer;
+ private final ShareConsumer<K, V> shareConsumer;
- TestableAcknowledgementCommitCallbackWakeup(KafkaShareConsumer<K, V>
shareConsumer) {
+ TestableAcknowledgementCommitCallbackWakeup(ShareConsumer<K, V>
shareConsumer) {
this.shareConsumer = shareConsumer;
}
@@ -1340,11 +1383,12 @@ public class ShareConsumerTest {
* Test to verify that the acknowledgement commit callback can throw an
exception, and it is propagated
* to the caller of poll().
*/
- @Test
+ @ClusterTest
public void testAcknowledgementCommitCallbackThrowsException() throws
InterruptedException {
+ setup();
alterShareAutoOffsetReset("group1", "earliest");
- try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
- KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
producer.send(record);
@@ -1377,13 +1421,14 @@ public class ShareConsumerTest {
}
/**
- * Test to verify that calling Thread.interrupt() before
KafkaShareConsumer.poll(Duration)
+ * Test to verify that calling Thread.interrupt() before
ShareConsumer.poll(Duration)
* causes it to throw InterruptException
*/
- @Test
+ @ClusterTest
public void testPollThrowsInterruptExceptionIfInterrupted() {
+ setup();
alterShareAutoOffsetReset("group1", "earliest");
- try (KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
+ try (ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
shareConsumer.subscribe(Collections.singleton(tp.topic()));
@@ -1404,10 +1449,11 @@ public class ShareConsumerTest {
* Test to verify that InvalidTopicException is thrown if the consumer
subscribes
* to an invalid topic.
*/
- @Test
+ @ClusterTest
public void testSubscribeOnInvalidTopicThrowsInvalidTopicException() {
+ setup();
alterShareAutoOffsetReset("group1", "earliest");
- try (KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
+ try (ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
shareConsumer.subscribe(Collections.singleton("topic abc"));
@@ -1421,11 +1467,12 @@ public class ShareConsumerTest {
* Test to ensure that a wakeup when records are buffered doesn't prevent
the records
* being returned on the next poll.
*/
- @Test
+ @ClusterTest
public void testWakeupWithFetchedRecordsAvailable() {
+ setup();
alterShareAutoOffsetReset("group1", "earliest");
- try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
- KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
producer.send(record);
@@ -1442,11 +1489,12 @@ public class ShareConsumerTest {
}
}
- @Test
+ @ClusterTest
public void testSubscriptionFollowedByTopicCreation() throws
InterruptedException {
+ setup();
alterShareAutoOffsetReset("group1", "earliest");
- try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
- KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
String topic = "foo";
shareConsumer.subscribe(Collections.singleton(topic));
@@ -1471,16 +1519,17 @@ public class ShareConsumerTest {
}
}
- @Test
+ @ClusterTest
public void testSubscriptionAndPollFollowedByTopicDeletion() throws
InterruptedException, ExecutionException {
+ setup();
String topic1 = "bar";
String topic2 = "baz";
createTopic(topic1);
createTopic(topic2);
alterShareAutoOffsetReset("group1", "earliest");
- try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
- KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
ProducerRecord<byte[], byte[]> recordTopic1 = new
ProducerRecord<>(topic1, 0, null, "key".getBytes(), "value".getBytes());
ProducerRecord<byte[], byte[]> recordTopic2 = new
ProducerRecord<>(topic2, 0, null, "key".getBytes(), "value".getBytes());
@@ -1513,12 +1562,16 @@ public class ShareConsumerTest {
}
}
- @Test
+ @ClusterTest
public void testLsoMovementByRecordsDeletion() {
+ setup();
String groupId = "group1";
alterShareAutoOffsetReset(groupId, "earliest");
- try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer())) {
+ try (
+ Producer<byte[], byte[]> producer = createProducer();
+ Admin adminClient = createAdminClient()
+ ) {
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), 0, null, "key".getBytes(), "value".getBytes());
@@ -1555,10 +1608,11 @@ public class ShareConsumerTest {
}
}
- @Test
+ @ClusterTest
public void testShareAutoOffsetResetDefaultValue() {
- try (KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
- KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer())) {
+ setup();
+ try (ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1");
+ Producer<byte[], byte[]> producer = createProducer()) {
shareConsumer.subscribe(Collections.singleton(tp.topic()));
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
@@ -1581,11 +1635,12 @@ public class ShareConsumerTest {
}
}
- @Test
+ @ClusterTest
public void testShareAutoOffsetResetEarliest() {
+ setup();
alterShareAutoOffsetReset("group1", "earliest");
- try (KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
- KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer())) {
+ try (ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1");
+ Producer<byte[], byte[]> producer = createProducer()) {
shareConsumer.subscribe(Collections.singleton(tp.topic()));
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
@@ -1606,11 +1661,15 @@ public class ShareConsumerTest {
}
}
- @Test
+ @ClusterTest
public void testShareAutoOffsetResetEarliestAfterLsoMovement() {
+ setup();
alterShareAutoOffsetReset("group1", "earliest");
- try (KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
- KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer())) {
+ try (
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1");
+ Producer<byte[], byte[]> producer = createProducer();
+ Admin adminClient = createAdminClient()
+ ) {
shareConsumer.subscribe(Collections.singleton(tp.topic()));
@@ -1630,13 +1689,14 @@ public class ShareConsumerTest {
}
}
- @Test
+ @ClusterTest
public void testShareAutoOffsetResetMultipleGroupsWithDifferentValue() {
+ setup();
alterShareAutoOffsetReset("group1", "earliest");
alterShareAutoOffsetReset("group2", "latest");
- try (KafkaShareConsumer<byte[], byte[]> shareConsumerEarliest =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
- KafkaShareConsumer<byte[], byte[]> shareConsumerLatest =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group2");
- KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer())) {
+ try (ShareConsumer<byte[], byte[]> shareConsumerEarliest =
createShareConsumer("group1");
+ ShareConsumer<byte[], byte[]> shareConsumerLatest =
createShareConsumer("group2");
+ Producer<byte[], byte[]> producer = createProducer()) {
shareConsumerEarliest.subscribe(Collections.singleton(tp.topic()));
@@ -1670,13 +1730,14 @@ public class ShareConsumerTest {
}
}
- @Test
+ @ClusterTest
public void testShareAutoOffsetResetByDuration() throws Exception {
+ setup();
// Set auto offset reset to 1 hour before current time
alterShareAutoOffsetReset("group1", "by_duration:PT1H");
- try (KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
- KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer())) {
+ try (ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1");
+ Producer<byte[], byte[]> producer = createProducer()) {
long currentTime = System.currentTimeMillis();
long twoHoursAgo = currentTime - TimeUnit.HOURS.toMillis(2);
@@ -1711,8 +1772,8 @@ public class ShareConsumerTest {
// Set the auto offset reset to 3 hours before current time
// so the consumer should consume all messages (3 records)
alterShareAutoOffsetReset("group2", "by_duration:PT3H");
- try (KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group2");
- KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer())) {
+ try (ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group2");
+ Producer<byte[], byte[]> producer = createProducer()) {
shareConsumer.subscribe(Collections.singleton(tp.topic()));
List<ConsumerRecord<byte[], byte[]>> records =
consumeRecords(shareConsumer, 3);
@@ -1721,29 +1782,32 @@ public class ShareConsumerTest {
}
}
- @Test
+ @ClusterTest
public void testShareAutoOffsetResetByDurationInvalidFormat() throws
Exception {
+ setup();
// Test invalid duration format
ConfigResource configResource = new
ConfigResource(ConfigResource.Type.GROUP, "group1");
Map<ConfigResource, Collection<AlterConfigOp>> alterEntries = new
HashMap<>();
// Test invalid duration format
- alterEntries.put(configResource, List.of(new AlterConfigOp(new
ConfigEntry(
- GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "by_duration:1h"),
AlterConfigOp.OpType.SET)));
- ExecutionException e1 = assertThrows(ExecutionException.class, () ->
- adminClient.incrementalAlterConfigs(alterEntries).all().get());
- assertInstanceOf(InvalidConfigurationException.class, e1.getCause());
+ try (Admin adminClient = createAdminClient()) {
+ alterEntries.put(configResource, List.of(new AlterConfigOp(new
ConfigEntry(
+ GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "by_duration:1h"),
AlterConfigOp.OpType.SET)));
+ ExecutionException e1 = assertThrows(ExecutionException.class, ()
->
+ adminClient.incrementalAlterConfigs(alterEntries).all().get());
+ assertInstanceOf(InvalidConfigurationException.class,
e1.getCause());
- // Test negative duration
- alterEntries.put(configResource, List.of(new AlterConfigOp(new
ConfigEntry(
- GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "by_duration:-PT1H"),
AlterConfigOp.OpType.SET)));
- ExecutionException e2 = assertThrows(ExecutionException.class, () ->
- adminClient.incrementalAlterConfigs(alterEntries).all().get());
- assertInstanceOf(InvalidConfigurationException.class, e2.getCause());
+ // Test negative duration
+ alterEntries.put(configResource, List.of(new AlterConfigOp(new
ConfigEntry(
+ GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG,
"by_duration:-PT1H"), AlterConfigOp.OpType.SET)));
+ ExecutionException e2 = assertThrows(ExecutionException.class, ()
->
+ adminClient.incrementalAlterConfigs(alterEntries).all().get());
+ assertInstanceOf(InvalidConfigurationException.class,
e2.getCause());
+ }
}
private int produceMessages(int messageCount) {
- try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer())) {
+ try (Producer<byte[], byte[]> producer = createProducer()) {
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
IntStream.range(0, messageCount).forEach(__ ->
producer.send(record));
producer.flush();
@@ -1752,7 +1816,7 @@ public class ShareConsumerTest {
}
private void produceMessagesWithTimestamp(int messageCount, long
startingTimestamp) {
- try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer())) {
+ try (Producer<byte[], byte[]> producer = createProducer()) {
for (int i = 0; i < messageCount; i++) {
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), startingTimestamp + i,
("key " + i).getBytes(), ("value " + i).getBytes());
@@ -1769,8 +1833,8 @@ public class ShareConsumerTest {
int maxPolls,
boolean commit) {
return assertDoesNotThrow(() -> {
- try (KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(
- new ByteArrayDeserializer(), new ByteArrayDeserializer(),
groupId)) {
+ try (ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(
+ groupId)) {
shareConsumer.subscribe(Collections.singleton(tp.topic()));
return consumeMessages(shareConsumer, totalMessagesConsumed,
totalMessages, consumerNumber, maxPolls, commit);
}
@@ -1785,8 +1849,8 @@ public class ShareConsumerTest {
boolean commit,
int maxFetchBytes) {
return assertDoesNotThrow(() -> {
- try (KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(
- new ByteArrayDeserializer(), new ByteArrayDeserializer(),
groupId,
+ try (ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(
+ groupId,
Map.of(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
maxFetchBytes))) {
shareConsumer.subscribe(Collections.singleton(tp.topic()));
return consumeMessages(shareConsumer, totalMessagesConsumed,
totalMessages, consumerNumber, maxPolls, commit);
@@ -1794,7 +1858,7 @@ public class ShareConsumerTest {
}, "Consumer " + consumerNumber + " failed with exception");
}
- private int consumeMessages(KafkaShareConsumer<byte[], byte[]> consumer,
+ private int consumeMessages(ShareConsumer<byte[], byte[]> consumer,
AtomicInteger totalMessagesConsumed,
int totalMessages,
int consumerNumber,
@@ -1827,7 +1891,7 @@ public class ShareConsumerTest {
}, "Consumer " + consumerNumber + " failed with exception");
}
- private <K, V> List<ConsumerRecord<K, V>>
consumeRecords(KafkaShareConsumer<K, V> consumer,
+ private <K, V> List<ConsumerRecord<K, V>> consumeRecords(ShareConsumer<K,
V> consumer,
int numRecords) {
ArrayList<ConsumerRecord<K, V>> accumulatedRecords = new ArrayList<>();
long startTimeMs = System.currentTimeMillis();
@@ -1841,58 +1905,55 @@ public class ShareConsumerTest {
}
private void createTopic(String topicName) {
- Properties props = cluster.clientProperties();
assertDoesNotThrow(() -> {
- try (Admin admin = Admin.create(props)) {
+ try (Admin admin = createAdminClient()) {
admin.createTopics(Collections.singleton(new
NewTopic(topicName, 1, (short) 1))).all().get();
}
}, "Failed to create topic");
}
private void deleteTopic(String topicName) {
- Properties props = cluster.clientProperties();
assertDoesNotThrow(() -> {
- try (Admin admin = Admin.create(props)) {
+ try (Admin admin = createAdminClient()) {
admin.deleteTopics(Collections.singleton(topicName)).all().get();
}
}, "Failed to delete topic");
}
private Admin createAdminClient() {
- Properties props = cluster.clientProperties();
- return Admin.create(props);
+ return cluster.admin();
}
- private <K, V> KafkaProducer<K, V> createProducer(Serializer<K>
keySerializer,
- Serializer<V>
valueSerializer) {
- Properties props = cluster.clientProperties();
- return new KafkaProducer<>(props, keySerializer, valueSerializer);
+ private <K, V> Producer<K, V> createProducer() {
+ return createProducer(Map.of());
}
- private <K, V> KafkaProducer<K, V> createProducer(Serializer<K>
keySerializer,
- Serializer<V>
valueSerializer,
- String transactionalId) {
- Properties props = cluster.clientProperties();
- props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
- return new KafkaProducer<>(props, keySerializer, valueSerializer);
+ private <K, V> Producer<K, V> createProducer(Map<String, Object> config) {
+ return cluster.producer(config);
}
- private <K, V> KafkaShareConsumer<K, V>
createShareConsumer(Deserializer<K> keyDeserializer,
-
Deserializer<V> valueDeserializer,
- String
groupId) {
- Properties props = cluster.clientProperties();
- props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
- return new KafkaShareConsumer<>(props, keyDeserializer,
valueDeserializer);
+ private <K, V> Producer<K, V> createProducer(String transactionalId) {
+ return createProducer(
+ Map.of(
+ ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId
+ )
+ );
}
- private <K, V> KafkaShareConsumer<K, V>
createShareConsumer(Deserializer<K> keyDeserializer,
-
Deserializer<V> valueDeserializer,
- String groupId,
- Map<?, ?>
additionalProperties) {
- Properties props = cluster.clientProperties();
- props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ private <K, V> ShareConsumer<K, V> createShareConsumer(String groupId) {
+ return createShareConsumer(groupId, Map.of());
+ }
+
+ private <K, V> ShareConsumer<K, V> createShareConsumer(
+ String groupId,
+ Map<?, ?> additionalProperties
+ ) {
+ Properties props = new Properties();
props.putAll(additionalProperties);
- return new KafkaShareConsumer<>(props, keyDeserializer,
valueDeserializer);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ Map<String, Object> conf = new HashMap<>();
+ props.forEach((k, v) -> conf.put((String) k, v));
+ return cluster.shareConsumer(conf);
}
private void warmup() throws InterruptedException {
@@ -1901,8 +1962,8 @@ public class ShareConsumerTest {
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(warmupTp.topic(), warmupTp.partition(), null,
"key".getBytes(), "value".getBytes());
Set<String> subscription = Collections.singleton(warmupTp.topic());
alterShareAutoOffsetReset("warmupgroup1", "earliest");
- try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
- KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"warmupgroup1")) {
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("warmupgroup1")) {
producer.send(record);
producer.flush();
@@ -1921,12 +1982,7 @@ public class ShareConsumerTest {
private void verifyShareGroupStateTopicRecordsProduced() {
try {
- Map<String, Object> consumerConfigs = new HashMap<>();
- consumerConfigs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
cluster.bootstrapServers());
- consumerConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
-
consumerConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
-
- try (KafkaConsumer<byte[], byte[]> consumer = new
KafkaConsumer<>(consumerConfigs)) {
+ try (Consumer<byte[], byte[]> consumer = cluster.consumer()) {
consumer.assign(sgsTopicPartitions);
consumer.seekToBeginning(sgsTopicPartitions);
Set<ConsumerRecord<byte[], byte[]>> records = new HashSet<>();
@@ -1953,8 +2009,10 @@ public class ShareConsumerTest {
alterEntries.put(configResource, List.of(new AlterConfigOp(new
ConfigEntry(
GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, newValue),
AlterConfigOp.OpType.SET)));
AlterConfigsOptions alterOptions = new AlterConfigsOptions();
- assertDoesNotThrow(() ->
adminClient.incrementalAlterConfigs(alterEntries, alterOptions)
+ try (Admin adminClient = createAdminClient()) {
+ assertDoesNotThrow(() ->
adminClient.incrementalAlterConfigs(alterEntries, alterOptions)
.all()
.get(60, TimeUnit.SECONDS), "Failed to alter configs");
+ }
}
}
diff --git
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
index 1c8551bf9e6..eb97c2c92a1 100644
---
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
+++
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
@@ -32,6 +32,8 @@ import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.KafkaShareConsumer;
+import org.apache.kafka.clients.consumer.ShareConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -188,6 +190,20 @@ public interface ClusterInstance {
return consumer(Map.of());
}
+ default <K, V> ShareConsumer<K, V> shareConsumer() {
+ return shareConsumer(Map.of());
+ }
+
+ default <K, V> ShareConsumer<K, V> shareConsumer(Map<String, Object>
configs) {
+ Map<String, Object> props = new HashMap<>(configs);
+ props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
+ props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
+ props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "group_" +
TestUtils.randomString(5));
+ props.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers());
+ return new KafkaShareConsumer<>(setClientSaslConfig(props));
+ }
+
default Admin admin(Map<String, Object> configs, boolean
usingBootstrapControllers) {
Map<String, Object> props = new HashMap<>(configs);
if (usingBootstrapControllers) {