This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a605c9541c7 MINOR: Separate share consumer metadata class (#20700)
a605c9541c7 is described below

commit a605c9541c74bfb75dc2548a495dce4660feddec
Author: Andrew Schofield <[email protected]>
AuthorDate: Fri Oct 17 11:52:33 2025 +0100

    MINOR: Separate share consumer metadata class (#20700)
    
    As part of separating share consumer infrastructure from the regular
    consumer, this makes a separate metadata class for share consumers.
    
    Reviewers: Lianet Magrans <[email protected]>
---
 .../kafka/clients/consumer/KafkaShareConsumer.java |  4 +-
 .../internals/AbstractMembershipManager.java       |  5 +-
 .../clients/consumer/internals/FetchUtils.java     |  3 +-
 .../consumer/internals/NetworkClientDelegate.java  |  2 +-
 .../consumer/internals/RequestManagers.java        |  2 +-
 .../internals/ShareConsumeRequestManager.java      |  4 +-
 .../internals/ShareConsumerDelegateCreator.java    |  2 +-
 .../consumer/internals/ShareConsumerImpl.java      | 10 +--
 .../consumer/internals/ShareConsumerMetadata.java  | 78 ++++++++++++++++++++++
 .../consumer/internals/ShareFetchCollector.java    |  4 +-
 .../consumer/internals/ShareMembershipManager.java |  5 +-
 .../events/ApplicationEventProcessor.java          |  8 +--
 .../consumer/KafkaShareConsumerMetricsTest.java    | 28 ++++----
 .../clients/consumer/KafkaShareConsumerTest.java   | 10 +--
 .../internals/ShareConsumeRequestManagerTest.java  |  6 +-
 .../consumer/internals/ShareConsumerImplTest.java  |  2 +-
 .../internals/ShareFetchCollectorTest.java         |  5 +-
 17 files changed, 129 insertions(+), 49 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
index 7f3bad2e318..b18ec93ec7b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
@@ -17,9 +17,9 @@
 package org.apache.kafka.clients.consumer;
 
 import org.apache.kafka.clients.KafkaClient;
-import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
 import org.apache.kafka.clients.consumer.internals.ShareConsumerDelegate;
 import 
org.apache.kafka.clients.consumer.internals.ShareConsumerDelegateCreator;
+import org.apache.kafka.clients.consumer.internals.ShareConsumerMetadata;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import 
org.apache.kafka.clients.consumer.internals.metrics.KafkaShareConsumerMetrics;
 import org.apache.kafka.common.KafkaException;
@@ -392,7 +392,7 @@ public class KafkaShareConsumer<K, V> implements 
ShareConsumer<K, V> {
                        final Time time,
                        final KafkaClient client,
                        final SubscriptionState subscriptions,
-                       final ConsumerMetadata metadata) {
+                       final ShareConsumerMetadata metadata) {
         delegate = CREATOR.create(
                 logContext, clientId, groupId, config, keyDeserializer, 
valueDeserializer,
                 time, client, subscriptions, metadata);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
index ffe01c089e7..c93a844ff9b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.consumer.CloseOptions;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -108,7 +109,7 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
     /**
      * Metadata that allows us to create the partitions needed for {@link 
ConsumerRebalanceListener}.
      */
-    private final ConsumerMetadata metadata;
+    private final Metadata metadata;
 
     /**
      * Logger.
@@ -204,7 +205,7 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
 
     AbstractMembershipManager(String groupId,
                               SubscriptionState subscriptions,
-                              ConsumerMetadata metadata,
+                              Metadata metadata,
                               Logger log,
                               Time time,
                               RebalanceMetricsManager metricsManager,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchUtils.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchUtils.java
index 0b2faa58e7d..55083da20b4 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchUtils.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchUtils.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.common.TopicPartition;
 
 /**
@@ -44,7 +45,7 @@ public class FetchUtils {
      * @param subscriptions {@link SubscriptionState} to clear any internal 
read replica node
      * @param topicPartition {@link TopicPartition} for which this state 
change is related
      */
-    static void requestMetadataUpdate(final ConsumerMetadata metadata,
+    static void requestMetadataUpdate(final Metadata metadata,
                                       final SubscriptionState subscriptions,
                                       final TopicPartition topicPartition) {
         metadata.requestUpdate(false);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
index 3c280e39d02..e85f244c804 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
@@ -445,7 +445,7 @@ public class NetworkClientDelegate implements AutoCloseable 
{
      */
     public static Supplier<NetworkClientDelegate> supplier(final Time time,
                                                            final LogContext 
logContext,
-                                                           final 
ConsumerMetadata metadata,
+                                                           final Metadata 
metadata,
                                                            final 
ConsumerConfig config,
                                                            final ApiVersions 
apiVersions,
                                                            final Metrics 
metrics,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
index ae39753f3d8..4c6d8b17b5a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
@@ -318,7 +318,7 @@ public class RequestManagers implements Closeable {
     public static Supplier<RequestManagers> supplier(final Time time,
                                                      final LogContext 
logContext,
                                                      final 
BackgroundEventHandler backgroundEventHandler,
-                                                     final ConsumerMetadata 
metadata,
+                                                     final 
ShareConsumerMetadata metadata,
                                                      final SubscriptionState 
subscriptions,
                                                      final ShareFetchBuffer 
fetchBuffer,
                                                      final ConsumerConfig 
config,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index 51e3fb39dfb..647b5b7e4b4 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -78,7 +78,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
     private final Logger log;
     private final LogContext logContext;
     private final String groupId;
-    private final ConsumerMetadata metadata;
+    private final ShareConsumerMetadata metadata;
     private final SubscriptionState subscriptions;
     private final FetchConfig fetchConfig;
     protected final ShareFetchBuffer shareFetchBuffer;
@@ -103,7 +103,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
     ShareConsumeRequestManager(final Time time,
                                final LogContext logContext,
                                final String groupId,
-                               final ConsumerMetadata metadata,
+                               final ShareConsumerMetadata metadata,
                                final SubscriptionState subscriptions,
                                final FetchConfig fetchConfig,
                                final ShareFetchBuffer shareFetchBuffer,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerDelegateCreator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerDelegateCreator.java
index 9eb5fd13699..e840a7da3dc 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerDelegateCreator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerDelegateCreator.java
@@ -63,7 +63,7 @@ public class ShareConsumerDelegateCreator {
                                                      final Time time,
                                                      final KafkaClient client,
                                                      final SubscriptionState 
subscriptions,
-                                                     final ConsumerMetadata 
metadata) {
+                                                     final 
ShareConsumerMetadata metadata) {
         try {
             Logger log = logContext.logger(getClass());
             log.warn("Share groups and KafkaShareConsumer are part of a 
preview feature introduced by KIP-932, and are not recommended for use in 
production.");
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index 12b01b5482e..79697503256 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -189,7 +189,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
     private final ShareFetchCollector<K, V> fetchCollector;
 
     private final SubscriptionState subscriptions;
-    private final ConsumerMetadata metadata;
+    private final ShareConsumerMetadata metadata;
     private final Metrics metrics;
     private final int requestTimeoutMs;
     private final int defaultApiTimeoutMs;
@@ -257,7 +257,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
             ClusterResourceListeners clusterResourceListeners = 
ClientUtils.configureClusterResourceListeners(
                     metrics.reporters(),
                     Arrays.asList(deserializers.keyDeserializer(), 
deserializers.valueDeserializer()));
-            this.metadata = new ConsumerMetadata(config, subscriptions, 
logContext, clusterResourceListeners);
+            this.metadata = new ShareConsumerMetadata(config, subscriptions, 
logContext, clusterResourceListeners);
             final List<InetSocketAddress> addresses = 
ClientUtils.parseAndValidateAddresses(config);
             metadata.bootstrap(addresses);
 
@@ -350,7 +350,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
                       final Time time,
                       final KafkaClient client,
                       final SubscriptionState subscriptions,
-                      final ConsumerMetadata metadata) {
+                      final ShareConsumerMetadata metadata) {
         this.clientId = clientId;
         this.groupId = groupId;
         this.log = logContext.logger(getClass());
@@ -441,7 +441,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
                       final CompletableEventReaper backgroundEventReaper,
                       final Metrics metrics,
                       final SubscriptionState subscriptions,
-                      final ConsumerMetadata metadata,
+                      final ShareConsumerMetadata metadata,
                       final int requestTimeoutMs,
                       final int defaultApiTimeoutMs,
                       final String groupId,
@@ -492,7 +492,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
 
         ShareFetchCollector<K, V> build(
                 final LogContext logContext,
-                final ConsumerMetadata metadata,
+                final ShareConsumerMetadata metadata,
                 final SubscriptionState subscriptions,
                 final FetchConfig fetchConfig,
                 final Deserializers<K, V> deserializers
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerMetadata.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerMetadata.java
new file mode 100644
index 00000000000..20b99540e19
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerMetadata.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.utils.LogContext;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ShareConsumerMetadata extends Metadata {
+    private final boolean allowAutoTopicCreation;
+    private final SubscriptionState subscription;
+
+    public ShareConsumerMetadata(long refreshBackoffMs,
+                                 long refreshBackoffMaxMs,
+                                 long metadataExpireMs,
+                                 boolean allowAutoTopicCreation,
+                                 SubscriptionState subscription,
+                                 LogContext logContext,
+                                 ClusterResourceListeners 
clusterResourceListeners) {
+        super(refreshBackoffMs, refreshBackoffMaxMs, metadataExpireMs, 
logContext, clusterResourceListeners);
+        this.allowAutoTopicCreation = allowAutoTopicCreation;
+        this.subscription = subscription;
+    }
+
+    public ShareConsumerMetadata(ConsumerConfig config,
+                                 SubscriptionState subscriptions,
+                                 LogContext logContext,
+                                 ClusterResourceListeners 
clusterResourceListeners) {
+        this(config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG),
+            config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG),
+            config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
+            config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
+            subscriptions,
+            logContext,
+            clusterResourceListeners);
+    }
+
+    public boolean allowAutoTopicCreation() {
+        return allowAutoTopicCreation;
+    }
+
+    /**
+     * Constructs a metadata request builder for fetching cluster metadata for 
the topics the share consumer needs.
+     */
+    @Override
+    public synchronized MetadataRequest.Builder newMetadataRequestBuilder() {
+        List<String> topics = new ArrayList<>();
+        topics.addAll(subscription.metadataTopics());
+        return MetadataRequest.Builder.forTopicNames(topics, 
allowAutoTopicCreation);
+    }
+
+    /**
+     * Check if the metadata for the topic should be retained, based on the 
topic name.
+     */
+    @Override
+    public synchronized boolean retainTopic(String topic, boolean isInternal, 
long nowMs) {
+        return subscription.needsMetadata(topic);
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollector.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollector.java
index c2a17d051b1..8861824600e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollector.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollector.java
@@ -42,13 +42,13 @@ import static 
org.apache.kafka.clients.consumer.internals.FetchUtils.requestMeta
 public class ShareFetchCollector<K, V> {
 
     private final Logger log;
-    private final ConsumerMetadata metadata;
+    private final ShareConsumerMetadata metadata;
     private final SubscriptionState subscriptions;
     private final FetchConfig fetchConfig;
     private final Deserializers<K, V> deserializers;
 
     public ShareFetchCollector(final LogContext logContext,
-                               final ConsumerMetadata metadata,
+                               final ShareConsumerMetadata metadata,
                                final SubscriptionState subscriptions,
                                final FetchConfig fetchConfig,
                                final Deserializers<K, V> deserializers) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java
index 47ab87edb35..130656f5b58 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import 
org.apache.kafka.clients.consumer.internals.metrics.ShareRebalanceMetricsManager;
 import org.apache.kafka.common.Uuid;
@@ -81,7 +82,7 @@ public class ShareMembershipManager extends 
AbstractMembershipManager<ShareGroup
                                   String groupId,
                                   String rackId,
                                   SubscriptionState subscriptions,
-                                  ConsumerMetadata metadata,
+                                  Metadata metadata,
                                   Time time,
                                   Metrics metrics) {
         this(logContext,
@@ -98,7 +99,7 @@ public class ShareMembershipManager extends 
AbstractMembershipManager<ShareGroup
                            String groupId,
                            String rackId,
                            SubscriptionState subscriptions,
-                           ConsumerMetadata metadata,
+                           Metadata metadata,
                            Time time,
                            ShareRebalanceMetricsManager metricsManager) {
         super(groupId,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 853c5484df5..31eef662ce8 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -16,11 +16,11 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
+import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.internals.Acknowledgements;
 import org.apache.kafka.clients.consumer.internals.CachedSupplier;
 import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
-import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
 import org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal;
 import org.apache.kafka.clients.consumer.internals.RequestManagers;
@@ -56,14 +56,14 @@ import java.util.stream.Collectors;
 public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEvent> {
 
     private final Logger log;
-    private final ConsumerMetadata metadata;
+    private final Metadata metadata;
     private final SubscriptionState subscriptions;
     private final RequestManagers requestManagers;
     private int metadataVersionSnapshot;
 
     public ApplicationEventProcessor(final LogContext logContext,
                                      final RequestManagers requestManagers,
-                                     final ConsumerMetadata metadata,
+                                     final Metadata metadata,
                                      final SubscriptionState subscriptions) {
         this.log = logContext.logger(ApplicationEventProcessor.class);
         this.requestManagers = requestManagers;
@@ -740,7 +740,7 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
      * {@link ConsumerNetworkThread}.
      */
     public static Supplier<ApplicationEventProcessor> supplier(final 
LogContext logContext,
-                                                               final 
ConsumerMetadata metadata,
+                                                               final Metadata 
metadata,
                                                                final 
SubscriptionState subscriptions,
                                                                final 
Supplier<RequestManagers> requestManagersSupplier) {
         return new CachedSupplier<>() {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerMetricsTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerMetricsTest.java
index a5417c3e00f..3dbe0a01cbb 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerMetricsTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerMetricsTest.java
@@ -20,8 +20,8 @@ import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
-import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
 import org.apache.kafka.clients.consumer.internals.ShareConsumerImpl;
+import org.apache.kafka.clients.consumer.internals.ShareConsumerMetadata;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
@@ -80,7 +80,7 @@ public class KafkaShareConsumerMetricsTest {
 
     @Test
     public void testPollTimeMetrics() {
-        ConsumerMetadata metadata = createMetadata(subscription);
+        ShareConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
         initMetadata(client, Collections.singletonMap(topic, 1));
 
@@ -125,7 +125,7 @@ public class KafkaShareConsumerMetricsTest {
 
     @Test
     public void testPollIdleRatio() {
-        ConsumerMetadata metadata = createMetadata(subscription);
+        ShareConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
         initMetadata(client, Collections.singletonMap(topic, 1));
 
@@ -172,7 +172,7 @@ public class KafkaShareConsumerMetricsTest {
     @Test
     public void testClosingConsumerUnregistersConsumerMetrics() {
         Time time = new MockTime(1L);
-        ConsumerMetadata metadata = createMetadata(subscription);
+        ShareConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
         initMetadata(client, Collections.singletonMap(topic, 1));
 
@@ -190,7 +190,7 @@ public class KafkaShareConsumerMetricsTest {
     @Test
     public void testRegisteringCustomMetricsDoesntAffectConsumerMetrics() {
         Time time = new MockTime(1L);
-        ConsumerMetadata metadata = createMetadata(subscription);
+        ShareConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
         initMetadata(client, Collections.singletonMap(topic, 1));
 
@@ -207,7 +207,7 @@ public class KafkaShareConsumerMetricsTest {
         try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister()) {
             appender.setClassLogger(ShareConsumerImpl.class, Level.DEBUG);
             Time time = new MockTime(1L);
-            ConsumerMetadata metadata = createMetadata(subscription);
+            ShareConsumerMetadata metadata = createMetadata(subscription);
             MockClient client = new MockClient(time, metadata);
             initMetadata(client, Collections.singletonMap(topic, 1));
 
@@ -224,7 +224,7 @@ public class KafkaShareConsumerMetricsTest {
         try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister()) {
             appender.setClassLogger(ShareConsumerImpl.class, Level.DEBUG);
             Time time = new MockTime(1L);
-            ConsumerMetadata metadata = createMetadata(subscription);
+            ShareConsumerMetadata metadata = createMetadata(subscription);
             MockClient client = new MockClient(time, metadata);
             initMetadata(client, Collections.singletonMap(topic, 1));
 
@@ -244,7 +244,7 @@ public class KafkaShareConsumerMetricsTest {
             mockedCommonClientConfigs.when(() -> 
CommonClientConfigs.telemetryReporter(anyString(), 
any())).thenReturn(Optional.of(clientTelemetryReporter));
 
             Time time = new MockTime(1L);
-            ConsumerMetadata metadata = createMetadata(subscription);
+            ShareConsumerMetadata metadata = createMetadata(subscription);
             MockClient client = new MockClient(time, metadata);
             initMetadata(client, Collections.singletonMap(topic, 1));
 
@@ -265,7 +265,7 @@ public class KafkaShareConsumerMetricsTest {
             mockedCommonClientConfigs.when(() -> 
CommonClientConfigs.telemetryReporter(anyString(), 
any())).thenReturn(Optional.of(clientTelemetryReporter));
 
             Time time = new MockTime(1L);
-            ConsumerMetadata metadata = createMetadata(subscription);
+            ShareConsumerMetadata metadata = createMetadata(subscription);
             MockClient client = new MockClient(time, metadata);
             initMetadata(client, Collections.singletonMap(topic, 1));
 
@@ -280,7 +280,7 @@ public class KafkaShareConsumerMetricsTest {
     @Test
     public void testUnregisteringNonexistingMetricsDoesntCauseError() {
         Time time = new MockTime(1L);
-        ConsumerMetadata metadata = createMetadata(subscription);
+        ShareConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
         initMetadata(client, Collections.singletonMap(topic, 1));
 
@@ -291,15 +291,15 @@ public class KafkaShareConsumerMetricsTest {
         customMetrics.forEach((name, metric) -> assertDoesNotThrow(() -> 
consumer.unregisterMetricFromSubscription(metric)));
     }
 
-    private ConsumerMetadata createMetadata(SubscriptionState subscription) {
-        return new ConsumerMetadata(0, 0, Long.MAX_VALUE, false, false,
+    private ShareConsumerMetadata createMetadata(SubscriptionState 
subscription) {
+        return new ShareConsumerMetadata(0, 0, Long.MAX_VALUE, false,
                 subscription, new LogContext(), new 
ClusterResourceListeners());
     }
 
     private KafkaShareConsumer<String, String> newShareConsumer(Time time,
                                                                 KafkaClient 
client,
                                                                 
SubscriptionState subscription,
-                                                                
ConsumerMetadata metadata) {
+                                                                
ShareConsumerMetadata metadata) {
         return newShareConsumer(
                 time,
                 client,
@@ -313,7 +313,7 @@ public class KafkaShareConsumerMetricsTest {
     private KafkaShareConsumer<String, String> newShareConsumer(Time time,
                                                                 KafkaClient 
client,
                                                                 
SubscriptionState subscriptions,
-                                                                
ConsumerMetadata metadata,
+                                                                
ShareConsumerMetadata metadata,
                                                                 String groupId,
                                                                 
Optional<Deserializer<String>> valueDeserializerOpt) {
         String clientId = "mock-consumer";
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerTest.java
index 1a6d76dbabf..1346ea7fd09 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerTest.java
@@ -19,7 +19,7 @@ package org.apache.kafka.clients.consumer;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
-import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
+import org.apache.kafka.clients.consumer.internals.ShareConsumerMetadata;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicIdPartition;
@@ -91,7 +91,7 @@ public class KafkaShareConsumerTest {
 
     @Test
     public void testVerifyHeartbeats() throws InterruptedException {
-        ConsumerMetadata metadata = new ConsumerMetadata(0, 0, Long.MAX_VALUE, 
false, false,
+        ShareConsumerMetadata metadata = new ShareConsumerMetadata(0, 0, 
Long.MAX_VALUE, false,
             subscription, new LogContext(), new ClusterResourceListeners());
         MockClient client = new MockClient(time, metadata);
 
@@ -146,7 +146,7 @@ public class KafkaShareConsumerTest {
     // @Flaky("KAFKA-18488")
     @Test
     public void testVerifyFetchAndCommitSyncImplicit() {
-        ConsumerMetadata metadata = new ConsumerMetadata(0, 0, Long.MAX_VALUE, 
false, false,
+        ShareConsumerMetadata metadata = new ShareConsumerMetadata(0, 0, 
Long.MAX_VALUE, false,
             subscription, new LogContext(), new ClusterResourceListeners());
         MockClient client = new MockClient(time, metadata);
 
@@ -223,7 +223,7 @@ public class KafkaShareConsumerTest {
     //@Flaky("KAFKA-18794")
     @Test
     public void testVerifyFetchAndCloseImplicit() {
-        ConsumerMetadata metadata = new ConsumerMetadata(0, 0, Long.MAX_VALUE, 
false, false,
+        ShareConsumerMetadata metadata = new ShareConsumerMetadata(0, 0, 
Long.MAX_VALUE, false,
             subscription, new LogContext(), new ClusterResourceListeners());
         MockClient client = new MockClient(time, metadata);
 
@@ -279,7 +279,7 @@ public class KafkaShareConsumerTest {
     }
 
     private KafkaShareConsumer<String, String> newShareConsumer(String 
clientId,
-                                                                
ConsumerMetadata metadata,
+                                                                
ShareConsumerMetadata metadata,
                                                                 KafkaClient 
client) {
         LogContext logContext = new LogContext();
         Deserializer<String> keyDeserializer = new StringDeserializer();
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index a4268b7eca0..6c47aea4171 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -161,7 +161,7 @@ public class ShareConsumeRequestManagerTest {
     private final long defaultApiTimeoutMs = 60000;
     private MockTime time = new MockTime(1);
     private SubscriptionState subscriptions;
-    private ConsumerMetadata metadata;
+    private ShareConsumerMetadata metadata;
     private ShareFetchMetricsManager metricsManager;
     private MockClient client;
     private Metrics metrics;
@@ -2671,7 +2671,7 @@ public class ShareConsumeRequestManagerTest {
                                    LogContext logContext) {
         time = new MockTime(1, 0, 0);
         subscriptions = subscriptionState;
-        metadata = new ConsumerMetadata(0, 0, Long.MAX_VALUE, false, false,
+        metadata = new ShareConsumerMetadata(0, 0, Long.MAX_VALUE, false,
                 subscriptions, logContext, new ClusterResourceListeners());
         client = new MockClient(time, metadata);
         metrics = new Metrics(metricConfig, time);
@@ -2696,7 +2696,7 @@ public class ShareConsumeRequestManagerTest {
 
         public TestableShareConsumeRequestManager(LogContext logContext,
                                                   String groupId,
-                                                  ConsumerMetadata metadata,
+                                                  ShareConsumerMetadata 
metadata,
                                                   SubscriptionState 
subscriptions,
                                                   FetchConfig fetchConfig,
                                                   ShareFetchBuffer 
shareFetchBuffer,
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
index 5dddd0772df..edb244b06aa 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
@@ -102,7 +102,7 @@ public class ShareConsumerImplTest {
 
     private final Time time = new MockTime(1);
     private final ShareFetchCollector<String, String> fetchCollector = 
mock(ShareFetchCollector.class);
-    private final ConsumerMetadata metadata = mock(ConsumerMetadata.class);
+    private final ShareConsumerMetadata metadata = 
mock(ShareConsumerMetadata.class);
     private final ApplicationEventHandler applicationEventHandler = 
mock(ApplicationEventHandler.class);
     private final LinkedBlockingQueue<BackgroundEvent> backgroundEventQueue = 
new LinkedBlockingQueue<>();
     private final CompletableEventReaper backgroundEventReaper = 
mock(CompletableEventReaper.class);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
index 194d9b2a2c4..afac3a76d95 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
@@ -76,7 +76,7 @@ public class ShareFetchCollectorTest {
 
     private SubscriptionState subscriptions;
     private FetchConfig fetchConfig;
-    private ConsumerMetadata metadata;
+    private ShareConsumerMetadata metadata;
     private ShareFetchBuffer fetchBuffer;
     private Deserializers<String, String> deserializers;
     private ShareFetchCollector<String, String> fetchCollector;
@@ -246,12 +246,11 @@ public class ShareFetchCollectorTest {
         subscriptions = createSubscriptionState(config, logContext);
         fetchConfig = new FetchConfig(config);
 
-        metadata = new ConsumerMetadata(
+        metadata = new ShareConsumerMetadata(
                 0,
                 1000,
                 10000,
                 false,
-                false,
                 subscriptions,
                 logContext,
                 new ClusterResourceListeners());


Reply via email to