http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java index 4c99d4a..1651e75 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java @@ -43,6 +43,6 @@ public class ConsumerMetadataRequest extends AbstractRequestResponse { } public static ConsumerMetadataRequest parse(ByteBuffer buffer) { - return new ConsumerMetadataRequest(((Struct) CURRENT_SCHEMA.read(buffer))); + return new ConsumerMetadataRequest((Struct) CURRENT_SCHEMA.read(buffer)); } }
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java index 173333b..0c250c3 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java @@ -65,6 +65,6 @@ public class ConsumerMetadataResponse extends AbstractRequestResponse { } public static ConsumerMetadataResponse parse(ByteBuffer buffer) { - return new ConsumerMetadataResponse(((Struct) CURRENT_SCHEMA.read(buffer))); + return new ConsumerMetadataResponse((Struct) CURRENT_SCHEMA.read(buffer)); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java index 2529a09..721e7d3 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java @@ -135,6 +135,6 @@ public class FetchRequest extends AbstractRequestResponse { } public static FetchRequest parse(ByteBuffer buffer) { - return new FetchRequest(((Struct) CURRENT_SCHEMA.read(buffer))); + return new FetchRequest((Struct) CURRENT_SCHEMA.read(buffer)); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java index c1e5f44..e67c4c8 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java @@ -106,6 +106,6 @@ public class FetchResponse extends AbstractRequestResponse { } public static FetchResponse parse(ByteBuffer buffer) { - return new FetchResponse(((Struct) CURRENT_SCHEMA.read(buffer))); + return new FetchResponse((Struct) CURRENT_SCHEMA.read(buffer)); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java index cfdb5de..6943878 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java @@ -60,6 +60,6 @@ public class HeartbeatRequest extends AbstractRequestResponse { } public static HeartbeatRequest parse(ByteBuffer buffer) { - return new HeartbeatRequest(((Struct) CURRENT_SCHEMA.read(buffer))); + return new HeartbeatRequest((Struct) CURRENT_SCHEMA.read(buffer)); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java index ea964f7..0057496 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java @@ -41,6 +41,6 @@ public class HeartbeatResponse extends AbstractRequestResponse { } public static HeartbeatResponse parse(ByteBuffer buffer) { - return new HeartbeatResponse(((Struct) CURRENT_SCHEMA.read(buffer))); + return new HeartbeatResponse((Struct) CURRENT_SCHEMA.read(buffer)); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java index a1d48c9..8c50e9b 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java @@ -83,6 +83,6 @@ public class JoinGroupRequest extends AbstractRequestResponse { } public static JoinGroupRequest parse(ByteBuffer buffer) { - return new JoinGroupRequest(((Struct) CURRENT_SCHEMA.read(buffer))); + return new JoinGroupRequest((Struct) CURRENT_SCHEMA.read(buffer)); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java index 1e9f349..52b1803 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java @@ -98,6 +98,6 @@ public class JoinGroupResponse extends AbstractRequestResponse { } public static JoinGroupResponse parse(ByteBuffer buffer) { - return new JoinGroupResponse(((Struct) CURRENT_SCHEMA.read(buffer))); + return new JoinGroupResponse((Struct) CURRENT_SCHEMA.read(buffer)); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java index 05c5fed..e5dc92e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java @@ -58,7 +58,7 @@ public class ListOffsetRequest extends AbstractRequestResponse { } public ListOffsetRequest(Map<TopicPartition, PartitionData> offsetData) { - this(-1, offsetData); + this(-1, offsetData); } public ListOffsetRequest(int replicaId, Map<TopicPartition, PartitionData> offsetData) { @@ -114,6 +114,6 @@ public class ListOffsetRequest extends AbstractRequestResponse { } public static ListOffsetRequest parse(ByteBuffer buffer) { - return new ListOffsetRequest(((Struct) CURRENT_SCHEMA.read(buffer))); + return new ListOffsetRequest((Struct) CURRENT_SCHEMA.read(buffer)); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java index b2e473e..cfac47a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java @@ -104,6 +104,6 @@ public class ListOffsetResponse extends AbstractRequestResponse { } public static ListOffsetResponse parse(ByteBuffer buffer) { - return new ListOffsetResponse(((Struct) CURRENT_SCHEMA.read(buffer))); + return new ListOffsetResponse((Struct) CURRENT_SCHEMA.read(buffer)); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java index 0186783..5d5f52c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java @@ -48,6 +48,6 @@ public class MetadataRequest extends AbstractRequestResponse { } public static MetadataRequest parse(ByteBuffer buffer) { - return new MetadataRequest(((Struct) CURRENT_SCHEMA.read(buffer))); + return new MetadataRequest((Struct) CURRENT_SCHEMA.read(buffer)); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java index 13daf59..90f3141 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java @@ -29,7 +29,7 @@ import org.apache.kafka.common.protocol.types.Struct; public class MetadataResponse extends AbstractRequestResponse { - private static Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.METADATA.id); + private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.METADATA.id); private static final String BROKERS_KEY_NAME = "brokers"; private static final String TOPIC_METATDATA_KEY_NAME = "topic_metadata"; @@ -69,12 +69,12 @@ public class MetadataResponse extends AbstractRequestResponse { List<Struct> topicArray = new ArrayList<Struct>(); for (String topic: cluster.topics()) { Struct topicData = struct.instance(TOPIC_METATDATA_KEY_NAME); - topicData.set(TOPIC_ERROR_CODE_KEY_NAME, (short)0); // no error + topicData.set(TOPIC_ERROR_CODE_KEY_NAME, Errors.NONE.code()); topicData.set(TOPIC_KEY_NAME, topic); List<Struct> partitionArray = new ArrayList<Struct>(); for (PartitionInfo fetchPartitionData : cluster.partitionsForTopic(topic)) { Struct partitionData = topicData.instance(PARTITION_METADATA_KEY_NAME); - partitionData.set(PARTITION_ERROR_CODE_KEY_NAME, (short)0); // no error + partitionData.set(PARTITION_ERROR_CODE_KEY_NAME, Errors.NONE.code()); partitionData.set(PARTITION_KEY_NAME, fetchPartitionData.partition()); partitionData.set(LEADER_KEY_NAME, fetchPartitionData.leader().id()); ArrayList<Integer> replicas = new ArrayList<Integer>(); @@ -148,6 +148,6 @@ public class MetadataResponse extends AbstractRequestResponse { } public static MetadataResponse parse(ByteBuffer buffer) { - return new MetadataResponse(((Struct) CURRENT_SCHEMA.read(buffer))); + return new MetadataResponse((Struct) CURRENT_SCHEMA.read(buffer)); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java index 4fb48c8..94e9d37 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java @@ -172,10 +172,10 @@ public class OffsetCommitRequest extends AbstractRequestResponse { public static OffsetCommitRequest parse(ByteBuffer buffer, int versionId) { Schema schema = ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, versionId); - return new OffsetCommitRequest(((Struct) schema.read(buffer))); + return new OffsetCommitRequest((Struct) schema.read(buffer)); } public static OffsetCommitRequest parse(ByteBuffer buffer) { - return new OffsetCommitRequest(((Struct) CURRENT_SCHEMA.read(buffer))); + return new OffsetCommitRequest((Struct) CURRENT_SCHEMA.read(buffer)); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java index 2ab1dc6..4d3b9ec 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java @@ -83,6 +83,6 @@ public class OffsetCommitResponse extends AbstractRequestResponse { } public static OffsetCommitResponse parse(ByteBuffer buffer) { - return new OffsetCommitResponse(((Struct) CURRENT_SCHEMA.read(buffer))); + return new OffsetCommitResponse((Struct) CURRENT_SCHEMA.read(buffer)); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java index 333483f..16c807c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java @@ -83,7 +83,7 @@ public class OffsetFetchRequest extends AbstractRequestResponse { } } groupId = struct.getString(GROUP_ID_KEY_NAME); - } + } public String groupId() { return groupId; @@ -94,6 +94,6 @@ public class OffsetFetchRequest extends AbstractRequestResponse { } public static OffsetFetchRequest parse(ByteBuffer buffer) { - return new OffsetFetchRequest(((Struct) CURRENT_SCHEMA.read(buffer))); + return new OffsetFetchRequest((Struct) CURRENT_SCHEMA.read(buffer)); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java index 04c88c0..edbed58 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java @@ -108,6 +108,6 @@ public class OffsetFetchResponse extends AbstractRequestResponse { } public static OffsetFetchResponse parse(ByteBuffer buffer) { - return new OffsetFetchResponse(((Struct) CURRENT_SCHEMA.read(buffer))); + return new OffsetFetchResponse((Struct) CURRENT_SCHEMA.read(buffer)); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java index 03a0ab1..995f89f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java @@ -101,6 +101,6 @@ public class ProduceRequest extends AbstractRequestResponse { } public static ProduceRequest parse(ByteBuffer buffer) { - return new ProduceRequest(((Struct) CURRENT_SCHEMA.read(buffer))); + return new ProduceRequest((Struct) CURRENT_SCHEMA.read(buffer)); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index e42d7db..a00dcdf 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -108,6 +108,6 @@ public class ProduceResponse extends AbstractRequestResponse { } public static ProduceResponse parse(ByteBuffer buffer) { - return new ProduceResponse(((Struct) CURRENT_SCHEMA.read(buffer))); + return new ProduceResponse((Struct) CURRENT_SCHEMA.read(buffer)); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java index f459a2a..14bcde7 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java @@ -26,10 +26,10 @@ import org.apache.kafka.common.protocol.types.Struct; */ public class RequestHeader extends AbstractRequestResponse { - private static Field API_KEY_FIELD = REQUEST_HEADER.get("api_key"); - private static Field API_VERSION_FIELD = REQUEST_HEADER.get("api_version"); - private static Field CLIENT_ID_FIELD = REQUEST_HEADER.get("client_id"); - private static Field CORRELATION_ID_FIELD = REQUEST_HEADER.get("correlation_id"); + private static final Field API_KEY_FIELD = REQUEST_HEADER.get("api_key"); + private static final Field API_VERSION_FIELD = REQUEST_HEADER.get("api_version"); + private static final Field CLIENT_ID_FIELD = REQUEST_HEADER.get("client_id"); + private static final Field CORRELATION_ID_FIELD = REQUEST_HEADER.get("correlation_id"); private final short apiKey; private final short apiVersion; http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java b/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java index dd63853..e8a7ef9 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java @@ -30,7 +30,7 @@ import org.apache.kafka.common.protocol.types.Struct; */ public class ResponseHeader extends AbstractRequestResponse { - private static Field CORRELATION_KEY_FIELD = RESPONSE_HEADER.get("correlation_id"); + private static final Field CORRELATION_KEY_FIELD = RESPONSE_HEADER.get("correlation_id"); private final int correlationId; @@ -50,7 +50,7 @@ public class ResponseHeader extends AbstractRequestResponse { } public static ResponseHeader parse(ByteBuffer buffer) { - return new ResponseHeader(((Struct) Protocol.RESPONSE_HEADER.read(buffer))); + return new ResponseHeader((Struct) Protocol.RESPONSE_HEADER.read(buffer)); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java b/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java deleted file mode 100644 index b987e7f..0000000 --- a/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.kafka.common.utils; - -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; - -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.config.ConfigException; - -import static org.apache.kafka.common.utils.Utils.getHost; -import static org.apache.kafka.common.utils.Utils.getPort; - -public class ClientUtils { - - public static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls) { - List<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>(); - for (String url : urls) { - if (url != null && url.length() > 0) { - String host = getHost(url); - Integer port = getPort(url); - if (host == null || port == null) - throw new ConfigException("Invalid url in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url); - try { - InetSocketAddress address = new InetSocketAddress(host, port); - if (address.isUnresolved()) - throw new ConfigException("DNS resolution failed for url in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url); - addresses.add(address); - } catch (NumberFormatException e) { - throw new ConfigException("Invalid port in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url); - } - } - } - if (addresses.size() < 1) - throw new ConfigException("No bootstrap urls given in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG); - return addresses; - } -} \ No newline at end of file
