This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 632aedcf4ff KAFKA-18632: Multibroker test improvements. (#18718)
632aedcf4ff is described below

commit 632aedcf4ff7e718bf1295fe5132ebaf1d3f92a0
Author: Sushant Mahajan <[email protected]>
AuthorDate: Wed Jan 29 22:33:43 2025 +0530

    KAFKA-18632: Multibroker test improvements. (#18718)
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../ShareCoordinatorMetadataCacheHelperImpl.java   |  57 +++--
 .../java/kafka/test/api/ShareConsumerTest.java     | 274 ++++++++++++++-------
 2 files changed, 213 insertions(+), 118 deletions(-)

diff --git 
a/core/src/main/scala/kafka/server/metadata/ShareCoordinatorMetadataCacheHelperImpl.java
 
b/core/src/main/scala/kafka/server/metadata/ShareCoordinatorMetadataCacheHelperImpl.java
index 28148eab7ff..40dcac8ca0b 100644
--- 
a/core/src/main/scala/kafka/server/metadata/ShareCoordinatorMetadataCacheHelperImpl.java
+++ 
b/core/src/main/scala/kafka/server/metadata/ShareCoordinatorMetadataCacheHelperImpl.java
@@ -20,6 +20,7 @@ package kafka.server.metadata;
 import kafka.server.MetadataCache;
 
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
 import org.apache.kafka.common.message.MetadataResponseData;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.protocol.Errors;
@@ -27,6 +28,9 @@ import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.server.share.SharePartitionKey;
 import 
org.apache.kafka.server.share.persister.ShareCoordinatorMetadataCacheHelper;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
@@ -41,6 +45,7 @@ public class ShareCoordinatorMetadataCacheHelperImpl 
implements ShareCoordinator
     private final MetadataCache metadataCache;
     private final Function<SharePartitionKey, Integer> keyToPartitionMapper;
     private final ListenerName interBrokerListenerName;
+    private final Logger log = 
LoggerFactory.getLogger(ShareCoordinatorMetadataCacheHelperImpl.class);
 
     public ShareCoordinatorMetadataCacheHelperImpl(
         MetadataCache metadataCache,
@@ -63,35 +68,39 @@ public class ShareCoordinatorMetadataCacheHelperImpl 
implements ShareCoordinator
 
     @Override
     public Node getShareCoordinator(SharePartitionKey key, String 
internalTopicName) {
-        if (metadataCache.contains(internalTopicName)) {
-            Set<String> topicSet = new HashSet<>();
-            topicSet.add(internalTopicName);
-
-            List<MetadataResponseData.MetadataResponseTopic> topicMetadata = 
CollectionConverters.asJava(
-                metadataCache.getTopicMetadata(
-                    CollectionConverters.asScala(topicSet),
-                    interBrokerListenerName,
-                    false,
-                    false
-                )
-            );
+        try {
+            if (metadataCache.contains(internalTopicName)) {
+                Set<String> topicSet = new HashSet<>();
+                topicSet.add(internalTopicName);
 
-            if (topicMetadata == null || topicMetadata.isEmpty() || 
topicMetadata.get(0).errorCode() != Errors.NONE.code()) {
-                return Node.noNode();
-            } else {
-                int partition = keyToPartitionMapper.apply(key);
-                Optional<MetadataResponseData.MetadataResponsePartition> 
response = topicMetadata.get(0).partitions().stream()
-                    .filter(responsePart -> responsePart.partitionIndex() == 
partition
-                        && responsePart.leaderId() != 
MetadataResponse.NO_LEADER_ID)
-                    .findFirst();
+                List<MetadataResponseData.MetadataResponseTopic> topicMetadata 
= CollectionConverters.asJava(
+                    metadataCache.getTopicMetadata(
+                        CollectionConverters.asScala(topicSet),
+                        interBrokerListenerName,
+                        false,
+                        false
+                    )
+                );
 
-                if (response.isPresent()) {
-                    return 
OptionConverters.toJava(metadataCache.getAliveBrokerNode(response.get().leaderId(),
 interBrokerListenerName))
-                        .orElse(Node.noNode());
-                } else {
+                if (topicMetadata == null || topicMetadata.isEmpty() || 
topicMetadata.get(0).errorCode() != Errors.NONE.code()) {
                     return Node.noNode();
+                } else {
+                    int partition = keyToPartitionMapper.apply(key);
+                    Optional<MetadataResponseData.MetadataResponsePartition> 
response = topicMetadata.get(0).partitions().stream()
+                        .filter(responsePart -> responsePart.partitionIndex() 
== partition
+                            && responsePart.leaderId() != 
MetadataResponse.NO_LEADER_ID)
+                        .findFirst();
+
+                    if (response.isPresent()) {
+                        return 
OptionConverters.toJava(metadataCache.getAliveBrokerNode(response.get().leaderId(),
 interBrokerListenerName))
+                            .orElse(Node.noNode());
+                    } else {
+                        return Node.noNode();
+                    }
                 }
             }
+        } catch (CoordinatorNotAvailableException e) {
+            log.warn("Coordinator not available", e);
         }
         return Node.noNode();
     }
diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java 
b/core/src/test/java/kafka/test/api/ShareConsumerTest.java
index 0de4f180444..f5761003e10 100644
--- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java
+++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java
@@ -55,6 +55,8 @@ import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.test.ClusterInstance;
 import org.apache.kafka.common.test.api.ClusterConfigProperty;
 import org.apache.kafka.common.test.api.ClusterTest;
@@ -88,6 +90,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -1832,86 +1835,142 @@ public class ShareConsumerTest {
             @ClusterConfigProperty(key = "unstable.api.versions.enable", value 
= "true")
         }
     )
+    @Timeout(90)
     public void testShareConsumerAfterCoordinatorMovement() throws Exception {
         setup();
         String topicName = "multipart";
         String groupId = "multipartGrp";
         Uuid topicId = createTopic(topicName, 3, 3);
         alterShareAutoOffsetReset(groupId, "earliest");
-
-        try (Admin admin = createAdminClient()) {
-            TopicPartition tpMulti = new TopicPartition(topicName, 0);
-
-            // produce some messages
-            try (Producer<byte[], byte[]> producer = createProducer()) {
-                ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
-                    tpMulti.topic(),
-                    tpMulti.partition(),
-                    null,
-                    "key".getBytes(),
-                    "value".getBytes()
-                );
-                IntStream.range(0, 10).forEach(__ -> producer.send(record));
-                producer.flush();
-            }
-
-            // consume messages
-            try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(groupId)) {
-                shareConsumer.subscribe(List.of(topicName));
-                ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
-                assertEquals(10, records.count());
+        ScheduledExecutorService service = Executors.newScheduledThreadPool(5);
+
+        TopicPartition tpMulti = new TopicPartition(topicName, 0);
+
+        // produce some messages
+        ClientState prodState = new ClientState();
+        final Set<String> produced = new HashSet<>();
+        service.execute(() -> {
+                int i = 0;
+                try (Producer<String, String> producer = createProducer(Map.of(
+                    ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName(),
+                    ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName()
+                ))) {
+                    while (!prodState.done().get()) {
+                        String key = "key-" + (i++);
+                        ProducerRecord<String, String> record = new 
ProducerRecord<>(
+                            tpMulti.topic(),
+                            tpMulti.partition(),
+                            null,
+                            key,
+                            "value"
+                        );
+                        try {
+                            producer.send(record);
+                            producer.flush();
+                            // count only correctly produced records
+                            prodState.count().incrementAndGet();
+                            produced.add(key);
+                        } catch (Exception e) {
+                            // ignore
+                        }
+                    }
+                }
             }
+        );
 
-            // get current share coordinator node
-            SharePartitionKey key = SharePartitionKey.getInstance(groupId, new 
TopicIdPartition(topicId, tpMulti));
-            int shareGroupStateTp = 
Utils.abs(key.asCoordinatorKey().hashCode()) % 3;
-            List<Integer> curShareCoordNodeId = 
admin.describeTopics(List.of(Topic.SHARE_GROUP_STATE_TOPIC_NAME)).allTopicNames().get().get(Topic.SHARE_GROUP_STATE_TOPIC_NAME)
-                .partitions().stream()
-                .filter(info -> info.partition() == shareGroupStateTp)
-                .map(info -> info.leader().id())
-                .toList();
-
-            assertEquals(1, curShareCoordNodeId.size());
+        // consume messages - start after small delay
+        ClientState consState = new ClientState();
+        // using map here if we want to debug specific keys
+        Map<String, Integer> consumed = new HashMap<>();
+        service.schedule(() -> {
+                try (ShareConsumer<String, String> shareConsumer = 
createShareConsumer(groupId, Map.of(
+                    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName(),
+                    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName()
+                ))) {
+                    shareConsumer.subscribe(List.of(topicName));
+                    while (!consState.done().get()) {
+                        ConsumerRecords<String, String> records = 
shareConsumer.poll(Duration.ofMillis(2000L));
+                        consState.count().addAndGet(records.count());
+                        records.forEach(rec -> consumed.compute(rec.key(), (k, 
v) -> v == null ? 1 : v + 1));
+                        if (prodState.done().get() && records.count() == 0) {
+                            consState.done().set(true);
+                        }
+                    }
+                }
+            }, 100L, TimeUnit.MILLISECONDS
+        );
 
-            // shutdown the coordinator
-            KafkaBroker broker = 
cluster.brokers().get(curShareCoordNodeId.get(0));
-            cluster.shutdownBroker(curShareCoordNodeId.get(0));
+        // To be closer to real world scenarios, we will execute after
+        // some time has elapsed since the producer and consumer started
+        // working.
+        service.schedule(() -> {
+                // Get the current node hosting the __share_group_state 
partition
+                // on which tpMulti is hosted. Then shut down this node and 
wait
+                // for it to be gracefully shutdown. Then fetch the 
coordinator again
+                // and verify that it has moved to some other broker.
+                try (Admin admin = createAdminClient()) {
+                    SharePartitionKey key = 
SharePartitionKey.getInstance(groupId, new TopicIdPartition(topicId, tpMulti));
+                    int shareGroupStateTp = 
Utils.abs(key.asCoordinatorKey().hashCode()) % 3;
+                    List<Integer> curShareCoordNodeId = null;
+                    try {
+                        curShareCoordNodeId = 
admin.describeTopics(List.of(Topic.SHARE_GROUP_STATE_TOPIC_NAME)).allTopicNames().get().get(Topic.SHARE_GROUP_STATE_TOPIC_NAME)
+                            .partitions().stream()
+                            .filter(info -> info.partition() == 
shareGroupStateTp)
+                            .map(info -> info.leader().id())
+                            .toList();
+                    } catch (Exception e) {
+                        fail(e);
+                    }
+                    assertEquals(1, curShareCoordNodeId.size());
+
+                    // shutdown the coordinator
+                    KafkaBroker broker = 
cluster.brokers().get(curShareCoordNodeId.get(0));
+                    cluster.shutdownBroker(curShareCoordNodeId.get(0));
+
+                    // wait for it to be completely shutdown
+                    broker.awaitShutdown();
+
+                    List<Integer> newShareCoordNodeId = null;
+                    try {
+                        newShareCoordNodeId = 
admin.describeTopics(List.of(Topic.SHARE_GROUP_STATE_TOPIC_NAME)).allTopicNames().get().get(Topic.SHARE_GROUP_STATE_TOPIC_NAME)
+                            .partitions().stream()
+                            .filter(info -> info.partition() == 
shareGroupStateTp)
+                            .map(info -> info.leader().id())
+                            .toList();
+                    } catch (Exception e) {
+                        fail(e);
+                    }
 
-            // give some breathing time
-            broker.awaitShutdown();
+                    assertEquals(1, newShareCoordNodeId.size());
+                    assertNotEquals(curShareCoordNodeId.get(0), 
newShareCoordNodeId.get(0));
+                }
+            }, 5L, TimeUnit.SECONDS
+        );
 
-            List<Integer> newShareCoordNodeId = 
admin.describeTopics(List.of(Topic.SHARE_GROUP_STATE_TOPIC_NAME)).allTopicNames().get().get(Topic.SHARE_GROUP_STATE_TOPIC_NAME)
-                .partitions().stream()
-                .filter(info -> info.partition() == shareGroupStateTp)
-                .map(info -> info.leader().id())
-                .toList();
+        // top the producer after some time (but after coordinator shutdown)
+        service.schedule(() -> {
+                prodState.done().set(true);
+            }, 10L, TimeUnit.SECONDS
+        );
 
-            assertEquals(1, newShareCoordNodeId.size());
-            assertNotEquals(curShareCoordNodeId.get(0), 
newShareCoordNodeId.get(0));
+        // wait for both producer and consumer to finish
+        TestUtils.waitForCondition(
+            () -> prodState.done().get() && consState.done().get(),
+            45_000L,
+            500L,
+            () -> "prod/cons not done yet"
+        );
 
-            // again produce to same topic partition
-            try (Producer<byte[], byte[]> producer = createProducer()) {
-                ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
-                    tpMulti.topic(),
-                    tpMulti.partition(),
-                    null,
-                    "key".getBytes(),
-                    "value".getBytes()
-                );
-                IntStream.range(0, 10).forEach(__ -> producer.send(record));
-                producer.flush();
-            }
+        // Make sure we consumed all records. Consumed records could be higher
+        // due to re-delivery but that is expected since we are only 
guaranteeing
+        // at least once semantics.
+        assertTrue(prodState.count().get() <= consState.count().get());
+        Set<String> consumedKeys = consumed.keySet();
+        assertTrue(produced.containsAll(consumedKeys) && 
consumedKeys.containsAll(produced));
 
-            // consume messages should only be possible if partition and share 
coord has moved
-            // from shutdown broker since we are only producing to partition 0 
of topic.
-            try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(groupId)) {
-                shareConsumer.subscribe(List.of(topicName));
-                ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
-                assertEquals(20, records.count());
-            }
+        shutdownExecutorService(service);
 
-            verifyShareGroupStateTopicRecordsProduced();
-        }
+        verifyShareGroupStateTopicRecordsProduced();
     }
 
     @ClusterTest(
@@ -1929,6 +1988,8 @@ public class ShareConsumerTest {
             @ClusterConfigProperty(key = "unstable.api.versions.enable", value 
= "true")
         }
     )
+    @Timeout(150)
+    @Flaky("KAFKA-18665")
     public void testComplexShareConsumer() throws Exception {
         setup();
         String topicName = "multipart";
@@ -1936,19 +1997,18 @@ public class ShareConsumerTest {
         createTopic(topicName, 3, 3);
         TopicPartition multiTp = new TopicPartition(topicName, 0);
 
-        ExecutorService executer = Executors.newCachedThreadPool();
+        ScheduledExecutorService service = Executors.newScheduledThreadPool(5);
 
-        AtomicBoolean prodDone = new AtomicBoolean(false);
-        AtomicInteger sentCount = new AtomicInteger(0);
+        ClientState prodState = new ClientState();
 
         // produce messages until we want
-        executer.execute(() -> {
+        service.execute(() -> {
             try (Producer<byte[], byte[]> producer = createProducer()) {
-                while (!prodDone.get()) {
+                while (!prodState.done().get()) {
                     ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(multiTp.topic(), multiTp.partition(), null, "key".getBytes(), 
"value".getBytes());
                     producer.send(record);
                     producer.flush();
-                    sentCount.incrementAndGet();
+                    prodState.count().incrementAndGet();
                 }
             }
         });
@@ -1961,28 +2021,45 @@ public class ShareConsumerTest {
             Map.of()
         );
 
-        executer.execute(complexCons1);
+        service.schedule(
+            complexCons1,
+            100L,
+            TimeUnit.MILLISECONDS
+        );
 
         // let the complex consumer read the messages
-        executer.execute(() -> {
-            try {
-                TimeUnit.SECONDS.sleep(10L);
-                prodDone.set(true);
-            } catch (InterruptedException e) {
-                // ignore
-            }
-        });
+        service.schedule(() -> {
+                prodState.done().set(true);
+            }, 10L, TimeUnit.SECONDS
+        );
 
         // all messages which can be read are read, some would be redelivered
-        TestUtils.waitForCondition(complexCons1::isDone, 30_000L, () -> "did 
not close!");
-        assertTrue(sentCount.get() < complexCons1.recordsRead());
+        TestUtils.waitForCondition(complexCons1::isDone, 45_000L, () -> "did 
not close!");
 
-        executer.shutdown();
-        executer.shutdownNow();
+        assertTrue(prodState.count().get() < complexCons1.recordsRead());
+
+        shutdownExecutorService(service);
 
         verifyShareGroupStateTopicRecordsProduced();
     }
 
+    /**
+     * Util class to encapsulate state for a consumer/producer
+     * being executed by an {@link ExecutorService}.
+     */
+    private static class ClientState {
+        private final AtomicBoolean done = new AtomicBoolean(false);
+        private final AtomicInteger count = new AtomicInteger(0);
+
+        AtomicBoolean done() {
+            return done;
+        }
+
+        AtomicInteger count() {
+            return count;
+        }
+    }
+
     private int produceMessages(int messageCount) {
         try (Producer<byte[], byte[]> producer = createProducer()) {
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
@@ -2217,9 +2294,7 @@ public class ShareConsumerTest {
 
         private final String topicName;
         private final Map<String, Object> configs = new HashMap<>();
-        private final AtomicBoolean isDone = new AtomicBoolean(false);
-        private final AtomicBoolean shouldLoop = new AtomicBoolean(true);
-        private final AtomicInteger readCount = new AtomicInteger(0);
+        private final ClientState state = new ClientState();
         private final Predicate<ConsumerRecords<K, V>> exitCriteria;
         private final BiConsumer<ShareConsumer<K, V>, ConsumerRecord<K, V>> 
processFunc;
 
@@ -2267,31 +2342,42 @@ public class ShareConsumerTest {
         }
 
         void stop() {
-            shouldLoop.set(false);
+            state.done().set(true);
         }
 
         @Override
         public void run() {
             try (ShareConsumer<K, V> consumer = new 
KafkaShareConsumer<>(configs)) {
                 consumer.subscribe(Set.of(this.topicName));
-                while (shouldLoop.get()) {
+                while (!state.done().get()) {
                     ConsumerRecords<K, V> records = 
consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
-                    readCount.addAndGet(records.count());
+                    state.count().addAndGet(records.count());
                     if (exitCriteria.test(records)) {
-                        break;
+                        state.done().set(true);
                     }
                     records.forEach(record -> processFunc.accept(consumer, 
record));
                 }
             }
-            isDone.set(true);
         }
 
         boolean isDone() {
-            return isDone.get();
+            return state.done().get();
         }
 
         int recordsRead() {
-            return readCount.get();
+            return state.count().get();
+        }
+    }
+
+    private void shutdownExecutorService(ExecutorService service) {
+        service.shutdown();
+        try {
+            if (!service.awaitTermination(5L, TimeUnit.SECONDS)) {
+                service.shutdownNow();
+            }
+        } catch (Exception e) {
+            service.shutdownNow();
+            Thread.currentThread().interrupt();
         }
     }
 }

Reply via email to