KAFKA-3306: Change metadata response to include required additional fiâ¦
â¦elds - Adds boolean type to the protocol - Allows protocol arrays to be null (optionally) - Adds support to ask for no topics in the metadata request - Adds new fields to the Metadata response protocol - Adds server code to handle new fields - Support no-topic metadata requests - Track controller id in the metadata cache - Check if a topic is considered internal - Included rack information if present - Include all replicas and ISRs, even if node is down - Adds test code to test new functionality independent of the client Author: Grant Henke <[email protected]> Reviewers: Gwen Shapira, Ismael Juma, Ashish Singh Closes #1095 from granthenke/metadata-changes Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/33d745e2 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/33d745e2 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/33d745e2 Branch: refs/heads/trunk Commit: 33d745e2dcfa7a9cac90af5594903330ad774cd2 Parents: 5b375d7 Author: Grant Henke <[email protected]> Authored: Tue Apr 26 17:03:18 2016 -0700 Committer: Gwen Shapira <[email protected]> Committed: Tue Apr 26 17:03:18 2016 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/clients/NetworkClient.java | 25 +-- .../kafka/clients/consumer/KafkaConsumer.java | 3 +- .../clients/consumer/internals/Fetcher.java | 17 +- .../main/java/org/apache/kafka/common/Node.java | 32 +++- .../apache/kafka/common/protocol/Protocol.java | 44 ++++- .../kafka/common/protocol/types/ArrayOf.java | 33 +++- .../kafka/common/protocol/types/Struct.java | 34 ++-- .../kafka/common/protocol/types/Type.java | 43 ++++- .../kafka/common/requests/MetadataRequest.java | 40 ++++- .../kafka/common/requests/MetadataResponse.java | 101 ++++++++++- .../clients/consumer/internals/FetcherTest.java | 11 +- .../types/ProtocolSerializationTest.java | 13 +- .../common/requests/RequestResponseTest.java | 21 ++- .../src/main/scala/kafka/admin/AdminUtils.scala | 4 +- .../main/scala/kafka/admin/TopicCommand.scala | 2 +- core/src/main/scala/kafka/cluster/Broker.scala | 4 +- core/src/main/scala/kafka/common/Topic.scala | 5 +- .../src/main/scala/kafka/server/KafkaApis.scala | 65 ++++--- .../main/scala/kafka/server/MetadataCache.scala | 39 +++-- .../scala/kafka/server/ReplicaManager.scala | 4 +- .../unit/kafka/server/BaseRequestTest.scala | 106 ++++++++++++ .../unit/kafka/server/MetadataCacheTest.scala | 46 ++++- .../unit/kafka/server/MetadataRequestTest.scala | 168 +++++++++++++++++++ 23 files changed, 732 insertions(+), 128 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index cc5dc6f..b134631 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -35,11 +35,9 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Random; -import java.util.Set; /** * A network client for asynchronous request/response network i/o. This is an internal class used to implement the @@ -53,7 +51,7 @@ public class NetworkClient implements KafkaClient { /* the selector used to perform network i/o */ private final Selectable selector; - + private final MetadataUpdater metadataUpdater; private final Random randOffset; @@ -78,7 +76,7 @@ public class NetworkClient implements KafkaClient { /* max time in ms for the producer to wait for acknowledgement from server*/ private final int requestTimeoutMs; - + private final Time time; public NetworkClient(Selectable selector, @@ -114,7 +112,7 @@ public class NetworkClient implements KafkaClient { int maxInFlightRequestsPerConnection, long reconnectBackoffMs, int socketSendBuffer, - int socketReceiveBuffer, + int socketReceiveBuffer, int requestTimeoutMs, Time time) { @@ -370,7 +368,7 @@ public class NetworkClient implements KafkaClient { found = node; } } - + return found; } @@ -546,7 +544,7 @@ public class NetworkClient implements KafkaClient { // if there is no node available to connect, back off refreshing metadata long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt), waitForMetadataFetch); - + if (metadataTimeout == 0) { // Beware that the behavior of this method and the computation of timeouts for poll() are // highly dependent on the behavior of leastLoadedNode. @@ -614,8 +612,7 @@ public class NetworkClient implements KafkaClient { /** * Create a metadata request for the given topics */ - private ClientRequest request(long now, String node, Set<String> topics) { - MetadataRequest metadata = new MetadataRequest(new ArrayList<>(topics)); + private ClientRequest request(long now, String node, MetadataRequest metadata) { RequestSend send = new RequestSend(node, nextRequestHeader(ApiKeys.METADATA), metadata.toStruct()); return new ClientRequest(now, true, send, null, true); } @@ -633,11 +630,15 @@ public class NetworkClient implements KafkaClient { String nodeConnectionId = node.idString(); if (canSendRequest(nodeConnectionId)) { - Set<String> topics = metadata.needMetadataForAllTopics() ? new HashSet<String>() : metadata.topics(); this.metadataFetchInProgress = true; - ClientRequest metadataRequest = request(now, nodeConnectionId, topics); + MetadataRequest metadataRequest; + if (metadata.needMetadataForAllTopics()) + metadataRequest = MetadataRequest.allTopics(); + else + metadataRequest = new MetadataRequest(new ArrayList<>(metadata.topics())); + ClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest); log.debug("Sending metadata request {} to node {}", metadataRequest, node.id()); - doSend(metadataRequest, now); + doSend(clientRequest, now); } else if (connectionStates.canConnect(nodeConnectionId, now)) { // we don't have a connection to this node right now, make one log.debug("Initialize connection to node {} for sending metadata request", node.id()); http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index d9b74e2..ad44d16 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -35,6 +35,7 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.common.network.ChannelBuilder; import org.apache.kafka.common.network.Selector; +import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.common.utils.SystemTime; @@ -1190,7 +1191,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { if (parts != null) return parts; - Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(Collections.singletonList(topic), requestTimeoutMs); + Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(new MetadataRequest(Collections.singletonList(topic)), requestTimeoutMs); return topicMetadata.get(topic); } finally { release(); http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 4985275..f6d3387 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -183,25 +183,26 @@ public class Fetcher<K, V> { * @return The map of topics with their partition information */ public Map<String, List<PartitionInfo>> getAllTopicMetadata(long timeout) { - return getTopicMetadata(null, timeout); + return getTopicMetadata(MetadataRequest.allTopics(), timeout); } /** * Get metadata for all topics present in Kafka cluster * - * @param topics The list of topics to fetch or null to fetch all + * @param request The MetadataRequest to send * @param timeout time for which getting topic metadata is attempted * @return The map of topics with their partition information */ - public Map<String, List<PartitionInfo>> getTopicMetadata(List<String> topics, long timeout) { - if (topics != null && topics.isEmpty()) + public Map<String, List<PartitionInfo>> getTopicMetadata(MetadataRequest request, long timeout) { + // Save the round trip if no topics are requested. + if (!request.isAllTopics() && request.topics().isEmpty()) return Collections.emptyMap(); long start = time.milliseconds(); long remaining = timeout; do { - RequestFuture<ClientResponse> future = sendMetadataRequest(topics); + RequestFuture<ClientResponse> future = sendMetadataRequest(request); client.poll(future, remaining); if (future.failed() && !future.isRetriable()) @@ -266,14 +267,12 @@ public class Fetcher<K, V> { * Send Metadata Request to least loaded node in Kafka cluster asynchronously * @return A future that indicates result of sent metadata request */ - private RequestFuture<ClientResponse> sendMetadataRequest(List<String> topics) { - if (topics == null) - topics = Collections.emptyList(); + private RequestFuture<ClientResponse> sendMetadataRequest(MetadataRequest request) { final Node node = client.leastLoadedNode(); if (node == null) return RequestFuture.noBrokersAvailable(); else - return client.send(node, ApiKeys.METADATA, new MetadataRequest(topics)); + return client.send(node, ApiKeys.METADATA, request); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/clients/src/main/java/org/apache/kafka/common/Node.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/Node.java b/clients/src/main/java/org/apache/kafka/common/Node.java index 6c3fd0b..f569ddd 100644 --- a/clients/src/main/java/org/apache/kafka/common/Node.java +++ b/clients/src/main/java/org/apache/kafka/common/Node.java @@ -3,9 +3,9 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. @@ -23,13 +23,19 @@ public class Node { private final String idString; private final String host; private final int port; + private final String rack; public Node(int id, String host, int port) { + this(id, host, port, null); + } + + public Node(int id, String host, int port, String rack) { super(); this.id = id; this.idString = Integer.toString(id); this.host = host; this.port = port; + this.rack = rack; } public static Node noNode() { @@ -74,6 +80,20 @@ public class Node { return port; } + /** + * True if this node has a defined rack + */ + public boolean hasRack() { + return rack != null; + } + + /** + * The rack for this node + */ + public String rack() { + return rack; + } + @Override public int hashCode() { final int prime = 31; @@ -81,6 +101,7 @@ public class Node { result = prime * result + ((host == null) ? 0 : host.hashCode()); result = prime * result + id; result = prime * result + port; + result = prime * result + ((rack == null) ? 0 : rack.hashCode()); return result; } @@ -102,12 +123,17 @@ public class Node { return false; if (port != other.port) return false; + if (rack == null) { + if (other.rack != null) + return false; + } else if (!rack.equals(other.rack)) + return false; return true; } @Override public String toString() { - return host + ":" + port + " (id: " + idString + ")"; + return host + ":" + port + " (id: " + idString + " rack: " + rack + ")"; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index bf76557..d322095 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -26,6 +26,7 @@ import java.util.LinkedHashSet; import java.util.Map; import java.util.Set; +import static org.apache.kafka.common.protocol.types.Type.BOOLEAN; import static org.apache.kafka.common.protocol.types.Type.BYTES; import static org.apache.kafka.common.protocol.types.Type.INT16; import static org.apache.kafka.common.protocol.types.Type.INT32; @@ -56,10 +57,13 @@ public class Protocol { new ArrayOf(STRING), "An array of topics to fetch metadata for. If no topics are specified fetch metadata for all topics.")); - public static final Schema BROKER = new Schema(new Field("node_id", INT32, "The broker id."), + public static final Schema METADATA_REQUEST_V1 = new Schema(new Field("topics", + ArrayOf.nullable(STRING), + "An array of topics to fetch metadata for. If the topics array is null fetch metadata for all topics.")); + + public static final Schema METADATA_BROKER_V0 = new Schema(new Field("node_id", INT32, "The broker id."), new Field("host", STRING, "The hostname of the broker."), - new Field("port", - INT32, + new Field("port", INT32, "The port on which the broker accepts requests.")); public static final Schema PARTITION_METADATA_V0 = new Schema(new Field("partition_error_code", @@ -87,13 +91,34 @@ public class Protocol { "Metadata for each partition of the topic.")); public static final Schema METADATA_RESPONSE_V0 = new Schema(new Field("brokers", - new ArrayOf(BROKER), + new ArrayOf(METADATA_BROKER_V0), "Host and port information for all brokers."), new Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V0))); - public static final Schema[] METADATA_REQUEST = new Schema[] {METADATA_REQUEST_V0}; - public static final Schema[] METADATA_RESPONSE = new Schema[] {METADATA_RESPONSE_V0}; + public static final Schema METADATA_BROKER_V1 = new Schema(new Field("node_id", INT32, "The broker id."), + new Field("host", STRING, "The hostname of the broker."), + new Field("port", INT32, + "The port on which the broker accepts requests."), + new Field("rack", NULLABLE_STRING, "The rack of the broker.")); + + public static final Schema PARTITION_METADATA_V1 = PARTITION_METADATA_V0; + + public static final Schema TOPIC_METADATA_V1 = new Schema(new Field("topic_error_code", INT16, "The error code for the given topic."), + new Field("topic", STRING, "The name of the topic"), + new Field("is_internal", BOOLEAN, + "Indicates if the topic is considered a Kafka internal topic"), + new Field("partition_metadata", new ArrayOf(PARTITION_METADATA_V1), + "Metadata for each partition of the topic.")); + + public static final Schema METADATA_RESPONSE_V1 = new Schema(new Field("brokers", new ArrayOf(METADATA_BROKER_V1), + "Host and port information for all brokers."), + new Field("controller_id", INT32, + "The broker id of the controller broker."), + new Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V1))); + + public static final Schema[] METADATA_REQUEST = new Schema[] {METADATA_REQUEST_V0, METADATA_REQUEST_V1}; + public static final Schema[] METADATA_RESPONSE = new Schema[] {METADATA_RESPONSE_V0, METADATA_RESPONSE_V1}; /* Produce api */ @@ -496,9 +521,14 @@ public class Protocol { STRING, "The unique group id.")); + public static final Schema GROUP_COORDINATOR_BROKER_V0 = new Schema(new Field("node_id", INT32, "The broker id."), + new Field("host", STRING, "The hostname of the broker."), + new Field("port", INT32, + "The port on which the broker accepts requests.")); + public static final Schema GROUP_COORDINATOR_RESPONSE_V0 = new Schema(new Field("error_code", INT16), new Field("coordinator", - BROKER, + GROUP_COORDINATOR_BROKER_V0, "Host and port information for the coordinator for a consumer group.")); public static final Schema[] GROUP_COORDINATOR_REQUEST = new Schema[] {GROUP_COORDINATOR_REQUEST_V0}; http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java index a08f876..207f108 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java @@ -24,13 +24,33 @@ import java.nio.ByteBuffer; public class ArrayOf extends Type { private final Type type; + private final boolean nullable; public ArrayOf(Type type) { + this(type, false); + } + + public static ArrayOf nullable(Type type) { + return new ArrayOf(type, true); + } + + private ArrayOf(Type type, boolean nullable) { this.type = type; + this.nullable = nullable; + } + + @Override + public boolean isNullable() { + return nullable; } @Override public void write(ByteBuffer buffer, Object o) { + if (o == null) { + buffer.putInt(-1); + return; + } + Object[] objs = (Object[]) o; int size = objs.length; buffer.putInt(size); @@ -41,8 +61,11 @@ public class ArrayOf extends Type { @Override public Object read(ByteBuffer buffer) { int size = buffer.getInt(); - if (size < 0) + if (size < 0 && isNullable()) + return null; + else if (size < 0) throw new SchemaException("Array size " + size + " cannot be negative"); + if (size > buffer.remaining()) throw new SchemaException("Error reading array of size " + size + ", only " + buffer.remaining() + " bytes available"); Object[] objs = new Object[size]; @@ -53,8 +76,11 @@ public class ArrayOf extends Type { @Override public int sizeOf(Object o) { - Object[] objs = (Object[]) o; int size = 4; + if (o == null) + return size; + + Object[] objs = (Object[]) o; for (int i = 0; i < objs.length; i++) size += type.sizeOf(objs[i]); return size; @@ -72,6 +98,9 @@ public class ArrayOf extends Type { @Override public Object[] validate(Object item) { try { + if (isNullable() && item == null) + return null; + Object[] array = (Object[]) item; for (int i = 0; i < array.length; i++) type.validate(array[i]); http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java index 79f0638..7eee09f 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java @@ -3,9 +3,9 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. @@ -41,7 +41,7 @@ public class Struct { /** * Return the value of the given pre-validated field, or if the value is missing return the default value. - * + * * @param field The field for which to get the default value * @throws SchemaException if the field has no value and has no default. */ @@ -59,7 +59,7 @@ public class Struct { /** * Get the value for the field directly by the field index with no lookup needed (faster!) - * + * * @param field The field to look up * @return The value for that field. * @throws SchemaException if the field has no value and has no default. @@ -71,7 +71,7 @@ public class Struct { /** * Get the record value for the field with the given name by doing a hash table lookup (slower!) - * + * * @param name The name of the field * @return The value in the record * @throws SchemaException If no such field exists @@ -148,6 +148,14 @@ public class Struct { return (String) get(name); } + public Boolean getBoolean(Field field) { + return (Boolean) get(field); + } + + public Boolean getBoolean(String name) { + return (Boolean) get(name); + } + public ByteBuffer getBytes(Field field) { Object result = get(field); if (result instanceof byte[]) @@ -164,7 +172,7 @@ public class Struct { /** * Set the given field to the specified value - * + * * @param field The field * @param value The value * @throws SchemaException If the validation of the field failed @@ -177,7 +185,7 @@ public class Struct { /** * Set the field specified by the given name to the value - * + * * @param name The name of the field * @param value The value to set * @throws SchemaException If the field is not known @@ -194,7 +202,7 @@ public class Struct { * Create a struct for the schema of a container type (struct or array). Note that for array type, this method * assumes that the type is an array of schema and creates a struct of that schema. Arrays of other types can't be * instantiated with this method. - * + * * @param field The field to create an instance of * @return The struct * @throws SchemaException If the given field is not a container type @@ -213,7 +221,7 @@ public class Struct { /** * Create a struct instance for the given field which must be a container type (struct or array) - * + * * @param field The name of the field to create (field must be a schema type) * @return The struct * @throws SchemaException If the given field is not a container type @@ -307,9 +315,11 @@ public class Struct { for (int i = 0; i < this.values.length; i++) { Field f = this.schema.get(i); if (f.type() instanceof ArrayOf) { - Object[] arrayObject = (Object []) this.get(f); - for (Object arrayItem: arrayObject) - result = prime * result + arrayItem.hashCode(); + if (this.get(f) != null) { + Object[] arrayObject = (Object []) this.get(f); + for (Object arrayItem: arrayObject) + result = prime * result + arrayItem.hashCode(); + } } else { Object field = this.get(f); if (field != null) { http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java index 92c1f7c..43b4a37 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java @@ -59,6 +59,47 @@ public abstract class Type { return false; } + /** + * The Boolean type represents a boolean value in a byte by using + * the value of 0 to represent false, and 1 to represent true. + * + * If for some reason a value that is not 0 or 1 is read, + * then any non-zero value will return true. + */ + public static final Type BOOLEAN = new Type() { + @Override + public void write(ByteBuffer buffer, Object o) { + if ((Boolean) o) + buffer.put((byte) 1); + else + buffer.put((byte) 0); + } + + @Override + public Object read(ByteBuffer buffer) { + byte value = buffer.get(); + return value != 0; + } + + @Override + public int sizeOf(Object o) { + return 1; + } + + @Override + public String toString() { + return "BOOLEAN"; + } + + @Override + public Boolean validate(Object item) { + if (item instanceof Boolean) + return (Boolean) item; + else + throw new SchemaException(item + " is not a Boolean."); + } + }; + public static final Type INT8 = new Type() { @Override public void write(ByteBuffer buffer, Object o) { @@ -196,7 +237,7 @@ public abstract class Type { throw new SchemaException("String length " + length + " cannot be negative"); if (length > buffer.remaining()) throw new SchemaException("Error reading string of length " + length + ", only " + buffer.remaining() + " bytes available"); - + byte[] bytes = new byte[length]; buffer.get(bytes); return Utils.utf8(bytes); http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java index 92d8c6d..f0cb8fc 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java @@ -25,24 +25,41 @@ import java.util.Collections; import java.util.List; public class MetadataRequest extends AbstractRequest { - + private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.METADATA.id); private static final String TOPICS_KEY_NAME = "topics"; + private static final MetadataRequest ALL_TOPICS_REQUEST = new MetadataRequest((List<String>) null); // Unusual cast to work around constructor ambiguity + private final List<String> topics; + public static MetadataRequest allTopics() { + return ALL_TOPICS_REQUEST; + } + + /** + * In v0 null is not allowed and and empty list indicates requesting all topics. + * In v1 null indicates requesting all topics, and an empty list indicates requesting no topics. + */ public MetadataRequest(List<String> topics) { super(new Struct(CURRENT_SCHEMA)); - struct.set(TOPICS_KEY_NAME, topics.toArray()); + if (topics == null) + struct.set(TOPICS_KEY_NAME, null); + else + struct.set(TOPICS_KEY_NAME, topics.toArray()); this.topics = topics; } public MetadataRequest(Struct struct) { super(struct); Object[] topicArray = struct.getArray(TOPICS_KEY_NAME); - topics = new ArrayList<>(); - for (Object topicObj: topicArray) { - topics.add((String) topicObj); + if (topicArray != null) { + topics = new ArrayList<>(); + for (Object topicObj: topicArray) { + topics.add((String) topicObj); + } + } else { + topics = null; } } @@ -52,18 +69,25 @@ public class MetadataRequest extends AbstractRequest { Errors error = Errors.forException(e); List<MetadataResponse.PartitionMetadata> partitions = Collections.emptyList(); - for (String topic : topics) - topicMetadatas.add(new MetadataResponse.TopicMetadata(error, topic, partitions)); + if (topics != null) { + for (String topic : topics) + topicMetadatas.add(new MetadataResponse.TopicMetadata(error, topic, false, partitions)); + } switch (versionId) { case 0: - return new MetadataResponse(Collections.<Node>emptyList(), topicMetadatas); + case 1: + return new MetadataResponse(Collections.<Node>emptyList(), MetadataResponse.NO_CONTROLLER_ID, topicMetadatas, versionId); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.METADATA.id))); } } + public boolean isAllTopics() { + return topics == null; + } + public List<String> topics() { return topics; } http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java index 13e0d8f..09a5bee 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java @@ -18,7 +18,6 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; @@ -32,7 +31,7 @@ import java.util.Set; public class MetadataResponse extends AbstractRequestResponse { - private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.METADATA.id); + private static final short CURRENT_VERSION = ProtoUtils.latestVersion(ApiKeys.METADATA.id); private static final String BROKERS_KEY_NAME = "brokers"; private static final String TOPIC_METADATA_KEY_NAME = "topic_metadata"; @@ -40,6 +39,10 @@ public class MetadataResponse extends AbstractRequestResponse { private static final String NODE_ID_KEY_NAME = "node_id"; private static final String HOST_KEY_NAME = "host"; private static final String PORT_KEY_NAME = "port"; + private static final String RACK_KEY_NAME = "rack"; + + private static final String CONTROLLER_ID_KEY_NAME = "controller_id"; + public static final int NO_CONTROLLER_ID = -1; // topic level field names private static final String TOPIC_ERROR_CODE_KEY_NAME = "topic_error_code"; @@ -54,6 +57,7 @@ public class MetadataResponse extends AbstractRequestResponse { */ private static final String TOPIC_KEY_NAME = "topic"; + private static final String IS_INTERNAL_KEY_NAME = "is_internal"; private static final String PARTITION_METADATA_KEY_NAME = "partition_metadata"; // partition level field names @@ -72,13 +76,24 @@ public class MetadataResponse extends AbstractRequestResponse { private static final String ISR_KEY_NAME = "isr"; private final Collection<Node> brokers; + private final Node controller; private final List<TopicMetadata> topicMetadata; + /** + * Constructor for the latest version + */ + public MetadataResponse(List<Node> brokers, int controllerId, List<TopicMetadata> topicMetadata) { + this(brokers, controllerId, topicMetadata, CURRENT_VERSION); + } - public MetadataResponse(List<Node> brokers, List<TopicMetadata> topicMetadata) { - super(new Struct(CURRENT_SCHEMA)); + /** + * Constructor for a specific version + */ + public MetadataResponse(List<Node> brokers, int controllerId, List<TopicMetadata> topicMetadata, int version) { + super(new Struct(ProtoUtils.responseSchema(ApiKeys.METADATA.id, version))); this.brokers = brokers; + this.controller = getControllerNode(controllerId, brokers); this.topicMetadata = topicMetadata; List<Struct> brokerArray = new ArrayList<>(); @@ -87,15 +102,25 @@ public class MetadataResponse extends AbstractRequestResponse { broker.set(NODE_ID_KEY_NAME, node.id()); broker.set(HOST_KEY_NAME, node.host()); broker.set(PORT_KEY_NAME, node.port()); + // This field only exists in v1+ + if (broker.hasField(RACK_KEY_NAME)) + broker.set(RACK_KEY_NAME, node.rack()); brokerArray.add(broker); } struct.set(BROKERS_KEY_NAME, brokerArray.toArray()); + // This field only exists in v1+ + if (struct.hasField(CONTROLLER_ID_KEY_NAME)) + struct.set(CONTROLLER_ID_KEY_NAME, controllerId); + List<Struct> topicMetadataArray = new ArrayList<>(topicMetadata.size()); for (TopicMetadata metadata : topicMetadata) { Struct topicData = struct.instance(TOPIC_METADATA_KEY_NAME); topicData.set(TOPIC_KEY_NAME, metadata.topic); topicData.set(TOPIC_ERROR_CODE_KEY_NAME, metadata.error.code()); + // This field only exists in v1+ + if (topicData.hasField(IS_INTERNAL_KEY_NAME)) + topicData.set(IS_INTERNAL_KEY_NAME, metadata.isInternal()); List<Struct> partitionMetadataArray = new ArrayList<>(metadata.partitionMetadata.size()); for (PartitionMetadata partitionMetadata : metadata.partitionMetadata()) { @@ -130,15 +155,28 @@ public class MetadataResponse extends AbstractRequestResponse { int nodeId = broker.getInt(NODE_ID_KEY_NAME); String host = broker.getString(HOST_KEY_NAME); int port = broker.getInt(PORT_KEY_NAME); - brokers.put(nodeId, new Node(nodeId, host, port)); + // This field only exists in v1+ + // When we can't know if a rack exists in a v0 response we default to null + String rack = broker.hasField(RACK_KEY_NAME) ? broker.getString(RACK_KEY_NAME) : null; + brokers.put(nodeId, new Node(nodeId, host, port, rack)); } + // This field only exists in v1+ + // When we can't know the controller id in a v0 response we default to NO_CONTROLLER_ID + int controllerId = NO_CONTROLLER_ID; + if (struct.hasField(CONTROLLER_ID_KEY_NAME)) + controllerId = struct.getInt(CONTROLLER_ID_KEY_NAME); + List<TopicMetadata> topicMetadata = new ArrayList<>(); Object[] topicInfos = (Object[]) struct.get(TOPIC_METADATA_KEY_NAME); for (int i = 0; i < topicInfos.length; i++) { Struct topicInfo = (Struct) topicInfos[i]; Errors topicError = Errors.forCode(topicInfo.getShort(TOPIC_ERROR_CODE_KEY_NAME)); String topic = topicInfo.getString(TOPIC_KEY_NAME); + // This field only exists in v1+ + // When we can't know if a topic is internal or not in a v0 response we default to false + boolean isInternal = topicInfo.hasField(IS_INTERNAL_KEY_NAME) ? topicInfo.getBoolean(IS_INTERNAL_KEY_NAME) : false; + List<PartitionMetadata> partitionMetadata = new ArrayList<>(); Object[] partitionInfos = (Object[]) topicInfo.get(PARTITION_METADATA_KEY_NAME); @@ -149,23 +187,41 @@ public class MetadataResponse extends AbstractRequestResponse { int leader = partitionInfo.getInt(LEADER_KEY_NAME); Node leaderNode = leader == -1 ? null : brokers.get(leader); Object[] replicas = (Object[]) partitionInfo.get(REPLICAS_KEY_NAME); + List<Node> replicaNodes = new ArrayList<>(replicas.length); for (Object replicaNodeId : replicas) - replicaNodes.add(brokers.get(replicaNodeId)); + if (brokers.containsKey(replicaNodeId)) + replicaNodes.add(brokers.get(replicaNodeId)); + else + replicaNodes.add(new Node((int) replicaNodeId, "", -1)); + Object[] isr = (Object[]) partitionInfo.get(ISR_KEY_NAME); List<Node> isrNodes = new ArrayList<>(isr.length); for (Object isrNode : isr) - isrNodes.add(brokers.get(isrNode)); + if (brokers.containsKey(isrNode)) + isrNodes.add(brokers.get(isrNode)); + else + isrNodes.add(new Node((int) isrNode, "", -1)); + partitionMetadata.add(new PartitionMetadata(partitionError, partition, leaderNode, replicaNodes, isrNodes)); } - topicMetadata.add(new TopicMetadata(topicError, topic, partitionMetadata)); + topicMetadata.add(new TopicMetadata(topicError, topic, isInternal, partitionMetadata)); } this.brokers = brokers.values(); + this.controller = getControllerNode(controllerId, brokers.values()); this.topicMetadata = topicMetadata; } + private Node getControllerNode(int controllerId, Collection<Node> brokers) { + for (Node broker : brokers) { + if (broker.id() == controllerId) + return broker; + } + return null; + } + /** * Get a map of the topics which had metadata errors * @return the map @@ -211,20 +267,43 @@ public class MetadataResponse extends AbstractRequestResponse { return brokers; } + /** + * Get all topic metadata returned in the metadata response + * @return the topicMetadata + */ + public Collection<TopicMetadata> topicMetadata() { + return topicMetadata; + } + + /** + * The controller node returned in metadata response + * @return the controller node or null if it doesn't exist + */ + public Node controller() { + return controller; + } + public static MetadataResponse parse(ByteBuffer buffer) { - return new MetadataResponse(CURRENT_SCHEMA.read(buffer)); + return parse(buffer, CURRENT_VERSION); + } + + public static MetadataResponse parse(ByteBuffer buffer, int version) { + return new MetadataResponse(ProtoUtils.responseSchema(ApiKeys.METADATA.id, version).read(buffer)); } public static class TopicMetadata { private final Errors error; private final String topic; + private final boolean isInternal; private final List<PartitionMetadata> partitionMetadata; public TopicMetadata(Errors error, String topic, + boolean isInternal, List<PartitionMetadata> partitionMetadata) { this.error = error; this.topic = topic; + this.isInternal = isInternal; this.partitionMetadata = partitionMetadata; } @@ -236,6 +315,10 @@ public class MetadataResponse extends AbstractRequestResponse { return topic; } + public boolean isInternal() { + return isInternal; + } + public List<PartitionMetadata> partitionMetadata() { return partitionMetadata; } http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 9002e81..49bff10 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -44,6 +44,7 @@ import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.FetchResponse; import org.apache.kafka.common.requests.ListOffsetRequest; import org.apache.kafka.common.requests.ListOffsetResponse; +import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.utils.MockTime; @@ -478,14 +479,14 @@ public class FetcherTest { @Test(expected = InvalidTopicException.class) public void testGetTopicMetadataInvalidTopic() { client.prepareResponse(newMetadataResponse(topicName, Errors.INVALID_TOPIC_EXCEPTION).toStruct()); - fetcher.getTopicMetadata(Collections.singletonList(topicName), 5000L); + fetcher.getTopicMetadata(new MetadataRequest(Collections.singletonList(topicName)), 5000L); } @Test public void testGetTopicMetadataUnknownTopic() { client.prepareResponse(newMetadataResponse(topicName, Errors.UNKNOWN_TOPIC_OR_PARTITION).toStruct()); - Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(Collections.singletonList(topicName), 5000L); + Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(new MetadataRequest(Collections.singletonList(topicName)), 5000L); assertNull(topicMetadata.get(topicName)); } @@ -494,7 +495,7 @@ public class FetcherTest { client.prepareResponse(newMetadataResponse(topicName, Errors.LEADER_NOT_AVAILABLE).toStruct()); client.prepareResponse(newMetadataResponse(topicName, Errors.NONE).toStruct()); - Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(Collections.singletonList(topicName), 5000L); + Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(new MetadataRequest(Collections.singletonList(topicName)), 5000L); assertTrue(topicMetadata.containsKey(topicName)); } @@ -570,8 +571,8 @@ public class FetcherTest { } } - MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(error, topic, partitionsMetadata); - return new MetadataResponse(cluster.nodes(), Arrays.asList(topicMetadata)); + MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(error, topic, false, partitionsMetadata); + return new MetadataResponse(cluster.nodes(), MetadataResponse.NO_CONTROLLER_ID, Arrays.asList(topicMetadata)); } private Fetcher<byte[], byte[]> createFetcher(int maxPollRecords, http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java index 5c34277..e91b2fb 100644 --- a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java +++ b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java @@ -33,7 +33,8 @@ public class ProtocolSerializationTest { @Before public void setup() { - this.schema = new Schema(new Field("int8", Type.INT8), + this.schema = new Schema(new Field("boolean", Type.BOOLEAN), + new Field("int8", Type.INT8), new Field("int16", Type.INT16), new Field("int32", Type.INT32), new Field("int64", Type.INT64), @@ -42,8 +43,10 @@ public class ProtocolSerializationTest { new Field("bytes", Type.BYTES), new Field("nullable_bytes", Type.NULLABLE_BYTES), new Field("array", new ArrayOf(Type.INT32)), + new Field("null_array", ArrayOf.nullable(Type.INT32)), new Field("struct", new Schema(new Field("field", new ArrayOf(Type.INT32))))); - this.struct = new Struct(this.schema).set("int8", (byte) 1) + this.struct = new Struct(this.schema).set("boolean", true) + .set("int8", (byte) 1) .set("int16", (short) 1) .set("int32", 1) .set("int64", 1L) @@ -51,12 +54,15 @@ public class ProtocolSerializationTest { .set("nullable_string", null) .set("bytes", ByteBuffer.wrap("1".getBytes())) .set("nullable_bytes", null) - .set("array", new Object[] {1}); + .set("array", new Object[] {1}) + .set("null_array", null); this.struct.set("struct", this.struct.instance("struct").set("field", new Object[] {1, 2, 3})); } @Test public void testSimple() { + check(Type.BOOLEAN, false); + check(Type.BOOLEAN, true); check(Type.INT8, (byte) -111); check(Type.INT16, (short) -11111); check(Type.INT32, -11111111); @@ -75,6 +81,7 @@ public class ProtocolSerializationTest { check(new ArrayOf(Type.INT32), new Object[] {1, 2, 3, 4}); check(new ArrayOf(Type.STRING), new Object[] {}); check(new ArrayOf(Type.STRING), new Object[] {"hello", "there", "beautiful"}); + check(ArrayOf.nullable(Type.STRING), null); } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 92f3101..0018f53 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -70,9 +70,10 @@ public class RequestResponseTest { createListOffsetRequest(), createListOffsetRequest().getErrorResponse(0, new UnknownServerException()), createListOffsetResponse(), - createMetadataRequest(), - createMetadataRequest().getErrorResponse(0, new UnknownServerException()), - createMetadataResponse(), + MetadataRequest.allTopics(), + createMetadataRequest(Arrays.asList("topic1")), + createMetadataRequest(Arrays.asList("topic1")).getErrorResponse(1, new UnknownServerException()), + createMetadataResponse(1), createOffsetCommitRequest(2), createOffsetCommitRequest(2).getErrorResponse(2, new UnknownServerException()), createOffsetCommitResponse(), @@ -100,6 +101,8 @@ public class RequestResponseTest { for (AbstractRequestResponse req : requestResponseList) checkSerialization(req, null); + createMetadataResponse(0); + createMetadataRequest(Arrays.asList("topic1")).getErrorResponse(0, new UnknownServerException()); checkSerialization(createFetchRequest().getErrorResponse(0, new UnknownServerException()), 0); checkSerialization(createOffsetCommitRequest(0), 0); checkSerialization(createOffsetCommitRequest(0).getErrorResponse(0, new UnknownServerException()), 0); @@ -281,22 +284,22 @@ public class RequestResponseTest { return new ListOffsetResponse(responseData); } - private AbstractRequest createMetadataRequest() { - return new MetadataRequest(Arrays.asList("topic1")); + private AbstractRequest createMetadataRequest(List<String> topics) { + return new MetadataRequest(topics); } - private AbstractRequestResponse createMetadataResponse() { + private AbstractRequestResponse createMetadataResponse(int version) { Node node = new Node(1, "host1", 1001); List<Node> replicas = Arrays.asList(node); List<Node> isr = Arrays.asList(node); List<MetadataResponse.TopicMetadata> allTopicMetadata = new ArrayList<>(); - allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, "topic1", + allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, "__consumer_offsets", true, Arrays.asList(new MetadataResponse.PartitionMetadata(Errors.NONE, 1, node, replicas, isr)))); - allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, "topic2", + allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, "topic2", false, Collections.<MetadataResponse.PartitionMetadata>emptyList())); - return new MetadataResponse(Arrays.asList(node), allTopicMetadata); + return new MetadataResponse(Arrays.asList(node), MetadataResponse.NO_CONTROLLER_ID, allTopicMetadata, version); } private AbstractRequest createOffsetCommitRequest(int version) { http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/core/src/main/scala/kafka/admin/AdminUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 24174be..a8a282e 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -605,10 +605,10 @@ object AdminUtils extends Logging { new MetadataResponse.PartitionMetadata(Errors.forException(e), partition, leaderInfo, replicaInfo.asJava, isrInfo.asJava) } } - new MetadataResponse.TopicMetadata(Errors.NONE, topic, partitionMetadata.toList.asJava) + new MetadataResponse.TopicMetadata(Errors.NONE, topic, Topic.isInternal(topic), partitionMetadata.asJava) } else { // topic doesn't exist, send appropriate error code - new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, java.util.Collections.emptyList()) + new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, Topic.isInternal(topic), java.util.Collections.emptyList()) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/core/src/main/scala/kafka/admin/TopicCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 232db4a..9f1014f 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -170,7 +170,7 @@ object TopicCommand extends Logging { } topics.foreach { topic => try { - if (TopicConstants.INTERNAL_TOPICS.contains(topic)) { + if (Topic.isInternal(topic)) { throw new AdminOperationException("Topic %s is a kafka internal topic and is not allowed to be marked for deletion.".format(topic)) } else { zkUtils.createPersistentPath(getDeleteTopicPath(topic)) http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/core/src/main/scala/kafka/cluster/Broker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala index 77b85e0..61290c1 100755 --- a/core/src/main/scala/kafka/cluster/Broker.scala +++ b/core/src/main/scala/kafka/cluster/Broker.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -123,7 +123,7 @@ case class Broker(id: Int, endPoints: collection.Map[SecurityProtocol, EndPoint] def getNode(protocolType: SecurityProtocol): Node = { val endpoint = endPoints.getOrElse(protocolType, throw new BrokerEndPointNotAvailableException(s"End point with security protocol $protocolType not found for broker $id")) - new Node(id, endpoint.host, endpoint.port) + new Node(id, endpoint.host, endpoint.port, rack.orNull) } def getBrokerEndPoint(protocolType: SecurityProtocol): BrokerEndPoint = { http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/core/src/main/scala/kafka/common/Topic.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/common/Topic.scala b/core/src/main/scala/kafka/common/Topic.scala index 6067712..054c5eb 100644 --- a/core/src/main/scala/kafka/common/Topic.scala +++ b/core/src/main/scala/kafka/common/Topic.scala @@ -18,7 +18,7 @@ package kafka.common import util.matching.Regex -import kafka.coordinator.GroupCoordinator +import org.apache.kafka.common.internals.TopicConstants.INTERNAL_TOPICS object Topic { val legalChars = "[a-zA-Z0-9\\._\\-]" @@ -62,4 +62,7 @@ object Topic { topicA.replace('.', '_') == topicB.replace('.', '_') } + def isInternal(topic: String): Boolean = + INTERNAL_TOPICS.contains(topic) + } http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 9afefa5..406b1bd 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -24,6 +24,7 @@ import java.util.Properties import kafka.admin.{RackAwareMode, AdminUtils} import kafka.api._ import kafka.cluster.Partition +import kafka.common import kafka.common._ import kafka.controller.KafkaController import kafka.coordinator.{GroupCoordinator, JoinGroupResult} @@ -631,12 +632,15 @@ class KafkaApis(val requestChannel: RequestChannel, AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe) info("Auto creation of topic %s with %d partitions and replication factor %d is successful" .format(topic, numPartitions, replicationFactor)) - new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, java.util.Collections.emptyList()) + new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, common.Topic.isInternal(topic), + java.util.Collections.emptyList()) } catch { case e: TopicExistsException => // let it go, possibly another broker created this topic - new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, java.util.Collections.emptyList()) + new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, common.Topic.isInternal(topic), + java.util.Collections.emptyList()) case itex: InvalidTopicException => - new MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION, topic, java.util.Collections.emptyList()) + new MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION, topic, common.Topic.isInternal(topic), + java.util.Collections.emptyList()) } } @@ -656,8 +660,8 @@ class KafkaApis(val requestChannel: RequestChannel, topicMetadata.headOption.getOrElse(createGroupMetadataTopic()) } - private def getTopicMetadata(topics: Set[String], securityProtocol: SecurityProtocol): Seq[MetadataResponse.TopicMetadata] = { - val topicResponses = metadataCache.getTopicMetadata(topics, securityProtocol) + private def getTopicMetadata(topics: Set[String], securityProtocol: SecurityProtocol, errorUnavailableEndpoints: Boolean): Seq[MetadataResponse.TopicMetadata] = { + val topicResponses = metadataCache.getTopicMetadata(topics, securityProtocol, errorUnavailableEndpoints) if (topics.isEmpty || topicResponses.size == topics.size) { topicResponses } else { @@ -668,7 +672,8 @@ class KafkaApis(val requestChannel: RequestChannel, } else if (config.autoCreateTopicsEnable) { createTopic(topic, config.numPartitions, config.defaultReplicationFactor) } else { - new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, java.util.Collections.emptyList()) + new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, common.Topic.isInternal(topic), + java.util.Collections.emptyList()) } } topicResponses ++ responsesForNonExistentTopics @@ -680,16 +685,24 @@ class KafkaApis(val requestChannel: RequestChannel, */ def handleTopicMetadataRequest(request: RequestChannel.Request) { val metadataRequest = request.body.asInstanceOf[MetadataRequest] + val requestVersion = request.header.apiVersion() - val topics = metadataRequest.topics.asScala.toSet - var (authorizedTopics, unauthorizedTopics) = if (metadataRequest.topics.isEmpty) { - //if topics is empty -> fetch all topics metadata but filter out the topic response that are not authorized - val authorized = metadataCache.getAllTopics() - .filter(topic => authorize(request.session, Describe, new Resource(Topic, topic))) - (authorized, mutable.Set[String]()) - } else { + val topics = + // Handle old metadata request logic. Version 0 has no way to specify "no topics". + if (requestVersion == 0) { + if (metadataRequest.topics() == null || metadataRequest.topics().isEmpty) + metadataCache.getAllTopics() + else + metadataRequest.topics.asScala.toSet + } else { + if (metadataRequest.isAllTopics) + metadataCache.getAllTopics() + else + metadataRequest.topics.asScala.toSet + } + + var (authorizedTopics, unauthorizedTopics) = topics.partition(topic => authorize(request.session, Describe, new Resource(Topic, topic))) - } if (authorizedTopics.nonEmpty) { val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics) @@ -704,22 +717,32 @@ class KafkaApis(val requestChannel: RequestChannel, } val unauthorizedTopicMetadata = unauthorizedTopics.map(topic => - new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, java.util.Collections.emptyList())) + new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, common.Topic.isInternal(topic), + java.util.Collections.emptyList())) + + // In version 0, we returned an error when brokers with replicas were unavailable, + // while in higher versions we simply don't include the broker in the returned broker list + val errorUnavailableEndpoints = requestVersion == 0 + val topicMetadata = + if (authorizedTopics.isEmpty) + Seq.empty[MetadataResponse.TopicMetadata] + else + getTopicMetadata(authorizedTopics, request.securityProtocol, errorUnavailableEndpoints) - val topicMetadata = if (authorizedTopics.isEmpty) - Seq.empty[MetadataResponse.TopicMetadata] - else - getTopicMetadata(authorizedTopics, request.securityProtocol) + val completeTopicMetadata = topicMetadata ++ unauthorizedTopicMetadata val brokers = metadataCache.getAliveBrokers - trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(topicMetadata.mkString(","), + trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(completeTopicMetadata.mkString(","), brokers.mkString(","), request.header.correlationId, request.header.clientId)) val responseHeader = new ResponseHeader(request.header.correlationId) + val responseBody = new MetadataResponse( brokers.map(_.getNode(request.securityProtocol)).asJava, - (topicMetadata ++ unauthorizedTopicMetadata).asJava + metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID), + completeTopicMetadata.asJava, + requestVersion ) requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody))) } http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/core/src/main/scala/kafka/server/MetadataCache.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index 06fae42..b387f2e 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -24,11 +24,11 @@ import scala.collection.{Seq, Set, mutable} import scala.collection.JavaConverters._ import kafka.cluster.{Broker, EndPoint} import kafka.api._ -import kafka.common.{BrokerEndPointNotAvailableException, TopicAndPartition} +import kafka.common.{BrokerEndPointNotAvailableException, Topic, TopicAndPartition} import kafka.controller.{KafkaController, LeaderIsrAndControllerEpoch} import kafka.utils.CoreUtils._ import kafka.utils.Logging -import org.apache.kafka.common.{Node, TopicPartition} +import org.apache.kafka.common.Node import org.apache.kafka.common.protocol.{Errors, SecurityProtocol} import org.apache.kafka.common.requests.UpdateMetadataRequest.PartitionState import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest} @@ -40,16 +40,24 @@ import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest private[server] class MetadataCache(brokerId: Int) extends Logging { private val stateChangeLogger = KafkaController.stateChangeLogger private val cache = mutable.Map[String, mutable.Map[Int, PartitionStateInfo]]() + private var controllerId: Option[Int] = None private val aliveBrokers = mutable.Map[Int, Broker]() private val aliveNodes = mutable.Map[Int, collection.Map[SecurityProtocol, Node]]() private val partitionMetadataLock = new ReentrantReadWriteLock() this.logIdent = s"[Kafka Metadata Cache on broker $brokerId] " - private def getAliveEndpoints(brokers: Iterable[Int], protocol: SecurityProtocol): Seq[Node] = { + // This method is the main hotspot when it comes to the performance of metadata requests, + // we should be careful about adding additional logic here. + // filterUnavailableEndpoints exists to support v0 MetadataResponses + private def getEndpoints(brokers: Iterable[Int], protocol: SecurityProtocol, filterUnavailableEndpoints: Boolean): Seq[Node] = { val result = new mutable.ArrayBuffer[Node](math.min(aliveBrokers.size, brokers.size)) brokers.foreach { brokerId => - getAliveEndpoint(brokerId, protocol).foreach(result +=) + val endpoint = getAliveEndpoint(brokerId, protocol) match { + case None => if (!filterUnavailableEndpoints) Some(new Node(brokerId, "", -1)) else None + case Some(node) => Some(node) + } + endpoint.foreach(result +=) } result } @@ -60,7 +68,8 @@ private[server] class MetadataCache(brokerId: Int) extends Logging { throw new BrokerEndPointNotAvailableException(s"Broker `$brokerId` does not support security protocol `$protocol`")) } - private def getPartitionMetadata(topic: String, protocol: SecurityProtocol): Option[Iterable[MetadataResponse.PartitionMetadata]] = { + // errorUnavailableEndpoints exists to support v0 MetadataResponses + private def getPartitionMetadata(topic: String, protocol: SecurityProtocol, errorUnavailableEndpoints: Boolean): Option[Iterable[MetadataResponse.PartitionMetadata]] = { cache.get(topic).map { partitions => partitions.map { case (partitionId, partitionState) => val topicPartition = TopicAndPartition(topic, partitionId) @@ -69,7 +78,7 @@ private[server] class MetadataCache(brokerId: Int) extends Logging { val maybeLeader = getAliveEndpoint(leaderAndIsr.leader, protocol) val replicas = partitionState.allReplicas - val replicaInfo = getAliveEndpoints(replicas, protocol) + val replicaInfo = getEndpoints(replicas, protocol, errorUnavailableEndpoints) maybeLeader match { case None => @@ -79,7 +88,7 @@ private[server] class MetadataCache(brokerId: Int) extends Logging { case Some(leader) => val isr = leaderAndIsr.isr - val isrInfo = getAliveEndpoints(isr, protocol) + val isrInfo = getEndpoints(isr, protocol, errorUnavailableEndpoints) if (replicaInfo.size < replicas.size) { debug(s"Error while fetching metadata for $topicPartition: replica information not available for " + @@ -101,12 +110,12 @@ private[server] class MetadataCache(brokerId: Int) extends Logging { } } - def getTopicMetadata(topics: Set[String], protocol: SecurityProtocol): Seq[MetadataResponse.TopicMetadata] = { + // errorUnavailableEndpoints exists to support v0 MetadataResponses + def getTopicMetadata(topics: Set[String], protocol: SecurityProtocol, errorUnavailableEndpoints: Boolean = false): Seq[MetadataResponse.TopicMetadata] = { inReadLock(partitionMetadataLock) { - val topicsRequested = if (topics.isEmpty) cache.keySet else topics - topicsRequested.toSeq.flatMap { topic => - getPartitionMetadata(topic, protocol).map { partitionMetadata => - new MetadataResponse.TopicMetadata(Errors.NONE, topic, partitionMetadata.toBuffer.asJava) + topics.toSeq.flatMap { topic => + getPartitionMetadata(topic, protocol, errorUnavailableEndpoints).map { partitionMetadata => + new MetadataResponse.TopicMetadata(Errors.NONE, topic, Topic.isInternal(topic), partitionMetadata.toBuffer.asJava) } } } @@ -151,8 +160,14 @@ private[server] class MetadataCache(brokerId: Int) extends Logging { } } + def getControllerId: Option[Int] = controllerId + def updateCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest) { inWriteLock(partitionMetadataLock) { + controllerId = updateMetadataRequest.controllerId match { + case id if id < 0 => None + case id => Some(id) + } aliveNodes.clear() aliveBrokers.clear() updateMetadataRequest.liveBrokers.asScala.foreach { broker => http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 9bbd29e..888912b 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -28,6 +28,7 @@ import kafka.log.{LogAppendInfo, LogManager} import kafka.message.{ByteBufferMessageSet, InvalidMessageException, Message, MessageSet} import kafka.metrics.KafkaMetricsGroup import kafka.utils._ +import org.I0Itec.zkclient.IZkChildListener import org.apache.kafka.common.errors.{OffsetOutOfRangeException, RecordBatchTooLargeException, ReplicaNotAvailableException, RecordTooLargeException, InvalidTopicException, ControllerMovedException, NotLeaderForPartitionException, CorruptRecordException, UnknownTopicOrPartitionException, InvalidTimestampException} @@ -39,7 +40,6 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.{Time => JTime} import scala.collection._ import scala.collection.JavaConverters._ -import org.apache.kafka.common.internals.TopicConstants /* * Result metadata of a log append operation on the log @@ -394,7 +394,7 @@ class ReplicaManager(val config: KafkaConfig, BrokerTopicStats.getBrokerAllTopicsStats().totalProduceRequestRate.mark() // reject appending to internal topics if it is not allowed - if (TopicConstants.INTERNAL_TOPICS.contains(topicPartition.topic) && !internalTopicsAllowed) { + if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) { (topicPartition, LogAppendResult( LogAppendInfo.UnknownLogAppendInfo, Some(new InvalidTopicException("Cannot append to internal topic %s".format(topicPartition.topic))))) http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala new file mode 100644 index 0000000..3d05c1d --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.io.{DataInputStream, DataOutputStream} +import java.net.Socket +import java.nio.ByteBuffer +import java.util.Properties + +import kafka.integration.KafkaServerTestHarness +import kafka.network.SocketServer +import kafka.utils._ +import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol} +import org.apache.kafka.common.requests.{AbstractRequest, RequestHeader, ResponseHeader} +import org.junit.Before + +abstract class BaseRequestTest extends KafkaServerTestHarness { + val numBrokers = 3 + private var correlationId = 0 + + // Override properties by mutating the passed Properties object + def propertyOverrides(properties: Properties): Unit + + def generateConfigs() = { + val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect, enableControlledShutdown = false) + props.foreach(propertyOverrides) + props.map(KafkaConfig.fromProps) + } + + @Before + override def setUp() { + super.setUp() + TestUtils.waitUntilTrue(() => servers.head.metadataCache.getAliveBrokers.size == numBrokers, "Wait for cache to update") + } + + def socketServer = { + servers.find { server => + val state = server.brokerState.currentState + state != NotRunning.state && state != BrokerShuttingDown.state + }.map(_.socketServer).getOrElse(throw new IllegalStateException("No live broker is available")) + } + + private def connect(s: SocketServer = socketServer, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): Socket = { + new Socket("localhost", s.boundPort(protocol)) + } + + private def sendRequest(socket: Socket, request: Array[Byte]) { + val outgoing = new DataOutputStream(socket.getOutputStream) + outgoing.writeInt(request.length) + outgoing.write(request) + outgoing.flush() + } + + private def receiveResponse(socket: Socket): Array[Byte] = { + val incoming = new DataInputStream(socket.getInputStream) + val len = incoming.readInt() + val response = new Array[Byte](len) + incoming.readFully(response) + response + } + + private def requestAndReceive(request: Array[Byte]): Array[Byte] = { + val plainSocket = connect() + try { + sendRequest(plainSocket, request) + receiveResponse(plainSocket) + } finally { + plainSocket.close() + } + } + + /** + * Serializes and send the request to the given api. A ByteBuffer containing the response is returned. + */ + def send(request: AbstractRequest, apiKey: ApiKeys, version: Short): ByteBuffer = { + correlationId += 1 + val serializedBytes = { + val header = new RequestHeader(apiKey.id, version, "", correlationId) + val byteBuffer = ByteBuffer.allocate(header.sizeOf() + request.sizeOf) + header.writeTo(byteBuffer) + request.writeTo(byteBuffer) + byteBuffer.array() + } + + val response = requestAndReceive(serializedBytes) + + val responseBuffer = ByteBuffer.wrap(response) + ResponseHeader.parse(responseBuffer) // Parse the header to ensure its valid and move the buffer forward + responseBuffer + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala index 017faea..770513c 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala @@ -158,7 +158,8 @@ class MetadataCacheTest { val updateMetadataRequest = new UpdateMetadataRequest(controllerId, controllerEpoch, partitionStates.asJava, brokers.asJava) cache.updateCache(15, updateMetadataRequest) - val topicMetadatas = cache.getTopicMetadata(Set(topic), SecurityProtocol.PLAINTEXT) + // Validate errorUnavailableEndpoints = false + val topicMetadatas = cache.getTopicMetadata(Set(topic), SecurityProtocol.PLAINTEXT, errorUnavailableEndpoints = false) assertEquals(1, topicMetadatas.size) val topicMetadata = topicMetadatas.head @@ -169,9 +170,25 @@ class MetadataCacheTest { val partitionMetadata = partitionMetadatas.get(0) assertEquals(0, partitionMetadata.partition) - assertEquals(Errors.REPLICA_NOT_AVAILABLE, partitionMetadata.error) - assertEquals(Set(0), partitionMetadata.replicas.asScala.map(_.id).toSet) + assertEquals(Errors.NONE, partitionMetadata.error) + assertEquals(Set(0, 1), partitionMetadata.replicas.asScala.map(_.id).toSet) assertEquals(Set(0), partitionMetadata.isr.asScala.map(_.id).toSet) + + // Validate errorUnavailableEndpoints = true + val topicMetadatasWithError = cache.getTopicMetadata(Set(topic), SecurityProtocol.PLAINTEXT, errorUnavailableEndpoints = true) + assertEquals(1, topicMetadatasWithError.size) + + val topicMetadataWithError = topicMetadatasWithError.head + assertEquals(Errors.NONE, topicMetadataWithError.error) + + val partitionMetadatasWithError = topicMetadataWithError.partitionMetadata + assertEquals(1, partitionMetadatasWithError.size) + + val partitionMetadataWithError = partitionMetadatasWithError.get(0) + assertEquals(0, partitionMetadataWithError.partition) + assertEquals(Errors.REPLICA_NOT_AVAILABLE, partitionMetadataWithError.error) + assertEquals(Set(0), partitionMetadataWithError.replicas.asScala.map(_.id).toSet) + assertEquals(Set(0), partitionMetadataWithError.isr.asScala.map(_.id).toSet) } @Test @@ -197,7 +214,8 @@ class MetadataCacheTest { val updateMetadataRequest = new UpdateMetadataRequest(controllerId, controllerEpoch, partitionStates.asJava, brokers.asJava) cache.updateCache(15, updateMetadataRequest) - val topicMetadatas = cache.getTopicMetadata(Set(topic), SecurityProtocol.PLAINTEXT) + // Validate errorUnavailableEndpoints = false + val topicMetadatas = cache.getTopicMetadata(Set(topic), SecurityProtocol.PLAINTEXT, errorUnavailableEndpoints = false) assertEquals(1, topicMetadatas.size) val topicMetadata = topicMetadatas.head @@ -208,9 +226,25 @@ class MetadataCacheTest { val partitionMetadata = partitionMetadatas.get(0) assertEquals(0, partitionMetadata.partition) - assertEquals(Errors.REPLICA_NOT_AVAILABLE, partitionMetadata.error) + assertEquals(Errors.NONE, partitionMetadata.error) assertEquals(Set(0), partitionMetadata.replicas.asScala.map(_.id).toSet) - assertEquals(Set(0), partitionMetadata.isr.asScala.map(_.id).toSet) + assertEquals(Set(0, 1), partitionMetadata.isr.asScala.map(_.id).toSet) + + // Validate errorUnavailableEndpoints = true + val topicMetadatasWithError = cache.getTopicMetadata(Set(topic), SecurityProtocol.PLAINTEXT, errorUnavailableEndpoints = true) + assertEquals(1, topicMetadatasWithError.size) + + val topicMetadataWithError = topicMetadatasWithError.head + assertEquals(Errors.NONE, topicMetadataWithError.error) + + val partitionMetadatasWithError = topicMetadataWithError.partitionMetadata + assertEquals(1, partitionMetadatasWithError.size) + + val partitionMetadataWithError = partitionMetadatasWithError.get(0) + assertEquals(0, partitionMetadataWithError.partition) + assertEquals(Errors.REPLICA_NOT_AVAILABLE, partitionMetadataWithError.error) + assertEquals(Set(0), partitionMetadataWithError.replicas.asScala.map(_.id).toSet) + assertEquals(Set(0), partitionMetadataWithError.isr.asScala.map(_.id).toSet) } @Test
