This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ddfcc333f8d KAFKA-16226 Add test for concurrently updatingMetadata and
fetching snapshot/cluster (#15385)
ddfcc333f8d is described below
commit ddfcc333f8d1c2f4ab0a88abf6d664153d6e82fd
Author: Mayank Shekhar Narula <[email protected]>
AuthorDate: Mon Feb 26 23:57:11 2024 +0000
KAFKA-16226 Add test for concurrently updatingMetadata and fetching
snapshot/cluster (#15385)
Add test for concurrently updatingMetadata and fetching snapshot/cluster
Reviewers: Jason Gustafson <[email protected]>
Co-authored-by: Zhifeng Chen <[email protected]>
---
.../org/apache/kafka/clients/MetadataTest.java | 106 +++++++++++++++++++++
1 file changed, 106 insertions(+)
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 600fc23ecb9..e6db9685eb5 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -16,6 +16,11 @@
*/
package org.apache.kafka.clients;
+import java.util.OptionalInt;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.ClusterResourceListener;
import org.apache.kafka.common.Node;
@@ -1260,6 +1265,107 @@ public class MetadataTest {
Mockito.reset(mockListener);
}
+ /**
+ * Test that concurrently updating Metadata, and fetching the
corresponding MetadataSnapshot & Cluster work as expected, i.e.
+ * snapshot & cluster contain the relevant updates.
+ */
+ @Test
+ public void testConcurrentUpdateAndFetchForSnapshotAndCluster() throws
InterruptedException {
+ Time time = new MockTime();
+ metadata = new Metadata(refreshBackoffMs, refreshBackoffMaxMs,
metadataExpireMs, new LogContext(), new ClusterResourceListeners());
+
+ // Setup metadata with 10 nodes, 2 topics, topic1 & 2, both to be
retained in the update. Both will have leader-epoch 100.
+ int oldNodeCount = 10;
+ String topic1 = "test_topic1";
+ String topic2 = "test_topic2";
+ TopicPartition topic1Part0 = new TopicPartition(topic1, 0);
+ Map<String, Integer> topicPartitionCounts = new HashMap<>();
+ int oldPartitionCount = 1;
+ topicPartitionCounts.put(topic1, oldPartitionCount);
+ topicPartitionCounts.put(topic2, oldPartitionCount);
+ Map<String, Uuid> topicIds = new HashMap<>();
+ topicIds.put(topic1, Uuid.randomUuid());
+ topicIds.put(topic2, Uuid.randomUuid());
+ int oldLeaderEpoch = 100;
+ MetadataResponse metadataResponse =
+ RequestTestUtils.metadataUpdateWithIds("cluster", oldNodeCount,
Collections.emptyMap(), topicPartitionCounts, _tp -> oldLeaderEpoch, topicIds);
+ metadata.updateWithCurrentRequestVersion(metadataResponse, true,
time.milliseconds());
+ MetadataSnapshot snapshot = metadata.fetchMetadataSnapshot();
+ Cluster cluster = metadata.fetch();
+ // Validate metadata snapshot & cluster are setup as expected.
+ assertEquals(cluster, snapshot.cluster());
+ assertEquals(oldNodeCount, snapshot.cluster().nodes().size());
+ assertEquals(oldPartitionCount,
snapshot.cluster().partitionCountForTopic(topic1));
+ assertEquals(oldPartitionCount,
snapshot.cluster().partitionCountForTopic(topic2));
+ assertEquals(OptionalInt.of(oldLeaderEpoch),
snapshot.leaderEpochFor(topic1Part0));
+
+ // Setup 6 threads, where 3 are updating metadata & 3 are reading
snapshot/cluster.
+ // Metadata will be updated with higher # of nodes, partition-counts,
leader-epoch.
+ int numThreads = 6;
+ ExecutorService service = Executors.newFixedThreadPool(numThreads);
+ CountDownLatch allThreadsDoneLatch = new CountDownLatch(numThreads);
+ CountDownLatch atleastMetadataUpdatedOnceLatch = new CountDownLatch(1);
+ AtomicReference<MetadataSnapshot> newSnapshot = new
AtomicReference<>();
+ AtomicReference<Cluster> newCluster = new AtomicReference<>();
+ for (int i = 0; i < numThreads; i++) {
+ final int id = i + 1;
+ service.execute(() -> {
+ if (id % 2 == 0) { // Thread to update metadata.
+ String oldClusterId = "clusterId";
+ int nNodes = oldNodeCount + id;
+ Map<String, Integer> newTopicPartitionCounts = new
HashMap<>();
+ newTopicPartitionCounts.put(topic1, oldPartitionCount +
id);
+ newTopicPartitionCounts.put(topic2, oldPartitionCount +
id);
+ MetadataResponse newMetadataResponse =
+ RequestTestUtils.metadataUpdateWithIds(oldClusterId,
nNodes, Collections.emptyMap(), newTopicPartitionCounts, _tp -> oldLeaderEpoch
+ id, topicIds);
+
metadata.updateWithCurrentRequestVersion(newMetadataResponse, true,
time.milliseconds());
+ atleastMetadataUpdatedOnceLatch.countDown();
+ } else { // Thread to read metadata snapshot, once its updated
+ try {
+ if (!atleastMetadataUpdatedOnceLatch.await(5,
TimeUnit.MINUTES)) {
+ assertFalse(true, "Test had to wait more than 5
minutes, something went wrong.");
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ newSnapshot.set(metadata.fetchMetadataSnapshot());
+ newCluster.set(metadata.fetch());
+ }
+ allThreadsDoneLatch.countDown();
+ });
+ }
+ if (!allThreadsDoneLatch.await(5, TimeUnit.MINUTES)) {
+ assertFalse(true, "Test had to wait more than 5 minutes, something
went wrong.");
+ }
+
+ // Validate new snapshot is upto-date. And has higher partition
counts, nodes & leader epoch than earlier.
+ {
+ int newNodeCount = newSnapshot.get().cluster().nodes().size();
+ assertTrue(oldNodeCount < newNodeCount, "Unexpected value " +
newNodeCount);
+ int newPartitionCountTopic1 =
newSnapshot.get().cluster().partitionCountForTopic(topic1);
+ assertTrue(oldPartitionCount < newPartitionCountTopic1,
"Unexpected value " + newPartitionCountTopic1);
+ int newPartitionCountTopic2 =
newSnapshot.get().cluster().partitionCountForTopic(topic2);
+ assertTrue(oldPartitionCount < newPartitionCountTopic2,
"Unexpected value " + newPartitionCountTopic2);
+ int newLeaderEpoch =
newSnapshot.get().leaderEpochFor(topic1Part0).getAsInt();
+ assertTrue(oldLeaderEpoch < newLeaderEpoch, "Unexpected value " +
newLeaderEpoch);
+ }
+
+ // Validate new cluster is upto-date. And has higher partition counts,
nodes than earlier.
+ {
+ int newNodeCount = newCluster.get().nodes().size();
+ assertTrue(oldNodeCount < newNodeCount, "Unexpected value " +
newNodeCount);
+ int newPartitionCountTopic1 =
newCluster.get().partitionCountForTopic(topic1);
+ assertTrue(oldPartitionCount < newPartitionCountTopic1,
"Unexpected value " + newPartitionCountTopic1);
+ int newPartitionCountTopic2 = newCluster.get()
+ .partitionCountForTopic(topic2);
+ assertTrue(oldPartitionCount < newPartitionCountTopic2,
"Unexpected value " + newPartitionCountTopic2);
+ }
+
+ service.shutdown();
+ // Executor service should down much quickly, as all tasks are
finished at this point.
+ assertTrue(service.awaitTermination(60, TimeUnit.SECONDS));
+ }
+
/**
* For testUpdatePartially, validates that updatedMetadata is matching
expected part1Metadata, part2Metadata, interalPartMetadata, nodes & more.
*/