This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e4b8644400e KAFKA-17992 Remove getUnderlying and isKRaftTest from 
ClusterInstance (#17802)
e4b8644400e is described below

commit e4b8644400e4442303947aecb34f4f8d416bd766
Author: Yung <[email protected]>
AuthorDate: Thu Nov 14 17:11:19 2024 +0800

    KAFKA-17992 Remove getUnderlying and isKRaftTest from ClusterInstance 
(#17802)
    
    Reviewers: TengYao Chi <[email protected]>, Chia-Ping Tsai 
<[email protected]>
---
 .../kafka/admin/ConfigCommandIntegrationTest.java  |  2 +-
 .../kafka/server/LogManagerIntegrationTest.java    |  6 +--
 .../server/AbstractApiVersionsRequestTest.scala    | 13 ++----
 .../server/BrokerRegistrationRequestTest.scala     |  2 +-
 .../unit/kafka/server/ReplicationQuotasTest.scala  | 35 +++++----------
 .../kafka/common/test/api/ClusterInstance.java     | 15 +------
 .../test/api/RaftClusterInvocationContext.java     |  1 -
 .../kafka/tools/BrokerApiVersionsCommandTest.java  |  3 +-
 .../org/apache/kafka/tools/ClusterToolTest.java    | 16 ++-----
 .../org/apache/kafka/tools/GetOffsetShellTest.java | 12 +----
 .../org/apache/kafka/tools/TopicCommandTest.java   |  7 +--
 .../kafka/tools/consumer/ConsoleConsumerTest.java  | 52 +++++++---------------
 12 files changed, 45 insertions(+), 119 deletions(-)

diff --git a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java 
b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
index 6b285c448c2..5ecd0337a96 100644
--- a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
+++ b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
@@ -87,7 +87,7 @@ public class ConfigCommandIntegrationTest {
     @ClusterTest
     public void testExitWithNonZeroStatusOnUpdatingUnallowedConfig() {
         assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
-            "--entity-name", cluster.isKRaftTest() ? "0" : "1",
+            "--entity-name", "0",
             "--entity-type", "brokers",
             "--alter",
             "--add-config", "security.inter.broker.protocol=PLAINTEXT")),
diff --git a/core/src/test/java/kafka/server/LogManagerIntegrationTest.java 
b/core/src/test/java/kafka/server/LogManagerIntegrationTest.java
index 5a08dada8d2..390794b8f25 100644
--- a/core/src/test/java/kafka/server/LogManagerIntegrationTest.java
+++ b/core/src/test/java/kafka/server/LogManagerIntegrationTest.java
@@ -76,12 +76,12 @@ public class LogManagerIntegrationTest {
         cluster.waitForTopic("foo", 1);
 
         Optional<PartitionMetadataFile> partitionMetadataFile = 
Optional.ofNullable(
-                raftInstance.getUnderlying().brokers().get(0).logManager()
+                raftInstance.brokers().get(0).logManager()
                         .getLog(new TopicPartition("foo", 0), false).get()
                         .partitionMetadataFile().getOrElse(null));
         assertTrue(partitionMetadataFile.isPresent());
 
-        raftInstance.getUnderlying().brokers().get(0).shutdown();
+        raftInstance.brokers().get(0).shutdown();
         try (Admin admin = cluster.admin()) {
             TestUtils.waitForCondition(() -> {
                 List<TopicPartitionInfo> partitionInfos = 
admin.describeTopics(Collections.singletonList("foo"))
@@ -93,7 +93,7 @@ public class LogManagerIntegrationTest {
         // delete partition.metadata file here to simulate the scenario that 
partition.metadata not flush to disk yet
         partitionMetadataFile.get().delete();
         assertFalse(partitionMetadataFile.get().exists());
-        raftInstance.getUnderlying().brokers().get(0).startup();
+        raftInstance.brokers().get(0).startup();
         // make sure there is no error during load logs
         assertDoesNotThrow(() -> 
raftInstance.getUnderlying().fatalFaultHandler().maybeRethrowFirstException());
         try (Admin admin = cluster.admin()) {
diff --git 
a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
index 05c3bc2eade..4f442d3194e 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
@@ -64,7 +64,7 @@ abstract class AbstractApiVersionsRequestTest(cluster: 
ClusterInstance) {
     clientTelemetryEnabled: Boolean = false,
     apiVersion: Short = ApiKeys.API_VERSIONS.latestVersion
   ): Unit = {
-    if (cluster.isKRaftTest && apiVersion >= 3) {
+    if (apiVersion >= 3) {
       assertEquals(3, apiVersionsResponse.data().finalizedFeatures().size())
       assertEquals(MetadataVersion.latestTesting().featureLevel(), 
apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).minVersionLevel())
       assertEquals(MetadataVersion.latestTesting().featureLevel(), 
apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).maxVersionLevel())
@@ -84,12 +84,7 @@ abstract class AbstractApiVersionsRequestTest(cluster: 
ClusterInstance) {
       assertEquals(0, 
apiVersionsResponse.data().supportedFeatures().find(GroupVersion.FEATURE_NAME).minVersion())
       assertEquals(GroupVersion.GV_1.featureLevel(), 
apiVersionsResponse.data().supportedFeatures().find(GroupVersion.FEATURE_NAME).maxVersion())
     }
-    val expectedApis = if (!cluster.isKRaftTest) {
-      ApiVersionsResponse.collectApis(
-        ApiKeys.apisForListener(ApiMessageType.ListenerType.ZK_BROKER),
-        enableUnstableLastVersion
-      )
-    } else if 
(cluster.controllerListenerName().toScala.contains(listenerName)) {
+    val expectedApis = if 
(cluster.controllerListenerName().toScala.contains(listenerName)) {
       ApiVersionsResponse.collectApis(
         ApiKeys.apisForListener(ApiMessageType.ListenerType.CONTROLLER),
         enableUnstableLastVersion
@@ -107,9 +102,7 @@ abstract class AbstractApiVersionsRequestTest(cluster: 
ClusterInstance) {
     assertEquals(expectedApis.size, apiVersionsResponse.data.apiKeys.size,
       "API keys in ApiVersionsResponse must match API keys supported by 
broker.")
 
-    val defaultApiVersionsResponse = if (!cluster.isKRaftTest) {
-      TestUtils.defaultApiVersionsResponse(0, ListenerType.ZK_BROKER, 
enableUnstableLastVersion)
-    } else if 
(cluster.controllerListenerName().toScala.contains(listenerName)) {
+    val defaultApiVersionsResponse = if 
(cluster.controllerListenerName().toScala.contains(listenerName)) {
       TestUtils.defaultApiVersionsResponse(0, ListenerType.CONTROLLER, 
enableUnstableLastVersion)
     } else {
       TestUtils.createApiVersionsResponse(0, expectedApis)
diff --git 
a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
index e697eede581..3fec68da273 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
@@ -57,7 +57,7 @@ class BrokerRegistrationRequestTest {
 
         val saslMechanism: String = ""
 
-        def isZkController: Boolean = !clusterInstance.isKRaftTest
+        def isZkController: Boolean = false
 
         override def getControllerInfo(): ControllerInformation =
           ControllerInformation(node, listenerName, securityProtocol, 
saslMechanism, isZkController)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index b6122575703..64ef5641f23 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -22,7 +22,6 @@ import java.util.{Collections, Properties}
 import java.util.Map.Entry
 import kafka.server.KafkaConfig.fromProps
 import kafka.utils.TestUtils._
-import kafka.utils.CoreUtils._
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET
 import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry, NewTopic}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
@@ -113,30 +112,20 @@ class ReplicationQuotasTest extends QuorumTestHarness {
     if (!leaderThrottle) throttle = throttle * 3
 
     Using(createAdminClient(brokers, listenerName)) { admin =>
-      if (isKRaftTest()) {
-        (106 to 107).foreach(registerBroker)
-      }
+      (106 to 107).foreach(registerBroker)
       admin.createTopics(List(new NewTopic(topic, assignment.map(a => 
a._1.asInstanceOf[Integer] ->
         
a._2.map(_.asInstanceOf[Integer]).toList.asJava).asJava)).asJava).all().get()
       //Set the throttle limit on all 8 brokers, but only assign throttled 
replicas to the six leaders, or two followers
       (100 to 107).foreach { brokerId =>
-        if (isKRaftTest()) {
-          val entry = new SimpleImmutableEntry[AlterConfigOp.OpType, 
String](SET, throttle.toString)
-            .asInstanceOf[Entry[AlterConfigOp.OpType, String]]
-          controllerServer.controller.incrementalAlterConfigs(
-            ControllerRequestContextUtil.ANONYMOUS_CONTEXT,
-            Map(new ConfigResource(BROKER, String.valueOf(brokerId)) -> Map(
-              QuotaConfig.LEADER_REPLICATION_THROTTLED_RATE_CONFIG -> entry,
-              QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG -> 
entry).asJava).asJava,
-            false
-          ).get()
-        } else {
-          adminZkClient.changeBrokerConfig(Seq(brokerId),
-            propsWith(
-              (QuotaConfig.LEADER_REPLICATION_THROTTLED_RATE_CONFIG, 
throttle.toString),
-              (QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG, 
throttle.toString)
-            ))
-        }
+        val entry = new SimpleImmutableEntry[AlterConfigOp.OpType, 
String](SET, throttle.toString)
+          .asInstanceOf[Entry[AlterConfigOp.OpType, String]]
+        controllerServer.controller.incrementalAlterConfigs(
+          ControllerRequestContextUtil.ANONYMOUS_CONTEXT,
+          Map(new ConfigResource(BROKER, String.valueOf(brokerId)) -> Map(
+            QuotaConfig.LEADER_REPLICATION_THROTTLED_RATE_CONFIG -> entry,
+            QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG -> 
entry).asJava).asJava,
+          false
+        ).get()
       }
       //Either throttle the six leaders or the two followers
       val configEntry = if (leaderThrottle)
@@ -224,9 +213,7 @@ class ReplicationQuotasTest extends QuorumTestHarness {
     val throttle: Long = msg.length * msgCount / expectedDuration
 
     Using(createAdminClient(brokers, listenerName)) { admin =>
-      if (isKRaftTest()) {
-        registerBroker(101)
-      }
+      registerBroker(101)
       admin.createTopics(
         List(new NewTopic(topic, Collections.singletonMap(0, List(100, 
101).map(_.asInstanceOf[Integer]).asJava))).asJava
       ).all().get()
diff --git 
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
 
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
index cd0f91b18d0..150e3c5e651 100644
--- 
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
+++ 
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
@@ -70,10 +70,6 @@ public interface ClusterInstance {
 
     Type type();
 
-    default boolean isKRaftTest() {
-        return type() == Type.KRAFT || type() == Type.CO_KRAFT;
-    }
-
     Map<Integer, KafkaBroker> brokers();
 
     default Map<Integer, KafkaBroker> aliveBrokers() {
@@ -158,15 +154,6 @@ public interface ClusterInstance {
 
     String clusterId();
 
-    /**
-     * The underlying object which is responsible for setting up and tearing 
down the cluster.
-     */
-    Object getUnderlying();
-
-    default <T> T getUnderlying(Class<T> asClass) {
-        return asClass.cast(getUnderlying());
-    }
-
     
//---------------------------[producer/consumer/admin]---------------------------//
 
     default <K, V> Producer<K, V> producer(Map<String, Object> configs) {
@@ -216,7 +203,7 @@ public interface ClusterInstance {
     }
 
     default Set<GroupProtocol> supportedGroupProtocols() {
-        if (isKRaftTest() && brokers().values().stream().allMatch(b -> 
b.dataPlaneRequestProcessor().isConsumerGroupProtocolEnabled())) {
+        if (brokers().values().stream().allMatch(b -> 
b.dataPlaneRequestProcessor().isConsumerGroupProtocolEnabled())) {
             return Set.of(CLASSIC, CONSUMER);
         } else {
             return Collections.singleton(CLASSIC);
diff --git 
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java
 
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java
index 927e8bad9fa..19c1f1b9fa3 100644
--- 
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java
+++ 
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java
@@ -165,7 +165,6 @@ public class RaftClusterInvocationContext implements 
TestTemplateInvocationConte
             return controllers().keySet();
         }
 
-        @Override
         public KafkaClusterTestKit getUnderlying() {
             return clusterTestKit;
         }
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java 
b/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java
index 0c2c7d0881d..f83024837ba 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java
@@ -56,8 +56,7 @@ public class BrokerApiVersionsCommandTest {
         assertTrue(lineIter.hasNext());
         assertEquals(clusterInstance.bootstrapServers() + " (id: 0 rack: null) 
-> (", lineIter.next());
 
-        ApiMessageType.ListenerType listenerType = 
clusterInstance.isKRaftTest() ?
-                ApiMessageType.ListenerType.BROKER : 
ApiMessageType.ListenerType.ZK_BROKER;
+        ApiMessageType.ListenerType listenerType = 
ApiMessageType.ListenerType.BROKER;
 
         NodeApiVersions nodeApiVersions = new NodeApiVersions(
                 ApiVersionsResponse.collectApis(ApiKeys.clientApis(), true),
diff --git a/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java 
b/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java
index ea9ad446795..d27839269f4 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java
@@ -51,22 +51,14 @@ public class ClusterToolTest {
     @ClusterTest(brokers = 3)
     public void testUnregister(ClusterInstance clusterInstance) {
         int brokerId;
-        if (!clusterInstance.isKRaftTest()) {
-            brokerId = assertDoesNotThrow(() -> 
clusterInstance.brokerIds().stream().findFirst().get());
-        } else {
-            Set<Integer> brokerIds = clusterInstance.brokerIds();
-            brokerIds.removeAll(clusterInstance.controllerIds());
-            brokerId = assertDoesNotThrow(() -> 
brokerIds.stream().findFirst().get());
-        }
+        Set<Integer> brokerIds = clusterInstance.brokerIds();
+        brokerIds.removeAll(clusterInstance.controllerIds());
+        brokerId = assertDoesNotThrow(() -> 
brokerIds.stream().findFirst().get());
         clusterInstance.shutdownBroker(brokerId);
         String output = ToolsTestUtils.captureStandardOut(() ->
                 assertDoesNotThrow(() -> ClusterTool.execute("unregister", 
"--bootstrap-server", clusterInstance.bootstrapServers(), "--id", 
String.valueOf(brokerId))));
 
-        if (clusterInstance.isKRaftTest()) {
-            assertTrue(output.contains("Broker " + brokerId + " is no longer 
registered."));
-        } else {
-            assertTrue(output.contains("The target cluster does not support 
the broker unregistration API."));
-        }
+        assertTrue(output.contains("Broker " + brokerId + " is no longer 
registered."));
     }
 
     @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT})
diff --git a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java 
b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
index dcac2f11e50..a641758a827 100644
--- a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
@@ -205,11 +205,7 @@ public class GetOffsetShellTest {
         setUp();
 
         List<Row> output = executeAndParse();
-        if (!cluster.isKRaftTest()) {
-            assertEquals(expectedOffsetsWithInternal(), output);
-        } else {
-            assertEquals(expectedTestTopicOffsets(), output);
-        }
+        assertEquals(expectedTestTopicOffsets(), output);
     }
 
     @ClusterTest
@@ -247,11 +243,7 @@ public class GetOffsetShellTest {
         setUp();
 
         List<Row> offsets = executeAndParse("--partitions", "0,1");
-        if (!cluster.isKRaftTest()) {
-            assertEquals(expectedOffsetsWithInternal().stream().filter(r -> 
r.partition <= 1).collect(Collectors.toList()), offsets);
-        } else {
-            assertEquals(expectedTestTopicOffsets().stream().filter(r -> 
r.partition <= 1).collect(Collectors.toList()), offsets);
-        }
+        assertEquals(expectedTestTopicOffsets().stream().filter(r -> 
r.partition <= 1).collect(Collectors.toList()), offsets);
     }
 
     @ClusterTest
diff --git a/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java 
b/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java
index 0675dadbeb5..aec130ad915 100644
--- a/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java
@@ -908,11 +908,8 @@ public class TopicCommandTest {
              TopicCommand.TopicService topicService = new 
TopicCommand.TopicService(adminClient)) {
 
             // create the offset topic
-            // In ZK mode, Topic.GROUP_METADATA_TOPIC_NAME exist when cluster 
is created.
-            if (clusterInstance.isKRaftTest()) {
-                adminClient.createTopics(Collections.singletonList(new 
NewTopic(Topic.GROUP_METADATA_TOPIC_NAME, defaultNumPartitions, 
defaultReplicationFactor)));
-                clusterInstance.waitForTopic(Topic.GROUP_METADATA_TOPIC_NAME, 
defaultNumPartitions);
-            }
+            adminClient.createTopics(Collections.singletonList(new 
NewTopic(Topic.GROUP_METADATA_TOPIC_NAME, defaultNumPartitions, 
defaultReplicationFactor)));
+            clusterInstance.waitForTopic(Topic.GROUP_METADATA_TOPIC_NAME, 
defaultNumPartitions);
 
             // Try to delete the Topic.GROUP_METADATA_TOPIC_NAME which is 
allowed by default.
             // This is a difference between the new and the old command as the 
old one didn't allow internal topic deletion.
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java 
b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java
index 3a679abc121..03fffa2f236 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java
@@ -396,42 +396,22 @@ public class ConsoleConsumerTest {
 
                 JsonNode jsonNode = 
objectMapper.reader().readTree(out.toByteArray());
 
-                // The new group coordinator writes an empty group metadata 
record when the group is created for
-                // the first time whereas the old group coordinator only 
writes a group metadata record when
-                // the first rebalance completes.
-                if (cluster.isKRaftTest()) {
-                    JsonNode keyNode = jsonNode.get("key");
-                    GroupMetadataKey groupMetadataKey =
-                        
GroupMetadataKeyJsonConverter.read(keyNode.get("data"), 
GroupMetadataKey.HIGHEST_SUPPORTED_VERSION);
-                    assertNotNull(groupMetadataKey);
-                    assertEquals(groupId, groupMetadataKey.group());
-
-                    JsonNode valueNode = jsonNode.get("value");
-                    GroupMetadataValue groupMetadataValue =
-                        
GroupMetadataValueJsonConverter.read(valueNode.get("data"), 
GroupMetadataValue.HIGHEST_SUPPORTED_VERSION);
-                    assertNotNull(groupMetadataValue);
-                    assertEquals("", groupMetadataValue.protocolType());
-                    assertEquals(0, groupMetadataValue.generation());
-                    assertNull(groupMetadataValue.protocol());
-                    assertNull(groupMetadataValue.leader());
-                    assertEquals(0, groupMetadataValue.members().size());
-                } else {
-                    JsonNode keyNode = jsonNode.get("key");
-                    GroupMetadataKey groupMetadataKey =
-                        
GroupMetadataKeyJsonConverter.read(keyNode.get("data"), 
GroupMetadataKey.HIGHEST_SUPPORTED_VERSION);
-                    assertNotNull(groupMetadataKey);
-                    assertEquals(groupId, groupMetadataKey.group());
-
-                    JsonNode valueNode = jsonNode.get("value");
-                    GroupMetadataValue groupMetadataValue =
-                        
GroupMetadataValueJsonConverter.read(valueNode.get("data"), 
GroupMetadataValue.HIGHEST_SUPPORTED_VERSION);
-                    assertNotNull(groupMetadataValue);
-                    assertEquals("consumer", 
groupMetadataValue.protocolType());
-                    assertEquals(1, groupMetadataValue.generation());
-                    assertEquals("range", groupMetadataValue.protocol());
-                    assertNotNull(groupMetadataValue.leader());
-                    assertEquals(1, groupMetadataValue.members().size());
-                }
+                // The group coordinator writes an empty group metadata record 
when the group is created for the first time
+                JsonNode keyNode = jsonNode.get("key");
+                GroupMetadataKey groupMetadataKey =
+                    GroupMetadataKeyJsonConverter.read(keyNode.get("data"), 
GroupMetadataKey.HIGHEST_SUPPORTED_VERSION);
+                assertNotNull(groupMetadataKey);
+                assertEquals(groupId, groupMetadataKey.group());
+
+                JsonNode valueNode = jsonNode.get("value");
+                GroupMetadataValue groupMetadataValue =
+                    
GroupMetadataValueJsonConverter.read(valueNode.get("data"), 
GroupMetadataValue.HIGHEST_SUPPORTED_VERSION);
+                assertNotNull(groupMetadataValue);
+                assertEquals("", groupMetadataValue.protocolType());
+                assertEquals(0, groupMetadataValue.generation());
+                assertNull(groupMetadataValue.protocol());
+                assertNull(groupMetadataValue.leader());
+                assertEquals(0, groupMetadataValue.members().size());
             } finally {
                 consumerWrapper.cleanup();
             }

Reply via email to