This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a605c9541c7 MINOR: Separate share consumer metadata class (#20700)
a605c9541c7 is described below
commit a605c9541c74bfb75dc2548a495dce4660feddec
Author: Andrew Schofield <[email protected]>
AuthorDate: Fri Oct 17 11:52:33 2025 +0100
MINOR: Separate share consumer metadata class (#20700)
As part of separating share consumer infrastructure from the regular
consumer, this makes a separate metadata class for share consumers.
Reviewers: Lianet Magrans <[email protected]>
---
.../kafka/clients/consumer/KafkaShareConsumer.java | 4 +-
.../internals/AbstractMembershipManager.java | 5 +-
.../clients/consumer/internals/FetchUtils.java | 3 +-
.../consumer/internals/NetworkClientDelegate.java | 2 +-
.../consumer/internals/RequestManagers.java | 2 +-
.../internals/ShareConsumeRequestManager.java | 4 +-
.../internals/ShareConsumerDelegateCreator.java | 2 +-
.../consumer/internals/ShareConsumerImpl.java | 10 +--
.../consumer/internals/ShareConsumerMetadata.java | 78 ++++++++++++++++++++++
.../consumer/internals/ShareFetchCollector.java | 4 +-
.../consumer/internals/ShareMembershipManager.java | 5 +-
.../events/ApplicationEventProcessor.java | 8 +--
.../consumer/KafkaShareConsumerMetricsTest.java | 28 ++++----
.../clients/consumer/KafkaShareConsumerTest.java | 10 +--
.../internals/ShareConsumeRequestManagerTest.java | 6 +-
.../consumer/internals/ShareConsumerImplTest.java | 2 +-
.../internals/ShareFetchCollectorTest.java | 5 +-
17 files changed, 129 insertions(+), 49 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
index 7f3bad2e318..b18ec93ec7b 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
@@ -17,9 +17,9 @@
package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.KafkaClient;
-import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ShareConsumerDelegate;
import
org.apache.kafka.clients.consumer.internals.ShareConsumerDelegateCreator;
+import org.apache.kafka.clients.consumer.internals.ShareConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import
org.apache.kafka.clients.consumer.internals.metrics.KafkaShareConsumerMetrics;
import org.apache.kafka.common.KafkaException;
@@ -392,7 +392,7 @@ public class KafkaShareConsumer<K, V> implements
ShareConsumer<K, V> {
final Time time,
final KafkaClient client,
final SubscriptionState subscriptions,
- final ConsumerMetadata metadata) {
+ final ShareConsumerMetadata metadata) {
delegate = CREATOR.create(
logContext, clientId, groupId, config, keyDeserializer,
valueDeserializer,
time, client, subscriptions, metadata);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
index ffe01c089e7..c93a844ff9b 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer.internals;
+import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.CloseOptions;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -108,7 +109,7 @@ public abstract class AbstractMembershipManager<R extends
AbstractResponse> impl
/**
* Metadata that allows us to create the partitions needed for {@link
ConsumerRebalanceListener}.
*/
- private final ConsumerMetadata metadata;
+ private final Metadata metadata;
/**
* Logger.
@@ -204,7 +205,7 @@ public abstract class AbstractMembershipManager<R extends
AbstractResponse> impl
AbstractMembershipManager(String groupId,
SubscriptionState subscriptions,
- ConsumerMetadata metadata,
+ Metadata metadata,
Logger log,
Time time,
RebalanceMetricsManager metricsManager,
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchUtils.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchUtils.java
index 0b2faa58e7d..55083da20b4 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchUtils.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchUtils.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer.internals;
+import org.apache.kafka.clients.Metadata;
import org.apache.kafka.common.TopicPartition;
/**
@@ -44,7 +45,7 @@ public class FetchUtils {
* @param subscriptions {@link SubscriptionState} to clear any internal
read replica node
* @param topicPartition {@link TopicPartition} for which this state
change is related
*/
- static void requestMetadataUpdate(final ConsumerMetadata metadata,
+ static void requestMetadataUpdate(final Metadata metadata,
final SubscriptionState subscriptions,
final TopicPartition topicPartition) {
metadata.requestUpdate(false);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
index 3c280e39d02..e85f244c804 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
@@ -445,7 +445,7 @@ public class NetworkClientDelegate implements AutoCloseable
{
*/
public static Supplier<NetworkClientDelegate> supplier(final Time time,
final LogContext
logContext,
- final
ConsumerMetadata metadata,
+ final Metadata
metadata,
final
ConsumerConfig config,
final ApiVersions
apiVersions,
final Metrics
metrics,
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
index ae39753f3d8..4c6d8b17b5a 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
@@ -318,7 +318,7 @@ public class RequestManagers implements Closeable {
public static Supplier<RequestManagers> supplier(final Time time,
final LogContext
logContext,
final
BackgroundEventHandler backgroundEventHandler,
- final ConsumerMetadata
metadata,
+ final
ShareConsumerMetadata metadata,
final SubscriptionState
subscriptions,
final ShareFetchBuffer
fetchBuffer,
final ConsumerConfig
config,
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index 51e3fb39dfb..647b5b7e4b4 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -78,7 +78,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
private final Logger log;
private final LogContext logContext;
private final String groupId;
- private final ConsumerMetadata metadata;
+ private final ShareConsumerMetadata metadata;
private final SubscriptionState subscriptions;
private final FetchConfig fetchConfig;
protected final ShareFetchBuffer shareFetchBuffer;
@@ -103,7 +103,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
ShareConsumeRequestManager(final Time time,
final LogContext logContext,
final String groupId,
- final ConsumerMetadata metadata,
+ final ShareConsumerMetadata metadata,
final SubscriptionState subscriptions,
final FetchConfig fetchConfig,
final ShareFetchBuffer shareFetchBuffer,
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerDelegateCreator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerDelegateCreator.java
index 9eb5fd13699..e840a7da3dc 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerDelegateCreator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerDelegateCreator.java
@@ -63,7 +63,7 @@ public class ShareConsumerDelegateCreator {
final Time time,
final KafkaClient client,
final SubscriptionState
subscriptions,
- final ConsumerMetadata
metadata) {
+ final
ShareConsumerMetadata metadata) {
try {
Logger log = logContext.logger(getClass());
log.warn("Share groups and KafkaShareConsumer are part of a
preview feature introduced by KIP-932, and are not recommended for use in
production.");
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index 12b01b5482e..79697503256 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -189,7 +189,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
private final ShareFetchCollector<K, V> fetchCollector;
private final SubscriptionState subscriptions;
- private final ConsumerMetadata metadata;
+ private final ShareConsumerMetadata metadata;
private final Metrics metrics;
private final int requestTimeoutMs;
private final int defaultApiTimeoutMs;
@@ -257,7 +257,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
ClusterResourceListeners clusterResourceListeners =
ClientUtils.configureClusterResourceListeners(
metrics.reporters(),
Arrays.asList(deserializers.keyDeserializer(),
deserializers.valueDeserializer()));
- this.metadata = new ConsumerMetadata(config, subscriptions,
logContext, clusterResourceListeners);
+ this.metadata = new ShareConsumerMetadata(config, subscriptions,
logContext, clusterResourceListeners);
final List<InetSocketAddress> addresses =
ClientUtils.parseAndValidateAddresses(config);
metadata.bootstrap(addresses);
@@ -350,7 +350,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
final Time time,
final KafkaClient client,
final SubscriptionState subscriptions,
- final ConsumerMetadata metadata) {
+ final ShareConsumerMetadata metadata) {
this.clientId = clientId;
this.groupId = groupId;
this.log = logContext.logger(getClass());
@@ -441,7 +441,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
final CompletableEventReaper backgroundEventReaper,
final Metrics metrics,
final SubscriptionState subscriptions,
- final ConsumerMetadata metadata,
+ final ShareConsumerMetadata metadata,
final int requestTimeoutMs,
final int defaultApiTimeoutMs,
final String groupId,
@@ -492,7 +492,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
ShareFetchCollector<K, V> build(
final LogContext logContext,
- final ConsumerMetadata metadata,
+ final ShareConsumerMetadata metadata,
final SubscriptionState subscriptions,
final FetchConfig fetchConfig,
final Deserializers<K, V> deserializers
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerMetadata.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerMetadata.java
new file mode 100644
index 00000000000..20b99540e19
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerMetadata.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.utils.LogContext;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ShareConsumerMetadata extends Metadata {
+ private final boolean allowAutoTopicCreation;
+ private final SubscriptionState subscription;
+
+ public ShareConsumerMetadata(long refreshBackoffMs,
+ long refreshBackoffMaxMs,
+ long metadataExpireMs,
+ boolean allowAutoTopicCreation,
+ SubscriptionState subscription,
+ LogContext logContext,
+ ClusterResourceListeners
clusterResourceListeners) {
+ super(refreshBackoffMs, refreshBackoffMaxMs, metadataExpireMs,
logContext, clusterResourceListeners);
+ this.allowAutoTopicCreation = allowAutoTopicCreation;
+ this.subscription = subscription;
+ }
+
+ public ShareConsumerMetadata(ConsumerConfig config,
+ SubscriptionState subscriptions,
+ LogContext logContext,
+ ClusterResourceListeners
clusterResourceListeners) {
+ this(config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG),
+ config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG),
+ config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
+ config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
+ subscriptions,
+ logContext,
+ clusterResourceListeners);
+ }
+
+ public boolean allowAutoTopicCreation() {
+ return allowAutoTopicCreation;
+ }
+
+ /**
+ * Constructs a metadata request builder for fetching cluster metadata for
the topics the share consumer needs.
+ */
+ @Override
+ public synchronized MetadataRequest.Builder newMetadataRequestBuilder() {
+ List<String> topics = new ArrayList<>();
+ topics.addAll(subscription.metadataTopics());
+ return MetadataRequest.Builder.forTopicNames(topics,
allowAutoTopicCreation);
+ }
+
+ /**
+ * Check if the metadata for the topic should be retained, based on the
topic name.
+ */
+ @Override
+ public synchronized boolean retainTopic(String topic, boolean isInternal,
long nowMs) {
+ return subscription.needsMetadata(topic);
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollector.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollector.java
index c2a17d051b1..8861824600e 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollector.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollector.java
@@ -42,13 +42,13 @@ import static
org.apache.kafka.clients.consumer.internals.FetchUtils.requestMeta
public class ShareFetchCollector<K, V> {
private final Logger log;
- private final ConsumerMetadata metadata;
+ private final ShareConsumerMetadata metadata;
private final SubscriptionState subscriptions;
private final FetchConfig fetchConfig;
private final Deserializers<K, V> deserializers;
public ShareFetchCollector(final LogContext logContext,
- final ConsumerMetadata metadata,
+ final ShareConsumerMetadata metadata,
final SubscriptionState subscriptions,
final FetchConfig fetchConfig,
final Deserializers<K, V> deserializers) {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java
index 47ab87edb35..130656f5b58 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer.internals;
+import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import
org.apache.kafka.clients.consumer.internals.metrics.ShareRebalanceMetricsManager;
import org.apache.kafka.common.Uuid;
@@ -81,7 +82,7 @@ public class ShareMembershipManager extends
AbstractMembershipManager<ShareGroup
String groupId,
String rackId,
SubscriptionState subscriptions,
- ConsumerMetadata metadata,
+ Metadata metadata,
Time time,
Metrics metrics) {
this(logContext,
@@ -98,7 +99,7 @@ public class ShareMembershipManager extends
AbstractMembershipManager<ShareGroup
String groupId,
String rackId,
SubscriptionState subscriptions,
- ConsumerMetadata metadata,
+ Metadata metadata,
Time time,
ShareRebalanceMetricsManager metricsManager) {
super(groupId,
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 853c5484df5..31eef662ce8 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -16,11 +16,11 @@
*/
package org.apache.kafka.clients.consumer.internals.events;
+import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.Acknowledgements;
import org.apache.kafka.clients.consumer.internals.CachedSupplier;
import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
-import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
import org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal;
import org.apache.kafka.clients.consumer.internals.RequestManagers;
@@ -56,14 +56,14 @@ import java.util.stream.Collectors;
public class ApplicationEventProcessor implements
EventProcessor<ApplicationEvent> {
private final Logger log;
- private final ConsumerMetadata metadata;
+ private final Metadata metadata;
private final SubscriptionState subscriptions;
private final RequestManagers requestManagers;
private int metadataVersionSnapshot;
public ApplicationEventProcessor(final LogContext logContext,
final RequestManagers requestManagers,
- final ConsumerMetadata metadata,
+ final Metadata metadata,
final SubscriptionState subscriptions) {
this.log = logContext.logger(ApplicationEventProcessor.class);
this.requestManagers = requestManagers;
@@ -740,7 +740,7 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
* {@link ConsumerNetworkThread}.
*/
public static Supplier<ApplicationEventProcessor> supplier(final
LogContext logContext,
- final
ConsumerMetadata metadata,
+ final Metadata
metadata,
final
SubscriptionState subscriptions,
final
Supplier<RequestManagers> requestManagersSupplier) {
return new CachedSupplier<>() {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerMetricsTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerMetricsTest.java
index a5417c3e00f..3dbe0a01cbb 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerMetricsTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerMetricsTest.java
@@ -20,8 +20,8 @@ import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
-import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ShareConsumerImpl;
+import org.apache.kafka.clients.consumer.internals.ShareConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
@@ -80,7 +80,7 @@ public class KafkaShareConsumerMetricsTest {
@Test
public void testPollTimeMetrics() {
- ConsumerMetadata metadata = createMetadata(subscription);
+ ShareConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
@@ -125,7 +125,7 @@ public class KafkaShareConsumerMetricsTest {
@Test
public void testPollIdleRatio() {
- ConsumerMetadata metadata = createMetadata(subscription);
+ ShareConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
@@ -172,7 +172,7 @@ public class KafkaShareConsumerMetricsTest {
@Test
public void testClosingConsumerUnregistersConsumerMetrics() {
Time time = new MockTime(1L);
- ConsumerMetadata metadata = createMetadata(subscription);
+ ShareConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
@@ -190,7 +190,7 @@ public class KafkaShareConsumerMetricsTest {
@Test
public void testRegisteringCustomMetricsDoesntAffectConsumerMetrics() {
Time time = new MockTime(1L);
- ConsumerMetadata metadata = createMetadata(subscription);
+ ShareConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
@@ -207,7 +207,7 @@ public class KafkaShareConsumerMetricsTest {
try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister()) {
appender.setClassLogger(ShareConsumerImpl.class, Level.DEBUG);
Time time = new MockTime(1L);
- ConsumerMetadata metadata = createMetadata(subscription);
+ ShareConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
@@ -224,7 +224,7 @@ public class KafkaShareConsumerMetricsTest {
try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister()) {
appender.setClassLogger(ShareConsumerImpl.class, Level.DEBUG);
Time time = new MockTime(1L);
- ConsumerMetadata metadata = createMetadata(subscription);
+ ShareConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
@@ -244,7 +244,7 @@ public class KafkaShareConsumerMetricsTest {
mockedCommonClientConfigs.when(() ->
CommonClientConfigs.telemetryReporter(anyString(),
any())).thenReturn(Optional.of(clientTelemetryReporter));
Time time = new MockTime(1L);
- ConsumerMetadata metadata = createMetadata(subscription);
+ ShareConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
@@ -265,7 +265,7 @@ public class KafkaShareConsumerMetricsTest {
mockedCommonClientConfigs.when(() ->
CommonClientConfigs.telemetryReporter(anyString(),
any())).thenReturn(Optional.of(clientTelemetryReporter));
Time time = new MockTime(1L);
- ConsumerMetadata metadata = createMetadata(subscription);
+ ShareConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
@@ -280,7 +280,7 @@ public class KafkaShareConsumerMetricsTest {
@Test
public void testUnregisteringNonexistingMetricsDoesntCauseError() {
Time time = new MockTime(1L);
- ConsumerMetadata metadata = createMetadata(subscription);
+ ShareConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
@@ -291,15 +291,15 @@ public class KafkaShareConsumerMetricsTest {
customMetrics.forEach((name, metric) -> assertDoesNotThrow(() ->
consumer.unregisterMetricFromSubscription(metric)));
}
- private ConsumerMetadata createMetadata(SubscriptionState subscription) {
- return new ConsumerMetadata(0, 0, Long.MAX_VALUE, false, false,
+ private ShareConsumerMetadata createMetadata(SubscriptionState
subscription) {
+ return new ShareConsumerMetadata(0, 0, Long.MAX_VALUE, false,
subscription, new LogContext(), new
ClusterResourceListeners());
}
private KafkaShareConsumer<String, String> newShareConsumer(Time time,
KafkaClient
client,
SubscriptionState subscription,
-
ConsumerMetadata metadata) {
+
ShareConsumerMetadata metadata) {
return newShareConsumer(
time,
client,
@@ -313,7 +313,7 @@ public class KafkaShareConsumerMetricsTest {
private KafkaShareConsumer<String, String> newShareConsumer(Time time,
KafkaClient
client,
SubscriptionState subscriptions,
-
ConsumerMetadata metadata,
+
ShareConsumerMetadata metadata,
String groupId,
Optional<Deserializer<String>> valueDeserializerOpt) {
String clientId = "mock-consumer";
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerTest.java
index 1a6d76dbabf..1346ea7fd09 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerTest.java
@@ -19,7 +19,7 @@ package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
-import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
+import org.apache.kafka.clients.consumer.internals.ShareConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicIdPartition;
@@ -91,7 +91,7 @@ public class KafkaShareConsumerTest {
@Test
public void testVerifyHeartbeats() throws InterruptedException {
- ConsumerMetadata metadata = new ConsumerMetadata(0, 0, Long.MAX_VALUE,
false, false,
+ ShareConsumerMetadata metadata = new ShareConsumerMetadata(0, 0,
Long.MAX_VALUE, false,
subscription, new LogContext(), new ClusterResourceListeners());
MockClient client = new MockClient(time, metadata);
@@ -146,7 +146,7 @@ public class KafkaShareConsumerTest {
// @Flaky("KAFKA-18488")
@Test
public void testVerifyFetchAndCommitSyncImplicit() {
- ConsumerMetadata metadata = new ConsumerMetadata(0, 0, Long.MAX_VALUE,
false, false,
+ ShareConsumerMetadata metadata = new ShareConsumerMetadata(0, 0,
Long.MAX_VALUE, false,
subscription, new LogContext(), new ClusterResourceListeners());
MockClient client = new MockClient(time, metadata);
@@ -223,7 +223,7 @@ public class KafkaShareConsumerTest {
//@Flaky("KAFKA-18794")
@Test
public void testVerifyFetchAndCloseImplicit() {
- ConsumerMetadata metadata = new ConsumerMetadata(0, 0, Long.MAX_VALUE,
false, false,
+ ShareConsumerMetadata metadata = new ShareConsumerMetadata(0, 0,
Long.MAX_VALUE, false,
subscription, new LogContext(), new ClusterResourceListeners());
MockClient client = new MockClient(time, metadata);
@@ -279,7 +279,7 @@ public class KafkaShareConsumerTest {
}
private KafkaShareConsumer<String, String> newShareConsumer(String
clientId,
-
ConsumerMetadata metadata,
+
ShareConsumerMetadata metadata,
KafkaClient
client) {
LogContext logContext = new LogContext();
Deserializer<String> keyDeserializer = new StringDeserializer();
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index a4268b7eca0..6c47aea4171 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -161,7 +161,7 @@ public class ShareConsumeRequestManagerTest {
private final long defaultApiTimeoutMs = 60000;
private MockTime time = new MockTime(1);
private SubscriptionState subscriptions;
- private ConsumerMetadata metadata;
+ private ShareConsumerMetadata metadata;
private ShareFetchMetricsManager metricsManager;
private MockClient client;
private Metrics metrics;
@@ -2671,7 +2671,7 @@ public class ShareConsumeRequestManagerTest {
LogContext logContext) {
time = new MockTime(1, 0, 0);
subscriptions = subscriptionState;
- metadata = new ConsumerMetadata(0, 0, Long.MAX_VALUE, false, false,
+ metadata = new ShareConsumerMetadata(0, 0, Long.MAX_VALUE, false,
subscriptions, logContext, new ClusterResourceListeners());
client = new MockClient(time, metadata);
metrics = new Metrics(metricConfig, time);
@@ -2696,7 +2696,7 @@ public class ShareConsumeRequestManagerTest {
public TestableShareConsumeRequestManager(LogContext logContext,
String groupId,
- ConsumerMetadata metadata,
+ ShareConsumerMetadata
metadata,
SubscriptionState
subscriptions,
FetchConfig fetchConfig,
ShareFetchBuffer
shareFetchBuffer,
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
index 5dddd0772df..edb244b06aa 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
@@ -102,7 +102,7 @@ public class ShareConsumerImplTest {
private final Time time = new MockTime(1);
private final ShareFetchCollector<String, String> fetchCollector =
mock(ShareFetchCollector.class);
- private final ConsumerMetadata metadata = mock(ConsumerMetadata.class);
+ private final ShareConsumerMetadata metadata =
mock(ShareConsumerMetadata.class);
private final ApplicationEventHandler applicationEventHandler =
mock(ApplicationEventHandler.class);
private final LinkedBlockingQueue<BackgroundEvent> backgroundEventQueue =
new LinkedBlockingQueue<>();
private final CompletableEventReaper backgroundEventReaper =
mock(CompletableEventReaper.class);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
index 194d9b2a2c4..afac3a76d95 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
@@ -76,7 +76,7 @@ public class ShareFetchCollectorTest {
private SubscriptionState subscriptions;
private FetchConfig fetchConfig;
- private ConsumerMetadata metadata;
+ private ShareConsumerMetadata metadata;
private ShareFetchBuffer fetchBuffer;
private Deserializers<String, String> deserializers;
private ShareFetchCollector<String, String> fetchCollector;
@@ -246,12 +246,11 @@ public class ShareFetchCollectorTest {
subscriptions = createSubscriptionState(config, logContext);
fetchConfig = new FetchConfig(config);
- metadata = new ConsumerMetadata(
+ metadata = new ShareConsumerMetadata(
0,
1000,
10000,
false,
- false,
subscriptions,
logContext,
new ClusterResourceListeners());