This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new be96807ac8e MINOR: Refactor share coord cache helper to share package.
(#18743)
be96807ac8e is described below
commit be96807ac8ee81b586b6b565ea34f369b02805b3
Author: Sushant Mahajan <[email protected]>
AuthorDate: Thu Jan 30 19:03:42 2025 +0530
MINOR: Refactor share coord cache helper to share package. (#18743)
Reviewers: Andrew Schofield <[email protected]>
---
.../ShareCoordinatorMetadataCacheHelperImpl.java | 31 +-
.../src/main/scala/kafka/server/BrokerServer.scala | 2 +-
...hareCoordinatorMetadataCacheHelperImplTest.java | 459 +++++++++++++++++++++
3 files changed, 478 insertions(+), 14 deletions(-)
diff --git
a/core/src/main/scala/kafka/server/metadata/ShareCoordinatorMetadataCacheHelperImpl.java
b/core/src/main/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImpl.java
similarity index 80%
rename from
core/src/main/scala/kafka/server/metadata/ShareCoordinatorMetadataCacheHelperImpl.java
rename to
core/src/main/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImpl.java
index 40dcac8ca0b..b6a189bc024 100644
---
a/core/src/main/scala/kafka/server/metadata/ShareCoordinatorMetadataCacheHelperImpl.java
+++
b/core/src/main/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImpl.java
@@ -15,12 +15,11 @@
* limitations under the License.
*/
-package kafka.server.metadata;
+package kafka.server.share;
import kafka.server.MetadataCache;
import org.apache.kafka.common.Node;
-import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.Errors;
@@ -52,18 +51,19 @@ public class ShareCoordinatorMetadataCacheHelperImpl
implements ShareCoordinator
Function<SharePartitionKey, Integer> keyToPartitionMapper,
ListenerName interBrokerListenerName
) {
- Objects.requireNonNull(metadataCache, "metadataCache must not be
null");
- Objects.requireNonNull(keyToPartitionMapper, "keyToPartitionMapper
must not be null");
- Objects.requireNonNull(interBrokerListenerName,
"interBrokerListenerName must not be null");
-
- this.metadataCache = metadataCache;
- this.keyToPartitionMapper = keyToPartitionMapper;
- this.interBrokerListenerName = interBrokerListenerName;
+ this.metadataCache = Objects.requireNonNull(metadataCache,
"metadataCache must not be null");
+ this.keyToPartitionMapper =
Objects.requireNonNull(keyToPartitionMapper, "keyToPartitionMapper must not be
null");
+ this.interBrokerListenerName =
Objects.requireNonNull(interBrokerListenerName, "interBrokerListenerName must
not be null");
}
@Override
public boolean containsTopic(String topic) {
- return metadataCache.contains(topic);
+ try {
+ return metadataCache.contains(topic);
+ } catch (Exception e) {
+ log.warn("Exception checking {} in metadata cache", topic, e);
+ }
+ return false;
}
@Override
@@ -99,14 +99,19 @@ public class ShareCoordinatorMetadataCacheHelperImpl
implements ShareCoordinator
}
}
}
- } catch (CoordinatorNotAvailableException e) {
- log.warn("Coordinator not available", e);
+ } catch (Exception e) {
+ log.warn("Exception while getting share coordinator", e);
}
return Node.noNode();
}
@Override
public List<Node> getClusterNodes() {
- return
CollectionConverters.asJava(metadataCache.getAliveBrokerNodes(interBrokerListenerName).toSeq());
+ try {
+ return
CollectionConverters.asJava(metadataCache.getAliveBrokerNodes(interBrokerListenerName).toSeq());
+ } catch (Exception e) {
+ log.warn("Exception while getting cluster nodes", e);
+ }
+ return List.of();
}
}
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 73a838e6a6e..ee92d1bcf46 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -24,7 +24,7 @@ import kafka.log.remote.RemoteLogManager
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.raft.KafkaRaftManager
import kafka.server.metadata._
-import kafka.server.share.SharePartitionManager
+import kafka.server.share.{ShareCoordinatorMetadataCacheHelperImpl,
SharePartitionManager}
import kafka.utils.CoreUtils
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.message.ApiMessageType.ListenerType
diff --git
a/core/src/test/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImplTest.java
b/core/src/test/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImplTest.java
new file mode 100644
index 00000000000..582caf140a2
--- /dev/null
+++
b/core/src/test/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImplTest.java
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.share;
+
+import kafka.server.MetadataCache;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.MetadataResponseData;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.server.share.SharePartitionKey;
+import
org.apache.kafka.server.share.persister.ShareCoordinatorMetadataCacheHelper;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+
+import scala.jdk.javaapi.CollectionConverters;
+import scala.jdk.javaapi.OptionConverters;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class ShareCoordinatorMetadataCacheHelperImplTest {
+ @Test
+ public void testConstructorThrowsErrorOnNullArgs() {
+ Function<SharePartitionKey, Integer> func = sharePartitionKey -> 0;
+
+ Exception e = assertThrows(NullPointerException.class, () -> new
ShareCoordinatorMetadataCacheHelperImpl(
+ null,
+ func,
+ mock(ListenerName.class)
+ ));
+ assertEquals("metadataCache must not be null", e.getMessage());
+
+ e = assertThrows(NullPointerException.class, () -> new
ShareCoordinatorMetadataCacheHelperImpl(
+ mock(MetadataCache.class),
+ null,
+ mock(ListenerName.class)
+ ));
+ assertEquals("keyToPartitionMapper must not be null", e.getMessage());
+
+ e = assertThrows(NullPointerException.class, () -> new
ShareCoordinatorMetadataCacheHelperImpl(
+ mock(MetadataCache.class),
+ func,
+ null
+ ));
+ assertEquals("interBrokerListenerName must not be null",
e.getMessage());
+ }
+
+ @Test
+ public void testContainsTopicReturnsFalseOnException() {
+ Function<SharePartitionKey, Integer> func = sharePartitionKey -> 0;
+ MetadataCache mockMetadataCache = mock(MetadataCache.class);
+
when(mockMetadataCache.contains(eq(Topic.SHARE_GROUP_STATE_TOPIC_NAME)))
+ .thenReturn(false);
+
+ ShareCoordinatorMetadataCacheHelper cache = new
ShareCoordinatorMetadataCacheHelperImpl(
+ mockMetadataCache,
+ func,
+ mock(ListenerName.class)
+ );
+
+
when(mockMetadataCache.contains(eq(Topic.SHARE_GROUP_STATE_TOPIC_NAME)))
+ .thenThrow(
+ new RuntimeException("bad stuff")
+ );
+
+ assertFalse(cache.containsTopic(Topic.SHARE_GROUP_STATE_TOPIC_NAME));
+ verify(mockMetadataCache,
times(1)).contains(Topic.SHARE_GROUP_STATE_TOPIC_NAME);
+ }
+
+ @Test
+ public void testContainsTopicSuccess() {
+ Function<SharePartitionKey, Integer> func = sharePartitionKey -> 0;
+ MetadataCache mockMetadataCache = mock(MetadataCache.class);
+
when(mockMetadataCache.contains(eq(Topic.SHARE_GROUP_STATE_TOPIC_NAME)))
+ .thenReturn(false);
+
+ ShareCoordinatorMetadataCacheHelper cache = new
ShareCoordinatorMetadataCacheHelperImpl(
+ mockMetadataCache,
+ func,
+ mock(ListenerName.class)
+ );
+
+
when(mockMetadataCache.contains(eq(Topic.SHARE_GROUP_STATE_TOPIC_NAME)))
+ .thenReturn(
+ true
+ );
+
+ assertTrue(cache.containsTopic(Topic.SHARE_GROUP_STATE_TOPIC_NAME));
+ verify(mockMetadataCache,
times(1)).contains(Topic.SHARE_GROUP_STATE_TOPIC_NAME);
+ }
+
+ @Test
+ public void testShareCoordinatorReturnsNoNodeWhenNoInternalTopicInCache() {
+ Function<SharePartitionKey, Integer> func = sharePartitionKey -> 0;
+ MetadataCache mockMetadataCache = mock(MetadataCache.class);
+
when(mockMetadataCache.contains(eq(Topic.SHARE_GROUP_STATE_TOPIC_NAME)))
+ .thenReturn(false);
+
+ ShareCoordinatorMetadataCacheHelper cache = new
ShareCoordinatorMetadataCacheHelperImpl(
+ mockMetadataCache,
+ func,
+ mock(ListenerName.class)
+ );
+
+ assertEquals(
+ Node.noNode(),
+ cache.getShareCoordinator(SharePartitionKey.getInstance("group",
Uuid.randomUuid(), 0), Topic.SHARE_GROUP_STATE_TOPIC_NAME)
+ );
+
+ verify(mockMetadataCache,
times(1)).contains(eq(Topic.SHARE_GROUP_STATE_TOPIC_NAME));
+ }
+
+ @Test
+ public void testShareCoordinatorReturnsNoNodeIfTopicMetadataInvalid() {
+ Function<SharePartitionKey, Integer> func = sharePartitionKey -> 0;
+ MetadataCache mockMetadataCache = mock(MetadataCache.class);
+ ListenerName mockListenerName = mock(ListenerName.class);
+
+ ShareCoordinatorMetadataCacheHelper cache = new
ShareCoordinatorMetadataCacheHelperImpl(
+ mockMetadataCache,
+ func,
+ mockListenerName
+ );
+
+
when(mockMetadataCache.contains(eq(Topic.SHARE_GROUP_STATE_TOPIC_NAME)))
+ .thenReturn(true);
+
+ // null topic metadata response
+ when(mockMetadataCache.getTopicMetadata(
+ any(),
+ eq(mockListenerName),
+ eq(false),
+ eq(false)
+ )).thenReturn(
+ null
+ );
+
+ assertEquals(
+ Node.noNode(),
+ cache.getShareCoordinator(SharePartitionKey.getInstance("group",
Uuid.randomUuid(), 0), Topic.SHARE_GROUP_STATE_TOPIC_NAME)
+ );
+
+ verify(mockMetadataCache,
times(1)).contains(eq(Topic.SHARE_GROUP_STATE_TOPIC_NAME));
+ verify(mockMetadataCache, times(1)).getTopicMetadata(
+ any(),
+ eq(mockListenerName),
+ eq(false),
+ eq(false)
+ );
+
+ // empty topic metadata response
+ when(mockMetadataCache.getTopicMetadata(
+ any(),
+ eq(mockListenerName),
+ eq(false),
+ eq(false)
+ )).thenReturn(
+ CollectionConverters.asScala(List.of())
+ );
+
+ assertEquals(
+ Node.noNode(),
+ cache.getShareCoordinator(SharePartitionKey.getInstance("group",
Uuid.randomUuid(), 0), Topic.SHARE_GROUP_STATE_TOPIC_NAME)
+ );
+
+ verify(mockMetadataCache,
times(2)).contains(eq(Topic.SHARE_GROUP_STATE_TOPIC_NAME));
+ verify(mockMetadataCache, times(2)).getTopicMetadata(
+ any(),
+ eq(mockListenerName),
+ eq(false),
+ eq(false)
+ );
+
+ // erroneous topic metadata response
+ when(mockMetadataCache.getTopicMetadata(
+ any(),
+ eq(mockListenerName),
+ eq(false),
+ eq(false)
+ )).thenReturn(
+ CollectionConverters.asScala(List.of(
+ new MetadataResponseData.MetadataResponseTopic()
+ .setErrorCode(Errors.BROKER_NOT_AVAILABLE.code())
+ ))
+ );
+
+ assertEquals(
+ Node.noNode(),
+ cache.getShareCoordinator(SharePartitionKey.getInstance("group",
Uuid.randomUuid(), 0), Topic.SHARE_GROUP_STATE_TOPIC_NAME)
+ );
+
+ verify(mockMetadataCache,
times(3)).contains(eq(Topic.SHARE_GROUP_STATE_TOPIC_NAME));
+ verify(mockMetadataCache, times(3)).getTopicMetadata(
+ any(),
+ eq(mockListenerName),
+ eq(false),
+ eq(false)
+ );
+ }
+
+ @Test
+ public void testShareCoordinatorReturnsNoNodeOnGetAliveNodeEmptyResponse()
{
+ Function<SharePartitionKey, Integer> func = sharePartitionKey -> 0;
+ MetadataCache mockMetadataCache = mock(MetadataCache.class);
+ ListenerName mockListenerName = mock(ListenerName.class);
+
+ ShareCoordinatorMetadataCacheHelper cache = new
ShareCoordinatorMetadataCacheHelperImpl(
+ mockMetadataCache,
+ func,
+ mockListenerName
+ );
+
+
when(mockMetadataCache.contains(eq(Topic.SHARE_GROUP_STATE_TOPIC_NAME)))
+ .thenReturn(true);
+
+ // correct topic metadata response
+ when(mockMetadataCache.getTopicMetadata(
+ any(),
+ eq(mockListenerName),
+ eq(false),
+ eq(false)
+ )).thenReturn(
+ CollectionConverters.asScala(List.of(
+ new MetadataResponseData.MetadataResponseTopic()
+ .setErrorCode(Errors.NONE.code())
+ .setPartitions(List.of(
+ new MetadataResponseData.MetadataResponsePartition()
+ .setPartitionIndex(0)
+ .setLeaderId(1)
+ ))
+ ))
+ );
+
+ // get alive broker node throws exception
+ when(mockMetadataCache.getAliveBrokerNode(
+ eq(1),
+ eq(mockListenerName)
+ )).thenReturn(
+ OptionConverters.toScala(Optional.empty())
+ );
+
+ assertEquals(
+ Node.noNode(),
+ cache.getShareCoordinator(SharePartitionKey.getInstance("group",
Uuid.randomUuid(), 0), Topic.SHARE_GROUP_STATE_TOPIC_NAME)
+ );
+
+ verify(mockMetadataCache,
times(1)).contains(eq(Topic.SHARE_GROUP_STATE_TOPIC_NAME));
+ verify(mockMetadataCache, times(1)).getTopicMetadata(
+ any(),
+ eq(mockListenerName),
+ eq(false),
+ eq(false)
+ );
+ verify(mockMetadataCache, times(1)).getAliveBrokerNode(eq(1),
eq(mockListenerName));
+ }
+
+ @Test
+ public void testShareCoordinatorReturnsNoNodeOnGetAliveNodeException() {
+ Function<SharePartitionKey, Integer> func = sharePartitionKey -> 0;
+ MetadataCache mockMetadataCache = mock(MetadataCache.class);
+ ListenerName mockListenerName = mock(ListenerName.class);
+
+ ShareCoordinatorMetadataCacheHelper cache = new
ShareCoordinatorMetadataCacheHelperImpl(
+ mockMetadataCache,
+ func,
+ mockListenerName
+ );
+
+
when(mockMetadataCache.contains(eq(Topic.SHARE_GROUP_STATE_TOPIC_NAME)))
+ .thenReturn(true);
+
+ // correct topic metadata response
+ when(mockMetadataCache.getTopicMetadata(
+ any(),
+ eq(mockListenerName),
+ eq(false),
+ eq(false)
+ )).thenReturn(
+ CollectionConverters.asScala(List.of(
+ new MetadataResponseData.MetadataResponseTopic()
+ .setErrorCode(Errors.NONE.code())
+ .setPartitions(List.of(
+ new MetadataResponseData.MetadataResponsePartition()
+ .setPartitionIndex(0)
+ .setLeaderId(1)
+ ))
+ ))
+ );
+
+ // get alive broker node throws exception
+ when(mockMetadataCache.getAliveBrokerNode(
+ eq(1),
+ eq(mockListenerName)
+ )).thenThrow(
+ new CoordinatorNotAvailableException("bad stuff")
+ );
+
+ assertEquals(
+ Node.noNode(),
+ cache.getShareCoordinator(SharePartitionKey.getInstance("group",
Uuid.randomUuid(), 0), Topic.SHARE_GROUP_STATE_TOPIC_NAME)
+ );
+
+ verify(mockMetadataCache,
times(1)).contains(eq(Topic.SHARE_GROUP_STATE_TOPIC_NAME));
+ verify(mockMetadataCache, times(1)).getTopicMetadata(
+ any(),
+ eq(mockListenerName),
+ eq(false),
+ eq(false)
+ );
+ verify(mockMetadataCache, times(1)).getAliveBrokerNode(eq(1),
eq(mockListenerName));
+ }
+
+ @Test
+ public void testShareCoordinatorSuccess() {
+ Function<SharePartitionKey, Integer> func = sharePartitionKey -> 0;
+ MetadataCache mockMetadataCache = mock(MetadataCache.class);
+ ListenerName mockListenerName = mock(ListenerName.class);
+
+ ShareCoordinatorMetadataCacheHelper cache = new
ShareCoordinatorMetadataCacheHelperImpl(
+ mockMetadataCache,
+ func,
+ mockListenerName
+ );
+
+
when(mockMetadataCache.contains(eq(Topic.SHARE_GROUP_STATE_TOPIC_NAME)))
+ .thenReturn(true);
+
+ // correct topic metadata response
+ when(mockMetadataCache.getTopicMetadata(
+ any(),
+ eq(mockListenerName),
+ eq(false),
+ eq(false)
+ )).thenReturn(
+ CollectionConverters.asScala(List.of(
+ new MetadataResponseData.MetadataResponseTopic()
+ .setErrorCode(Errors.NONE.code())
+ .setPartitions(List.of(
+ new MetadataResponseData.MetadataResponsePartition()
+ .setPartitionIndex(0)
+ .setLeaderId(1)
+ ))
+ ))
+ );
+
+ // get alive broker node throws exception
+ Node node = new Node(2, "some.domain.name", 65534);
+ when(mockMetadataCache.getAliveBrokerNode(
+ eq(1),
+ eq(mockListenerName)
+ )).thenReturn(
+ OptionConverters.toScala(Optional.of(node))
+ );
+
+ assertEquals(
+ node,
+ cache.getShareCoordinator(SharePartitionKey.getInstance("group",
Uuid.randomUuid(), 0), Topic.SHARE_GROUP_STATE_TOPIC_NAME)
+ );
+
+ verify(mockMetadataCache,
times(1)).contains(eq(Topic.SHARE_GROUP_STATE_TOPIC_NAME));
+ verify(mockMetadataCache, times(1)).getTopicMetadata(
+ any(),
+ eq(mockListenerName),
+ eq(false),
+ eq(false)
+ );
+ verify(mockMetadataCache, times(1)).getAliveBrokerNode(eq(1),
eq(mockListenerName));
+ }
+
+ @Test
+ public void testGetClusterNodesEmptyListOnException() {
+ Function<SharePartitionKey, Integer> func = sharePartitionKey -> 0;
+ MetadataCache mockMetadataCache = mock(MetadataCache.class);
+ ListenerName mockListenerName = mock(ListenerName.class);
+
+ ShareCoordinatorMetadataCacheHelper cache = new
ShareCoordinatorMetadataCacheHelperImpl(
+ mockMetadataCache,
+ func,
+ mockListenerName
+ );
+
+ when(mockMetadataCache.getAliveBrokerNodes(
+ eq(mockListenerName)
+ )).thenThrow(
+ new CoordinatorNotAvailableException("scary stuff")
+ );
+
+ assertEquals(
+ List.of(),
+ cache.getClusterNodes()
+ );
+
+ verify(mockMetadataCache,
times(1)).getAliveBrokerNodes(eq(mockListenerName));
+ }
+
+ @Test
+ public void testGetClusterNodesSuccess() {
+ Function<SharePartitionKey, Integer> func = sharePartitionKey -> 0;
+ MetadataCache mockMetadataCache = mock(MetadataCache.class);
+ ListenerName mockListenerName = mock(ListenerName.class);
+
+ ShareCoordinatorMetadataCacheHelper cache = new
ShareCoordinatorMetadataCacheHelperImpl(
+ mockMetadataCache,
+ func,
+ mockListenerName
+ );
+
+ List<Node> nodes = List.of(
+ new Node(0, "some.domain.name", 65534),
+ new Node(1, "some.domain.name", 12345)
+ );
+
+ when(mockMetadataCache.getAliveBrokerNodes(
+ eq(mockListenerName)
+ )).thenReturn(
+ CollectionConverters.asScala(
+ nodes
+ )
+ );
+
+ assertEquals(
+ nodes,
+ cache.getClusterNodes()
+ );
+
+ verify(mockMetadataCache,
times(1)).getAliveBrokerNodes(eq(mockListenerName));
+ }
+}