This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new f210b7a KAFKA-9261; Client should handle unavailable leader metadata
(#7770)
f210b7a is described below
commit f210b7abbafd82c4b8491169bbb76458b85d1591
Author: Jason Gustafson <[email protected]>
AuthorDate: Wed Feb 5 09:13:11 2020 -0800
KAFKA-9261; Client should handle unavailable leader metadata (#7770)
The client caches metadata fetched from Metadata requests. Previously, each
metadata response overwrote all of the metadata from the previous one, so we
could rely on the expectation that the broker only returned the leaderId for a
partition if it had connection information available. This behavior changed
with KIP-320 since having the leader epoch allows the client to filter out
partition metadata which is known to be stale. However, because of this, we can
no longer rely on the requ [...]
Fixing this issue was unfortunately not straightforward because the cache
was built to maintain references to broker metadata through the `Node` object
at the partition level. In order to keep the state consistent, each `Node`
reference would need to be updated based on the new broker metadata. Instead of
doing that, this patch changes the cache so that it is structured more closely
with the Metadata response schema. Broker node information is maintained at the
top level in a single c [...]
Note that one of the side benefits of the refactor here is that we
virtually eliminate one of the hotspots in Metadata request handling in
`MetadataCache.getEndpoints` (which was renamed to `maybeFilterAliveReplicas`).
The only reason this was expensive was because we had to build a new collection
for the `Node` representations of each of the replica lists. This information
was doomed to just get discarded on serialization, so the whole effort was
wasteful. Now, we work with the lower [...]
Reviewers: Rajini Sivaram <[email protected]>, Ismael Juma
<[email protected]>
---
.../java/org/apache/kafka/clients/Metadata.java | 113 +++++++------
.../org/apache/kafka/clients/MetadataCache.java | 105 ++++--------
.../org/apache/kafka/clients/NetworkClient.java | 2 +-
.../admin/internals/MetadataOperationContext.java | 4 +-
.../kafka/clients/consumer/KafkaConsumer.java | 4 +-
.../kafka/clients/consumer/MockConsumer.java | 5 +-
.../kafka/clients/consumer/OffsetAndMetadata.java | 4 +-
.../consumer/internals/ConsumerCoordinator.java | 6 +-
.../kafka/clients/consumer/internals/Fetcher.java | 55 +++---
.../consumer/internals/SubscriptionState.java | 8 +-
.../main/java/org/apache/kafka/common/Cluster.java | 21 +--
.../kafka/common/requests/MetadataResponse.java | 184 ++++++++++-----------
.../apache/kafka/common/requests/RequestUtils.java | 9 +-
.../apache/kafka/clients/MetadataCacheTest.java | 85 ++++++++++
.../org/apache/kafka/clients/MetadataTest.java | 115 ++++++++++---
.../kafka/clients/admin/KafkaAdminClientTest.java | 51 +++---
.../internals/ConsumerCoordinatorTest.java | 5 +-
.../consumer/internals/ConsumerMetadataTest.java | 3 +-
.../clients/consumer/internals/FetcherTest.java | 37 +++--
.../internals/OffsetForLeaderEpochClientTest.java | 8 +-
.../consumer/internals/SubscriptionStateTest.java | 47 +++---
.../kafka/common/requests/RequestResponseTest.java | 26 +--
.../test/java/org/apache/kafka/test/TestUtils.java | 15 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 8 +-
.../main/scala/kafka/server/MetadataCache.scala | 89 ++++++----
.../scala/unit/kafka/admin/AddPartitionsTest.scala | 23 ++-
.../admin/TopicCommandWithAdminClientTest.scala | 2 +-
.../server/AbstractCreateTopicsRequestTest.scala | 4 +-
.../unit/kafka/server/MetadataCacheTest.scala | 33 ++--
.../unit/kafka/server/MetadataRequestTest.scala | 43 +++--
30 files changed, 635 insertions(+), 479 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 82c1b07..72f2d04 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -19,14 +19,12 @@ package org.apache.kafka.clients;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidMetadataException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.utils.LogContext;
@@ -43,10 +41,11 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
-import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
+import static
org.apache.kafka.common.record.RecordBatch.NO_PARTITION_LEADER_EPOCH;
+
/**
* A class encapsulating some of the logic around metadata.
* <p>
@@ -147,11 +146,16 @@ public class Metadata implements Closeable {
}
/**
- * Request an update for the partition metadata iff the given leader epoch
is at newer than the last seen leader epoch
+ * Request an update for the partition metadata iff the given leader epoch
is newer than the last seen leader epoch
*/
public synchronized boolean updateLastSeenEpochIfNewer(TopicPartition
topicPartition, int leaderEpoch) {
Objects.requireNonNull(topicPartition, "TopicPartition cannot be
null");
- return updateLastSeenEpoch(topicPartition, leaderEpoch, oldEpoch ->
leaderEpoch > oldEpoch, true);
+ if (leaderEpoch < 0)
+ throw new IllegalArgumentException("Invalid leader epoch " +
leaderEpoch + " (must be non-negative)");
+
+ boolean updated = updateLastSeenEpoch(topicPartition, leaderEpoch,
oldEpoch -> leaderEpoch > oldEpoch);
+ this.needUpdate = this.needUpdate || updated;
+ return updated;
}
@@ -165,21 +169,16 @@ public class Metadata implements Closeable {
* @param topicPartition topic+partition to update the epoch for
* @param epoch the new epoch
* @param epochTest a predicate to determine if the old epoch
should be replaced
- * @param setRequestUpdateFlag sets the "needUpdate" flag to true if the
epoch is updated
* @return true if the epoch was updated, false otherwise
*/
private synchronized boolean updateLastSeenEpoch(TopicPartition
topicPartition,
int epoch,
- Predicate<Integer>
epochTest,
- boolean
setRequestUpdateFlag) {
+ Predicate<Integer>
epochTest) {
Integer oldEpoch = lastSeenLeaderEpochs.get(topicPartition);
log.trace("Determining if we should replace existing epoch {} with new
epoch {}", oldEpoch, epoch);
if (oldEpoch == null || epochTest.test(oldEpoch)) {
log.debug("Updating last seen epoch from {} to {} for partition
{}", oldEpoch, epoch, topicPartition);
lastSeenLeaderEpochs.put(topicPartition, epoch);
- if (setRequestUpdateFlag) {
- this.needUpdate = true;
- }
return true;
} else {
log.debug("Not replacing existing epoch {} with new epoch {} for
partition {}", oldEpoch, epoch, topicPartition);
@@ -199,16 +198,29 @@ public class Metadata implements Closeable {
/**
* Return the cached partition info if it exists and a newer leader epoch
isn't known about.
*/
- public synchronized Optional<MetadataCache.PartitionInfoAndEpoch>
partitionInfoIfCurrent(TopicPartition topicPartition) {
+ synchronized Optional<MetadataResponse.PartitionMetadata>
partitionMetadataIfCurrent(TopicPartition topicPartition) {
Integer epoch = lastSeenLeaderEpochs.get(topicPartition);
+ Optional<MetadataResponse.PartitionMetadata> partitionMetadata =
cache.partitionMetadata(topicPartition);
if (epoch == null) {
// old cluster format (no epochs)
- return cache.getPartitionInfo(topicPartition);
+ return partitionMetadata;
} else {
- return cache.getPartitionInfoHavingEpoch(topicPartition, epoch);
+ return partitionMetadata.filter(metadata ->
+
metadata.leaderEpoch.orElse(NO_PARTITION_LEADER_EPOCH).equals(epoch));
}
}
+ public synchronized LeaderAndEpoch currentLeader(TopicPartition
topicPartition) {
+ Optional<MetadataResponse.PartitionMetadata> maybeMetadata =
partitionMetadataIfCurrent(topicPartition);
+ if (!maybeMetadata.isPresent())
+ return new LeaderAndEpoch(Optional.empty(),
Optional.ofNullable(lastSeenLeaderEpochs.get(topicPartition)));
+
+ MetadataResponse.PartitionMetadata partitionMetadata =
maybeMetadata.get();
+ Optional<Integer> leaderEpochOpt = partitionMetadata.leaderEpoch;
+ Optional<Node> leaderNodeOpt =
partitionMetadata.leaderId.flatMap(cache::nodeById);
+ return new LeaderAndEpoch(leaderNodeOpt, leaderEpochOpt);
+ }
+
public synchronized void bootstrap(List<InetSocketAddress> addresses) {
this.needUpdate = true;
this.updateVersion += 1;
@@ -245,7 +257,7 @@ public class Metadata implements Closeable {
this.lastSuccessfulRefreshMs = now;
this.updateVersion += 1;
- String previousClusterId =
cache.cluster().clusterResource().clusterId();
+ String previousClusterId = cache.clusterResource().clusterId();
this.cache = handleMetadataResponse(response, topic ->
retainTopic(topic.topic(), topic.isInternal(), now));
@@ -254,11 +266,11 @@ public class Metadata implements Closeable {
this.lastSeenLeaderEpochs.keySet().removeIf(tp ->
!retainTopic(tp.topic(), false, now));
- String newClusterId = cache.cluster().clusterResource().clusterId();
+ String newClusterId = cache.clusterResource().clusterId();
if (!Objects.equals(previousClusterId, newClusterId)) {
log.info("Cluster ID: {}", newClusterId);
}
- clusterResourceListeners.onUpdate(cache.cluster().clusterResource());
+ clusterResourceListeners.onUpdate(cache.clusterResource());
log.debug("Updated cluster metadata updateVersion {} to {}",
this.updateVersion, this.cache);
}
@@ -289,7 +301,7 @@ public class Metadata implements Closeable {
private MetadataCache handleMetadataResponse(MetadataResponse
metadataResponse,
Predicate<MetadataResponse.TopicMetadata> topicsToRetain) {
Set<String> internalTopics = new HashSet<>();
- List<MetadataCache.PartitionInfoAndEpoch> partitions = new
ArrayList<>();
+ List<MetadataResponse.PartitionMetadata> partitions = new
ArrayList<>();
for (MetadataResponse.TopicMetadata metadata :
metadataResponse.topicMetadata()) {
if (!topicsToRetain.test(metadata))
continue;
@@ -300,12 +312,12 @@ public class Metadata implements Closeable {
for (MetadataResponse.PartitionMetadata partitionMetadata :
metadata.partitionMetadata()) {
// Even if the partition's metadata includes an error, we
need to handle
// the update to catch new epochs
- updatePartitionInfo(metadata.topic(), partitionMetadata,
- metadataResponse.hasReliableLeaderEpochs(),
partitions::add);
+ updateLatestMetadata(partitionMetadata,
metadataResponse.hasReliableLeaderEpochs())
+ .ifPresent(partitions::add);
- if (partitionMetadata.error().exception() instanceof
InvalidMetadataException) {
+ if (partitionMetadata.error.exception() instanceof
InvalidMetadataException) {
log.debug("Requesting metadata update for partition {}
due to error {}",
- new TopicPartition(metadata.topic(),
partitionMetadata.partition()), partitionMetadata.error());
+ partitionMetadata.topicPartition,
partitionMetadata.error);
requestUpdate();
}
}
@@ -315,37 +327,36 @@ public class Metadata implements Closeable {
}
}
- return new MetadataCache(metadataResponse.clusterId(), new
ArrayList<>(metadataResponse.brokers()), partitions,
+ return new MetadataCache(metadataResponse.clusterId(),
+ metadataResponse.brokersById(),
+ partitions,
metadataResponse.topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED),
metadataResponse.topicsByError(Errors.INVALID_TOPIC_EXCEPTION),
- internalTopics, metadataResponse.controller());
+ internalTopics,
+ metadataResponse.controller());
}
/**
- * Compute the correct PartitionInfo to cache for a topic+partition and
pass to the given consumer.
+ * Compute the latest partition metadata to cache given ordering by leader
epochs (if both
+ * available and reliable).
*/
- private void updatePartitionInfo(String topic,
- MetadataResponse.PartitionMetadata
partitionMetadata,
- boolean hasReliableLeaderEpoch,
-
Consumer<MetadataCache.PartitionInfoAndEpoch> partitionInfoConsumer) {
- TopicPartition tp = new TopicPartition(topic,
partitionMetadata.partition());
-
- if (hasReliableLeaderEpoch &&
partitionMetadata.leaderEpoch().isPresent()) {
- int newEpoch = partitionMetadata.leaderEpoch().get();
+ private Optional<MetadataResponse.PartitionMetadata> updateLatestMetadata(
+ MetadataResponse.PartitionMetadata partitionMetadata,
+ boolean hasReliableLeaderEpoch) {
+ TopicPartition tp = partitionMetadata.topicPartition;
+ if (hasReliableLeaderEpoch &&
partitionMetadata.leaderEpoch.isPresent()) {
+ int newEpoch = partitionMetadata.leaderEpoch.get();
// If the received leader epoch is at least the same as the
previous one, update the metadata
- if (updateLastSeenEpoch(tp, newEpoch, oldEpoch -> newEpoch >=
oldEpoch, false)) {
- PartitionInfo info =
MetadataResponse.partitionMetaToInfo(topic, partitionMetadata);
- partitionInfoConsumer.accept(new
MetadataCache.PartitionInfoAndEpoch(info, newEpoch));
+ if (updateLastSeenEpoch(tp, newEpoch, oldEpoch -> newEpoch >=
oldEpoch)) {
+ return Optional.of(partitionMetadata);
} else {
// Otherwise ignore the new metadata and use the previously
cached info
- cache.getPartitionInfo(tp).ifPresent(partitionInfoConsumer);
+ return cache.partitionMetadata(tp);
}
} else {
// Handle old cluster formats as well as error responses where
leader and epoch are missing
lastSeenLeaderEpochs.remove(tp);
- PartitionInfo info = MetadataResponse.partitionMetaToInfo(topic,
partitionMetadata);
- partitionInfoConsumer.accept(new
MetadataCache.PartitionInfoAndEpoch(info,
- RecordBatch.NO_PARTITION_LEADER_EPOCH));
+ return Optional.of(partitionMetadata.withoutLeaderEpoch());
}
}
@@ -491,23 +502,19 @@ public class Metadata implements Closeable {
}
}
- public synchronized LeaderAndEpoch leaderAndEpoch(TopicPartition tp) {
- return partitionInfoIfCurrent(tp)
- .map(infoAndEpoch -> {
- Node leader = infoAndEpoch.partitionInfo().leader();
- return new LeaderAndEpoch(leader == null ? Node.noNode() :
leader, Optional.of(infoAndEpoch.epoch()));
- })
- .orElse(new LeaderAndEpoch(Node.noNode(),
lastSeenLeaderEpoch(tp)));
- }
-
+ /**
+ * Represents current leader state known in metadata. It is possible that
we know the leader, but not the
+ * epoch if the metadata is received from a broker which does not support
a sufficient Metadata API version.
+ * It is also possible that we know of the leader epoch, but not the
leader when it is derived
+ * from an external source (e.g. a committed offset).
+ */
public static class LeaderAndEpoch {
+ private static final LeaderAndEpoch NO_LEADER_OR_EPOCH = new
LeaderAndEpoch(Optional.empty(), Optional.empty());
- public static final LeaderAndEpoch NO_LEADER_OR_EPOCH = new
LeaderAndEpoch(Node.noNode(), Optional.empty());
-
- public final Node leader;
+ public final Optional<Node> leader;
public final Optional<Integer> epoch;
- public LeaderAndEpoch(Node leader, Optional<Integer> epoch) {
+ public LeaderAndEpoch(Optional<Node> leader, Optional<Integer> epoch) {
this.leader = Objects.requireNonNull(leader);
this.epoch = Objects.requireNonNull(epoch);
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
b/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
index e58da12..3230087 100644
--- a/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
+++ b/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
@@ -17,18 +17,18 @@
package org.apache.kafka.clients;
import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.MetadataResponse;
import java.net.InetSocketAddress;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -39,18 +39,18 @@ import java.util.stream.Collectors;
*/
public class MetadataCache {
private final String clusterId;
- private final List<Node> nodes;
+ private final Map<Integer, Node> nodes;
private final Set<String> unauthorizedTopics;
private final Set<String> invalidTopics;
private final Set<String> internalTopics;
private final Node controller;
- private final Map<TopicPartition, PartitionInfoAndEpoch>
metadataByPartition;
+ private final Map<TopicPartition, MetadataResponse.PartitionMetadata>
metadataByPartition;
private Cluster clusterInstance;
MetadataCache(String clusterId,
- List<Node> nodes,
- Collection<PartitionInfoAndEpoch> partitions,
+ Map<Integer, Node> nodes,
+ Collection<MetadataResponse.PartitionMetadata> partitions,
Set<String> unauthorizedTopics,
Set<String> invalidTopics,
Set<String> internalTopics,
@@ -58,14 +58,14 @@ public class MetadataCache {
this(clusterId, nodes, partitions, unauthorizedTopics, invalidTopics,
internalTopics, controller, null);
}
- MetadataCache(String clusterId,
- List<Node> nodes,
- Collection<PartitionInfoAndEpoch> partitions,
- Set<String> unauthorizedTopics,
- Set<String> invalidTopics,
- Set<String> internalTopics,
- Node controller,
- Cluster clusterInstance) {
+ private MetadataCache(String clusterId,
+ Map<Integer, Node> nodes,
+ Collection<MetadataResponse.PartitionMetadata>
partitions,
+ Set<String> unauthorizedTopics,
+ Set<String> invalidTopics,
+ Set<String> internalTopics,
+ Node controller,
+ Cluster clusterInstance) {
this.clusterId = clusterId;
this.nodes = nodes;
this.unauthorizedTopics = unauthorizedTopics;
@@ -74,8 +74,8 @@ public class MetadataCache {
this.controller = controller;
this.metadataByPartition = new HashMap<>(partitions.size());
- for (PartitionInfoAndEpoch p : partitions) {
- this.metadataByPartition.put(new
TopicPartition(p.partitionInfo().topic(), p.partitionInfo().partition()), p);
+ for (MetadataResponse.PartitionMetadata p : partitions) {
+ this.metadataByPartition.put(p.topicPartition, p);
}
if (clusterInstance == null) {
@@ -85,16 +85,12 @@ public class MetadataCache {
}
}
- /**
- * Return the cached PartitionInfo iff it was for the given epoch
- */
- Optional<PartitionInfoAndEpoch> getPartitionInfoHavingEpoch(TopicPartition
topicPartition, int epoch) {
- PartitionInfoAndEpoch infoAndEpoch =
metadataByPartition.get(topicPartition);
- return Optional.ofNullable(infoAndEpoch).filter(infoEpoch ->
infoEpoch.epoch() == epoch);
+ Optional<MetadataResponse.PartitionMetadata>
partitionMetadata(TopicPartition topicPartition) {
+ return Optional.ofNullable(metadataByPartition.get(topicPartition));
}
- Optional<PartitionInfoAndEpoch> getPartitionInfo(TopicPartition
topicPartition) {
- return Optional.ofNullable(metadataByPartition.get(topicPartition));
+ Optional<Node> nodeById(int id) {
+ return Optional.ofNullable(nodes.get(id));
}
Cluster cluster() {
@@ -105,25 +101,33 @@ public class MetadataCache {
}
}
+ ClusterResource clusterResource() {
+ return new ClusterResource(clusterId);
+ }
+
private void computeClusterView() {
List<PartitionInfo> partitionInfos = metadataByPartition.values()
.stream()
- .map(PartitionInfoAndEpoch::partitionInfo)
+ .map(metadata -> MetadataResponse.toPartitionInfo(metadata,
nodes))
.collect(Collectors.toList());
- this.clusterInstance = new Cluster(clusterId, nodes, partitionInfos,
unauthorizedTopics, invalidTopics, internalTopics, controller);
+ this.clusterInstance = new Cluster(clusterId, nodes.values(),
partitionInfos, unauthorizedTopics,
+ invalidTopics, internalTopics, controller);
}
static MetadataCache bootstrap(List<InetSocketAddress> addresses) {
- List<Node> nodes = new ArrayList<>();
+ Map<Integer, Node> nodes = new HashMap<>();
int nodeId = -1;
- for (InetSocketAddress address : addresses)
- nodes.add(new Node(nodeId--, address.getHostString(),
address.getPort()));
+ for (InetSocketAddress address : addresses) {
+ nodes.put(nodeId, new Node(nodeId, address.getHostString(),
address.getPort()));
+ nodeId--;
+ }
return new MetadataCache(null, nodes, Collections.emptyList(),
- Collections.emptySet(), Collections.emptySet(),
Collections.emptySet(), null, Cluster.bootstrap(addresses));
+ Collections.emptySet(), Collections.emptySet(),
Collections.emptySet(),
+ null, Cluster.bootstrap(addresses));
}
static MetadataCache empty() {
- return new MetadataCache(null, Collections.emptyList(),
Collections.emptyList(),
+ return new MetadataCache(null, Collections.emptyMap(),
Collections.emptyList(),
Collections.emptySet(), Collections.emptySet(),
Collections.emptySet(), null, Cluster.empty());
}
@@ -137,43 +141,4 @@ public class MetadataCache {
'}';
}
- public static class PartitionInfoAndEpoch {
- private final PartitionInfo partitionInfo;
- private final int epoch;
-
- PartitionInfoAndEpoch(PartitionInfo partitionInfo, int epoch) {
- this.partitionInfo = partitionInfo;
- this.epoch = epoch;
- }
-
- public PartitionInfo partitionInfo() {
- return partitionInfo;
- }
-
- public int epoch() {
- return epoch;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- PartitionInfoAndEpoch that = (PartitionInfoAndEpoch) o;
- return epoch == that.epoch &&
- Objects.equals(partitionInfo, that.partitionInfo);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(partitionInfo, epoch);
- }
-
- @Override
- public String toString() {
- return "PartitionInfoAndEpoch{" +
- "partitionInfo=" + partitionInfo +
- ", epoch=" + epoch +
- '}';
- }
- }
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 61e8cd1..3060a50 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -1055,7 +1055,7 @@ public class NetworkClient implements KafkaClient {
// This could be a transient issue if listeners were added
dynamically to brokers.
List<TopicPartition> missingListenerPartitions =
response.topicMetadata().stream().flatMap(topicMetadata ->
topicMetadata.partitionMetadata().stream()
- .filter(partitionMetadata -> partitionMetadata.error() ==
Errors.LISTENER_NOT_FOUND)
+ .filter(partitionMetadata -> partitionMetadata.error ==
Errors.LISTENER_NOT_FOUND)
.map(partitionMetadata -> new
TopicPartition(topicMetadata.topic(), partitionMetadata.partition())))
.collect(Collectors.toList());
if (!missingListenerPartitions.isEmpty()) {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
index e6f4054..c05e5cf 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
@@ -83,8 +83,8 @@ public final class MetadataOperationContext<T, O extends
AbstractOptions<O>> {
public static void handleMetadataErrors(MetadataResponse response) {
for (TopicMetadata tm : response.topicMetadata()) {
for (PartitionMetadata pm : tm.partitionMetadata()) {
- if (shouldRefreshMetadata(pm.error())) {
- throw pm.error().exception();
+ if (shouldRefreshMetadata(pm.error)) {
+ throw pm.error.exception();
}
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 2052af9..20da83f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1604,7 +1604,7 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
SubscriptionState.FetchPosition newPosition = new
SubscriptionState.FetchPosition(
offset,
Optional.empty(), // This will ensure we skip validation
- this.metadata.leaderAndEpoch(partition));
+ this.metadata.currentLeader(partition));
this.subscriptions.seekUnvalidated(partition, newPosition);
} finally {
release();
@@ -1635,7 +1635,7 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
} else {
log.info("Seeking to offset {} for partition {}", offset,
partition);
}
- Metadata.LeaderAndEpoch currentLeaderAndEpoch =
this.metadata.leaderAndEpoch(partition);
+ Metadata.LeaderAndEpoch currentLeaderAndEpoch =
this.metadata.currentLeader(partition);
SubscriptionState.FetchPosition newPosition = new
SubscriptionState.FetchPosition(
offsetAndMetadata.offset(),
offsetAndMetadata.leaderEpoch(),
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index bd8b93c..67b4e9f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -22,7 +22,6 @@ import
org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
@@ -37,6 +36,7 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -202,8 +202,9 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
if (assignment().contains(entry.getKey()) && rec.offset()
>= position) {
results.computeIfAbsent(entry.getKey(), partition ->
new ArrayList<>()).add(rec);
+ Metadata.LeaderAndEpoch leaderAndEpoch = new
Metadata.LeaderAndEpoch(Optional.empty(), rec.leaderEpoch());
SubscriptionState.FetchPosition newPosition = new
SubscriptionState.FetchPosition(
- rec.offset() + 1, rec.leaderEpoch(), new
Metadata.LeaderAndEpoch(Node.noNode(), rec.leaderEpoch()));
+ rec.offset() + 1, rec.leaderEpoch(),
leaderAndEpoch);
subscriptions.position(entry.getKey(), newPosition);
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
index aa91e50..d6b3b94 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
@@ -94,7 +94,9 @@ public class OffsetAndMetadata implements Serializable {
* @return the leader epoch or empty if not known
*/
public Optional<Integer> leaderEpoch() {
- return Optional.ofNullable(leaderEpoch);
+ if (leaderEpoch == null || leaderEpoch < 0)
+ return Optional.empty();
+ return Optional.of(leaderEpoch);
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 6610bfb02..94209d2 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -766,10 +766,10 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
// it's possible that the partition is no longer assigned when
the response is received,
// so we need to ignore seeking if that's the case
if (this.subscriptions.isAssigned(tp)) {
- final ConsumerMetadata.LeaderAndEpoch leaderAndEpoch =
metadata.leaderAndEpoch(tp);
+ final ConsumerMetadata.LeaderAndEpoch leaderAndEpoch =
metadata.currentLeader(tp);
final SubscriptionState.FetchPosition position = new
SubscriptionState.FetchPosition(
- offsetAndMetadata.offset(),
offsetAndMetadata.leaderEpoch(),
- leaderAndEpoch);
+ offsetAndMetadata.offset(),
offsetAndMetadata.leaderEpoch(),
+ leaderAndEpoch);
this.subscriptions.seekUnvalidated(tp, position);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 32d3976..7890c9a 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -19,7 +19,7 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.FetchSessionHandler;
-import org.apache.kafka.clients.MetadataCache;
+import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.ApiVersion;
import org.apache.kafka.clients.StaleMetadataException;
@@ -485,7 +485,7 @@ public class Fetcher<K, V> implements Closeable {
// Validate each partition against the current leader and epoch
subscriptions.assignedPartitions().forEach(topicPartition -> {
- ConsumerMetadata.LeaderAndEpoch leaderAndEpoch =
metadata.leaderAndEpoch(topicPartition);
+ ConsumerMetadata.LeaderAndEpoch leaderAndEpoch =
metadata.currentLeader(topicPartition);
subscriptions.maybeValidatePositionForCurrentLeader(topicPartition,
leaderAndEpoch);
});
@@ -716,7 +716,7 @@ public class Fetcher<K, V> implements Closeable {
private void resetOffsetIfNeeded(TopicPartition partition,
OffsetResetStrategy requestedResetStrategy, ListOffsetData offsetData) {
SubscriptionState.FetchPosition position = new
SubscriptionState.FetchPosition(
- offsetData.offset, offsetData.leaderEpoch,
metadata.leaderAndEpoch(partition));
+ offsetData.offset, offsetData.leaderEpoch,
metadata.currentLeader(partition));
offsetData.leaderEpoch.ifPresent(epoch ->
metadata.updateLastSeenEpochIfNewer(partition, epoch));
subscriptions.maybeSeekUnvalidated(partition, position.offset,
requestedResetStrategy);
}
@@ -904,27 +904,26 @@ public class Fetcher<K, V> implements Closeable {
for (Map.Entry<TopicPartition, Long> entry:
timestampsToSearch.entrySet()) {
TopicPartition tp = entry.getKey();
Long offset = entry.getValue();
- Optional<MetadataCache.PartitionInfoAndEpoch> currentInfo =
metadata.partitionInfoIfCurrent(tp);
- if (!currentInfo.isPresent()) {
+ Metadata.LeaderAndEpoch leaderAndEpoch =
metadata.currentLeader(tp);
+
+ if (!leaderAndEpoch.leader.isPresent()) {
log.debug("Leader for partition {} is unknown for fetching
offset {}", tp, offset);
metadata.requestUpdate();
partitionsToRetry.add(tp);
- } else if (currentInfo.get().partitionInfo().leader() == null) {
- log.debug("Leader for partition {} is unavailable for fetching
offset {}", tp, offset);
- metadata.requestUpdate();
- partitionsToRetry.add(tp);
- } else if
(client.isUnavailable(currentInfo.get().partitionInfo().leader())) {
-
client.maybeThrowAuthFailure(currentInfo.get().partitionInfo().leader());
-
- // The connection has failed and we need to await the blackout
period before we can
- // try again. No need to request a metadata update since the
disconnect will have
- // done so already.
- log.debug("Leader {} for partition {} is unavailable for
fetching offset until reconnect backoff expires",
- currentInfo.get().partitionInfo().leader(), tp);
- partitionsToRetry.add(tp);
} else {
- partitionDataMap.put(tp,
- new ListOffsetRequest.PartitionData(offset,
Optional.of(currentInfo.get().epoch())));
+ Node leader = leaderAndEpoch.leader.get();
+ if (client.isUnavailable(leader)) {
+ client.maybeThrowAuthFailure(leader);
+
+ // The connection has failed and we need to await the
blackout period before we can
+ // try again. No need to request a metadata update since
the disconnect will have
+ // done so already.
+ log.debug("Leader {} for partition {} is unavailable for
fetching offset until reconnect backoff expires",
+ leader, tp);
+ partitionsToRetry.add(tp);
+ } else {
+ partitionDataMap.put(tp, new
ListOffsetRequest.PartitionData(offset, leaderAndEpoch.epoch));
+ }
}
}
return regroupPartitionMapByNode(partitionDataMap);
@@ -1100,18 +1099,21 @@ public class Fetcher<K, V> implements Closeable {
// Ensure the position has an up-to-date leader
subscriptions.assignedPartitions().forEach(
- tp -> subscriptions.maybeValidatePositionForCurrentLeader(tp,
metadata.leaderAndEpoch(tp)));
+ tp -> subscriptions.maybeValidatePositionForCurrentLeader(tp,
metadata.currentLeader(tp)));
long currentTimeMs = time.milliseconds();
for (TopicPartition partition : fetchablePartitions()) {
// Use the preferred read replica if set, or the position's leader
SubscriptionState.FetchPosition position =
this.subscriptions.position(partition);
- Node node = selectReadReplica(partition,
position.currentLeader.leader, currentTimeMs);
-
- if (node == null || node.isEmpty()) {
+ Optional<Node> leaderOpt = position.currentLeader.leader;
+ if (!leaderOpt.isPresent()) {
metadata.requestUpdate();
- } else if (client.isUnavailable(node)) {
+ continue;
+ }
+
+ Node node = selectReadReplica(partition, leaderOpt.get(),
currentTimeMs);
+ if (client.isUnavailable(node)) {
client.maybeThrowAuthFailure(node);
// If we try to send during the reconnect blackout window,
then the request is just
@@ -1153,7 +1155,8 @@ public class Fetcher<K, V> implements Closeable {
Map<TopicPartition, SubscriptionState.FetchPosition> partitionMap)
{
return partitionMap.entrySet()
.stream()
- .collect(Collectors.groupingBy(entry ->
entry.getValue().currentLeader.leader,
+ .filter(entry ->
entry.getValue().currentLeader.leader.isPresent())
+ .collect(Collectors.groupingBy(entry ->
entry.getValue().currentLeader.leader.get(),
Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue)));
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index ec7c376..9200eb8 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -16,20 +16,19 @@
*/
package org.apache.kafka.clients.consumer.internals;
-import java.util.ArrayList;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.IsolationLevel;
-import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.PartitionStates;
import org.apache.kafka.common.requests.EpochEndOffset;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -750,8 +749,7 @@ public class SubscriptionState {
return false;
}
- if
(currentLeaderAndEpoch.equals(Metadata.LeaderAndEpoch.noLeaderOrEpoch())) {
- // Ignore empty LeaderAndEpochs
+ if (!currentLeaderAndEpoch.leader.isPresent() &&
!currentLeaderAndEpoch.epoch.isPresent()) {
return false;
}
@@ -985,7 +983,7 @@ public class SubscriptionState {
final Metadata.LeaderAndEpoch currentLeader;
FetchPosition(long offset) {
- this(offset, Optional.empty(), new
Metadata.LeaderAndEpoch(Node.noNode(), Optional.empty()));
+ this(offset, Optional.empty(),
Metadata.LeaderAndEpoch.noLeaderOrEpoch());
}
public FetchPosition(long offset, Optional<Integer> offsetEpoch,
Metadata.LeaderAndEpoch currentLeader) {
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java
b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index c765cdc..a6bf763 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -122,18 +122,15 @@ public final class Cluster {
Map<String, List<PartitionInfo>> tmpPartitionsByTopic = new
HashMap<>();
for (PartitionInfo p : partitions) {
tmpPartitionsByTopicPartition.put(new TopicPartition(p.topic(),
p.partition()), p);
- List<PartitionInfo> partitionsForTopic =
tmpPartitionsByTopic.get(p.topic());
- if (partitionsForTopic == null) {
- partitionsForTopic = new ArrayList<>();
- tmpPartitionsByTopic.put(p.topic(), partitionsForTopic);
- }
- partitionsForTopic.add(p);
- if (p.leader() != null) {
- // The broker guarantees that if a partition has a non-null
leader, it is one of the brokers returned
- // in the metadata response
- List<PartitionInfo> partitionsForNode =
Objects.requireNonNull(tmpPartitionsByNode.get(p.leader().id()));
- partitionsForNode.add(p);
- }
+ tmpPartitionsByTopic.computeIfAbsent(p.topic(), topic -> new
ArrayList<>()).add(p);
+
+ // The leader may not be known
+ if (p.leader() == null || p.leader().isEmpty())
+ continue;
+
+ // If it is known, its node information should be available
+ List<PartitionInfo> partitionsForNode =
Objects.requireNonNull(tmpPartitionsByNode.get(p.leader().id()));
+ partitionsForNode.add(p);
}
// Update the values of `tmpPartitionsByNode` to contain unmodifiable
lists
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index e4e09a5..4fee563 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -19,10 +19,11 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.MetadataResponseData;
-import
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
-import
org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition;
import
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBroker;
+import
org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition;
+import
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
@@ -40,6 +41,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.function.Function;
import java.util.stream.Collectors;
/**
@@ -55,7 +57,7 @@ import java.util.stream.Collectors;
*/
public class MetadataResponse extends AbstractResponse {
public static final int NO_CONTROLLER_ID = -1;
-
+ public static final int NO_LEADER_ID = -1;
public static final int AUTHORIZED_OPERATIONS_OMITTED = Integer.MIN_VALUE;
private final MetadataResponseData data;
@@ -131,12 +133,13 @@ public class MetadataResponse extends AbstractResponse {
public Cluster cluster() {
Set<String> internalTopics = new HashSet<>();
List<PartitionInfo> partitions = new ArrayList<>();
+
for (TopicMetadata metadata : topicMetadata()) {
if (metadata.error == Errors.NONE) {
if (metadata.isInternal)
internalTopics.add(metadata.topic);
for (PartitionMetadata partitionMetadata :
metadata.partitionMetadata) {
- partitions.add(partitionMetaToInfo(metadata.topic,
partitionMetadata));
+ partitions.add(toPartitionInfo(partitionMetadata,
holder().brokers));
}
}
}
@@ -144,6 +147,24 @@ public class MetadataResponse extends AbstractResponse {
topicsByError(Errors.INVALID_TOPIC_EXCEPTION), internalTopics,
controller());
}
+ public static PartitionInfo toPartitionInfo(PartitionMetadata metadata,
Map<Integer, Node> nodesById) {
+ return new PartitionInfo(metadata.topic(),
+ metadata.partition(),
+ metadata.leaderId.map(nodesById::get).orElse(null),
+ convertToNodeArray(metadata.replicaIds, nodesById),
+ convertToNodeArray(metadata.inSyncReplicaIds, nodesById),
+ convertToNodeArray(metadata.offlineReplicaIds, nodesById));
+ }
+
+ private static Node[] convertToNodeArray(List<Integer> replicaIds,
Map<Integer, Node> nodesById) {
+ return replicaIds.stream().map(replicaId -> {
+ Node node = nodesById.get(replicaId);
+ if (node == null)
+ return new Node(replicaId, "", -1);
+ return node;
+ }).toArray(Node[]::new);
+ }
+
/**
* Returns a 32-bit bitfield to represent authorized operations for this
topic.
*/
@@ -162,19 +183,6 @@ public class MetadataResponse extends AbstractResponse {
return data.clusterAuthorizedOperations();
}
- /**
- * Transform a topic and PartitionMetadata into PartitionInfo.
- */
- public static PartitionInfo partitionMetaToInfo(String topic,
PartitionMetadata partitionMetadata) {
- return new PartitionInfo(
- topic,
- partitionMetadata.partition(),
- partitionMetadata.leader(),
- partitionMetadata.replicas().toArray(new Node[0]),
- partitionMetadata.isr().toArray(new Node[0]),
- partitionMetadata.offlineReplicas().toArray(new Node[0]));
- }
-
private Holder holder() {
if (holder == null) {
synchronized (data) {
@@ -190,6 +198,10 @@ public class MetadataResponse extends AbstractResponse {
* @return the brokers
*/
public Collection<Node> brokers() {
+ return holder().brokers.values();
+ }
+
+ public Map<Integer, Node> brokersById() {
return holder().brokers;
}
@@ -314,93 +326,78 @@ public class MetadataResponse extends AbstractResponse {
// This is used to describe per-partition state in the MetadataResponse
public static class PartitionMetadata {
- private final Errors error;
- private final int partition;
- private final Node leader;
- private final Optional<Integer> leaderEpoch;
- private final List<Node> replicas;
- private final List<Node> isr;
- private final List<Node> offlineReplicas;
+ public final TopicPartition topicPartition;
+ public final Errors error;
+ public final Optional<Integer> leaderId;
+ public final Optional<Integer> leaderEpoch;
+ public final List<Integer> replicaIds;
+ public final List<Integer> inSyncReplicaIds;
+ public final List<Integer> offlineReplicaIds;
public PartitionMetadata(Errors error,
- int partition,
- Node leader,
+ TopicPartition topicPartition,
+ Optional<Integer> leaderId,
Optional<Integer> leaderEpoch,
- List<Node> replicas,
- List<Node> isr,
- List<Node> offlineReplicas) {
+ List<Integer> replicaIds,
+ List<Integer> inSyncReplicaIds,
+ List<Integer> offlineReplicaIds) {
this.error = error;
- this.partition = partition;
- this.leader = leader;
+ this.topicPartition = topicPartition;
+ this.leaderId = leaderId;
this.leaderEpoch = leaderEpoch;
- this.replicas = replicas;
- this.isr = isr;
- this.offlineReplicas = offlineReplicas;
- }
-
- public Errors error() {
- return error;
+ this.replicaIds = replicaIds;
+ this.inSyncReplicaIds = inSyncReplicaIds;
+ this.offlineReplicaIds = offlineReplicaIds;
}
public int partition() {
- return partition;
- }
-
- public int leaderId() {
- return leader == null ? -1 : leader.id();
- }
-
- public Optional<Integer> leaderEpoch() {
- return leaderEpoch;
- }
-
- public Node leader() {
- return leader;
+ return topicPartition.partition();
}
- public List<Node> replicas() {
- return replicas;
- }
-
- public List<Node> isr() {
- return isr;
+ public String topic() {
+ return topicPartition.topic();
}
- public List<Node> offlineReplicas() {
- return offlineReplicas;
+ public PartitionMetadata withoutLeaderEpoch() {
+ return new PartitionMetadata(error,
+ topicPartition,
+ leaderId,
+ Optional.empty(),
+ replicaIds,
+ inSyncReplicaIds,
+ offlineReplicaIds);
}
@Override
public String toString() {
- return "(type=PartitionMetadata" +
+ return "PartitionMetadata(" +
", error=" + error +
- ", partition=" + partition +
- ", leader=" + leader +
+ ", partition=" + topicPartition +
+ ", leader=" + leaderId +
", leaderEpoch=" + leaderEpoch +
- ", replicas=" + Utils.join(replicas, ",") +
- ", isr=" + Utils.join(isr, ",") +
- ", offlineReplicas=" + Utils.join(offlineReplicas, ",") +
')';
+ ", replicas=" + Utils.join(replicaIds, ",") +
+ ", isr=" + Utils.join(inSyncReplicaIds, ",") +
+ ", offlineReplicas=" + Utils.join(offlineReplicaIds, ",")
+ ')';
}
}
private static class Holder {
- private final Collection<Node> brokers;
+ private final Map<Integer, Node> brokers;
private final Node controller;
private final Collection<TopicMetadata> topicMetadata;
Holder(MetadataResponseData data) {
- this.brokers =
Collections.unmodifiableCollection(createBrokers(data));
- Map<Integer, Node> brokerMap =
brokers.stream().collect(Collectors.toMap(Node::id, b -> b));
- this.topicMetadata = createTopicMetadata(data, brokerMap);
- this.controller = brokerMap.get(data.controllerId());
+ this.brokers = Collections.unmodifiableMap(createBrokers(data));
+ this.topicMetadata = createTopicMetadata(data);
+ this.controller = brokers.get(data.controllerId());
}
- private Collection<Node> createBrokers(MetadataResponseData data) {
- return data.brokers().valuesList().stream().map(b ->
- new Node(b.nodeId(), b.host(), b.port(),
b.rack())).collect(Collectors.toList());
+ private Map<Integer, Node> createBrokers(MetadataResponseData data) {
+ return data.brokers().valuesList().stream().map(b -> new
Node(b.nodeId(), b.host(), b.port(), b.rack()))
+ .collect(Collectors.toMap(Node::id, Function.identity()));
}
- private Collection<TopicMetadata>
createTopicMetadata(MetadataResponseData data, Map<Integer, Node> brokerMap) {
+ private Collection<TopicMetadata>
createTopicMetadata(MetadataResponseData data) {
List<TopicMetadata> topicMetadataList = new ArrayList<>();
for (MetadataResponseTopic topicMetadata : data.topics()) {
Errors topicError = Errors.forCode(topicMetadata.errorCode());
@@ -411,14 +408,15 @@ public class MetadataResponse extends AbstractResponse {
for (MetadataResponsePartition partitionMetadata :
topicMetadata.partitions()) {
Errors partitionError =
Errors.forCode(partitionMetadata.errorCode());
int partitionIndex = partitionMetadata.partitionIndex();
- int leader = partitionMetadata.leaderId();
+
+ int leaderId = partitionMetadata.leaderId();
+ Optional<Integer> leaderIdOpt = leaderId < 0 ?
Optional.empty() : Optional.of(leaderId);
+
Optional<Integer> leaderEpoch =
RequestUtils.getLeaderEpoch(partitionMetadata.leaderEpoch());
- Node leaderNode = leader == -1 ? null :
brokerMap.get(leader);
- List<Node> replicaNodes = convertToNodes(brokerMap,
partitionMetadata.replicaNodes());
- List<Node> isrNodes = convertToNodes(brokerMap,
partitionMetadata.isrNodes());
- List<Node> offlineNodes = convertToNodes(brokerMap,
partitionMetadata.offlineReplicas());
- partitionMetadataList.add(new
PartitionMetadata(partitionError, partitionIndex, leaderNode, leaderEpoch,
- replicaNodes, isrNodes, offlineNodes));
+ TopicPartition topicPartition = new TopicPartition(topic,
partitionIndex);
+ partitionMetadataList.add(new
PartitionMetadata(partitionError, topicPartition, leaderIdOpt,
+ leaderEpoch, partitionMetadata.replicaNodes(),
partitionMetadata.isrNodes(),
+ partitionMetadata.offlineReplicas()));
}
topicMetadataList.add(new TopicMetadata(topicError, topic,
isInternal, partitionMetadataList,
@@ -427,18 +425,6 @@ public class MetadataResponse extends AbstractResponse {
return topicMetadataList;
}
- private List<Node> convertToNodes(Map<Integer, Node> brokers,
List<Integer> brokerIds) {
- List<Node> nodes = new ArrayList<>(brokerIds.size());
- for (Integer brokerId : brokerIds) {
- Node node = brokers.get(brokerId);
- if (node == null)
- nodes.add(new Node(brokerId, "", -1));
- else
- nodes.add(node);
- }
- return nodes;
- }
-
}
public static MetadataResponse prepareResponse(int throttleTimeMs,
Collection<Node> brokers, String clusterId,
@@ -469,12 +455,12 @@ public class MetadataResponse extends AbstractResponse {
for (PartitionMetadata partitionMetadata :
topicMetadata.partitionMetadata) {
metadataResponseTopic.partitions().add(new
MetadataResponsePartition()
.setErrorCode(partitionMetadata.error.code())
- .setPartitionIndex(partitionMetadata.partition)
- .setLeaderId(partitionMetadata.leader == null ? -1 :
partitionMetadata.leader.id())
-
.setLeaderEpoch(partitionMetadata.leaderEpoch().orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
-
.setReplicaNodes(partitionMetadata.replicas.stream().map(Node::id).collect(Collectors.toList()))
-
.setIsrNodes(partitionMetadata.isr.stream().map(Node::id).collect(Collectors.toList()))
-
.setOfflineReplicas(partitionMetadata.offlineReplicas.stream().map(Node::id).collect(Collectors.toList())));
+ .setPartitionIndex(partitionMetadata.partition())
+
.setLeaderId(partitionMetadata.leaderId.orElse(NO_LEADER_ID))
+
.setLeaderEpoch(partitionMetadata.leaderEpoch.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
+ .setReplicaNodes(partitionMetadata.replicaIds)
+ .setIsrNodes(partitionMetadata.inSyncReplicaIds)
+ .setOfflineReplicas(partitionMetadata.offlineReplicaIds));
}
responseData.topics().add(metadataResponseTopic);
});
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
b/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
index 2b1f04c..be72edb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
@@ -17,8 +17,8 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.record.RecordBatch;
import java.nio.ByteBuffer;
import java.util.Optional;
@@ -33,15 +33,12 @@ public final class RequestUtils {
static Optional<Integer> getLeaderEpoch(Struct struct, Field.Int32
leaderEpochField) {
int leaderEpoch = struct.getOrElse(leaderEpochField,
RecordBatch.NO_PARTITION_LEADER_EPOCH);
- Optional<Integer> leaderEpochOpt = leaderEpoch ==
RecordBatch.NO_PARTITION_LEADER_EPOCH ?
- Optional.empty() : Optional.of(leaderEpoch);
- return leaderEpochOpt;
+ return getLeaderEpoch(leaderEpoch);
}
static Optional<Integer> getLeaderEpoch(int leaderEpoch) {
- Optional<Integer> leaderEpochOpt = leaderEpoch ==
RecordBatch.NO_PARTITION_LEADER_EPOCH ?
+ return leaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH ?
Optional.empty() : Optional.of(leaderEpoch);
- return leaderEpochOpt;
}
public static ByteBuffer serialize(Struct headerStruct, Struct bodyStruct)
{
diff --git
a/clients/src/test/java/org/apache/kafka/clients/MetadataCacheTest.java
b/clients/src/test/java/org/apache/kafka/clients/MetadataCacheTest.java
new file mode 100644
index 0000000..afdf89c
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataCacheTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class MetadataCacheTest {
+
+ @Test
+ public void testMissingLeaderEndpoint() {
+ // Although the broker attempts to ensure leader information is
available, the
+ // client metadata cache may retain partition metadata across multiple
responses.
+ // For example, separate responses may contain conflicting leader
epochs for
+ // separate partitions and the client will always retain the highest.
+
+ TopicPartition topicPartition = new TopicPartition("topic", 0);
+
+ MetadataResponse.PartitionMetadata partitionMetadata = new
MetadataResponse.PartitionMetadata(
+ Errors.NONE,
+ topicPartition,
+ Optional.of(5),
+ Optional.of(10),
+ Arrays.asList(5, 6, 7),
+ Arrays.asList(5, 6, 7),
+ Collections.emptyList());
+
+ Map<Integer, Node> nodesById = new HashMap<>();
+ nodesById.put(6, new Node(6, "localhost", 2077));
+ nodesById.put(7, new Node(7, "localhost", 2078));
+ nodesById.put(8, new Node(8, "localhost", 2079));
+
+ MetadataCache cache = new MetadataCache("clusterId",
+ nodesById,
+ Collections.singleton(partitionMetadata),
+ Collections.emptySet(),
+ Collections.emptySet(),
+ Collections.emptySet(),
+ null);
+
+ Cluster cluster = cache.cluster();
+ assertNull(cluster.leaderFor(topicPartition));
+
+ PartitionInfo partitionInfo = cluster.partition(topicPartition);
+ Map<Integer, Node> replicas = Arrays.stream(partitionInfo.replicas())
+ .collect(Collectors.toMap(Node::id, Function.identity()));
+ assertNull(partitionInfo.leader());
+ assertEquals(3, replicas.size());
+ assertTrue(replicas.get(5).isEmpty());
+ assertEquals(nodesById.get(6), replicas.get(6));
+ assertEquals(nodesById.get(7), replicas.get(7));
+ }
+
+}
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 7067e88..3dc4c9a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -46,7 +46,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.stream.Collectors;
import static org.apache.kafka.test.TestUtils.assertOptional;
import static org.junit.Assert.assertEquals;
@@ -196,9 +195,9 @@ public class MetadataTest {
MetadataResponse response = new MetadataResponse(struct, version);
assertFalse(response.hasReliableLeaderEpochs());
metadata.update(response, 100);
- assertTrue(metadata.partitionInfoIfCurrent(tp).isPresent());
- MetadataCache.PartitionInfoAndEpoch info =
metadata.partitionInfoIfCurrent(tp).get();
- assertEquals(-1, info.epoch());
+ assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent());
+ MetadataResponse.PartitionMetadata metadata =
this.metadata.partitionMetadataIfCurrent(tp).get();
+ assertEquals(Optional.empty(), metadata.leaderEpoch);
}
for (short version = 9; version <= ApiKeys.METADATA.latestVersion();
version++) {
@@ -206,9 +205,9 @@ public class MetadataTest {
MetadataResponse response = new MetadataResponse(struct, version);
assertTrue(response.hasReliableLeaderEpochs());
metadata.update(response, 100);
- assertTrue(metadata.partitionInfoIfCurrent(tp).isPresent());
- MetadataCache.PartitionInfoAndEpoch info =
metadata.partitionInfoIfCurrent(tp).get();
- assertEquals(10, info.epoch());
+ assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent());
+ MetadataResponse.PartitionMetadata info =
metadata.partitionMetadataIfCurrent(tp).get();
+ assertEquals(Optional.of(10), info.leaderEpoch);
}
}
@@ -255,13 +254,11 @@ public class MetadataTest {
metadata.update(new MetadataResponse(data), 101);
assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
- assertTrue(metadata.partitionInfoIfCurrent(tp).isPresent());
- MetadataCache.PartitionInfoAndEpoch info =
metadata.partitionInfoIfCurrent(tp).get();
+ assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent());
+ MetadataResponse.PartitionMetadata metadata =
this.metadata.partitionMetadataIfCurrent(tp).get();
- List<Integer> cachedIsr =
Arrays.stream(info.partitionInfo().inSyncReplicas())
- .map(Node::id).collect(Collectors.toList());
- assertEquals(Arrays.asList(1, 2, 3), cachedIsr);
- assertEquals(10, info.epoch());
+ assertEquals(Arrays.asList(1, 2, 3), metadata.inSyncReplicaIds);
+ assertEquals(Optional.of(10), metadata.leaderEpoch);
}
@Test
@@ -419,14 +416,14 @@ public class MetadataTest {
// Cache of partition stays, but current partition info is not
available since it's stale
assertNotNull(metadata.fetch().partition(tp));
assertEquals(metadata.fetch().partitionCountForTopic("topic-1").longValue(), 5);
- assertFalse(metadata.partitionInfoIfCurrent(tp).isPresent());
+ assertFalse(metadata.partitionMetadataIfCurrent(tp).isPresent());
assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 101);
// Metadata with older epoch is rejected, metadata state is unchanged
metadata.update(metadataResponse, 20L);
assertNotNull(metadata.fetch().partition(tp));
assertEquals(metadata.fetch().partitionCountForTopic("topic-1").longValue(), 5);
- assertFalse(metadata.partitionInfoIfCurrent(tp).isPresent());
+ assertFalse(metadata.partitionMetadataIfCurrent(tp).isPresent());
assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 101);
// Metadata with equal or newer epoch is accepted
@@ -434,7 +431,7 @@ public class MetadataTest {
metadata.update(metadataResponse, 30L);
assertNotNull(metadata.fetch().partition(tp));
assertEquals(metadata.fetch().partitionCountForTopic("topic-1").longValue(), 5);
- assertTrue(metadata.partitionInfoIfCurrent(tp).isPresent());
+ assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent());
assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 101);
}
@@ -450,9 +447,9 @@ public class MetadataTest {
assertFalse(metadata.lastSeenLeaderEpoch(tp).isPresent());
// still works
- assertTrue(metadata.partitionInfoIfCurrent(tp).isPresent());
-
assertEquals(metadata.partitionInfoIfCurrent(tp).get().partitionInfo().partition(),
0);
-
assertEquals(metadata.partitionInfoIfCurrent(tp).get().partitionInfo().leader().id(),
0);
+ assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent());
+ assertEquals(0,
metadata.partitionMetadataIfCurrent(tp).get().partition());
+ assertEquals(Optional.of(0),
metadata.partitionMetadataIfCurrent(tp).get().leaderId);
}
@Test
@@ -611,8 +608,9 @@ public class MetadataTest {
MetadataResponse metadataResponse =
TestUtils.metadataUpdateWith("dummy", 2, Collections.emptyMap(),
partitionCounts, _tp -> 99,
(error, partition, leader, leaderEpoch, replicas, isr,
offlineReplicas) ->
- new MetadataResponse.PartitionMetadata(error, partition,
node0, leaderEpoch,
- Collections.singletonList(node0), Collections.emptyList(),
Collections.singletonList(node1)));
+ new MetadataResponse.PartitionMetadata(error, partition,
Optional.of(node0.id()), leaderEpoch,
+ Collections.singletonList(node0.id()),
Collections.emptyList(),
+ Collections.singletonList(node1.id())));
metadata.update(emptyMetadataResponse(), 0L);
metadata.update(metadataResponse, 10L);
@@ -623,4 +621,79 @@ public class MetadataTest {
assertEquals(metadata.fetch().nodeById(0).id(), 0);
assertEquals(metadata.fetch().nodeById(1).id(), 1);
}
+
+ @Test
+ public void testLeaderMetadataInconsistentWithBrokerMetadata() {
+ // Tests a reordering scenario which can lead to inconsistent leader
state.
+ // A partition initially has one broker offline. That broker comes
online and
+ // is elected leader. The client sees these two events in the opposite
order.
+
+ TopicPartition tp = new TopicPartition("topic", 0);
+
+ Node node0 = new Node(0, "localhost", 9092);
+ Node node1 = new Node(1, "localhost", 9093);
+ Node node2 = new Node(2, "localhost", 9094);
+
+ // The first metadata received by broker (epoch=10)
+ MetadataResponsePartition firstPartitionMetadata = new
MetadataResponsePartition()
+ .setPartitionIndex(tp.partition())
+ .setErrorCode(Errors.NONE.code())
+ .setLeaderEpoch(10)
+ .setLeaderId(0)
+ .setReplicaNodes(Arrays.asList(0, 1, 2))
+ .setIsrNodes(Arrays.asList(0, 1, 2))
+ .setOfflineReplicas(Collections.emptyList());
+
+ // The second metadata received has stale metadata (epoch=8)
+ MetadataResponsePartition secondPartitionMetadata = new
MetadataResponsePartition()
+ .setPartitionIndex(tp.partition())
+ .setErrorCode(Errors.NONE.code())
+ .setLeaderEpoch(8)
+ .setLeaderId(1)
+ .setReplicaNodes(Arrays.asList(0, 1, 2))
+ .setIsrNodes(Arrays.asList(1, 2))
+ .setOfflineReplicas(Collections.singletonList(0));
+
+ metadata.update(new MetadataResponse(new MetadataResponseData()
+ .setTopics(buildTopicCollection(tp.topic(),
firstPartitionMetadata))
+ .setBrokers(buildBrokerCollection(Arrays.asList(node0,
node1, node2)))),
+ 10L);
+
+ metadata.update(new MetadataResponse(new MetadataResponseData()
+ .setTopics(buildTopicCollection(tp.topic(),
secondPartitionMetadata))
+ .setBrokers(buildBrokerCollection(Arrays.asList(node1,
node2)))),
+ 20L);
+
+ assertNull(metadata.fetch().leaderFor(tp));
+ assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+ assertFalse(metadata.currentLeader(tp).leader.isPresent());
+ }
+
+ private MetadataResponseTopicCollection buildTopicCollection(String topic,
MetadataResponsePartition partitionMetadata) {
+ MetadataResponseTopic topicMetadata = new MetadataResponseTopic()
+ .setErrorCode(Errors.NONE.code())
+ .setName(topic)
+ .setIsInternal(false);
+
+
topicMetadata.setPartitions(Collections.singletonList(partitionMetadata));
+
+ MetadataResponseTopicCollection topics = new
MetadataResponseTopicCollection();
+ topics.add(topicMetadata);
+ return topics;
+ }
+
+ private MetadataResponseBrokerCollection buildBrokerCollection(List<Node>
nodes) {
+ MetadataResponseBrokerCollection brokers = new
MetadataResponseBrokerCollection();
+ for (Node node : nodes) {
+ MetadataResponseData.MetadataResponseBroker broker = new
MetadataResponseData.MetadataResponseBroker()
+ .setNodeId(node.id())
+ .setHost(node.host())
+ .setPort(node.port())
+ .setRack(node.rack());
+ brokers.add(broker);
+ }
+ return brokers;
+ }
+
+
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index c869aaa..cba9b48 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -347,12 +347,12 @@ public class KafkaAdminClientTest {
List<PartitionMetadata> pms = new ArrayList<>();
for (PartitionInfo pInfo :
cluster.availablePartitionsForTopic(topic)) {
PartitionMetadata pm = new PartitionMetadata(error,
- pInfo.partition(),
- pInfo.leader(),
+ new TopicPartition(topic, pInfo.partition()),
+ Optional.of(pInfo.leader().id()),
Optional.of(234),
- Arrays.asList(pInfo.replicas()),
- Arrays.asList(pInfo.inSyncReplicas()),
- Arrays.asList(pInfo.offlineReplicas()));
+
Arrays.stream(pInfo.replicas()).map(Node::id).collect(Collectors.toList()),
+
Arrays.stream(pInfo.inSyncReplicas()).map(Node::id).collect(Collectors.toList()),
+
Arrays.stream(pInfo.offlineReplicas()).map(Node::id).collect(Collectors.toList()));
pms.add(pm);
}
TopicMetadata tm = new TopicMetadata(error, topic, false, pms);
@@ -614,8 +614,8 @@ public class KafkaAdminClientTest {
// Then we respond to the DescribeTopic request
Node leader = initializedCluster.nodes().get(0);
MetadataResponse.PartitionMetadata partitionMetadata = new
MetadataResponse.PartitionMetadata(
- Errors.NONE, 0, leader, Optional.of(10),
singletonList(leader),
- singletonList(leader), singletonList(leader));
+ Errors.NONE, new TopicPartition(topic, 0),
Optional.of(leader.id()), Optional.of(10),
+ singletonList(leader.id()), singletonList(leader.id()),
singletonList(leader.id()));
env.kafkaClient().prepareResponse(MetadataResponse.prepareResponse(initializedCluster.nodes(),
initializedCluster.clusterResource().clusterId(), 1,
singletonList(new
MetadataResponse.TopicMetadata(Errors.NONE, topic, false,
@@ -960,11 +960,12 @@ public class KafkaAdminClientTest {
List<Node> nodes = env.cluster().nodes();
List<MetadataResponse.PartitionMetadata> partitionMetadata = new
ArrayList<>();
- partitionMetadata.add(new
MetadataResponse.PartitionMetadata(Errors.NONE, tp0.partition(), nodes.get(0),
- Optional.of(5), singletonList(nodes.get(0)),
singletonList(nodes.get(0)),
- Collections.emptyList()));
- partitionMetadata.add(new
MetadataResponse.PartitionMetadata(Errors.NONE, tp1.partition(), nodes.get(1),
- Optional.of(5), singletonList(nodes.get(1)),
singletonList(nodes.get(1)), Collections.emptyList()));
+ partitionMetadata.add(new
MetadataResponse.PartitionMetadata(Errors.NONE, tp0,
+ Optional.of(nodes.get(0).id()), Optional.of(5),
singletonList(nodes.get(0).id()),
+ singletonList(nodes.get(0).id()),
Collections.emptyList()));
+ partitionMetadata.add(new
MetadataResponse.PartitionMetadata(Errors.NONE, tp1,
+ Optional.of(nodes.get(1).id()), Optional.of(5),
singletonList(nodes.get(1).id()),
+ singletonList(nodes.get(1).id()),
Collections.emptyList()));
List<MetadataResponse.TopicMetadata> topicMetadata = new
ArrayList<>();
topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE,
topic, false, partitionMetadata));
@@ -1024,17 +1025,21 @@ public class KafkaAdminClientTest {
List<MetadataResponse.TopicMetadata> t = new ArrayList<>();
List<MetadataResponse.PartitionMetadata> p = new ArrayList<>();
- p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 0,
nodes.get(0), Optional.of(5),
- singletonList(nodes.get(0)), singletonList(nodes.get(0)),
Collections.emptyList()));
- p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 1,
nodes.get(0), Optional.of(5),
- singletonList(nodes.get(0)), singletonList(nodes.get(0)),
Collections.emptyList()));
- p.add(new
MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE, 2, null,
- Optional.empty(), singletonList(nodes.get(0)),
singletonList(nodes.get(0)),
- Collections.emptyList()));
- p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 3,
nodes.get(0), Optional.of(5),
- singletonList(nodes.get(0)), singletonList(nodes.get(0)),
Collections.emptyList()));
- p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 4,
nodes.get(0), Optional.of(5),
- singletonList(nodes.get(0)), singletonList(nodes.get(0)),
Collections.emptyList()));
+ p.add(new MetadataResponse.PartitionMetadata(Errors.NONE,
myTopicPartition0,
+ Optional.of(nodes.get(0).id()), Optional.of(5),
singletonList(nodes.get(0).id()),
+ singletonList(nodes.get(0).id()),
Collections.emptyList()));
+ p.add(new MetadataResponse.PartitionMetadata(Errors.NONE,
myTopicPartition1,
+ Optional.of(nodes.get(0).id()), Optional.of(5),
singletonList(nodes.get(0).id()),
+ singletonList(nodes.get(0).id()),
Collections.emptyList()));
+ p.add(new
MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE,
myTopicPartition2,
+ Optional.empty(), Optional.empty(),
singletonList(nodes.get(0).id()),
+ singletonList(nodes.get(0).id()),
Collections.emptyList()));
+ p.add(new MetadataResponse.PartitionMetadata(Errors.NONE,
myTopicPartition3,
+ Optional.of(nodes.get(0).id()), Optional.of(5),
singletonList(nodes.get(0).id()),
+ singletonList(nodes.get(0).id()),
Collections.emptyList()));
+ p.add(new MetadataResponse.PartitionMetadata(Errors.NONE,
myTopicPartition4,
+ Optional.of(nodes.get(0).id()), Optional.of(5),
singletonList(nodes.get(0).id()),
+ singletonList(nodes.get(0).id()),
Collections.emptyList()));
t.add(new MetadataResponse.TopicMetadata(Errors.NONE, "my_topic",
false, p));
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index b0b6fb8..e6483f5 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -1391,8 +1391,9 @@ public class ConsumerCoordinatorTest {
subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
Node node = new Node(0, "localhost", 9999);
MetadataResponse.PartitionMetadata partitionMetadata =
- new MetadataResponse.PartitionMetadata(Errors.NONE, 0, node,
Optional.empty(),
- singletonList(node), singletonList(node),
singletonList(node));
+ new MetadataResponse.PartitionMetadata(Errors.NONE, new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0),
+ Optional.of(node.id()), Optional.empty(),
singletonList(node.id()), singletonList(node.id()),
+ singletonList(node.id()));
MetadataResponse.TopicMetadata topicMetadata = new
MetadataResponse.TopicMetadata(Errors.NONE,
Topic.GROUP_METADATA_TOPIC_NAME, true,
singletonList(partitionMetadata));
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
index b373192..b456e6c 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
@@ -160,7 +160,8 @@ public class ConsumerMetadataTest {
private MetadataResponse.TopicMetadata topicMetadata(String topic, boolean
isInternal) {
MetadataResponse.PartitionMetadata partitionMetadata = new
MetadataResponse.PartitionMetadata(Errors.NONE,
- 0, node, Optional.of(5), singletonList(node),
singletonList(node), singletonList(node));
+ new TopicPartition(topic, 0), Optional.of(node.id()),
Optional.of(5),
+ singletonList(node.id()), singletonList(node.id()),
singletonList(node.id()));
return new MetadataResponse.TopicMetadata(Errors.NONE, topic,
isInternal, singletonList(partitionMetadata));
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 2e45935..843e1bf 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1946,14 +1946,14 @@ public class FetcherTest {
List<MetadataResponse.PartitionMetadata> altPartitions = new
ArrayList<>();
for (MetadataResponse.PartitionMetadata p : partitions) {
altPartitions.add(new MetadataResponse.PartitionMetadata(
- p.error(),
- p.partition(),
- null, //no leader
+ p.error,
+ p.topicPartition,
+ Optional.empty(), //no leader
Optional.empty(),
- p.replicas(),
- p.isr(),
- p.offlineReplicas())
- );
+ p.replicaIds,
+ p.inSyncReplicaIds,
+ p.offlineReplicaIds
+ ));
}
MetadataResponse.TopicMetadata alteredTopic = new
MetadataResponse.TopicMetadata(
item.error(),
@@ -3023,8 +3023,8 @@ public class FetcherTest {
List<ConsumerRecord<byte[], byte[]>> records;
assignFromUser(new HashSet<>(Arrays.asList(tp0, tp1)));
- subscriptions.seekValidated(tp0, new
SubscriptionState.FetchPosition(0, Optional.empty(),
metadata.leaderAndEpoch(tp0)));
- subscriptions.seekValidated(tp1, new
SubscriptionState.FetchPosition(1, Optional.empty(),
metadata.leaderAndEpoch(tp1)));
+ subscriptions.seekValidated(tp0, new
SubscriptionState.FetchPosition(0, Optional.empty(),
metadata.currentLeader(tp0)));
+ subscriptions.seekValidated(tp1, new
SubscriptionState.FetchPosition(1, Optional.empty(),
metadata.currentLeader(tp1)));
// Fetch some records and establish an incremental fetch session.
LinkedHashMap<TopicPartition,
FetchResponse.PartitionData<MemoryRecords>> partitions1 = new LinkedHashMap<>();
@@ -3503,7 +3503,7 @@ public class FetcherTest {
// Seek with a position and leader+epoch
Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(
- metadata.leaderAndEpoch(tp0).leader, Optional.of(epochOne));
+ metadata.currentLeader(tp0).leader, Optional.of(epochOne));
subscriptions.seekUnvalidated(tp0, new
SubscriptionState.FetchPosition(20L, Optional.of(epochOne), leaderAndEpoch));
assertFalse(client.isConnected(node.idString()));
assertTrue(subscriptions.awaitingValidation(tp0));
@@ -3552,7 +3552,7 @@ public class FetcherTest {
// Seek with a position and leader+epoch
Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(
- metadata.leaderAndEpoch(tp0).leader, Optional.of(epochOne));
+ metadata.currentLeader(tp0).leader, Optional.of(epochOne));
subscriptions.seekUnvalidated(tp0, new
SubscriptionState.FetchPosition(0, Optional.of(epochOne), leaderAndEpoch));
// Update metadata to epoch=2, enter validation
@@ -3580,7 +3580,7 @@ public class FetcherTest {
Node node = metadata.fetch().nodes().get(0);
apiVersions.update(node.idString(), NodeApiVersions.create());
- Metadata.LeaderAndEpoch leaderAndEpoch = new
Metadata.LeaderAndEpoch(metadata.leaderAndEpoch(tp0).leader,
Optional.of(epochOne));
+ Metadata.LeaderAndEpoch leaderAndEpoch = new
Metadata.LeaderAndEpoch(metadata.currentLeader(tp0).leader,
Optional.of(epochOne));
subscriptions.seekUnvalidated(tp0, new
SubscriptionState.FetchPosition(0, Optional.of(epochOne), leaderAndEpoch));
fetcher.validateOffsetsIfNeeded();
@@ -3623,7 +3623,7 @@ public class FetcherTest {
apiVersions.update(node.idString(), NodeApiVersions.create());
// Seek with a position and leader+epoch
- Metadata.LeaderAndEpoch leaderAndEpoch = new
Metadata.LeaderAndEpoch(metadata.leaderAndEpoch(tp0).leader,
Optional.of(epochOne));
+ Metadata.LeaderAndEpoch leaderAndEpoch = new
Metadata.LeaderAndEpoch(metadata.currentLeader(tp0).leader,
Optional.of(epochOne));
subscriptions.seekValidated(tp0, new
SubscriptionState.FetchPosition(0, Optional.of(epochOne), leaderAndEpoch));
// Update metadata to epoch=2, enter validation
@@ -3693,7 +3693,7 @@ public class FetcherTest {
apiVersions.update(node.idString(), NodeApiVersions.create());
// Seek
- Metadata.LeaderAndEpoch leaderAndEpoch = new
Metadata.LeaderAndEpoch(metadata.leaderAndEpoch(tp0).leader, Optional.of(1));
+ Metadata.LeaderAndEpoch leaderAndEpoch = new
Metadata.LeaderAndEpoch(metadata.currentLeader(tp0).leader, Optional.of(1));
subscriptions.seekValidated(tp0, new
SubscriptionState.FetchPosition(0, Optional.of(1), leaderAndEpoch));
// Check for truncation, this should cause tp0 to go into validation
@@ -3817,19 +3817,22 @@ public class FetcherTest {
client.prepareResponse(fullFetchResponse(tp0, buildRecords(1L, 1, 1),
Errors.NONE, 100L, 0));
consumerClient.poll(time.timer(0));
fetchedRecords();
- Node node = fetcher.selectReadReplica(tp0,
subscriptions.position(tp0).currentLeader.leader, time.milliseconds());
+
+ Metadata.LeaderAndEpoch leaderAndEpoch =
subscriptions.position(tp0).currentLeader;
+ assertTrue(leaderAndEpoch.leader.isPresent());
+ Node readReplica = fetcher.selectReadReplica(tp0,
leaderAndEpoch.leader.get(), time.milliseconds());
AtomicBoolean wokenUp = new AtomicBoolean(false);
client.setWakeupHook(() -> {
if (!wokenUp.getAndSet(true)) {
- consumerClient.disconnectAsync(node);
+ consumerClient.disconnectAsync(readReplica);
consumerClient.poll(time.timer(0));
}
});
assertEquals(1, fetcher.sendFetches());
- consumerClient.disconnectAsync(node);
+ consumerClient.disconnectAsync(readReplica);
consumerClient.poll(time.timer(0));
assertEquals(1, fetcher.sendFetches());
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java
index 55b8754..b3339ac 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java
@@ -69,7 +69,7 @@ public class OffsetForLeaderEpochClientTest {
public void testUnexpectedEmptyResponse() {
Map<TopicPartition, SubscriptionState.FetchPosition> positionMap = new
HashMap<>();
positionMap.put(tp0, new SubscriptionState.FetchPosition(0,
Optional.of(1),
- new Metadata.LeaderAndEpoch(Node.noNode(), Optional.of(1))));
+ new Metadata.LeaderAndEpoch(Optional.empty(),
Optional.of(1))));
OffsetsForLeaderEpochClient offsetClient = newOffsetClient();
RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future
=
@@ -88,7 +88,7 @@ public class OffsetForLeaderEpochClientTest {
public void testOkResponse() {
Map<TopicPartition, SubscriptionState.FetchPosition> positionMap = new
HashMap<>();
positionMap.put(tp0, new SubscriptionState.FetchPosition(0,
Optional.of(1),
- new Metadata.LeaderAndEpoch(Node.noNode(), Optional.of(1))));
+ new Metadata.LeaderAndEpoch(Optional.empty(),
Optional.of(1))));
OffsetsForLeaderEpochClient offsetClient = newOffsetClient();
RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future
=
@@ -111,7 +111,7 @@ public class OffsetForLeaderEpochClientTest {
public void testUnauthorizedTopic() {
Map<TopicPartition, SubscriptionState.FetchPosition> positionMap = new
HashMap<>();
positionMap.put(tp0, new SubscriptionState.FetchPosition(0,
Optional.of(1),
- new Metadata.LeaderAndEpoch(Node.noNode(), Optional.of(1))));
+ new Metadata.LeaderAndEpoch(Optional.empty(),
Optional.of(1))));
OffsetsForLeaderEpochClient offsetClient = newOffsetClient();
RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future
=
@@ -131,7 +131,7 @@ public class OffsetForLeaderEpochClientTest {
public void testRetriableError() {
Map<TopicPartition, SubscriptionState.FetchPosition> positionMap = new
HashMap<>();
positionMap.put(tp0, new SubscriptionState.FetchPosition(0,
Optional.of(1),
- new Metadata.LeaderAndEpoch(Node.noNode(), Optional.of(1))));
+ new Metadata.LeaderAndEpoch(Optional.empty(),
Optional.of(1))));
OffsetsForLeaderEpochClient offsetClient = newOffsetClient();
RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future
=
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index 6e1e8a3..74048ad 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -51,7 +51,7 @@ public class SubscriptionStateTest {
private final TopicPartition tp1 = new TopicPartition(topic, 1);
private final TopicPartition t1p0 = new TopicPartition(topic1, 0);
private final MockRebalanceListener rebalanceListener = new
MockRebalanceListener();
- private final Metadata.LeaderAndEpoch leaderAndEpoch = new
Metadata.LeaderAndEpoch(Node.noNode(), Optional.empty());
+ private final Metadata.LeaderAndEpoch leaderAndEpoch =
Metadata.LeaderAndEpoch.noLeaderOrEpoch();
@Test
public void partitionAssignment() {
@@ -363,15 +363,17 @@ public class SubscriptionStateTest {
// Seek with no offset epoch requires no validation no matter what the
current leader is
state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0L,
Optional.empty(),
- new Metadata.LeaderAndEpoch(broker1, Optional.of(5))));
+ new Metadata.LeaderAndEpoch(Optional.of(broker1),
Optional.of(5))));
assertTrue(state.hasValidPosition(tp0));
assertFalse(state.awaitingValidation(tp0));
- assertFalse(state.maybeValidatePositionForCurrentLeader(tp0, new
Metadata.LeaderAndEpoch(broker1, Optional.empty())));
+ assertFalse(state.maybeValidatePositionForCurrentLeader(tp0, new
Metadata.LeaderAndEpoch(
+ Optional.of(broker1), Optional.empty())));
assertTrue(state.hasValidPosition(tp0));
assertFalse(state.awaitingValidation(tp0));
- assertFalse(state.maybeValidatePositionForCurrentLeader(tp0, new
Metadata.LeaderAndEpoch(broker1, Optional.of(10))));
+ assertFalse(state.maybeValidatePositionForCurrentLeader(tp0, new
Metadata.LeaderAndEpoch(
+ Optional.of(broker1), Optional.of(10))));
assertTrue(state.hasValidPosition(tp0));
assertFalse(state.awaitingValidation(tp0));
}
@@ -383,12 +385,12 @@ public class SubscriptionStateTest {
// Seek with no offset epoch requires no validation no matter what the
current leader is
state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0L,
Optional.of(2),
- new Metadata.LeaderAndEpoch(broker1, Optional.of(5))));
+ new Metadata.LeaderAndEpoch(Optional.of(broker1),
Optional.of(5))));
assertFalse(state.hasValidPosition(tp0));
assertTrue(state.awaitingValidation(tp0));
state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0L,
Optional.empty(),
- new Metadata.LeaderAndEpoch(broker1, Optional.of(5))));
+ new Metadata.LeaderAndEpoch(Optional.of(broker1),
Optional.of(5))));
assertTrue(state.hasValidPosition(tp0));
assertFalse(state.awaitingValidation(tp0));
}
@@ -399,22 +401,25 @@ public class SubscriptionStateTest {
state.assignFromUser(Collections.singleton(tp0));
state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0L,
Optional.of(2),
- new Metadata.LeaderAndEpoch(broker1, Optional.of(5))));
+ new Metadata.LeaderAndEpoch(Optional.of(broker1),
Optional.of(5))));
assertFalse(state.hasValidPosition(tp0));
assertTrue(state.awaitingValidation(tp0));
// Update using the current leader and epoch
- assertTrue(state.maybeValidatePositionForCurrentLeader(tp0, new
Metadata.LeaderAndEpoch(broker1, Optional.of(5))));
+ assertTrue(state.maybeValidatePositionForCurrentLeader(tp0, new
Metadata.LeaderAndEpoch(
+ Optional.of(broker1), Optional.of(5))));
assertFalse(state.hasValidPosition(tp0));
assertTrue(state.awaitingValidation(tp0));
// Update with a newer leader and epoch
- assertTrue(state.maybeValidatePositionForCurrentLeader(tp0, new
Metadata.LeaderAndEpoch(broker1, Optional.of(15))));
+ assertTrue(state.maybeValidatePositionForCurrentLeader(tp0, new
Metadata.LeaderAndEpoch(
+ Optional.of(broker1), Optional.of(15))));
assertFalse(state.hasValidPosition(tp0));
assertTrue(state.awaitingValidation(tp0));
// If the updated leader has no epoch information, then skip
validation and begin fetching
- assertFalse(state.maybeValidatePositionForCurrentLeader(tp0, new
Metadata.LeaderAndEpoch(broker1, Optional.empty())));
+ assertFalse(state.maybeValidatePositionForCurrentLeader(tp0, new
Metadata.LeaderAndEpoch(
+ Optional.of(broker1), Optional.empty())));
assertTrue(state.hasValidPosition(tp0));
assertFalse(state.awaitingValidation(tp0));
}
@@ -425,13 +430,13 @@ public class SubscriptionStateTest {
state.assignFromUser(Collections.singleton(tp0));
state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(10L,
Optional.of(5),
- new Metadata.LeaderAndEpoch(broker1, Optional.of(10))));
+ new Metadata.LeaderAndEpoch(Optional.of(broker1),
Optional.of(10))));
assertFalse(state.hasValidPosition(tp0));
assertTrue(state.awaitingValidation(tp0));
assertEquals(10L, state.position(tp0).offset);
state.seekValidated(tp0, new SubscriptionState.FetchPosition(8L,
Optional.of(4),
- new Metadata.LeaderAndEpoch(broker1, Optional.of(10))));
+ new Metadata.LeaderAndEpoch(Optional.of(broker1),
Optional.of(10))));
assertTrue(state.hasValidPosition(tp0));
assertFalse(state.awaitingValidation(tp0));
assertEquals(8L, state.position(tp0).offset);
@@ -443,7 +448,7 @@ public class SubscriptionStateTest {
state.assignFromUser(Collections.singleton(tp0));
state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(10L,
Optional.of(5),
- new Metadata.LeaderAndEpoch(broker1, Optional.of(10))));
+ new Metadata.LeaderAndEpoch(Optional.of(broker1),
Optional.of(10))));
assertFalse(state.hasValidPosition(tp0));
assertTrue(state.awaitingValidation(tp0));
assertEquals(10L, state.position(tp0).offset);
@@ -460,7 +465,7 @@ public class SubscriptionStateTest {
state.assignFromUser(Collections.singleton(tp0));
state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(10L,
Optional.of(5),
- new Metadata.LeaderAndEpoch(broker1, Optional.of(10))));
+ new Metadata.LeaderAndEpoch(Optional.of(broker1),
Optional.of(10))));
assertTrue(state.awaitingValidation(tp0));
state.requestOffsetReset(tp0, OffsetResetStrategy.EARLIEST);
@@ -478,7 +483,7 @@ public class SubscriptionStateTest {
int initialOffsetEpoch = 5;
SubscriptionState.FetchPosition initialPosition = new
SubscriptionState.FetchPosition(initialOffset,
- Optional.of(initialOffsetEpoch), new
Metadata.LeaderAndEpoch(broker1, Optional.of(currentEpoch)));
+ Optional.of(initialOffsetEpoch), new
Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(currentEpoch)));
state.seekUnvalidated(tp0, initialPosition);
assertTrue(state.awaitingValidation(tp0));
@@ -501,12 +506,12 @@ public class SubscriptionStateTest {
int updateOffsetEpoch = 8;
SubscriptionState.FetchPosition initialPosition = new
SubscriptionState.FetchPosition(initialOffset,
- Optional.of(initialOffsetEpoch), new
Metadata.LeaderAndEpoch(broker1, Optional.of(currentEpoch)));
+ Optional.of(initialOffsetEpoch), new
Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(currentEpoch)));
state.seekUnvalidated(tp0, initialPosition);
assertTrue(state.awaitingValidation(tp0));
SubscriptionState.FetchPosition updatePosition = new
SubscriptionState.FetchPosition(updateOffset,
- Optional.of(updateOffsetEpoch), new
Metadata.LeaderAndEpoch(broker1, Optional.of(currentEpoch)));
+ Optional.of(updateOffsetEpoch), new
Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(currentEpoch)));
state.seekUnvalidated(tp0, updatePosition);
Optional<OffsetAndMetadata> divergentOffsetMetadataOpt =
state.maybeCompleteValidation(tp0, initialPosition,
@@ -526,7 +531,7 @@ public class SubscriptionStateTest {
int initialOffsetEpoch = 5;
SubscriptionState.FetchPosition initialPosition = new
SubscriptionState.FetchPosition(initialOffset,
- Optional.of(initialOffsetEpoch), new
Metadata.LeaderAndEpoch(broker1, Optional.of(currentEpoch)));
+ Optional.of(initialOffsetEpoch), new
Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(currentEpoch)));
state.seekUnvalidated(tp0, initialPosition);
assertTrue(state.awaitingValidation(tp0));
@@ -551,7 +556,7 @@ public class SubscriptionStateTest {
int divergentOffsetEpoch = 7;
SubscriptionState.FetchPosition initialPosition = new
SubscriptionState.FetchPosition(initialOffset,
- Optional.of(initialOffsetEpoch), new
Metadata.LeaderAndEpoch(broker1, Optional.of(currentEpoch)));
+ Optional.of(initialOffsetEpoch), new
Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(currentEpoch)));
state.seekUnvalidated(tp0, initialPosition);
assertTrue(state.awaitingValidation(tp0));
@@ -561,7 +566,7 @@ public class SubscriptionStateTest {
assertFalse(state.awaitingValidation(tp0));
SubscriptionState.FetchPosition updatedPosition = new
SubscriptionState.FetchPosition(divergentOffset,
- Optional.of(divergentOffsetEpoch), new
Metadata.LeaderAndEpoch(broker1, Optional.of(currentEpoch)));
+ Optional.of(divergentOffsetEpoch), new
Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(currentEpoch)));
assertEquals(updatedPosition, state.position(tp0));
}
@@ -578,7 +583,7 @@ public class SubscriptionStateTest {
int divergentOffsetEpoch = 7;
SubscriptionState.FetchPosition initialPosition = new
SubscriptionState.FetchPosition(initialOffset,
- Optional.of(initialOffsetEpoch), new
Metadata.LeaderAndEpoch(broker1, Optional.of(currentEpoch)));
+ Optional.of(initialOffsetEpoch), new
Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(currentEpoch)));
state.seekUnvalidated(tp0, initialPosition);
assertTrue(state.awaitingValidation(tp0));
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 8330e6a..487eabf 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -169,6 +169,8 @@ import java.util.Optional;
import java.util.Set;
import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
import static org.apache.kafka.common.protocol.ApiKeys.FETCH;
import static
org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
import static org.apache.kafka.test.TestUtils.toBuffer;
@@ -637,7 +639,7 @@ public class RequestResponseTest {
responseData.put(new TopicPartition("bar", 1), new
FetchResponse.PartitionData<>(Errors.NONE, 900000,
5, FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(),
null, records));
responseData.put(new TopicPartition("foo", 0), new
FetchResponse.PartitionData<>(Errors.NONE, 70000,
- 6, FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(),
Collections.emptyList(), records));
+ 6, FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(),
emptyList(), records));
FetchResponse<MemoryRecords> response = new
FetchResponse<>(Errors.NONE, responseData, 10, INVALID_SESSION_ID);
FetchResponse deserialized =
FetchResponse.parse(toBuffer(response.toStruct((short) 4)), (short) 4);
@@ -1186,19 +1188,21 @@ public class RequestResponseTest {
private MetadataResponse createMetadataResponse() {
Node node = new Node(1, "host1", 1001);
- List<Node> replicas = asList(node);
- List<Node> isr = asList(node);
- List<Node> offlineReplicas = asList();
+ List<Integer> replicas = singletonList(node.id());
+ List<Integer> isr = singletonList(node.id());
+ List<Integer> offlineReplicas = emptyList();
List<MetadataResponse.TopicMetadata> allTopicMetadata = new
ArrayList<>();
allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE,
"__consumer_offsets", true,
- asList(new MetadataResponse.PartitionMetadata(Errors.NONE, 1,
node,
- Optional.of(5), replicas, isr, offlineReplicas))));
+ asList(new MetadataResponse.PartitionMetadata(Errors.NONE,
+ new TopicPartition("__consumer_offsets", 1),
+ Optional.of(node.id()), Optional.of(5), replicas, isr,
offlineReplicas))));
allTopicMetadata.add(new
MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, "topic2", false,
- Collections.emptyList()));
+ emptyList()));
allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE,
"topic3", false,
- asList(new
MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE, 0, null,
- Optional.empty(), replicas, isr, offlineReplicas))));
+ asList(new
MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE,
+ new TopicPartition("topic3", 0), Optional.empty(),
+ Optional.empty(), replicas, isr, offlineReplicas))));
return MetadataResponse.prepareResponse(asList(node), null,
MetadataResponse.NO_CONTROLLER_ID, allTopicMetadata);
}
@@ -1808,13 +1812,13 @@ public class RequestResponseTest {
Map<ConfigResource, Collection<String>> resources = new HashMap<>();
resources.put(new ConfigResource(ConfigResource.Type.BROKER, "0"),
asList("foo", "bar"));
resources.put(new ConfigResource(ConfigResource.Type.TOPIC, "topic"),
null);
- resources.put(new ConfigResource(ConfigResource.Type.TOPIC, "topic
a"), Collections.emptyList());
+ resources.put(new ConfigResource(ConfigResource.Type.TOPIC, "topic
a"), emptyList());
return new DescribeConfigsRequest.Builder(resources).build((short)
version);
}
private DescribeConfigsResponse createDescribeConfigsResponse() {
Map<ConfigResource, DescribeConfigsResponse.Config> configs = new
HashMap<>();
- List<DescribeConfigsResponse.ConfigSynonym> synonyms =
Collections.emptyList();
+ List<DescribeConfigsResponse.ConfigSynonym> synonyms = emptyList();
List<DescribeConfigsResponse.ConfigEntry> configEntries = asList(
new DescribeConfigsResponse.ConfigEntry("config_name",
"config_value",
DescribeConfigsResponse.ConfigSource.DYNAMIC_BROKER_CONFIG, true, false,
synonyms),
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 5782b6d..ad2ad99 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -159,9 +159,10 @@ public class TestUtils {
for (int i = 0; i < numPartitions; i++) {
TopicPartition tp = new TopicPartition(topic, i);
Node leader = nodes.get(i % nodes.size());
- List<Node> replicas = Collections.singletonList(leader);
+ List<Integer> replicaIds =
Collections.singletonList(leader.id());
partitionMetadata.add(partitionSupplier.supply(
- Errors.NONE, i, leader,
Optional.ofNullable(epochSupplier.apply(tp)), replicas, replicas, replicas));
+ Errors.NONE, tp, Optional.of(leader.id()),
Optional.ofNullable(epochSupplier.apply(tp)),
+ replicaIds, replicaIds, replicaIds));
}
topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE,
topic,
@@ -180,12 +181,12 @@ public class TestUtils {
@FunctionalInterface
public interface PartitionMetadataSupplier {
MetadataResponse.PartitionMetadata supply(Errors error,
- int partition,
- Node leader,
+ TopicPartition partition,
+ Optional<Integer> leaderId,
Optional<Integer> leaderEpoch,
- List<Node> replicas,
- List<Node> isr,
- List<Node> offlineReplicas);
+ List<Integer> replicas,
+ List<Integer> isr,
+ List<Integer> offlineReplicas);
}
public static Cluster clusterWith(final int nodes, final String topic,
final int partitions) {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index e3adb04..8a14cd7 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1290,11 +1290,13 @@ class KafkaApis(val requestChannel: RequestChannel,
} else {
val coordinatorEndpoint = topicMetadata.partitionMetadata.asScala
.find(_.partition == partition)
- .map(_.leader)
- .flatMap(p => Option(p))
+ .filter(_.leaderId.isPresent)
+ .flatMap(metadata =>
metadataCache.getAliveBroker(metadata.leaderId.get))
+ .flatMap(_.getNode(request.context.listenerName))
+ .filterNot(_.isEmpty)
coordinatorEndpoint match {
- case Some(endpoint) if !endpoint.isEmpty =>
+ case Some(endpoint) =>
createFindCoordinatorResponse(Errors.NONE, endpoint)
case _ =>
createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE,
Node.noNode)
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala
b/core/src/main/scala/kafka/server/MetadataCache.scala
index 1ad0e41..10c25b5 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -17,6 +17,7 @@
package kafka.server
+import java.util
import java.util.{Collections, Optional}
import java.util.concurrent.locks.ReentrantReadWriteLock
@@ -55,18 +56,22 @@ class MetadataCache(brokerId: Int) extends Logging {
// This method is the main hotspot when it comes to the performance of
metadata requests,
// we should be careful about adding additional logic here. Relatedly,
`brokers` is
- // `Iterable[Integer]` instead of `Iterable[Int]` to avoid a collection copy.
+ // `List[Integer]` instead of `List[Int]` to avoid a collection copy.
// filterUnavailableEndpoints exists to support v0 MetadataResponses
- private def getEndpoints(snapshot: MetadataSnapshot, brokers:
Iterable[java.lang.Integer], listenerName: ListenerName,
filterUnavailableEndpoints: Boolean): Seq[Node] = {
- val result = new
mutable.ArrayBuffer[Node](math.min(snapshot.aliveBrokers.size, brokers.size))
- brokers.foreach { brokerId =>
- val endpoint = getAliveEndpoint(snapshot, brokerId, listenerName) match {
- case None => if (!filterUnavailableEndpoints) Some(new Node(brokerId,
"", -1)) else None
- case Some(node) => Some(node)
+ private def maybeFilterAliveReplicas(snapshot: MetadataSnapshot,
+ brokers: java.util.List[Integer],
+ listenerName: ListenerName,
+ filterUnavailableEndpoints: Boolean):
java.util.List[Integer] = {
+ if (!filterUnavailableEndpoints) {
+ brokers
+ } else {
+ val res = new
util.ArrayList[Integer](math.min(snapshot.aliveBrokers.size, brokers.size))
+ for (brokerId <- brokers.asScala) {
+ if (hasAliveEndpoint(snapshot, brokerId, listenerName))
+ res.add(brokerId)
}
- endpoint.foreach(result +=)
+ res
}
- result
}
// errorUnavailableEndpoints exists to support v0 MetadataResponses
@@ -80,53 +85,71 @@ class MetadataCache(brokerId: Int) extends Logging {
val leaderBrokerId = partitionState.leader
val leaderEpoch = partitionState.leaderEpoch
val maybeLeader = getAliveEndpoint(snapshot, leaderBrokerId,
listenerName)
- val replicas = partitionState.replicas.asScala
- val replicaInfo = getEndpoints(snapshot, replicas, listenerName,
errorUnavailableEndpoints)
- val offlineReplicaInfo = getEndpoints(snapshot,
partitionState.offlineReplicas.asScala, listenerName, errorUnavailableEndpoints)
- val isr = partitionState.isr.asScala
- val isrInfo = getEndpoints(snapshot, isr, listenerName,
errorUnavailableEndpoints)
+ val replicas = partitionState.replicas
+ val filteredReplicas = maybeFilterAliveReplicas(snapshot, replicas,
listenerName, errorUnavailableEndpoints)
+
+ val isr = partitionState.isr
+ val filteredIsr = maybeFilterAliveReplicas(snapshot, isr,
listenerName, errorUnavailableEndpoints)
+
+ val offlineReplicas = partitionState.offlineReplicas
+
maybeLeader match {
case None =>
val error = if (!snapshot.aliveBrokers.contains(brokerId)) { // we
are already holding the read lock
debug(s"Error while fetching metadata for $topicPartition:
leader not available")
Errors.LEADER_NOT_AVAILABLE
} else {
- debug(s"Error while fetching metadata for $topicPartition:
listener $listenerName not found on leader $leaderBrokerId")
+ debug(s"Error while fetching metadata for $topicPartition:
listener $listenerName " +
+ s"not found on leader $leaderBrokerId")
if (errorUnavailableListeners) Errors.LISTENER_NOT_FOUND else
Errors.LEADER_NOT_AVAILABLE
}
- new MetadataResponse.PartitionMetadata(error, partitionId.toInt,
Node.noNode(),
- Optional.empty(), replicaInfo.asJava, isrInfo.asJava,
- offlineReplicaInfo.asJava)
+ new MetadataResponse.PartitionMetadata(error, topicPartition,
Optional.empty(),
+ Optional.of(leaderEpoch), filteredReplicas, filteredIsr,
offlineReplicas)
case Some(leader) =>
- if (replicaInfo.size < replicas.size) {
+ if (filteredReplicas.size < replicas.size) {
debug(s"Error while fetching metadata for $topicPartition:
replica information not available for " +
- s"following brokers
${replicas.filterNot(replicaInfo.map(_.id).contains).mkString(",")}")
-
- new
MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE,
partitionId.toInt, leader,
- Optional.empty(), replicaInfo.asJava, isrInfo.asJava,
offlineReplicaInfo.asJava)
- } else if (isrInfo.size < isr.size) {
+ s"following brokers
${replicas.asScala.filterNot(filteredReplicas.contains).mkString(",")}")
+ new
MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, topicPartition,
+ Optional.of(leader.id), Optional.of(leaderEpoch),
filteredReplicas, filteredIsr, offlineReplicas)
+ } else if (filteredIsr.size < isr.size) {
debug(s"Error while fetching metadata for $topicPartition: in
sync replica information not available for " +
- s"following brokers
${isr.filterNot(isrInfo.map(_.id).contains).mkString(",")}")
- new
MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE,
partitionId.toInt, leader,
- Optional.empty(), replicaInfo.asJava, isrInfo.asJava,
offlineReplicaInfo.asJava)
+ s"following brokers
${isr.asScala.filterNot(filteredIsr.contains).mkString(",")}")
+ new
MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, topicPartition,
+ Optional.of(leader.id), Optional.of(leaderEpoch),
filteredReplicas, filteredIsr, offlineReplicas)
} else {
- new MetadataResponse.PartitionMetadata(Errors.NONE,
partitionId.toInt, leader, Optional.of(leaderEpoch),
- replicaInfo.asJava, isrInfo.asJava, offlineReplicaInfo.asJava)
+ new MetadataResponse.PartitionMetadata(Errors.NONE,
topicPartition,
+ Optional.of(leader.id), Optional.of(leaderEpoch),
filteredReplicas, filteredIsr, offlineReplicas)
}
}
}
}
}
- private def getAliveEndpoint(snapshot: MetadataSnapshot, brokerId: Int,
listenerName: ListenerName): Option[Node] =
- // Returns None if broker is not alive or if the broker does not have a
listener named `listenerName`.
- // Since listeners can be added dynamically, a broker with a missing
listener could be a transient error.
+ /**
+ * Check whether a broker is alive and has a registered listener matching
the provided name.
+ * This method was added to avoid unnecessary allocations in
[[maybeFilterAliveReplicas]], which is
+ * a hotspot in metadata handling.
+ */
+ private def hasAliveEndpoint(snapshot: MetadataSnapshot, brokerId: Int,
listenerName: ListenerName): Boolean = {
+ snapshot.aliveNodes.get(brokerId).exists(_.contains(listenerName))
+ }
+
+ /**
+ * Get the endpoint matching the provided listener if the broker is alive.
Note that listeners can
+ * be added dynamically, so a broker with a missing listener could be a
transient error.
+ *
+ * @return None if broker is not alive or if the broker does not have a
listener named `listenerName`.
+ */
+ private def getAliveEndpoint(snapshot: MetadataSnapshot, brokerId: Int,
listenerName: ListenerName): Option[Node] = {
snapshot.aliveNodes.get(brokerId).flatMap(_.get(listenerName))
+ }
// errorUnavailableEndpoints exists to support v0 MetadataResponses
- def getTopicMetadata(topics: Set[String], listenerName: ListenerName,
errorUnavailableEndpoints: Boolean = false,
+ def getTopicMetadata(topics: Set[String],
+ listenerName: ListenerName,
+ errorUnavailableEndpoints: Boolean = false,
errorUnavailableListeners: Boolean = false):
Seq[MetadataResponse.TopicMetadata] = {
val snapshot = metadataSnapshot
topics.toSeq.flatMap { topic =>
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index efd721d..cdb4c0a 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -17,6 +17,8 @@
package kafka.admin
+import java.util.Optional
+
import kafka.controller.ReplicaAssignment
import kafka.server.BaseRequestTest
import kafka.utils.TestUtils
@@ -101,9 +103,14 @@ class AddPartitionsTest extends BaseRequestTest {
assertEquals(partitions.size, 3)
assertEquals(1, partitions(1).partition)
assertEquals(2, partitions(2).partition)
- val replicas = partitions(1).replicas
- assertEquals(replicas.size, 2)
- assertTrue(replicas.contains(partitions(1).leader))
+
+ for (partition <- partitions) {
+ val replicas = partition.replicaIds
+ assertEquals(2, replicas.size)
+ assertTrue(partition.leaderId.isPresent)
+ val leaderId = partition.leaderId.get
+ assertTrue(replicas.contains(leaderId))
+ }
}
@Test
@@ -131,10 +138,9 @@ class AddPartitionsTest extends BaseRequestTest {
assertEquals(0, partitionMetadata(0).partition)
assertEquals(1, partitionMetadata(1).partition)
assertEquals(2, partitionMetadata(2).partition)
- val replicas = partitionMetadata(1).replicas
+ val replicas = partitionMetadata(1).replicaIds
assertEquals(2, replicas.size)
- assertTrue(replicas.asScala.head.id == 0 || replicas.asScala.head.id == 1)
- assertTrue(replicas.asScala(1).id == 0 || replicas.asScala(1).id == 1)
+ assertEquals(Set(0, 1), replicas.asScala.toSet)
}
@Test
@@ -185,9 +191,8 @@ class AddPartitionsTest extends BaseRequestTest {
assertTrue(s"Partition $partitionId should exist", partitionOpt.isDefined)
val partition = partitionOpt.get
- assertNotNull("Partition leader should exist", partition.leader)
- assertEquals("Partition leader id should match", expectedLeaderId,
partition.leaderId)
- assertEquals("Replica set should match", expectedReplicas,
partition.replicas.asScala.map(_.id).toSet)
+ assertEquals("Partition leader id should match",
Optional.of(expectedLeaderId), partition.leaderId)
+ assertEquals("Replica set should match", expectedReplicas,
partition.replicaIds.asScala.toSet)
}
}
diff --git
a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
index b110139..5281d80 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
@@ -584,7 +584,7 @@ class TopicCommandWithAdminClientTest extends
KafkaServerTestHarness with Loggin
testPartitionMetadata match {
case None => fail(s"Partition metadata is not found in
metadata cache")
case Some(metadata) => {
- result && metadata.error() == Errors.LEADER_NOT_AVAILABLE
+ result && metadata.error == Errors.LEADER_NOT_AVAILABLE
}
}
}
diff --git
a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
index 5f1f671..8ca2c70 100644
---
a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
@@ -131,9 +131,9 @@ abstract class AbstractCreateTopicsRequestTest extends
BaseRequestTest {
if (replication == -1) {
assertEquals("The topic should have the default replication
factor",
- configs.head.defaultReplicationFactor,
metadataForTopic.partitionMetadata.asScala.head.replicas.size)
+ configs.head.defaultReplicationFactor,
metadataForTopic.partitionMetadata.asScala.head.replicaIds.size)
} else {
- assertEquals("The topic should have the correct replication
factor", replication,
metadataForTopic.partitionMetadata.asScala.head.replicas.size)
+ assertEquals("The topic should have the correct replication
factor", replication,
metadataForTopic.partitionMetadata.asScala.head.replicaIds.size)
}
}
}
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index a42a86a..60cff13 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -128,16 +128,12 @@ class MetadataCacheTest {
partitionMetadatas.zipWithIndex.foreach { case (partitionMetadata,
partitionId) =>
assertEquals(Errors.NONE, partitionMetadata.error)
assertEquals(partitionId, partitionMetadata.partition)
- val leader = partitionMetadata.leader
val partitionState = topicPartitionStates.find(_.partitionIndex ==
partitionId).getOrElse(
Assertions.fail(s"Unable to find partition state for partition
$partitionId"))
- assertEquals(partitionState.leader, leader.id)
+ assertEquals(Optional.of(partitionState.leader),
partitionMetadata.leaderId)
assertEquals(Optional.of(partitionState.leaderEpoch),
partitionMetadata.leaderEpoch)
- assertEquals(partitionState.isr,
partitionMetadata.isr.asScala.map(_.id).asJava)
- assertEquals(partitionState.replicas,
partitionMetadata.replicas.asScala.map(_.id).asJava)
- val endpoint =
endpoints(partitionMetadata.leader.id).find(_.listener ==
listenerName.value).get
- assertEquals(endpoint.host, leader.host)
- assertEquals(endpoint.port, leader.port)
+ assertEquals(partitionState.isr, partitionMetadata.inSyncReplicaIds)
+ assertEquals(partitionState.replicas, partitionMetadata.replicaIds)
}
}
@@ -267,9 +263,8 @@ class MetadataCacheTest {
val partitionMetadata = partitionMetadatas.get(0)
assertEquals(0, partitionMetadata.partition)
assertEquals(expectedError, partitionMetadata.error)
- assertFalse(partitionMetadata.isr.isEmpty)
- assertEquals(1, partitionMetadata.replicas.size)
- assertEquals(0, partitionMetadata.replicas.get(0).id)
+ assertFalse(partitionMetadata.inSyncReplicaIds.isEmpty)
+ assertEquals(List(0), partitionMetadata.replicaIds.asScala)
}
@Test
@@ -326,8 +321,8 @@ class MetadataCacheTest {
val partitionMetadata = partitionMetadatas.get(0)
assertEquals(0, partitionMetadata.partition)
assertEquals(Errors.NONE, partitionMetadata.error)
- assertEquals(Set(0, 1), partitionMetadata.replicas.asScala.map(_.id).toSet)
- assertEquals(Set(0), partitionMetadata.isr.asScala.map(_.id).toSet)
+ assertEquals(Set(0, 1), partitionMetadata.replicaIds.asScala.toSet)
+ assertEquals(Set(0), partitionMetadata.inSyncReplicaIds.asScala.toSet)
// Validate errorUnavailableEndpoints = true
val topicMetadatasWithError = cache.getTopicMetadata(Set(topic),
listenerName, errorUnavailableEndpoints = true)
@@ -342,8 +337,8 @@ class MetadataCacheTest {
val partitionMetadataWithError = partitionMetadatasWithError.get(0)
assertEquals(0, partitionMetadataWithError.partition)
assertEquals(Errors.REPLICA_NOT_AVAILABLE,
partitionMetadataWithError.error)
- assertEquals(Set(0),
partitionMetadataWithError.replicas.asScala.map(_.id).toSet)
- assertEquals(Set(0),
partitionMetadataWithError.isr.asScala.map(_.id).toSet)
+ assertEquals(Set(0), partitionMetadataWithError.replicaIds.asScala.toSet)
+ assertEquals(Set(0),
partitionMetadataWithError.inSyncReplicaIds.asScala.toSet)
}
@Test
@@ -400,8 +395,8 @@ class MetadataCacheTest {
val partitionMetadata = partitionMetadatas.get(0)
assertEquals(0, partitionMetadata.partition)
assertEquals(Errors.NONE, partitionMetadata.error)
- assertEquals(Set(0), partitionMetadata.replicas.asScala.map(_.id).toSet)
- assertEquals(Set(0, 1), partitionMetadata.isr.asScala.map(_.id).toSet)
+ assertEquals(Set(0), partitionMetadata.replicaIds.asScala.toSet)
+ assertEquals(Set(0, 1), partitionMetadata.inSyncReplicaIds.asScala.toSet)
// Validate errorUnavailableEndpoints = true
val topicMetadatasWithError = cache.getTopicMetadata(Set(topic),
listenerName, errorUnavailableEndpoints = true)
@@ -416,8 +411,8 @@ class MetadataCacheTest {
val partitionMetadataWithError = partitionMetadatasWithError.get(0)
assertEquals(0, partitionMetadataWithError.partition)
assertEquals(Errors.REPLICA_NOT_AVAILABLE,
partitionMetadataWithError.error)
- assertEquals(Set(0),
partitionMetadataWithError.replicas.asScala.map(_.id).toSet)
- assertEquals(Set(0),
partitionMetadataWithError.isr.asScala.map(_.id).toSet)
+ assertEquals(Set(0), partitionMetadataWithError.replicaIds.asScala.toSet)
+ assertEquals(Set(0),
partitionMetadataWithError.inSyncReplicaIds.asScala.toSet)
}
@Test
@@ -455,7 +450,7 @@ class MetadataCacheTest {
val topicMetadata = cache.getTopicMetadata(Set(topic),
ListenerName.forSecurityProtocol(SecurityProtocol.SSL))
assertEquals(1, topicMetadata.size)
assertEquals(1, topicMetadata.head.partitionMetadata.size)
- assertEquals(-1, topicMetadata.head.partitionMetadata.get(0).leaderId)
+ assertEquals(Optional.empty,
topicMetadata.head.partitionMetadata.get(0).leaderId)
}
@Test
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
index aa6a53f..599ae17 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
@@ -17,11 +17,10 @@
package kafka.server
-import java.util.Properties
+import java.util.{Optional, Properties}
import kafka.network.SocketServer
import kafka.utils.TestUtils
-import org.apache.kafka.common.Node
import org.apache.kafka.common.errors.UnsupportedVersionException
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.MetadataRequestData
@@ -202,8 +201,9 @@ class MetadataRequestTest extends BaseRequestTest {
assertEquals(1, topicMetadata1.partitionMetadata.size)
val partitionMetadata = topicMetadata1.partitionMetadata.asScala.head
assertEquals(0, partitionMetadata.partition)
- assertEquals(2, partitionMetadata.replicas.size)
- assertNotNull(partitionMetadata.leader)
+ assertEquals(2, partitionMetadata.replicaIds.size)
+ assertTrue(partitionMetadata.leaderId.isPresent)
+ assertTrue(partitionMetadata.leaderId.get >= 0)
}
@Test
@@ -243,9 +243,9 @@ class MetadataRequestTest extends BaseRequestTest {
assertEquals(Set(0, 1),
topicMetadata.partitionMetadata.asScala.map(_.partition).toSet)
topicMetadata.partitionMetadata.asScala.foreach { partitionMetadata =>
val assignment = replicaAssignment(partitionMetadata.partition)
- assertEquals(assignment, partitionMetadata.replicas.asScala.map(_.id))
- assertEquals(assignment, partitionMetadata.isr.asScala.map(_.id))
- assertEquals(assignment.head, partitionMetadata.leader.id)
+ assertEquals(assignment, partitionMetadata.replicaIds.asScala)
+ assertEquals(assignment, partitionMetadata.inSyncReplicaIds.asScala)
+ assertEquals(Optional.of(assignment.head), partitionMetadata.leaderId)
}
}
}
@@ -268,23 +268,22 @@ class MetadataRequestTest extends BaseRequestTest {
createTopic(replicaDownTopic, 1, replicaCount)
// Kill a replica node that is not the leader
- val metadataResponse = sendMetadataRequest(new
MetadataRequest.Builder(List(replicaDownTopic).asJava, true, 1.toShort).build())
+ val metadataResponse = sendMetadataRequest(new
MetadataRequest.Builder(List(replicaDownTopic).asJava, true).build())
val partitionMetadata =
metadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
val downNode = servers.find { server =>
val serverId = server.dataPlaneRequestProcessor.brokerId
- val leaderId = partitionMetadata.leader.id
- val replicaIds = partitionMetadata.replicas.asScala.map(_.id)
+ val leaderId = partitionMetadata.leaderId
+ val replicaIds = partitionMetadata.replicaIds.asScala
serverId != leaderId && replicaIds.contains(serverId)
}.get
downNode.shutdown()
TestUtils.waitUntilTrue(() => {
- val response = sendMetadataRequest(new
MetadataRequest.Builder(List(replicaDownTopic).asJava, true, 1.toShort).build())
- val metadata =
response.topicMetadata.asScala.head.partitionMetadata.asScala.head
- val replica = metadata.replicas.asScala.find(_.id ==
downNode.dataPlaneRequestProcessor.brokerId).get
- replica.host == "" & replica.port == -1
+ val response = sendMetadataRequest(new
MetadataRequest.Builder(List(replicaDownTopic).asJava, true).build())
+ !response.brokers.asScala.exists(_.id ==
downNode.dataPlaneRequestProcessor.brokerId)
}, "Replica was not found down", 5000)
+
// Validate version 0 still filters unavailable replicas and contains error
val v0MetadataResponse = sendMetadataRequest(new
MetadataRequest(requestData(List(replicaDownTopic), true), 0.toShort))
val v0BrokerIds = v0MetadataResponse.brokers().asScala.map(_.id).toSeq
@@ -293,37 +292,35 @@ class MetadataRequestTest extends BaseRequestTest {
assertTrue("Response should have one topic",
v0MetadataResponse.topicMetadata.size == 1)
val v0PartitionMetadata =
v0MetadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
assertTrue("PartitionMetadata should have an error",
v0PartitionMetadata.error == Errors.REPLICA_NOT_AVAILABLE)
- assertTrue(s"Response should have ${replicaCount - 1} replicas",
v0PartitionMetadata.replicas.size == replicaCount - 1)
+ assertTrue(s"Response should have ${replicaCount - 1} replicas",
v0PartitionMetadata.replicaIds.size == replicaCount - 1)
// Validate version 1 returns unavailable replicas with no error
- val v1MetadataResponse = sendMetadataRequest(new
MetadataRequest.Builder(List(replicaDownTopic).asJava, true, 1.toShort).build())
+ val v1MetadataResponse = sendMetadataRequest(new
MetadataRequest.Builder(List(replicaDownTopic).asJava, true).build(1))
val v1BrokerIds = v1MetadataResponse.brokers().asScala.map(_.id).toSeq
assertTrue("Response should have no errors",
v1MetadataResponse.errors.isEmpty)
assertFalse(s"The downed broker should not be in the brokers list",
v1BrokerIds.contains(downNode))
assertEquals("Response should have one topic", 1,
v1MetadataResponse.topicMetadata.size)
val v1PartitionMetadata =
v1MetadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
assertEquals("PartitionMetadata should have no errors", Errors.NONE,
v1PartitionMetadata.error)
- assertEquals(s"Response should have $replicaCount replicas", replicaCount,
v1PartitionMetadata.replicas.size)
+ assertEquals(s"Response should have $replicaCount replicas", replicaCount,
v1PartitionMetadata.replicaIds.size)
}
@Test
def testIsrAfterBrokerShutDownAndJoinsBack(): Unit = {
def checkIsr(servers: Seq[KafkaServer], topic: String): Unit = {
val activeBrokers = servers.filter(_.brokerState.currentState !=
NotRunning.state)
- val expectedIsr = activeBrokers.map { broker =>
- new Node(broker.config.brokerId, "localhost",
TestUtils.boundPort(broker), broker.config.rack.orNull)
- }.sortBy(_.id)
+ val expectedIsr = activeBrokers.map(_.config.brokerId).toSet
// Assert that topic metadata at new brokers is updated correctly
activeBrokers.foreach { broker =>
- var actualIsr: Seq[Node] = Seq.empty
+ var actualIsr = Set.empty[Int]
TestUtils.waitUntilTrue(() => {
val metadataResponse = sendMetadataRequest(new
MetadataRequest.Builder(Seq(topic).asJava, false).build,
Some(brokerSocketServer(broker.config.brokerId)))
val firstPartitionMetadata =
metadataResponse.topicMetadata.asScala.headOption.flatMap(_.partitionMetadata.asScala.headOption)
actualIsr = firstPartitionMetadata.map { partitionMetadata =>
- partitionMetadata.isr.asScala.sortBy(_.id)
- }.getOrElse(Seq.empty)
+ partitionMetadata.inSyncReplicaIds.asScala.map(Int.unbox).toSet
+ }.getOrElse(Set.empty)
expectedIsr == actualIsr
}, s"Topic metadata not updated correctly in broker $broker\n" +
s"Expected ISR: $expectedIsr \n" +