This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7d89bdc3f0d KAFKA-14960: TopicMetadata request manager (#14386)
7d89bdc3f0d is described below

commit 7d89bdc3f0d1b7c477f728449841c45fcfbb5724
Author: Philip Nee <[email protected]>
AuthorDate: Thu Sep 21 08:07:21 2023 -0700

    KAFKA-14960: TopicMetadata request manager (#14386)
    
    TopicMetadataRequestManager is responsible for sending topic metadata 
requests. The manager manages API requests and build the request accordingly. 
All topic metadata requests are chained, if requesting the same topic, to avoid 
sending requests repeatedly.
    
    Co-authored-by: Lianet Magrans <[email protected]>
    Co-authored-by: Kirk True <[email protected]>
    
    Reviewers: Kirk True <[email protected]>, Lianet Magrans 
<[email protected]>, Jun Rao <[email protected]>
---
 .../consumer/internals/CommitRequestManager.java   |  73 +++----
 .../internals/CoordinatorRequestManager.java       |   6 +-
 .../internals/DefaultBackgroundThread.java         |  26 ++-
 .../consumer/internals/NetworkClientDelegate.java  |  22 +-
 .../consumer/internals/OffsetsRequestManager.java  |   2 +-
 .../consumer/internals/PrototypeAsyncConsumer.java |   1 -
 .../consumer/internals/RequestManagers.java        |   4 +
 .../internals/TopicMetadataRequestManager.java     | 233 +++++++++++++++++++++
 .../internals/events/ApplicationEvent.java         |   2 +-
 .../events/ApplicationEventProcessor.java          |  20 +-
 .../internals/events/CommitApplicationEvent.java   |   1 +
 ...ent.java => TopicMetadataApplicationEvent.java} |  51 ++---
 .../internals/CommitRequestManagerTest.java        |   2 +-
 .../internals/DefaultBackgroundThreadTest.java     |  59 ++++--
 .../internals/TopicMetadataRequestManagerTest.java | 221 +++++++++++++++++++
 15 files changed, 600 insertions(+), 123 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
index 83672fbc080..64fc41d3f9e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
-import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
@@ -129,7 +128,7 @@ public class CommitRequestManager implements RequestManager 
{
      * Handles {@link 
org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent}. It 
creates an
      * {@link OffsetCommitRequestState} and enqueue it to send later.
      */
-    public CompletableFuture<ClientResponse> addOffsetCommitRequest(final 
Map<TopicPartition, OffsetAndMetadata> offsets) {
+    public CompletableFuture<Void> addOffsetCommitRequest(final 
Map<TopicPartition, OffsetAndMetadata> offsets) {
         return pendingRequests.addOffsetCommitRequest(offsets);
     }
 
@@ -145,19 +144,13 @@ public class CommitRequestManager implements 
RequestManager {
         this.autoCommitState.ifPresent(t -> t.ack(currentTimeMs));
     }
 
-
-    // Visible for testing
-    List<OffsetFetchRequestState> unsentOffsetFetchRequests() {
-        return pendingRequests.unsentOffsetFetches;
-    }
-
     // Visible for testing
     Queue<OffsetCommitRequestState> unsentOffsetCommitRequests() {
         return pendingRequests.unsentOffsetCommits;
     }
 
     // Visible for testing
-    CompletableFuture<ClientResponse> sendAutoCommit(final Map<TopicPartition, 
OffsetAndMetadata> allConsumedOffsets) {
+    CompletableFuture<Void> sendAutoCommit(final Map<TopicPartition, 
OffsetAndMetadata> allConsumedOffsets) {
         log.debug("Enqueuing autocommit offsets: {}", allConsumedOffsets);
         return this.addOffsetCommitRequest(allConsumedOffsets)
                 .whenComplete((response, throwable) -> {
@@ -182,23 +175,19 @@ public class CommitRequestManager implements 
RequestManager {
         private final String groupId;
         private final GroupState.Generation generation;
         private final String groupInstanceId;
-        private final NetworkClientDelegate.FutureCompletionHandler future;
+        private final CompletableFuture<Void> future;
 
         public OffsetCommitRequestState(final Map<TopicPartition, 
OffsetAndMetadata> offsets,
                                         final String groupId,
                                         final String groupInstanceId,
                                         final GroupState.Generation 
generation) {
             this.offsets = offsets;
-            this.future = new NetworkClientDelegate.FutureCompletionHandler();
+            this.future = new CompletableFuture<>();
             this.groupId = groupId;
             this.generation = generation;
             this.groupInstanceId = groupInstanceId;
         }
 
-        public CompletableFuture<ClientResponse> future() {
-            return future.future();
-        }
-
         public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
             Map<String, OffsetCommitRequestData.OffsetCommitRequestTopic> 
requestTopicDataMap = new HashMap<>();
             for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : 
offsets.entrySet()) {
@@ -230,15 +219,20 @@ public class CommitRequestManager implements 
RequestManager {
             return new NetworkClientDelegate.UnsentRequest(
                     builder,
                     coordinatorRequestManager.coordinator(),
-                    future);
+                    (response, throwable) -> {
+                        if (throwable == null) {
+                            future.complete(null);
+                        } else {
+                            future.completeExceptionally(throwable);
+                        }
+                    });
         }
     }
 
     private class OffsetFetchRequestState extends RequestState {
         public final Set<TopicPartition> requestedPartitions;
         public final GroupState.Generation requestedGeneration;
-        public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
future;
-
+        private final CompletableFuture<Map<TopicPartition, 
OffsetAndMetadata>> future;
         public OffsetFetchRequestState(final Set<TopicPartition> partitions,
                                        final GroupState.Generation generation,
                                        final long retryBackoffMs,
@@ -253,19 +247,16 @@ public class CommitRequestManager implements 
RequestManager {
             return Objects.equals(requestedGeneration, 
request.requestedGeneration) && 
requestedPartitions.equals(request.requestedPartitions);
         }
 
-        public NetworkClientDelegate.UnsentRequest toUnsentRequest(final long 
currentTimeMs) {
+        public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
             OffsetFetchRequest.Builder builder = new 
OffsetFetchRequest.Builder(
                     groupState.groupId,
                     true,
                     new ArrayList<>(this.requestedPartitions),
                     throwOnFetchStableOffsetUnsupported);
-            NetworkClientDelegate.UnsentRequest unsentRequest = new 
NetworkClientDelegate.UnsentRequest(
+            return new NetworkClientDelegate.UnsentRequest(
                     builder,
-                    coordinatorRequestManager.coordinator());
-            unsentRequest.future().whenComplete((r, t) -> {
-                onResponse(currentTimeMs, (OffsetFetchResponse) 
r.responseBody());
-            });
-            return unsentRequest;
+                    coordinatorRequestManager.coordinator(),
+                    (r, t) -> onResponse(r.receivedTimeMs(), 
(OffsetFetchResponse) r.responseBody()));
         }
 
         public void onResponse(
@@ -359,12 +350,12 @@ public class CommitRequestManager implements 
RequestManager {
             }
         }
 
-        private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
chainFuture(final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
future) {
+        private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
chainFuture(final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
otherFuture) {
             return this.future.whenComplete((r, t) -> {
                 if (t != null) {
-                    future.completeExceptionally(t);
+                    otherFuture.completeExceptionally(t);
                 } else {
-                    future.complete(r);
+                    otherFuture.complete(r);
                 }
             });
         }
@@ -384,8 +375,8 @@ public class CommitRequestManager implements RequestManager 
{
      * <p>This is used to stage the unsent {@link OffsetCommitRequestState} 
and {@link OffsetFetchRequestState}.
      * <li>unsentOffsetCommits holds the offset commit requests that have not 
been sent out</>
      * <li>unsentOffsetFetches holds the offset fetch requests that have not 
been sent out</li>
-     * <li>inflightOffsetFetches holds the offset fetch requests that have 
been sent out but incompleted</>.
-     *
+     * <li>inflightOffsetFetches holds the offset fetch requests that have 
been sent out but not completed</>.
+     * <p>
      * {@code addOffsetFetchRequest} dedupes the requests to avoid sending the 
same requests.
      */
 
@@ -395,11 +386,12 @@ public class CommitRequestManager implements 
RequestManager {
         List<OffsetFetchRequestState> unsentOffsetFetches = new ArrayList<>();
         List<OffsetFetchRequestState> inflightOffsetFetches = new 
ArrayList<>();
 
-        public boolean hasUnsentRequests() {
+        // Visible for testing
+        boolean hasUnsentRequests() {
             return !unsentOffsetCommits.isEmpty() || 
!unsentOffsetFetches.isEmpty();
         }
 
-        public CompletableFuture<ClientResponse> addOffsetCommitRequest(final 
Map<TopicPartition, OffsetAndMetadata> offsets) {
+        CompletableFuture<Void> addOffsetCommitRequest(final 
Map<TopicPartition, OffsetAndMetadata> offsets) {
             // TODO: Dedupe committing the same offsets to the same partitions
             OffsetCommitRequestState request = new OffsetCommitRequestState(
                     offsets,
@@ -407,15 +399,15 @@ public class CommitRequestManager implements 
RequestManager {
                     groupState.groupInstanceId.orElse(null),
                     groupState.generation);
             unsentOffsetCommits.add(request);
-            return request.future();
+            return request.future;
         }
 
         /**
-         *  <p>Adding an offset fetch request to the outgoing buffer.  If the 
same request was made, we chain the future
-         *  to the existing one.
+         * <p>Adding an offset fetch request to the outgoing buffer.  If the 
same request was made, we chain the future
+         * to the existing one.
          *
-         *  <p>If the request is new, it invokes a callback to remove itself 
from the {@code inflightOffsetFetches}
-         *  upon completion.</>
+         * <p>If the request is new, it invokes a callback to remove itself 
from the {@code inflightOffsetFetches}
+         * upon completion.
          */
         private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
addOffsetFetchRequest(final OffsetFetchRequestState request) {
             Optional<OffsetFetchRequestState> dupe =
@@ -450,12 +442,11 @@ public class CommitRequestManager implements 
RequestManager {
 
         /**
          * Clear {@code unsentOffsetCommits} and moves all the sendable 
request in {@code unsentOffsetFetches} to the
-         * {@code inflightOffsetFetches} to bookkeep all of the inflight 
requests.
-         *
+         * {@code inflightOffsetFetches} to bookkeep all the inflight requests.
          * Note: Sendable requests are determined by their timer as we are 
expecting backoff on failed attempt. See
          * {@link RequestState}.
          **/
-        public List<NetworkClientDelegate.UnsentRequest> drain(final long 
currentTimeMs) {
+        List<NetworkClientDelegate.UnsentRequest> drain(final long 
currentTimeMs) {
             List<NetworkClientDelegate.UnsentRequest> unsentRequests = new 
ArrayList<>();
 
             // Add all unsent offset commit requests to the unsentRequests list
@@ -472,7 +463,7 @@ public class CommitRequestManager implements RequestManager 
{
             // Add all sendable offset fetch requests to the unsentRequests 
list and to the inflightOffsetFetches list
             for (OffsetFetchRequestState request : 
partitionedBySendability.get(true)) {
                 request.onSendAttempt(currentTimeMs);
-                unsentRequests.add(request.toUnsentRequest(currentTimeMs));
+                unsentRequests.add(request.toUnsentRequest());
                 inflightOffsetFetches.add(request);
             }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
index 241760d4c22..c119a619892 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
@@ -38,8 +38,8 @@ import java.util.Optional;
  * Whether there is an existing coordinator.
  * Whether there is an inflight request.
  * Whether the backoff timer has expired.
- * The {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
contains either a wait timer
- * or a singleton list of {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}.
+ * The {@link NetworkClientDelegate.PollResult} contains either a wait timer
+ * or a singleton list of {@link NetworkClientDelegate.UnsentRequest}.
  * <p/>
  * The {@link FindCoordinatorRequest} will be handled by the {@link 
#onResponse(long, FindCoordinatorResponse)}  callback, which
  * subsequently invokes {@code onResponse} to handle the exception and 
response. Note that the coordinator node will be
@@ -86,7 +86,7 @@ public class CoordinatorRequestManager implements 
RequestManager {
      * Note that this method does not involve any actual network IO, and it 
only determines if we need to send a new request or not.
      *
      * @param currentTimeMs current time in ms.
-     * @return {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult}. 
This will not be {@code null}.
+     * @return {@link NetworkClientDelegate.PollResult}. This will not be 
{@code null}.
      */
     @Override
     public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
index 9b4cd89361f..8150f56debb 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
@@ -77,6 +77,7 @@ public class DefaultBackgroundThread extends KafkaThread {
     private final RequestManagers requestManagers;
 
     // Visible for testing
+    @SuppressWarnings("ParameterNumber")
     DefaultBackgroundThread(final Time time,
                             final ConsumerConfig config,
                             final LogContext logContext,
@@ -89,7 +90,8 @@ public class DefaultBackgroundThread extends KafkaThread {
                             final GroupState groupState,
                             final CoordinatorRequestManager coordinatorManager,
                             final CommitRequestManager commitRequestManager,
-                            final OffsetsRequestManager offsetsRequestManager) 
{
+                            final OffsetsRequestManager offsetsRequestManager,
+                            final TopicMetadataRequestManager 
topicMetadataRequestManager) {
         super(BACKGROUND_THREAD_NAME, true);
         this.time = time;
         this.running = true;
@@ -102,9 +104,9 @@ public class DefaultBackgroundThread extends KafkaThread {
         this.networkClientDelegate = networkClient;
         this.errorEventHandler = errorEventHandler;
         this.groupState = groupState;
-
         this.requestManagers = new RequestManagers(
                 offsetsRequestManager,
+                topicMetadataRequestManager,
                 Optional.ofNullable(coordinatorManager),
                 Optional.ofNullable(commitRequestManager));
     }
@@ -169,6 +171,9 @@ public class DefaultBackgroundThread extends KafkaThread {
                             logContext);
             CoordinatorRequestManager coordinatorRequestManager = null;
             CommitRequestManager commitRequestManager = null;
+            TopicMetadataRequestManager topicMetadataRequestManger = new 
TopicMetadataRequestManager(
+                logContext,
+                config);
 
             if (groupState.groupId != null) {
                 coordinatorRequestManager = new CoordinatorRequestManager(
@@ -188,15 +193,14 @@ public class DefaultBackgroundThread extends KafkaThread {
             }
 
             this.requestManagers = new RequestManagers(
-                    offsetsRequestManager,
-                    Optional.ofNullable(coordinatorRequestManager),
-                    Optional.ofNullable(commitRequestManager));
-
+                offsetsRequestManager,
+                topicMetadataRequestManger,
+                Optional.ofNullable(coordinatorRequestManager),
+                Optional.ofNullable(commitRequestManager));
             this.applicationEventProcessor = new ApplicationEventProcessor(
-                    backgroundEventQueue,
-                    requestManagers,
-                    metadata);
-
+                backgroundEventQueue,
+                requestManagers,
+                metadata);
         } catch (final Exception e) {
             close();
             throw new KafkaException("Failed to construct background 
processor", e.getCause());
@@ -217,7 +221,7 @@ public class DefaultBackgroundThread extends KafkaThread {
             }
         } catch (final Throwable t) {
             log.error("The background thread failed due to unexpected error", 
t);
-            throw new RuntimeException(t);
+            throw new KafkaException(t);
         } finally {
             close();
             log.debug("{} closed", getClass());
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
index 445005bf545..24b6b942481 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
@@ -43,6 +43,7 @@ import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
 
 /**
  * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to 
handle network poll and send operations.
@@ -211,17 +212,19 @@ public class NetworkClientDelegate implements 
AutoCloseable {
         private Optional<Node> node; // empty if random node can be chosen
         private Timer timer;
 
-        public UnsentRequest(final AbstractRequest.Builder<?> requestBuilder, 
final Optional<Node> node) {
-            this(requestBuilder, node, new FutureCompletionHandler());
-        }
-
         public UnsentRequest(final AbstractRequest.Builder<?> requestBuilder,
-                             final Optional<Node> node,
-                             final FutureCompletionHandler handler) {
+                             final Optional<Node> node) {
             Objects.requireNonNull(requestBuilder);
             this.requestBuilder = requestBuilder;
             this.node = node;
-            this.handler = handler;
+            this.handler = new FutureCompletionHandler();
+        }
+
+        public UnsentRequest(final AbstractRequest.Builder<?> requestBuilder,
+                             final Optional<Node> node,
+                             final BiConsumer<ClientResponse, Throwable> 
callback) {
+            this(requestBuilder, node);
+            this.handler.future.whenComplete(callback);
         }
 
         public void setTimer(final Time time, final long requestTimeoutMs) {
@@ -263,10 +266,6 @@ public class NetworkClientDelegate implements 
AutoCloseable {
             future.completeExceptionally(e);
         }
 
-        public CompletableFuture<ClientResponse> future() {
-            return future;
-        }
-
         @Override
         public void onComplete(final ClientResponse response) {
             if (response.authenticationException() != null) {
@@ -280,5 +279,4 @@ public class NetworkClientDelegate implements AutoCloseable 
{
             }
         }
     }
-
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
index a846f8c2bb2..3a4420a556b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
@@ -121,7 +121,7 @@ public class OffsetsRequestManager implements 
RequestManager, ClusterResourceLis
 
     /**
      * Determine if there are pending fetch offsets requests to be sent and 
build a
-     * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult}
+     * {@link NetworkClientDelegate.PollResult}
      * containing it.
      */
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
index 40b411b6dd5..4fe8fe80c44 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
@@ -306,7 +306,6 @@ public class PrototypeAsyncConsumer<K, V> implements 
Consumer<K, V> {
                 commitCallback.onComplete(offsets, null);
             }
         }).exceptionally(e -> {
-            System.out.println(e);
             throw new KafkaException(e);
         });
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
index 0df55a86184..3864b1fcaa6 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
@@ -33,20 +33,24 @@ public class RequestManagers {
     public final Optional<CoordinatorRequestManager> coordinatorRequestManager;
     public final Optional<CommitRequestManager> commitRequestManager;
     public final OffsetsRequestManager offsetsRequestManager;
+    public final TopicMetadataRequestManager topicMetadataRequestManager;
     private final List<Optional<? extends RequestManager>> entries;
 
     public RequestManagers(OffsetsRequestManager offsetsRequestManager,
+                           TopicMetadataRequestManager 
topicMetadataRequestManager,
                            Optional<CoordinatorRequestManager> 
coordinatorRequestManager,
                            Optional<CommitRequestManager> 
commitRequestManager) {
         this.offsetsRequestManager = requireNonNull(offsetsRequestManager,
                 "OffsetsRequestManager cannot be null");
         this.coordinatorRequestManager = coordinatorRequestManager;
         this.commitRequestManager = commitRequestManager;
+        this.topicMetadataRequestManager = topicMetadataRequestManager;
 
         List<Optional<? extends RequestManager>> list = new ArrayList<>();
         list.add(coordinatorRequestManager);
         list.add(commitRequestManager);
         list.add(Optional.of(offsetsRequestManager));
+        list.add(Optional.of(topicMetadataRequestManager));
         entries = Collections.unmodifiableList(list);
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java
new file mode 100644
index 00000000000..2b0cbf5dcb0
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * <p>
+ * Manages the state of topic metadata requests. This manager returns a
+ * {@link NetworkClientDelegate.PollResult} when a request is ready to
+ * be sent. Specifically, this manager handles the following user API calls:
+ * </p>
+ * <ul>
+ * <li>listTopics</li>
+ * <li>partitionsFor</li>
+ * </ul>
+ * <p>
+ * The manager checks the state of the {@link TopicMetadataRequestState} 
before sending a new one to
+ * prevent sending it without backing off from previous attempts.
+ * It also checks the state of inflight requests to avoid overwhelming the 
broker with duplicate requests.
+ * The {@code inflightRequests} are memorized by topic name. If all topics are 
requested, then we use {@code Optional
+ * .empty()} as the key.
+ * Once a request is completed successfully, its corresponding entry is 
removed.
+ * </p>
+ */
+
+public class TopicMetadataRequestManager implements RequestManager {
+    private final boolean allowAutoTopicCreation;
+    private final Map<Optional<String>, TopicMetadataRequestState> 
inflightRequests;
+    private final long retryBackoffMs;
+    private final long retryBackoffMaxMs;
+    private final Logger log;
+    private final LogContext logContext;
+
+    public TopicMetadataRequestManager(final LogContext context, final 
ConsumerConfig config) {
+        logContext = context;
+        log = logContext.logger(getClass());
+        inflightRequests = new HashMap<>();
+        retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+        retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+        allowAutoTopicCreation = 
config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG);
+    }
+
+    @Override
+    public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+        List<NetworkClientDelegate.UnsentRequest> requests = 
inflightRequests.values().stream()
+            .map(req -> req.send(currentTimeMs))
+            .filter(Optional::isPresent)
+            .map(Optional::get)
+            .collect(Collectors.toList());
+        return requests.isEmpty() ?
+            new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new 
ArrayList<>()) :
+            new NetworkClientDelegate.PollResult(0, 
Collections.unmodifiableList(requests));
+    }
+
+    /**
+     * return the future of the metadata request. Return the existing future 
if a request for the same topic is already
+     * inflight.
+     *
+     * @param topic to be requested. If empty, return the metadata for all 
topics.
+     * @return the future of the metadata request.
+     */
+    public CompletableFuture<Map<String, List<PartitionInfo>>> 
requestTopicMetadata(final Optional<String> topic) {
+        if (inflightRequests.containsKey(topic)) {
+            return inflightRequests.get(topic).future;
+        }
+
+        TopicMetadataRequestState newRequest = new TopicMetadataRequestState(
+                logContext,
+                topic,
+                retryBackoffMs,
+                retryBackoffMaxMs);
+        inflightRequests.put(topic, newRequest);
+        return newRequest.future;
+    }
+
+    // Visible for testing
+    List<TopicMetadataRequestState> inflightRequests() {
+        return new ArrayList<>(inflightRequests.values());
+    }
+
+    class TopicMetadataRequestState extends RequestState {
+        private final Optional<String> topic;
+        CompletableFuture<Map<String, List<PartitionInfo>>> future;
+
+        public TopicMetadataRequestState(final LogContext logContext,
+                                         final Optional<String> topic,
+                                         final long retryBackoffMs,
+                                         final long retryBackoffMaxMs) {
+            super(logContext, TopicMetadataRequestState.class.getSimpleName(), 
retryBackoffMs,
+                retryBackoffMaxMs);
+            future = new CompletableFuture<>();
+            this.topic = topic;
+        }
+
+        /**
+         * prepare the metadata request and return an
+         * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}
 if needed.
+         */
+        private Optional<NetworkClientDelegate.UnsentRequest> send(final long 
currentTimeMs) {
+            if (!canSendRequest(currentTimeMs)) {
+                return Optional.empty();
+            }
+            onSendAttempt(currentTimeMs);
+
+            final MetadataRequest.Builder request =
+                topic.map(t -> new 
MetadataRequest.Builder(Collections.singletonList(t), allowAutoTopicCreation))
+                    .orElseGet(MetadataRequest.Builder::allTopics);
+
+            return Optional.of(createUnsentRequest(request));
+        }
+
+        private NetworkClientDelegate.UnsentRequest createUnsentRequest(
+                final MetadataRequest.Builder request) {
+            return new NetworkClientDelegate.UnsentRequest(
+                    request,
+                    Optional.empty(),
+                    this::processResponseOrException
+            );
+        }
+
+        private void processResponseOrException(final ClientResponse response,
+                                                final Throwable exception) {
+            if (exception == null) {
+                handleResponse(response, response.receivedTimeMs());
+                return;
+            }
+
+            if (exception instanceof RetriableException) {
+                // We continue to retry on RetriableException
+                // TODO: TimeoutException will continue to retry despite user 
API timeout.
+                onFailedAttempt(response.receivedTimeMs());
+            } else {
+                completeFutureAndRemoveRequest(new KafkaException(exception));
+            }
+        }
+
+        private void handleResponse(final ClientResponse response, final long 
responseTimeMs) {
+            try {
+                Map<String, List<PartitionInfo>> res = 
handleTopicMetadataResponse((MetadataResponse) response.responseBody());
+                future.complete(res);
+                inflightRequests.remove(topic);
+            } catch (RetriableException e) {
+                onFailedAttempt(responseTimeMs);
+            } catch (Exception t) {
+                completeFutureAndRemoveRequest(t);
+            }
+        }
+
+        private void completeFutureAndRemoveRequest(final Throwable throwable) 
{
+            future.completeExceptionally(throwable);
+            inflightRequests.remove(topic);
+        }
+
+        private Map<String, List<PartitionInfo>> 
handleTopicMetadataResponse(final MetadataResponse response) {
+            Cluster cluster = response.buildCluster();
+
+            final Set<String> unauthorizedTopics = 
cluster.unauthorizedTopics();
+            if (!unauthorizedTopics.isEmpty())
+                throw new TopicAuthorizationException(unauthorizedTopics);
+
+            Map<String, Errors> errors = response.errors();
+            if (!errors.isEmpty()) {
+                // if there were errors, we need to check whether they were 
fatal or whether
+                // we should just retry
+
+                log.debug("Topic metadata fetch included errors: {}", errors);
+
+                for (Map.Entry<String, Errors> errorEntry : errors.entrySet()) 
{
+                    String topic = errorEntry.getKey();
+                    Errors error = errorEntry.getValue();
+
+                    if (error == Errors.INVALID_TOPIC_EXCEPTION)
+                        throw new InvalidTopicException("Topic '" + topic + "' 
is invalid");
+                    else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION)
+                        // if a requested topic is unknown, we just continue 
and let it be absent
+                        // in the returned map
+                        continue;
+                    else if (error.exception() instanceof RetriableException) {
+                        throw error.exception();
+                    } else
+                        throw new KafkaException("Unexpected error fetching 
metadata for topic " + topic,
+                            error.exception());
+                }
+            }
+
+            HashMap<String, List<PartitionInfo>> topicsPartitionInfos = new 
HashMap<>();
+            for (String topic : cluster.topics())
+                topicsPartitionInfos.put(topic, 
cluster.partitionsForTopic(topic));
+            return topicsPartitionInfos;
+        }
+
+        public Optional<String> topic() {
+            return topic;
+        }
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
index 65ba01959cd..bf9bb1d4962 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
@@ -25,7 +25,7 @@ public abstract class ApplicationEvent {
 
     public enum Type {
         NOOP, COMMIT, POLL, FETCH_COMMITTED_OFFSET, METADATA_UPDATE, 
ASSIGNMENT_CHANGE,
-        LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS,
+        LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA
     }
 
     private final Type type;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 234a228ba4f..f2f55860004 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -21,10 +21,13 @@ import 
org.apache.kafka.clients.consumer.internals.CommitRequestManager;
 import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
 import org.apache.kafka.clients.consumer.internals.RequestManagers;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 
@@ -59,6 +62,8 @@ public class ApplicationEventProcessor {
                 return process((NewTopicsMetadataUpdateRequestEvent) event);
             case ASSIGNMENT_CHANGE:
                 return process((AssignmentChangeApplicationEvent) event);
+            case TOPIC_METADATA:
+                return process((TopicMetadataApplicationEvent) event);
             case LIST_OFFSETS:
                 return process((ListOffsetsApplicationEvent) event);
             case RESET_POSITIONS:
@@ -100,13 +105,7 @@ public class ApplicationEventProcessor {
         }
 
         CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
-        manager.addOffsetCommitRequest(event.offsets()).whenComplete((r, e) -> 
{
-            if (e != null) {
-                event.future().completeExceptionally(e);
-                return;
-            }
-            event.future().complete(null);
-        });
+        event.chain(manager.addOffsetCommitRequest(event.offsets()));
         return true;
     }
 
@@ -153,4 +152,11 @@ public class ApplicationEventProcessor {
         requestManagers.offsetsRequestManager.validatePositionsIfNeeded();
         return true;
     }
+
+    private boolean process(final TopicMetadataApplicationEvent event) {
+        final CompletableFuture<Map<String, List<PartitionInfo>>> future =
+            
this.requestManagers.topicMetadataRequestManager.requestTopicMetadata(Optional.of(event.topic()));
+        event.chain(future);
+        return true;
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitApplicationEvent.java
index 5f9bad09326..48e62fe78bf 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitApplicationEvent.java
@@ -29,6 +29,7 @@ public class CommitApplicationEvent extends 
CompletableApplicationEvent<Void> {
     public CommitApplicationEvent(final Map<TopicPartition, OffsetAndMetadata> 
offsets) {
         super(Type.COMMIT);
         this.offsets = Collections.unmodifiableMap(offsets);
+
         for (OffsetAndMetadata offsetAndMetadata : offsets.values()) {
             if (offsetAndMetadata.offset() < 0) {
                 throw new IllegalArgumentException("Invalid offset: " + 
offsetAndMetadata.offset());
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicMetadataApplicationEvent.java
similarity index 55%
copy from 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
copy to 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicMetadataApplicationEvent.java
index 65ba01959cd..6486fe60c47 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicMetadataApplicationEvent.java
@@ -16,51 +16,42 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
-import java.util.Objects;
+import org.apache.kafka.common.PartitionInfo;
 
-/**
- * This is the abstract definition of the events created by the KafkaConsumer 
API
- */
-public abstract class ApplicationEvent {
+import java.util.List;
+import java.util.Map;
 
-    public enum Type {
-        NOOP, COMMIT, POLL, FETCH_COMMITTED_OFFSET, METADATA_UPDATE, 
ASSIGNMENT_CHANGE,
-        LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS,
+public class TopicMetadataApplicationEvent extends 
CompletableApplicationEvent<Map<String, List<PartitionInfo>>> {
+    private final String topic;
+    public TopicMetadataApplicationEvent(final String topic) {
+        super(Type.TOPIC_METADATA);
+        this.topic = topic;
     }
 
-    private final Type type;
-
-    protected ApplicationEvent(Type type) {
-        this.type = Objects.requireNonNull(type);
+    public String topic() {
+        return topic;
     }
 
-    public Type type() {
-        return type;
+    @Override
+    public String toString() {
+        return "TopicMetadataApplicationEvent(topic=" + topic + ")";
     }
 
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
+        if (!(o instanceof TopicMetadataApplicationEvent)) return false;
+        if (!super.equals(o)) return false;
 
-        ApplicationEvent that = (ApplicationEvent) o;
+        TopicMetadataApplicationEvent that = (TopicMetadataApplicationEvent) o;
 
-        return type == that.type;
+        return topic.equals(that.topic);
     }
 
     @Override
     public int hashCode() {
-        return type.hashCode();
-    }
-
-    protected String toStringBase() {
-        return "type=" + type;
-    }
-
-    @Override
-    public String toString() {
-        return "ApplicationEvent{" +
-                toStringBase() +
-                '}';
+        int result = super.hashCode();
+        result = 31 * result + topic.hashCode();
+        return result;
     }
-}
\ No newline at end of file
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
index 66e6ab760d2..8b2e08bd520 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
@@ -134,7 +134,7 @@ public class CommitRequestManagerTest {
         offsets2.put(new TopicPartition("test", 4), new 
OffsetAndMetadata(20L));
 
         // Add the requests to the CommitRequestManager and store their futures
-        ArrayList<CompletableFuture<ClientResponse>> commitFutures = new 
ArrayList<>();
+        ArrayList<CompletableFuture<Void>> commitFutures = new ArrayList<>();
         ArrayList<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> 
fetchFutures = new ArrayList<>();
         commitFutures.add(commitManager.addOffsetCommitRequest(offsets1));
         
fetchFutures.add(commitManager.addOffsetFetchRequest(Collections.singleton(new 
TopicPartition("test", 0))));
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java
index b5a1ced617a..137bd106d6e 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java
@@ -29,6 +29,7 @@ import 
org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplication
 import 
org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
 import org.apache.kafka.clients.consumer.internals.events.NoopApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.TopicMetadataApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
@@ -68,6 +69,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+@SuppressWarnings("ClassDataAbstractionCoupling")
 public class DefaultBackgroundThreadTest {
     private static final long RETRY_BACKOFF_MS = 100;
     private final Properties properties = new Properties();
@@ -83,6 +85,7 @@ public class DefaultBackgroundThreadTest {
     private final int requestTimeoutMs = 500;
     private GroupState groupState;
     private CommitRequestManager commitManager;
+    private TopicMetadataRequestManager topicMetadataRequestManager;
 
     @BeforeEach
     @SuppressWarnings("unchecked")
@@ -107,6 +110,7 @@ public class DefaultBackgroundThreadTest {
                 true);
         this.groupState = new GroupState(rebalanceConfig);
         this.commitManager = mock(CommitRequestManager.class);
+        this.topicMetadataRequestManager = 
mock(TopicMetadataRequestManager.class);
     }
 
     @Test
@@ -114,6 +118,7 @@ public class DefaultBackgroundThreadTest {
         
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
         when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
         
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+        
when(topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
         DefaultBackgroundThread backgroundThread = mockBackgroundThread();
         backgroundThread.start();
         TestUtils.waitForCondition(backgroundThread::isRunning, "Failed 
awaiting for the background thread to be running");
@@ -125,9 +130,10 @@ public class DefaultBackgroundThreadTest {
     public void testApplicationEvent() {
         this.applicationEventsQueue = new LinkedBlockingQueue<>();
         this.backgroundEventsQueue = new LinkedBlockingQueue<>();
-        
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
-        when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
-        
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+        
when(this.coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
+        
when(this.commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
+        
when(this.offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+        
when(this.topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
         DefaultBackgroundThread backgroundThread = mockBackgroundThread();
         ApplicationEvent e = new NoopApplicationEvent("noop event");
         this.applicationEventsQueue.add(e);
@@ -144,9 +150,10 @@ public class DefaultBackgroundThreadTest {
                 this.backgroundEventsQueue,
                 mockRequestManagers(),
                 metadata);
-        
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
-        when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
-        
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+        
when(this.coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
+        
when(this.commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
+        
when(this.offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+        
when(this.topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
         DefaultBackgroundThread backgroundThread = mockBackgroundThread();
         ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent();
         this.applicationEventsQueue.add(e);
@@ -159,9 +166,10 @@ public class DefaultBackgroundThreadTest {
     public void testCommitEvent() {
         this.applicationEventsQueue = new LinkedBlockingQueue<>();
         this.backgroundEventsQueue = new LinkedBlockingQueue<>();
-        
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
-        when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
-        
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+        
when(this.coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
+        
when(this.commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
+        
when(this.offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+        
when(this.topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
         DefaultBackgroundThread backgroundThread = mockBackgroundThread();
         ApplicationEvent e = new CommitApplicationEvent(new HashMap<>());
         this.applicationEventsQueue.add(e);
@@ -176,6 +184,7 @@ public class DefaultBackgroundThreadTest {
         
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
         when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
         
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+        
when(this.topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
         this.applicationEventsQueue = new LinkedBlockingQueue<>();
         this.backgroundEventsQueue = new LinkedBlockingQueue<>();
         DefaultBackgroundThread backgroundThread = mockBackgroundThread();
@@ -193,6 +202,7 @@ public class DefaultBackgroundThreadTest {
         
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
         when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
         
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+        
when(this.topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
         this.applicationEventsQueue = new LinkedBlockingQueue<>();
         this.backgroundEventsQueue = new LinkedBlockingQueue<>();
         DefaultBackgroundThread backgroundThread = mockBackgroundThread();
@@ -233,6 +243,7 @@ public class DefaultBackgroundThreadTest {
         
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
         when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
         
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+        
when(topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
         this.applicationEventsQueue = new LinkedBlockingQueue<>();
         this.backgroundEventsQueue = new LinkedBlockingQueue<>();
         DefaultBackgroundThread backgroundThread = mockBackgroundThread();
@@ -275,7 +286,7 @@ public class DefaultBackgroundThreadTest {
         this.applicationEventProcessor = spy(new ApplicationEventProcessor(
                 this.backgroundEventsQueue,
                 mockRequestManagers(),
-            metadata));
+                metadata));
 
         DefaultBackgroundThread backgroundThread = mockBackgroundThread();
         HashMap<TopicPartition, OffsetAndMetadata> offset = 
mockTopicPartitionOffset();
@@ -287,6 +298,7 @@ public class DefaultBackgroundThreadTest {
         
when(this.coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
         
when(this.commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
         
when(this.offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+        
when(this.topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
 
         backgroundThread.runOnce();
         
verify(applicationEventProcessor).process(any(AssignmentChangeApplicationEvent.class));
@@ -303,12 +315,27 @@ public class DefaultBackgroundThreadTest {
         
when(this.coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
         
when(this.commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
         
when(this.offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+        
when(this.topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
         backgroundThread.runOnce();
         Mockito.verify(coordinatorManager, times(1)).poll(anyLong());
         Mockito.verify(networkClient, times(1)).poll(anyLong(), anyLong());
         backgroundThread.close();
     }
 
+    @Test
+    void testFetchTopicMetadata() {
+        this.applicationEventsQueue = new LinkedBlockingQueue<>();
+        DefaultBackgroundThread backgroundThread = mockBackgroundThread();
+        
when(this.coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
+        
when(this.commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
+        
when(this.offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+        
when(this.topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+        this.applicationEventsQueue.add(new 
TopicMetadataApplicationEvent("topic"));
+        backgroundThread.runOnce();
+        
verify(applicationEventProcessor).process(any(TopicMetadataApplicationEvent.class));
+        backgroundThread.close();
+    }
+
     @Test
     void testPollResultTimer() {
         DefaultBackgroundThread backgroundThread = mockBackgroundThread();
@@ -336,6 +363,7 @@ public class DefaultBackgroundThreadTest {
     private RequestManagers mockRequestManagers() {
         return new RequestManagers(
                 offsetsRequestManager,
+                topicMetadataRequestManager,
                 Optional.of(coordinatorManager),
                 Optional.of(commitManager));
     }
@@ -372,19 +400,20 @@ public class DefaultBackgroundThreadTest {
                 this.groupState,
                 this.coordinatorManager,
                 this.commitManager,
-                this.offsetsRequestManager);
+                this.offsetsRequestManager,
+                this.topicMetadataRequestManager);
     }
 
     private NetworkClientDelegate.PollResult mockPollCoordinatorResult() {
         return new NetworkClientDelegate.PollResult(
-                RETRY_BACKOFF_MS,
-                Collections.singletonList(findCoordinatorUnsentRequest(time, 
requestTimeoutMs)));
+            RETRY_BACKOFF_MS,
+            Collections.singletonList(findCoordinatorUnsentRequest(time, 
requestTimeoutMs)));
     }
 
     private NetworkClientDelegate.PollResult mockPollCommitResult() {
         return new NetworkClientDelegate.PollResult(
-                RETRY_BACKOFF_MS,
-                Collections.singletonList(findCoordinatorUnsentRequest(time, 
requestTimeoutMs)));
+            RETRY_BACKOFF_MS,
+            Collections.singletonList(findCoordinatorUnsentRequest(time, 
requestTimeoutMs)));
     }
 
     private NetworkClientDelegate.PollResult emptyPollOffsetsRequestResult() {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManagerTest.java
new file mode 100644
index 00000000000..7f0fcd84c43
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManagerTest.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.NetworkException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.spy;
+
+public class TopicMetadataRequestManagerTest {
+    private MockTime time;
+    private TopicMetadataRequestManager topicMetadataRequestManager;
+
+    @BeforeEach
+    public void setup() {
+        this.time = new MockTime();
+        Properties props = new Properties();
+        props.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, 100);
+        props.put(ALLOW_AUTO_CREATE_TOPICS_CONFIG, false);
+        props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        this.topicMetadataRequestManager = spy(new TopicMetadataRequestManager(
+            new LogContext(),
+            new ConsumerConfig(props)));
+    }
+
+    @ParameterizedTest
+    @MethodSource("topicsProvider")
+    public void testPoll_SuccessfulRequestTopicMetadata(Optional<String> 
topic) {
+        this.topicMetadataRequestManager.requestTopicMetadata(topic);
+        this.time.sleep(100);
+        NetworkClientDelegate.PollResult res = 
this.topicMetadataRequestManager.poll(this.time.milliseconds());
+        assertEquals(1, res.unsentRequests.size());
+    }
+
+    @ParameterizedTest
+    @MethodSource("exceptionProvider")
+    public void testExceptionAndInflightRequests(final Errors error, final 
boolean shouldRetry) {
+        String topic = "hello";
+        
this.topicMetadataRequestManager.requestTopicMetadata(Optional.of("hello"));
+        this.time.sleep(100);
+        NetworkClientDelegate.PollResult res = 
this.topicMetadataRequestManager.poll(this.time.milliseconds());
+        
res.unsentRequests.get(0).future().complete(buildTopicMetadataClientResponse(
+            res.unsentRequests.get(0),
+            Optional.of(topic),
+            error));
+        List<TopicMetadataRequestManager.TopicMetadataRequestState> inflights 
= this.topicMetadataRequestManager.inflightRequests();
+
+        if (shouldRetry) {
+            assertEquals(1, inflights.size());
+            assertEquals(topic, inflights.get(0).topic().orElse(null));
+        } else {
+            assertEquals(0, inflights.size());
+        }
+    }
+
+    @ParameterizedTest
+    @MethodSource("topicsProvider")
+    public void testSendingTheSameRequest(Optional<String> topic) {
+        CompletableFuture<Map<String, List<PartitionInfo>>> future = 
this.topicMetadataRequestManager.requestTopicMetadata(topic);
+        CompletableFuture<Map<String, List<PartitionInfo>>> future2 =
+            this.topicMetadataRequestManager.requestTopicMetadata(topic);
+        this.time.sleep(100);
+        NetworkClientDelegate.PollResult res = 
this.topicMetadataRequestManager.poll(this.time.milliseconds());
+        assertEquals(1, res.unsentRequests.size());
+
+        
res.unsentRequests.get(0).future().complete(buildTopicMetadataClientResponse(
+            res.unsentRequests.get(0),
+            topic,
+            Errors.NONE));
+
+        assertTrue(future.isDone());
+        assertFalse(future.isCompletedExceptionally());
+        try {
+            future.get();
+        } catch (Throwable e) {
+            fail("Expecting to succeed, but got: {}", e);
+        }
+        assertTrue(future2.isDone());
+        assertFalse(future2.isCompletedExceptionally());
+    }
+
+    @ParameterizedTest
+    @MethodSource("hardFailureExceptionProvider")
+    void testHardFailures(Exception exception) {
+        Optional<String> topic = Optional.of("hello");
+
+        this.topicMetadataRequestManager.requestTopicMetadata(topic);
+        NetworkClientDelegate.PollResult res = 
this.topicMetadataRequestManager.poll(this.time.milliseconds());
+        assertEquals(1, res.unsentRequests.size());
+
+        res.unsentRequests.get(0).future().completeExceptionally(exception);
+
+        if (exception instanceof RetriableException) {
+            
assertFalse(topicMetadataRequestManager.inflightRequests().isEmpty());
+        } else {
+            
assertTrue(topicMetadataRequestManager.inflightRequests().isEmpty());
+        }
+    }
+
+    private ClientResponse buildTopicMetadataClientResponse(
+        final NetworkClientDelegate.UnsentRequest request,
+        final Optional<String> topic,
+        final Errors error) {
+        AbstractRequest abstractRequest = request.requestBuilder().build();
+        assertTrue(abstractRequest instanceof MetadataRequest);
+        MetadataRequest metadataRequest = (MetadataRequest) abstractRequest;
+        Cluster cluster = mockCluster(3, 0);
+        List<MetadataResponse.TopicMetadata> topics = new ArrayList<>();
+        if (topic.isPresent()) {
+            topics.add(new MetadataResponse.TopicMetadata(error, topic.get(), 
false,
+                Collections.emptyList()));
+        } else {
+            // null topic means request for all topics
+            topics.add(new MetadataResponse.TopicMetadata(error, "topic1", 
false,
+                Collections.emptyList()));
+            topics.add(new MetadataResponse.TopicMetadata(error, "topic2", 
false,
+                Collections.emptyList()));
+        }
+        final MetadataResponse metadataResponse = 
RequestTestUtils.metadataResponse(cluster.nodes(),
+            cluster.clusterResource().clusterId(),
+            cluster.controller().id(),
+            topics);
+        return new ClientResponse(
+            new RequestHeader(ApiKeys.METADATA, metadataRequest.version(), 
"mockClientId", 1),
+            request.callback(),
+            "-1",
+            time.milliseconds(),
+            time.milliseconds(),
+            false,
+            null,
+            null,
+            metadataResponse);
+    }
+
+    private static Cluster mockCluster(final int numNodes, final int 
controllerIndex) {
+        HashMap<Integer, Node> nodes = new HashMap<>();
+        for (int i = 0; i < numNodes; i++)
+            nodes.put(i, new Node(i, "localhost", 8121 + i));
+        return new Cluster("mockClusterId", nodes.values(),
+            Collections.emptySet(), Collections.emptySet(),
+            Collections.emptySet(), nodes.get(controllerIndex));
+    }
+
+
+    private static Collection<Arguments> topicsProvider() {
+        return Arrays.asList(
+            Arguments.of(Optional.of("topic1")),
+            Arguments.of(Optional.empty()));
+    }
+
+    private static Collection<Arguments> exceptionProvider() {
+        return Arrays.asList(
+            Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION, false),
+            Arguments.of(Errors.INVALID_TOPIC_EXCEPTION, false),
+            Arguments.of(Errors.UNKNOWN_SERVER_ERROR, false),
+            Arguments.of(Errors.NETWORK_EXCEPTION, true),
+            Arguments.of(Errors.NONE, false));
+    }
+
+    private static Collection<Arguments> hardFailureExceptionProvider() {
+        return Arrays.asList(
+                Arguments.of(new TimeoutException("timeout")),
+                Arguments.of(new KafkaException("non-retriable exception")),
+                Arguments.of(new NetworkException("retriable-exception")));
+    }
+
+}

Reply via email to