This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 632aedcf4ff KAFKA-18632: Multibroker test improvements. (#18718)
632aedcf4ff is described below
commit 632aedcf4ff7e718bf1295fe5132ebaf1d3f92a0
Author: Sushant Mahajan <[email protected]>
AuthorDate: Wed Jan 29 22:33:43 2025 +0530
KAFKA-18632: Multibroker test improvements. (#18718)
Reviewers: Andrew Schofield <[email protected]>
---
.../ShareCoordinatorMetadataCacheHelperImpl.java | 57 +++--
.../java/kafka/test/api/ShareConsumerTest.java | 274 ++++++++++++++-------
2 files changed, 213 insertions(+), 118 deletions(-)
diff --git
a/core/src/main/scala/kafka/server/metadata/ShareCoordinatorMetadataCacheHelperImpl.java
b/core/src/main/scala/kafka/server/metadata/ShareCoordinatorMetadataCacheHelperImpl.java
index 28148eab7ff..40dcac8ca0b 100644
---
a/core/src/main/scala/kafka/server/metadata/ShareCoordinatorMetadataCacheHelperImpl.java
+++
b/core/src/main/scala/kafka/server/metadata/ShareCoordinatorMetadataCacheHelperImpl.java
@@ -20,6 +20,7 @@ package kafka.server.metadata;
import kafka.server.MetadataCache;
import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.Errors;
@@ -27,6 +28,9 @@ import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.server.share.SharePartitionKey;
import
org.apache.kafka.server.share.persister.ShareCoordinatorMetadataCacheHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
@@ -41,6 +45,7 @@ public class ShareCoordinatorMetadataCacheHelperImpl
implements ShareCoordinator
private final MetadataCache metadataCache;
private final Function<SharePartitionKey, Integer> keyToPartitionMapper;
private final ListenerName interBrokerListenerName;
+ private final Logger log =
LoggerFactory.getLogger(ShareCoordinatorMetadataCacheHelperImpl.class);
public ShareCoordinatorMetadataCacheHelperImpl(
MetadataCache metadataCache,
@@ -63,35 +68,39 @@ public class ShareCoordinatorMetadataCacheHelperImpl
implements ShareCoordinator
@Override
public Node getShareCoordinator(SharePartitionKey key, String
internalTopicName) {
- if (metadataCache.contains(internalTopicName)) {
- Set<String> topicSet = new HashSet<>();
- topicSet.add(internalTopicName);
-
- List<MetadataResponseData.MetadataResponseTopic> topicMetadata =
CollectionConverters.asJava(
- metadataCache.getTopicMetadata(
- CollectionConverters.asScala(topicSet),
- interBrokerListenerName,
- false,
- false
- )
- );
+ try {
+ if (metadataCache.contains(internalTopicName)) {
+ Set<String> topicSet = new HashSet<>();
+ topicSet.add(internalTopicName);
- if (topicMetadata == null || topicMetadata.isEmpty() ||
topicMetadata.get(0).errorCode() != Errors.NONE.code()) {
- return Node.noNode();
- } else {
- int partition = keyToPartitionMapper.apply(key);
- Optional<MetadataResponseData.MetadataResponsePartition>
response = topicMetadata.get(0).partitions().stream()
- .filter(responsePart -> responsePart.partitionIndex() ==
partition
- && responsePart.leaderId() !=
MetadataResponse.NO_LEADER_ID)
- .findFirst();
+ List<MetadataResponseData.MetadataResponseTopic> topicMetadata
= CollectionConverters.asJava(
+ metadataCache.getTopicMetadata(
+ CollectionConverters.asScala(topicSet),
+ interBrokerListenerName,
+ false,
+ false
+ )
+ );
- if (response.isPresent()) {
- return
OptionConverters.toJava(metadataCache.getAliveBrokerNode(response.get().leaderId(),
interBrokerListenerName))
- .orElse(Node.noNode());
- } else {
+ if (topicMetadata == null || topicMetadata.isEmpty() ||
topicMetadata.get(0).errorCode() != Errors.NONE.code()) {
return Node.noNode();
+ } else {
+ int partition = keyToPartitionMapper.apply(key);
+ Optional<MetadataResponseData.MetadataResponsePartition>
response = topicMetadata.get(0).partitions().stream()
+ .filter(responsePart -> responsePart.partitionIndex()
== partition
+ && responsePart.leaderId() !=
MetadataResponse.NO_LEADER_ID)
+ .findFirst();
+
+ if (response.isPresent()) {
+ return
OptionConverters.toJava(metadataCache.getAliveBrokerNode(response.get().leaderId(),
interBrokerListenerName))
+ .orElse(Node.noNode());
+ } else {
+ return Node.noNode();
+ }
}
}
+ } catch (CoordinatorNotAvailableException e) {
+ log.warn("Coordinator not available", e);
}
return Node.noNode();
}
diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java
b/core/src/test/java/kafka/test/api/ShareConsumerTest.java
index 0de4f180444..f5761003e10 100644
--- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java
+++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java
@@ -55,6 +55,8 @@ import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
@@ -88,6 +90,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -1832,86 +1835,142 @@ public class ShareConsumerTest {
@ClusterConfigProperty(key = "unstable.api.versions.enable", value
= "true")
}
)
+ @Timeout(90)
public void testShareConsumerAfterCoordinatorMovement() throws Exception {
setup();
String topicName = "multipart";
String groupId = "multipartGrp";
Uuid topicId = createTopic(topicName, 3, 3);
alterShareAutoOffsetReset(groupId, "earliest");
-
- try (Admin admin = createAdminClient()) {
- TopicPartition tpMulti = new TopicPartition(topicName, 0);
-
- // produce some messages
- try (Producer<byte[], byte[]> producer = createProducer()) {
- ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
- tpMulti.topic(),
- tpMulti.partition(),
- null,
- "key".getBytes(),
- "value".getBytes()
- );
- IntStream.range(0, 10).forEach(__ -> producer.send(record));
- producer.flush();
- }
-
- // consume messages
- try (ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(groupId)) {
- shareConsumer.subscribe(List.of(topicName));
- ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(5000));
- assertEquals(10, records.count());
+ ScheduledExecutorService service = Executors.newScheduledThreadPool(5);
+
+ TopicPartition tpMulti = new TopicPartition(topicName, 0);
+
+ // produce some messages
+ ClientState prodState = new ClientState();
+ final Set<String> produced = new HashSet<>();
+ service.execute(() -> {
+ int i = 0;
+ try (Producer<String, String> producer = createProducer(Map.of(
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName(),
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName()
+ ))) {
+ while (!prodState.done().get()) {
+ String key = "key-" + (i++);
+ ProducerRecord<String, String> record = new
ProducerRecord<>(
+ tpMulti.topic(),
+ tpMulti.partition(),
+ null,
+ key,
+ "value"
+ );
+ try {
+ producer.send(record);
+ producer.flush();
+ // count only correctly produced records
+ prodState.count().incrementAndGet();
+ produced.add(key);
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ }
}
+ );
- // get current share coordinator node
- SharePartitionKey key = SharePartitionKey.getInstance(groupId, new
TopicIdPartition(topicId, tpMulti));
- int shareGroupStateTp =
Utils.abs(key.asCoordinatorKey().hashCode()) % 3;
- List<Integer> curShareCoordNodeId =
admin.describeTopics(List.of(Topic.SHARE_GROUP_STATE_TOPIC_NAME)).allTopicNames().get().get(Topic.SHARE_GROUP_STATE_TOPIC_NAME)
- .partitions().stream()
- .filter(info -> info.partition() == shareGroupStateTp)
- .map(info -> info.leader().id())
- .toList();
-
- assertEquals(1, curShareCoordNodeId.size());
+ // consume messages - start after small delay
+ ClientState consState = new ClientState();
+ // using map here if we want to debug specific keys
+ Map<String, Integer> consumed = new HashMap<>();
+ service.schedule(() -> {
+ try (ShareConsumer<String, String> shareConsumer =
createShareConsumer(groupId, Map.of(
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName(),
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName()
+ ))) {
+ shareConsumer.subscribe(List.of(topicName));
+ while (!consState.done().get()) {
+ ConsumerRecords<String, String> records =
shareConsumer.poll(Duration.ofMillis(2000L));
+ consState.count().addAndGet(records.count());
+ records.forEach(rec -> consumed.compute(rec.key(), (k,
v) -> v == null ? 1 : v + 1));
+ if (prodState.done().get() && records.count() == 0) {
+ consState.done().set(true);
+ }
+ }
+ }
+ }, 100L, TimeUnit.MILLISECONDS
+ );
- // shutdown the coordinator
- KafkaBroker broker =
cluster.brokers().get(curShareCoordNodeId.get(0));
- cluster.shutdownBroker(curShareCoordNodeId.get(0));
+ // To be closer to real world scenarios, we will execute after
+ // some time has elapsed since the producer and consumer started
+ // working.
+ service.schedule(() -> {
+ // Get the current node hosting the __share_group_state
partition
+ // on which tpMulti is hosted. Then shut down this node and
wait
+ // for it to be gracefully shutdown. Then fetch the
coordinator again
+ // and verify that it has moved to some other broker.
+ try (Admin admin = createAdminClient()) {
+ SharePartitionKey key =
SharePartitionKey.getInstance(groupId, new TopicIdPartition(topicId, tpMulti));
+ int shareGroupStateTp =
Utils.abs(key.asCoordinatorKey().hashCode()) % 3;
+ List<Integer> curShareCoordNodeId = null;
+ try {
+ curShareCoordNodeId =
admin.describeTopics(List.of(Topic.SHARE_GROUP_STATE_TOPIC_NAME)).allTopicNames().get().get(Topic.SHARE_GROUP_STATE_TOPIC_NAME)
+ .partitions().stream()
+ .filter(info -> info.partition() ==
shareGroupStateTp)
+ .map(info -> info.leader().id())
+ .toList();
+ } catch (Exception e) {
+ fail(e);
+ }
+ assertEquals(1, curShareCoordNodeId.size());
+
+ // shutdown the coordinator
+ KafkaBroker broker =
cluster.brokers().get(curShareCoordNodeId.get(0));
+ cluster.shutdownBroker(curShareCoordNodeId.get(0));
+
+ // wait for it to be completely shutdown
+ broker.awaitShutdown();
+
+ List<Integer> newShareCoordNodeId = null;
+ try {
+ newShareCoordNodeId =
admin.describeTopics(List.of(Topic.SHARE_GROUP_STATE_TOPIC_NAME)).allTopicNames().get().get(Topic.SHARE_GROUP_STATE_TOPIC_NAME)
+ .partitions().stream()
+ .filter(info -> info.partition() ==
shareGroupStateTp)
+ .map(info -> info.leader().id())
+ .toList();
+ } catch (Exception e) {
+ fail(e);
+ }
- // give some breathing time
- broker.awaitShutdown();
+ assertEquals(1, newShareCoordNodeId.size());
+ assertNotEquals(curShareCoordNodeId.get(0),
newShareCoordNodeId.get(0));
+ }
+ }, 5L, TimeUnit.SECONDS
+ );
- List<Integer> newShareCoordNodeId =
admin.describeTopics(List.of(Topic.SHARE_GROUP_STATE_TOPIC_NAME)).allTopicNames().get().get(Topic.SHARE_GROUP_STATE_TOPIC_NAME)
- .partitions().stream()
- .filter(info -> info.partition() == shareGroupStateTp)
- .map(info -> info.leader().id())
- .toList();
+ // top the producer after some time (but after coordinator shutdown)
+ service.schedule(() -> {
+ prodState.done().set(true);
+ }, 10L, TimeUnit.SECONDS
+ );
- assertEquals(1, newShareCoordNodeId.size());
- assertNotEquals(curShareCoordNodeId.get(0),
newShareCoordNodeId.get(0));
+ // wait for both producer and consumer to finish
+ TestUtils.waitForCondition(
+ () -> prodState.done().get() && consState.done().get(),
+ 45_000L,
+ 500L,
+ () -> "prod/cons not done yet"
+ );
- // again produce to same topic partition
- try (Producer<byte[], byte[]> producer = createProducer()) {
- ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
- tpMulti.topic(),
- tpMulti.partition(),
- null,
- "key".getBytes(),
- "value".getBytes()
- );
- IntStream.range(0, 10).forEach(__ -> producer.send(record));
- producer.flush();
- }
+ // Make sure we consumed all records. Consumed records could be higher
+ // due to re-delivery but that is expected since we are only
guaranteeing
+ // at least once semantics.
+ assertTrue(prodState.count().get() <= consState.count().get());
+ Set<String> consumedKeys = consumed.keySet();
+ assertTrue(produced.containsAll(consumedKeys) &&
consumedKeys.containsAll(produced));
- // consume messages should only be possible if partition and share
coord has moved
- // from shutdown broker since we are only producing to partition 0
of topic.
- try (ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(groupId)) {
- shareConsumer.subscribe(List.of(topicName));
- ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(5000));
- assertEquals(20, records.count());
- }
+ shutdownExecutorService(service);
- verifyShareGroupStateTopicRecordsProduced();
- }
+ verifyShareGroupStateTopicRecordsProduced();
}
@ClusterTest(
@@ -1929,6 +1988,8 @@ public class ShareConsumerTest {
@ClusterConfigProperty(key = "unstable.api.versions.enable", value
= "true")
}
)
+ @Timeout(150)
+ @Flaky("KAFKA-18665")
public void testComplexShareConsumer() throws Exception {
setup();
String topicName = "multipart";
@@ -1936,19 +1997,18 @@ public class ShareConsumerTest {
createTopic(topicName, 3, 3);
TopicPartition multiTp = new TopicPartition(topicName, 0);
- ExecutorService executer = Executors.newCachedThreadPool();
+ ScheduledExecutorService service = Executors.newScheduledThreadPool(5);
- AtomicBoolean prodDone = new AtomicBoolean(false);
- AtomicInteger sentCount = new AtomicInteger(0);
+ ClientState prodState = new ClientState();
// produce messages until we want
- executer.execute(() -> {
+ service.execute(() -> {
try (Producer<byte[], byte[]> producer = createProducer()) {
- while (!prodDone.get()) {
+ while (!prodState.done().get()) {
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(multiTp.topic(), multiTp.partition(), null, "key".getBytes(),
"value".getBytes());
producer.send(record);
producer.flush();
- sentCount.incrementAndGet();
+ prodState.count().incrementAndGet();
}
}
});
@@ -1961,28 +2021,45 @@ public class ShareConsumerTest {
Map.of()
);
- executer.execute(complexCons1);
+ service.schedule(
+ complexCons1,
+ 100L,
+ TimeUnit.MILLISECONDS
+ );
// let the complex consumer read the messages
- executer.execute(() -> {
- try {
- TimeUnit.SECONDS.sleep(10L);
- prodDone.set(true);
- } catch (InterruptedException e) {
- // ignore
- }
- });
+ service.schedule(() -> {
+ prodState.done().set(true);
+ }, 10L, TimeUnit.SECONDS
+ );
// all messages which can be read are read, some would be redelivered
- TestUtils.waitForCondition(complexCons1::isDone, 30_000L, () -> "did
not close!");
- assertTrue(sentCount.get() < complexCons1.recordsRead());
+ TestUtils.waitForCondition(complexCons1::isDone, 45_000L, () -> "did
not close!");
- executer.shutdown();
- executer.shutdownNow();
+ assertTrue(prodState.count().get() < complexCons1.recordsRead());
+
+ shutdownExecutorService(service);
verifyShareGroupStateTopicRecordsProduced();
}
+ /**
+ * Util class to encapsulate state for a consumer/producer
+ * being executed by an {@link ExecutorService}.
+ */
+ private static class ClientState {
+ private final AtomicBoolean done = new AtomicBoolean(false);
+ private final AtomicInteger count = new AtomicInteger(0);
+
+ AtomicBoolean done() {
+ return done;
+ }
+
+ AtomicInteger count() {
+ return count;
+ }
+ }
+
private int produceMessages(int messageCount) {
try (Producer<byte[], byte[]> producer = createProducer()) {
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
@@ -2217,9 +2294,7 @@ public class ShareConsumerTest {
private final String topicName;
private final Map<String, Object> configs = new HashMap<>();
- private final AtomicBoolean isDone = new AtomicBoolean(false);
- private final AtomicBoolean shouldLoop = new AtomicBoolean(true);
- private final AtomicInteger readCount = new AtomicInteger(0);
+ private final ClientState state = new ClientState();
private final Predicate<ConsumerRecords<K, V>> exitCriteria;
private final BiConsumer<ShareConsumer<K, V>, ConsumerRecord<K, V>>
processFunc;
@@ -2267,31 +2342,42 @@ public class ShareConsumerTest {
}
void stop() {
- shouldLoop.set(false);
+ state.done().set(true);
}
@Override
public void run() {
try (ShareConsumer<K, V> consumer = new
KafkaShareConsumer<>(configs)) {
consumer.subscribe(Set.of(this.topicName));
- while (shouldLoop.get()) {
+ while (!state.done().get()) {
ConsumerRecords<K, V> records =
consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
- readCount.addAndGet(records.count());
+ state.count().addAndGet(records.count());
if (exitCriteria.test(records)) {
- break;
+ state.done().set(true);
}
records.forEach(record -> processFunc.accept(consumer,
record));
}
}
- isDone.set(true);
}
boolean isDone() {
- return isDone.get();
+ return state.done().get();
}
int recordsRead() {
- return readCount.get();
+ return state.count().get();
+ }
+ }
+
+ private void shutdownExecutorService(ExecutorService service) {
+ service.shutdown();
+ try {
+ if (!service.awaitTermination(5L, TimeUnit.SECONDS)) {
+ service.shutdownNow();
+ }
+ } catch (Exception e) {
+ service.shutdownNow();
+ Thread.currentThread().interrupt();
}
}
}