mumrah commented on code in PR #14376:
URL: https://github.com/apache/kafka/pull/14376#discussion_r1324939918


##########
metadata/src/main/java/org/apache/kafka/metadata/ListenerInfo.java:
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.message.ControllerRegistrationRequestData;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RegisterControllerRecord;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Function;
+
+
+/**
+ * ListenerInfo contains information about the listeners of either a 
controller or a broker.
+ * ListenerInfo objects are immutable; they cannot be modified once created. 
The intention is
+ * that you store either controller listeners or broker listeners here, but 
not both. On a
+ * combined KRaft node, which has both broker and controller roles, you would 
have two
+ * separate ListenerInfo objects to represent the listeners of each role.
+ *
+ * Listener information is stored in a linked hash map. This maintains 
ordering while still
+ * allowing the traditional O(1) hash map access. By convention, the first 
listener is special,
+ * corresponding to either the inter-controller listener or the inter-broker 
listener.
+ * This is the only listener that other nodes will attempt to use to 
communicate with this node.
+ *
+ * You may wonder why nodes support multiple listeners, given that 
inter-cluster communication only
+ * ever uses the first one. Well, one reason is that external clients may wish 
to use the additional
+ * listeners. It is a good practice to separate external and internal traffic. 
In some cases,
+ * external traffic may be encrypted while internal traffic is not. (Although 
other admins may wish
+ * to encrypt everything.) Another reason is that supporting multiple 
listeners allows us to change
+ * the effective inter-cluster listener via a roll. During such a roll, half 
of the brokers
+ * (or controllers) might be using one listener, while the other half use 
another. This lets us,
+ * for example, transition from using a PLAINTEXT inter broker listener to 
using an SSL one without
+ * taking any downtime.
+ *
+ * The ListenerInfo class is intended to handle translating endpoint 
information between various
+ * different data structures, and also to handle the two big gotchas of Kafka 
endpoints.
+ *
+ * The first gotcha is that the hostname will be null or blank if we are 
listening on 0.0.0.0.
+ * The withWildcardHostnamesResolved function creates a ListenerInfo object 
where all such hostnames
+ * are replaced by specific hostnames. (It's not perfect because we have to 
choose a single hostname
+ * out of multiple possibilities. In production scenarios it would be better 
to set the desired
+ * hostname explicitly in the configuration rather than binding to 0.0.0.0.)
+ *
+ * The second gotcha is that if someone configures an ephemeral port (aka port 
0), we need to fill
+ * The withEphemeralPortsCorrected resolves this by filling in the missing 
information for ephemeral

Review Comment:
   seems some words are missing here? 



##########
metadata/src/main/java/org/apache/kafka/metadata/ListenerInfo.java:
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.message.ControllerRegistrationRequestData;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RegisterControllerRecord;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Function;
+
+
+/**
+ * ListenerInfo contains information about the listeners of either a 
controller or a broker.
+ * ListenerInfo objects are immutable; they cannot be modified once created. 
The intention is
+ * that you store either controller listeners or broker listeners here, but 
not both. On a
+ * combined KRaft node, which has both broker and controller roles, you would 
have two
+ * separate ListenerInfo objects to represent the listeners of each role.
+ *
+ * Listener information is stored in a linked hash map. This maintains 
ordering while still
+ * allowing the traditional O(1) hash map access. By convention, the first 
listener is special,
+ * corresponding to either the inter-controller listener or the inter-broker 
listener.
+ * This is the only listener that other nodes will attempt to use to 
communicate with this node.
+ *
+ * You may wonder why nodes support multiple listeners, given that 
inter-cluster communication only
+ * ever uses the first one. Well, one reason is that external clients may wish 
to use the additional
+ * listeners. It is a good practice to separate external and internal traffic. 
In some cases,
+ * external traffic may be encrypted while internal traffic is not. (Although 
other admins may wish
+ * to encrypt everything.) Another reason is that supporting multiple 
listeners allows us to change
+ * the effective inter-cluster listener via a roll. During such a roll, half 
of the brokers
+ * (or controllers) might be using one listener, while the other half use 
another. This lets us,
+ * for example, transition from using a PLAINTEXT inter broker listener to 
using an SSL one without
+ * taking any downtime.
+ *
+ * The ListenerInfo class is intended to handle translating endpoint 
information between various
+ * different data structures, and also to handle the two big gotchas of Kafka 
endpoints.
+ *
+ * The first gotcha is that the hostname will be null or blank if we are 
listening on 0.0.0.0.
+ * The withWildcardHostnamesResolved function creates a ListenerInfo object 
where all such hostnames
+ * are replaced by specific hostnames. (It's not perfect because we have to 
choose a single hostname
+ * out of multiple possibilities. In production scenarios it would be better 
to set the desired
+ * hostname explicitly in the configuration rather than binding to 0.0.0.0.)
+ *
+ * The second gotcha is that if someone configures an ephemeral port (aka port 
0), we need to fill
+ * The withEphemeralPortsCorrected resolves this by filling in the missing 
information for ephemeral
+ * ports.
+ */
+final public class ListenerInfo {
+    /**
+     * Create a ListenerInfo from data in a ControllerRegistrationRequest RPC.
+     *
+     * @param collection    The RPC data.
+     *
+     * @return              The ListenerInfo object.
+     */
+    public static ListenerInfo fromControllerRegistrationRequest(
+        ControllerRegistrationRequestData.ListenerCollection collection
+    ) {
+        LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>();
+        collection.forEach(listener -> {
+            SecurityProtocol protocol = 
SecurityProtocol.forId(listener.securityProtocol());
+            if (protocol == null) {
+                throw new RuntimeException("Unknown security protocol " +
+                    (int) listener.securityProtocol() + " in listener " + 
listener.name());
+            }
+            listeners.put(listener.name(), new Endpoint(listener.name(),
+                protocol,
+                listener.host(),
+                listener.port()));
+        });
+        return new ListenerInfo(listeners);
+    }
+
+    /**
+     * Create a ListenerInfo from data in a RegisterControllerRecord.
+     *
+     * @param collection    The record data.
+     *
+     * @return              The ListenerInfo object.
+     */
+    public static ListenerInfo fromControllerRegistrationRecord(
+        RegisterControllerRecord.ControllerEndpointCollection collection
+    ) {
+        LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>();
+        collection.forEach(listener -> {
+            SecurityProtocol protocol = 
SecurityProtocol.forId(listener.securityProtocol());
+            if (protocol == null) {
+                throw new RuntimeException("Unknown security protocol " +
+                    (int) listener.securityProtocol() + " in listener " + 
listener.name());
+            }
+            listeners.put(listener.name(), new Endpoint(listener.name(),
+                protocol,
+                listener.host(),
+                listener.port()));
+        });
+        return new ListenerInfo(listeners);
+    }
+
+    /**
+     * Create a ListenerInfo from data in a BrokerRegistrationRequest RPC.
+     *
+     * @param collection    The RPC data.
+     *
+     * @return              The ListenerInfo object.
+     */
+    public static ListenerInfo fromBrokerRegistrationRequest(
+        BrokerRegistrationRequestData.ListenerCollection collection
+    ) {
+        LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>();
+        collection.forEach(listener -> {
+            SecurityProtocol protocol = 
SecurityProtocol.forId(listener.securityProtocol());
+            if (protocol == null) {
+                throw new RuntimeException("Unknown security protocol " +
+                        (int) listener.securityProtocol() + " in listener " + 
listener.name());
+            }
+            listeners.put(listener.name(), new Endpoint(listener.name(),
+                protocol,
+                listener.host(),
+                listener.port()));
+        });
+        return new ListenerInfo(listeners);
+    }
+
+    /**
+     * Create a ListenerInfo from data in a RegisterBrokerRecord.
+     *
+     * @param collection    The record data.
+     *
+     * @return              The ListenerInfo object.
+     */
+    public static ListenerInfo fromBrokerRegistrationRecord(
+        RegisterBrokerRecord.BrokerEndpointCollection collection
+    ) {
+        LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>();
+        collection.forEach(listener -> {
+            SecurityProtocol protocol = 
SecurityProtocol.forId(listener.securityProtocol());
+            if (protocol == null) {
+                throw new RuntimeException("Unknown security protocol " +
+                        (int) listener.securityProtocol() + " in listener " + 
listener.name());
+            }
+            listeners.put(listener.name(), new Endpoint(listener.name(),
+                    protocol,
+                    listener.host(),
+                    listener.port()));
+        });
+        return new ListenerInfo(listeners);
+    }
+
+    public static ListenerInfo create(
+        List<Endpoint> rawListeners
+    ) {
+        return create(Optional.empty(), rawListeners);
+    }
+
+    public static ListenerInfo create(
+        Optional<String> firstListenerName,
+        List<Endpoint> rawListeners
+    ) {
+        LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>();
+        for (Endpoint listener : rawListeners) {
+            String name = listener.listenerName().get();
+            if (Optional.of(name).equals(firstListenerName)) {
+                listeners.put(name, listener);

Review Comment:
   Can probably break here if you find the first listener.



##########
metadata/src/main/java/org/apache/kafka/metadata/ListenerInfo.java:
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.message.ControllerRegistrationRequestData;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RegisterControllerRecord;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Function;
+
+
+/**
+ * ListenerInfo contains information about the listeners of either a 
controller or a broker.
+ * ListenerInfo objects are immutable; they cannot be modified once created. 
The intention is
+ * that you store either controller listeners or broker listeners here, but 
not both. On a
+ * combined KRaft node, which has both broker and controller roles, you would 
have two
+ * separate ListenerInfo objects to represent the listeners of each role.
+ *
+ * Listener information is stored in a linked hash map. This maintains 
ordering while still
+ * allowing the traditional O(1) hash map access. By convention, the first 
listener is special,
+ * corresponding to either the inter-controller listener or the inter-broker 
listener.
+ * This is the only listener that other nodes will attempt to use to 
communicate with this node.
+ *
+ * You may wonder why nodes support multiple listeners, given that 
inter-cluster communication only
+ * ever uses the first one. Well, one reason is that external clients may wish 
to use the additional
+ * listeners. It is a good practice to separate external and internal traffic. 
In some cases,
+ * external traffic may be encrypted while internal traffic is not. (Although 
other admins may wish
+ * to encrypt everything.) Another reason is that supporting multiple 
listeners allows us to change
+ * the effective inter-cluster listener via a roll. During such a roll, half 
of the brokers
+ * (or controllers) might be using one listener, while the other half use 
another. This lets us,
+ * for example, transition from using a PLAINTEXT inter broker listener to 
using an SSL one without
+ * taking any downtime.
+ *
+ * The ListenerInfo class is intended to handle translating endpoint 
information between various
+ * different data structures, and also to handle the two big gotchas of Kafka 
endpoints.
+ *
+ * The first gotcha is that the hostname will be null or blank if we are 
listening on 0.0.0.0.
+ * The withWildcardHostnamesResolved function creates a ListenerInfo object 
where all such hostnames
+ * are replaced by specific hostnames. (It's not perfect because we have to 
choose a single hostname
+ * out of multiple possibilities. In production scenarios it would be better 
to set the desired
+ * hostname explicitly in the configuration rather than binding to 0.0.0.0.)
+ *
+ * The second gotcha is that if someone configures an ephemeral port (aka port 
0), we need to fill
+ * The withEphemeralPortsCorrected resolves this by filling in the missing 
information for ephemeral
+ * ports.
+ */
+final public class ListenerInfo {
+    /**
+     * Create a ListenerInfo from data in a ControllerRegistrationRequest RPC.
+     *
+     * @param collection    The RPC data.
+     *
+     * @return              The ListenerInfo object.
+     */
+    public static ListenerInfo fromControllerRegistrationRequest(
+        ControllerRegistrationRequestData.ListenerCollection collection
+    ) {
+        LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>();
+        collection.forEach(listener -> {
+            SecurityProtocol protocol = 
SecurityProtocol.forId(listener.securityProtocol());
+            if (protocol == null) {
+                throw new RuntimeException("Unknown security protocol " +
+                    (int) listener.securityProtocol() + " in listener " + 
listener.name());
+            }
+            listeners.put(listener.name(), new Endpoint(listener.name(),
+                protocol,
+                listener.host(),
+                listener.port()));
+        });
+        return new ListenerInfo(listeners);
+    }
+
+    /**
+     * Create a ListenerInfo from data in a RegisterControllerRecord.
+     *
+     * @param collection    The record data.
+     *
+     * @return              The ListenerInfo object.
+     */
+    public static ListenerInfo fromControllerRegistrationRecord(
+        RegisterControllerRecord.ControllerEndpointCollection collection
+    ) {
+        LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>();
+        collection.forEach(listener -> {
+            SecurityProtocol protocol = 
SecurityProtocol.forId(listener.securityProtocol());
+            if (protocol == null) {
+                throw new RuntimeException("Unknown security protocol " +
+                    (int) listener.securityProtocol() + " in listener " + 
listener.name());
+            }
+            listeners.put(listener.name(), new Endpoint(listener.name(),
+                protocol,
+                listener.host(),
+                listener.port()));
+        });
+        return new ListenerInfo(listeners);
+    }
+
+    /**
+     * Create a ListenerInfo from data in a BrokerRegistrationRequest RPC.
+     *
+     * @param collection    The RPC data.
+     *
+     * @return              The ListenerInfo object.
+     */
+    public static ListenerInfo fromBrokerRegistrationRequest(
+        BrokerRegistrationRequestData.ListenerCollection collection
+    ) {
+        LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>();
+        collection.forEach(listener -> {
+            SecurityProtocol protocol = 
SecurityProtocol.forId(listener.securityProtocol());
+            if (protocol == null) {
+                throw new RuntimeException("Unknown security protocol " +
+                        (int) listener.securityProtocol() + " in listener " + 
listener.name());
+            }
+            listeners.put(listener.name(), new Endpoint(listener.name(),
+                protocol,
+                listener.host(),
+                listener.port()));
+        });
+        return new ListenerInfo(listeners);
+    }
+
+    /**
+     * Create a ListenerInfo from data in a RegisterBrokerRecord.
+     *
+     * @param collection    The record data.
+     *
+     * @return              The ListenerInfo object.
+     */
+    public static ListenerInfo fromBrokerRegistrationRecord(
+        RegisterBrokerRecord.BrokerEndpointCollection collection
+    ) {
+        LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>();
+        collection.forEach(listener -> {
+            SecurityProtocol protocol = 
SecurityProtocol.forId(listener.securityProtocol());
+            if (protocol == null) {
+                throw new RuntimeException("Unknown security protocol " +
+                        (int) listener.securityProtocol() + " in listener " + 
listener.name());
+            }
+            listeners.put(listener.name(), new Endpoint(listener.name(),
+                    protocol,
+                    listener.host(),
+                    listener.port()));
+        });
+        return new ListenerInfo(listeners);
+    }
+
+    public static ListenerInfo create(
+        List<Endpoint> rawListeners
+    ) {
+        return create(Optional.empty(), rawListeners);
+    }
+
+    public static ListenerInfo create(
+        Optional<String> firstListenerName,
+        List<Endpoint> rawListeners
+    ) {
+        LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>();
+        for (Endpoint listener : rawListeners) {
+            String name = listener.listenerName().get();
+            if (Optional.of(name).equals(firstListenerName)) {
+                listeners.put(name, listener);
+            }
+        }
+        for (Endpoint listener : rawListeners) {
+            String name = listener.listenerName().get();
+            if (!Optional.of(name).equals(firstListenerName)) {
+                listeners.put(name, listener);
+            }
+        }
+        return new ListenerInfo(listeners);
+    }
+
+    /**
+     * An ordered map containing all of the listeners. The first listener is 
special, indicating
+     * either the inter-broker or inter-controller listener.
+     */
+    private final Map<String, Endpoint> listeners;
+
+    private ListenerInfo(Map<String, Endpoint> listeners) {
+        this.listeners = Collections.unmodifiableMap(listeners);
+    }
+
+    public Map<String, Endpoint> listeners() {
+        return listeners;
+    }
+
+    public Endpoint firstListener() {
+        return listeners.values().iterator().next();
+    }
+
+    /**
+     * Create a new ListenerInfo object where null or blank hostnames 
(signifying that the user
+     * asked to bind to 0.0.0.0) are replaced by specific hostnames.
+     *
+     * @return A new ListenerInfo object.
+     */
+    public ListenerInfo withWildcardHostnamesResolved() throws 
UnknownHostException {
+        LinkedHashMap<String, Endpoint> newListeners = new LinkedHashMap<>();
+        for (Map.Entry<String, Endpoint> entry : listeners.entrySet()) {
+            if (entry.getValue().host() == null || 
entry.getValue().host().trim().isEmpty()) {
+                String newHost = 
InetAddress.getLocalHost().getCanonicalHostName();
+                Endpoint prevEndpoint = entry.getValue();
+                newListeners.put(entry.getKey(), new 
Endpoint(prevEndpoint.listenerName().get(),
+                        prevEndpoint.securityProtocol(),
+                        newHost,
+                        prevEndpoint.port()));
+            } else {
+                newListeners.put(entry.getKey(), entry.getValue());
+            }
+        }
+        return new ListenerInfo(newListeners);
+    }
+
+    /**
+     * Create a new ListenerInfo object where ephemeral ports are populated 
with their true runtime
+     * values.
+     *
+     * In other words, if a port was set to 0, indicating that a random port 
should be assigned by the
+     * operating system, this function will replace it with the value the 
operating system actually
+     * chose.
+     *
+     * @param getBoundPortCallback  The callback used to correct ephemeral 
endpoints.
+     *
+     * @return A new ListenerInfo object.
+     */
+    public ListenerInfo withEphemeralPortsCorrected(Function<String, Integer> 
getBoundPortCallback) {
+        LinkedHashMap<String, Endpoint> newListeners = new LinkedHashMap<>();
+        for (Map.Entry<String, Endpoint> entry : listeners.entrySet()) {
+            if (entry.getValue().port() == 0) {
+                Endpoint prevEndpoint = entry.getValue();
+                int newPort = getBoundPortCallback.apply(entry.getKey());

Review Comment:
   Same logging question as above



##########
metadata/src/main/java/org/apache/kafka/metadata/ListenerInfo.java:
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.message.ControllerRegistrationRequestData;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RegisterControllerRecord;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Function;
+
+
+/**
+ * ListenerInfo contains information about the listeners of either a 
controller or a broker.
+ * ListenerInfo objects are immutable; they cannot be modified once created. 
The intention is
+ * that you store either controller listeners or broker listeners here, but 
not both. On a
+ * combined KRaft node, which has both broker and controller roles, you would 
have two
+ * separate ListenerInfo objects to represent the listeners of each role.
+ *
+ * Listener information is stored in a linked hash map. This maintains 
ordering while still
+ * allowing the traditional O(1) hash map access. By convention, the first 
listener is special,
+ * corresponding to either the inter-controller listener or the inter-broker 
listener.
+ * This is the only listener that other nodes will attempt to use to 
communicate with this node.
+ *
+ * You may wonder why nodes support multiple listeners, given that 
inter-cluster communication only
+ * ever uses the first one. Well, one reason is that external clients may wish 
to use the additional
+ * listeners. It is a good practice to separate external and internal traffic. 
In some cases,
+ * external traffic may be encrypted while internal traffic is not. (Although 
other admins may wish
+ * to encrypt everything.) Another reason is that supporting multiple 
listeners allows us to change
+ * the effective inter-cluster listener via a roll. During such a roll, half 
of the brokers
+ * (or controllers) might be using one listener, while the other half use 
another. This lets us,
+ * for example, transition from using a PLAINTEXT inter broker listener to 
using an SSL one without
+ * taking any downtime.
+ *
+ * The ListenerInfo class is intended to handle translating endpoint 
information between various
+ * different data structures, and also to handle the two big gotchas of Kafka 
endpoints.
+ *
+ * The first gotcha is that the hostname will be null or blank if we are 
listening on 0.0.0.0.
+ * The withWildcardHostnamesResolved function creates a ListenerInfo object 
where all such hostnames
+ * are replaced by specific hostnames. (It's not perfect because we have to 
choose a single hostname
+ * out of multiple possibilities. In production scenarios it would be better 
to set the desired
+ * hostname explicitly in the configuration rather than binding to 0.0.0.0.)
+ *
+ * The second gotcha is that if someone configures an ephemeral port (aka port 
0), we need to fill
+ * The withEphemeralPortsCorrected resolves this by filling in the missing 
information for ephemeral
+ * ports.
+ */
+final public class ListenerInfo {
+    /**
+     * Create a ListenerInfo from data in a ControllerRegistrationRequest RPC.
+     *
+     * @param collection    The RPC data.
+     *
+     * @return              The ListenerInfo object.
+     */
+    public static ListenerInfo fromControllerRegistrationRequest(
+        ControllerRegistrationRequestData.ListenerCollection collection
+    ) {
+        LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>();
+        collection.forEach(listener -> {
+            SecurityProtocol protocol = 
SecurityProtocol.forId(listener.securityProtocol());
+            if (protocol == null) {
+                throw new RuntimeException("Unknown security protocol " +
+                    (int) listener.securityProtocol() + " in listener " + 
listener.name());
+            }
+            listeners.put(listener.name(), new Endpoint(listener.name(),
+                protocol,
+                listener.host(),
+                listener.port()));
+        });
+        return new ListenerInfo(listeners);
+    }
+
+    /**
+     * Create a ListenerInfo from data in a RegisterControllerRecord.
+     *
+     * @param collection    The record data.
+     *
+     * @return              The ListenerInfo object.
+     */
+    public static ListenerInfo fromControllerRegistrationRecord(
+        RegisterControllerRecord.ControllerEndpointCollection collection
+    ) {
+        LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>();
+        collection.forEach(listener -> {
+            SecurityProtocol protocol = 
SecurityProtocol.forId(listener.securityProtocol());
+            if (protocol == null) {
+                throw new RuntimeException("Unknown security protocol " +
+                    (int) listener.securityProtocol() + " in listener " + 
listener.name());
+            }
+            listeners.put(listener.name(), new Endpoint(listener.name(),
+                protocol,
+                listener.host(),
+                listener.port()));
+        });
+        return new ListenerInfo(listeners);
+    }
+
+    /**
+     * Create a ListenerInfo from data in a BrokerRegistrationRequest RPC.
+     *
+     * @param collection    The RPC data.
+     *
+     * @return              The ListenerInfo object.
+     */
+    public static ListenerInfo fromBrokerRegistrationRequest(
+        BrokerRegistrationRequestData.ListenerCollection collection
+    ) {
+        LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>();
+        collection.forEach(listener -> {
+            SecurityProtocol protocol = 
SecurityProtocol.forId(listener.securityProtocol());
+            if (protocol == null) {
+                throw new RuntimeException("Unknown security protocol " +
+                        (int) listener.securityProtocol() + " in listener " + 
listener.name());
+            }
+            listeners.put(listener.name(), new Endpoint(listener.name(),
+                protocol,
+                listener.host(),
+                listener.port()));
+        });
+        return new ListenerInfo(listeners);
+    }
+
+    /**
+     * Create a ListenerInfo from data in a RegisterBrokerRecord.
+     *
+     * @param collection    The record data.
+     *
+     * @return              The ListenerInfo object.
+     */
+    public static ListenerInfo fromBrokerRegistrationRecord(
+        RegisterBrokerRecord.BrokerEndpointCollection collection
+    ) {
+        LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>();
+        collection.forEach(listener -> {
+            SecurityProtocol protocol = 
SecurityProtocol.forId(listener.securityProtocol());
+            if (protocol == null) {
+                throw new RuntimeException("Unknown security protocol " +
+                        (int) listener.securityProtocol() + " in listener " + 
listener.name());
+            }
+            listeners.put(listener.name(), new Endpoint(listener.name(),
+                    protocol,
+                    listener.host(),
+                    listener.port()));
+        });
+        return new ListenerInfo(listeners);
+    }
+
+    public static ListenerInfo create(
+        List<Endpoint> rawListeners
+    ) {
+        return create(Optional.empty(), rawListeners);
+    }
+
+    public static ListenerInfo create(
+        Optional<String> firstListenerName,
+        List<Endpoint> rawListeners
+    ) {
+        LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>();
+        for (Endpoint listener : rawListeners) {
+            String name = listener.listenerName().get();
+            if (Optional.of(name).equals(firstListenerName)) {
+                listeners.put(name, listener);
+            }
+        }
+        for (Endpoint listener : rawListeners) {
+            String name = listener.listenerName().get();
+            if (!Optional.of(name).equals(firstListenerName)) {
+                listeners.put(name, listener);
+            }
+        }
+        return new ListenerInfo(listeners);
+    }
+
+    /**
+     * An ordered map containing all of the listeners. The first listener is 
special, indicating
+     * either the inter-broker or inter-controller listener.
+     */
+    private final Map<String, Endpoint> listeners;
+
+    private ListenerInfo(Map<String, Endpoint> listeners) {
+        this.listeners = Collections.unmodifiableMap(listeners);
+    }
+
+    public Map<String, Endpoint> listeners() {
+        return listeners;
+    }
+
+    public Endpoint firstListener() {
+        return listeners.values().iterator().next();

Review Comment:
   Is the iterator guaranteed to be non-empty?



##########
metadata/src/main/java/org/apache/kafka/metadata/ListenerInfo.java:
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.message.ControllerRegistrationRequestData;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RegisterControllerRecord;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Function;
+
+
+/**
+ * ListenerInfo contains information about the listeners of either a 
controller or a broker.
+ * ListenerInfo objects are immutable; they cannot be modified once created. 
The intention is
+ * that you store either controller listeners or broker listeners here, but 
not both. On a
+ * combined KRaft node, which has both broker and controller roles, you would 
have two
+ * separate ListenerInfo objects to represent the listeners of each role.
+ *
+ * Listener information is stored in a linked hash map. This maintains 
ordering while still
+ * allowing the traditional O(1) hash map access. By convention, the first 
listener is special,
+ * corresponding to either the inter-controller listener or the inter-broker 
listener.
+ * This is the only listener that other nodes will attempt to use to 
communicate with this node.
+ *
+ * You may wonder why nodes support multiple listeners, given that 
inter-cluster communication only
+ * ever uses the first one. Well, one reason is that external clients may wish 
to use the additional
+ * listeners. It is a good practice to separate external and internal traffic. 
In some cases,
+ * external traffic may be encrypted while internal traffic is not. (Although 
other admins may wish
+ * to encrypt everything.) Another reason is that supporting multiple 
listeners allows us to change
+ * the effective inter-cluster listener via a roll. During such a roll, half 
of the brokers
+ * (or controllers) might be using one listener, while the other half use 
another. This lets us,
+ * for example, transition from using a PLAINTEXT inter broker listener to 
using an SSL one without
+ * taking any downtime.
+ *
+ * The ListenerInfo class is intended to handle translating endpoint 
information between various
+ * different data structures, and also to handle the two big gotchas of Kafka 
endpoints.
+ *
+ * The first gotcha is that the hostname will be null or blank if we are 
listening on 0.0.0.0.
+ * The withWildcardHostnamesResolved function creates a ListenerInfo object 
where all such hostnames
+ * are replaced by specific hostnames. (It's not perfect because we have to 
choose a single hostname
+ * out of multiple possibilities. In production scenarios it would be better 
to set the desired
+ * hostname explicitly in the configuration rather than binding to 0.0.0.0.)
+ *
+ * The second gotcha is that if someone configures an ephemeral port (aka port 
0), we need to fill
+ * The withEphemeralPortsCorrected resolves this by filling in the missing 
information for ephemeral
+ * ports.
+ */
+final public class ListenerInfo {
+    /**
+     * Create a ListenerInfo from data in a ControllerRegistrationRequest RPC.
+     *
+     * @param collection    The RPC data.
+     *
+     * @return              The ListenerInfo object.
+     */
+    public static ListenerInfo fromControllerRegistrationRequest(
+        ControllerRegistrationRequestData.ListenerCollection collection
+    ) {
+        LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>();
+        collection.forEach(listener -> {
+            SecurityProtocol protocol = 
SecurityProtocol.forId(listener.securityProtocol());
+            if (protocol == null) {
+                throw new RuntimeException("Unknown security protocol " +
+                    (int) listener.securityProtocol() + " in listener " + 
listener.name());
+            }
+            listeners.put(listener.name(), new Endpoint(listener.name(),
+                protocol,
+                listener.host(),
+                listener.port()));
+        });
+        return new ListenerInfo(listeners);
+    }
+
+    /**
+     * Create a ListenerInfo from data in a RegisterControllerRecord.
+     *
+     * @param collection    The record data.
+     *
+     * @return              The ListenerInfo object.
+     */
+    public static ListenerInfo fromControllerRegistrationRecord(
+        RegisterControllerRecord.ControllerEndpointCollection collection
+    ) {
+        LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>();
+        collection.forEach(listener -> {
+            SecurityProtocol protocol = 
SecurityProtocol.forId(listener.securityProtocol());
+            if (protocol == null) {
+                throw new RuntimeException("Unknown security protocol " +
+                    (int) listener.securityProtocol() + " in listener " + 
listener.name());
+            }
+            listeners.put(listener.name(), new Endpoint(listener.name(),
+                protocol,
+                listener.host(),
+                listener.port()));
+        });
+        return new ListenerInfo(listeners);
+    }
+
+    /**
+     * Create a ListenerInfo from data in a BrokerRegistrationRequest RPC.
+     *
+     * @param collection    The RPC data.
+     *
+     * @return              The ListenerInfo object.
+     */
+    public static ListenerInfo fromBrokerRegistrationRequest(
+        BrokerRegistrationRequestData.ListenerCollection collection
+    ) {
+        LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>();
+        collection.forEach(listener -> {
+            SecurityProtocol protocol = 
SecurityProtocol.forId(listener.securityProtocol());
+            if (protocol == null) {
+                throw new RuntimeException("Unknown security protocol " +
+                        (int) listener.securityProtocol() + " in listener " + 
listener.name());
+            }
+            listeners.put(listener.name(), new Endpoint(listener.name(),
+                protocol,
+                listener.host(),
+                listener.port()));
+        });
+        return new ListenerInfo(listeners);
+    }
+
+    /**
+     * Create a ListenerInfo from data in a RegisterBrokerRecord.
+     *
+     * @param collection    The record data.
+     *
+     * @return              The ListenerInfo object.
+     */
+    public static ListenerInfo fromBrokerRegistrationRecord(
+        RegisterBrokerRecord.BrokerEndpointCollection collection
+    ) {
+        LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>();
+        collection.forEach(listener -> {
+            SecurityProtocol protocol = 
SecurityProtocol.forId(listener.securityProtocol());
+            if (protocol == null) {
+                throw new RuntimeException("Unknown security protocol " +
+                        (int) listener.securityProtocol() + " in listener " + 
listener.name());
+            }
+            listeners.put(listener.name(), new Endpoint(listener.name(),
+                    protocol,
+                    listener.host(),
+                    listener.port()));
+        });
+        return new ListenerInfo(listeners);
+    }
+
+    public static ListenerInfo create(
+        List<Endpoint> rawListeners
+    ) {
+        return create(Optional.empty(), rawListeners);
+    }
+
+    public static ListenerInfo create(
+        Optional<String> firstListenerName,
+        List<Endpoint> rawListeners
+    ) {
+        LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>();
+        for (Endpoint listener : rawListeners) {
+            String name = listener.listenerName().get();
+            if (Optional.of(name).equals(firstListenerName)) {
+                listeners.put(name, listener);
+            }
+        }
+        for (Endpoint listener : rawListeners) {
+            String name = listener.listenerName().get();
+            if (!Optional.of(name).equals(firstListenerName)) {
+                listeners.put(name, listener);
+            }
+        }
+        return new ListenerInfo(listeners);
+    }
+
+    /**
+     * An ordered map containing all of the listeners. The first listener is 
special, indicating
+     * either the inter-broker or inter-controller listener.
+     */
+    private final Map<String, Endpoint> listeners;
+
+    private ListenerInfo(Map<String, Endpoint> listeners) {
+        this.listeners = Collections.unmodifiableMap(listeners);
+    }
+
+    public Map<String, Endpoint> listeners() {
+        return listeners;
+    }
+
+    public Endpoint firstListener() {
+        return listeners.values().iterator().next();
+    }
+
+    /**
+     * Create a new ListenerInfo object where null or blank hostnames 
(signifying that the user
+     * asked to bind to 0.0.0.0) are replaced by specific hostnames.
+     *
+     * @return A new ListenerInfo object.
+     */
+    public ListenerInfo withWildcardHostnamesResolved() throws 
UnknownHostException {
+        LinkedHashMap<String, Endpoint> newListeners = new LinkedHashMap<>();
+        for (Map.Entry<String, Endpoint> entry : listeners.entrySet()) {
+            if (entry.getValue().host() == null || 
entry.getValue().host().trim().isEmpty()) {
+                String newHost = 
InetAddress.getLocalHost().getCanonicalHostName();
+                Endpoint prevEndpoint = entry.getValue();
+                newListeners.put(entry.getKey(), new 
Endpoint(prevEndpoint.listenerName().get(),
+                        prevEndpoint.securityProtocol(),
+                        newHost,
+                        prevEndpoint.port()));
+            } else {
+                newListeners.put(entry.getKey(), entry.getValue());
+            }
+        }
+        return new ListenerInfo(newListeners);
+    }
+
+    /**
+     * Create a new ListenerInfo object where ephemeral ports are populated 
with their true runtime
+     * values.
+     *
+     * In other words, if a port was set to 0, indicating that a random port 
should be assigned by the
+     * operating system, this function will replace it with the value the 
operating system actually
+     * chose.
+     *
+     * @param getBoundPortCallback  The callback used to correct ephemeral 
endpoints.
+     *
+     * @return A new ListenerInfo object.
+     */
+    public ListenerInfo withEphemeralPortsCorrected(Function<String, Integer> 
getBoundPortCallback) {
+        LinkedHashMap<String, Endpoint> newListeners = new LinkedHashMap<>();
+        for (Map.Entry<String, Endpoint> entry : listeners.entrySet()) {
+            if (entry.getValue().port() == 0) {
+                Endpoint prevEndpoint = entry.getValue();
+                int newPort = getBoundPortCallback.apply(entry.getKey());
+                checkPortIsSerializable(newPort);
+                newListeners.put(entry.getKey(), new 
Endpoint(prevEndpoint.listenerName().get(),
+                        prevEndpoint.securityProtocol(),
+                        prevEndpoint.host(),
+                        newPort));
+            } else {
+                newListeners.put(entry.getKey(), entry.getValue());
+            }
+        }
+        return new ListenerInfo(newListeners);
+    }
+
+    private static void checkPortIsSerializable(int port) {
+        if (port == 0) {
+            throw new RuntimeException("Cannot serialize ephemeral port 0 in 
ListenerInfo.");
+        } else if (port < 0) {
+            throw new RuntimeException("Cannot serialize negative port number 
" + port +
+                    " in ListenerInfo.");
+        } else if (port > 65535) {
+            throw new RuntimeException("Cannot serialize invalid port number " 
+ port +
+                    " in ListenerInfo.");

Review Comment:
   nit: too much indentation



##########
metadata/src/main/java/org/apache/kafka/metadata/ListenerInfo.java:
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.message.ControllerRegistrationRequestData;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RegisterControllerRecord;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Function;
+
+
+/**
+ * ListenerInfo contains information about the listeners of either a 
controller or a broker.
+ * ListenerInfo objects are immutable; they cannot be modified once created. 
The intention is
+ * that you store either controller listeners or broker listeners here, but 
not both. On a
+ * combined KRaft node, which has both broker and controller roles, you would 
have two
+ * separate ListenerInfo objects to represent the listeners of each role.
+ *
+ * Listener information is stored in a linked hash map. This maintains 
ordering while still
+ * allowing the traditional O(1) hash map access. By convention, the first 
listener is special,
+ * corresponding to either the inter-controller listener or the inter-broker 
listener.
+ * This is the only listener that other nodes will attempt to use to 
communicate with this node.
+ *
+ * You may wonder why nodes support multiple listeners, given that 
inter-cluster communication only
+ * ever uses the first one. Well, one reason is that external clients may wish 
to use the additional
+ * listeners. It is a good practice to separate external and internal traffic. 
In some cases,
+ * external traffic may be encrypted while internal traffic is not. (Although 
other admins may wish
+ * to encrypt everything.) Another reason is that supporting multiple 
listeners allows us to change
+ * the effective inter-cluster listener via a roll. During such a roll, half 
of the brokers
+ * (or controllers) might be using one listener, while the other half use 
another. This lets us,
+ * for example, transition from using a PLAINTEXT inter broker listener to 
using an SSL one without
+ * taking any downtime.
+ *
+ * The ListenerInfo class is intended to handle translating endpoint 
information between various
+ * different data structures, and also to handle the two big gotchas of Kafka 
endpoints.
+ *
+ * The first gotcha is that the hostname will be null or blank if we are 
listening on 0.0.0.0.
+ * The withWildcardHostnamesResolved function creates a ListenerInfo object 
where all such hostnames
+ * are replaced by specific hostnames. (It's not perfect because we have to 
choose a single hostname
+ * out of multiple possibilities. In production scenarios it would be better 
to set the desired
+ * hostname explicitly in the configuration rather than binding to 0.0.0.0.)
+ *
+ * The second gotcha is that if someone configures an ephemeral port (aka port 
0), we need to fill
+ * The withEphemeralPortsCorrected resolves this by filling in the missing 
information for ephemeral
+ * ports.
+ */
+final public class ListenerInfo {
+    /**
+     * Create a ListenerInfo from data in a ControllerRegistrationRequest RPC.
+     *
+     * @param collection    The RPC data.
+     *
+     * @return              The ListenerInfo object.
+     */
+    public static ListenerInfo fromControllerRegistrationRequest(
+        ControllerRegistrationRequestData.ListenerCollection collection
+    ) {
+        LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>();
+        collection.forEach(listener -> {
+            SecurityProtocol protocol = 
SecurityProtocol.forId(listener.securityProtocol());
+            if (protocol == null) {
+                throw new RuntimeException("Unknown security protocol " +
+                    (int) listener.securityProtocol() + " in listener " + 
listener.name());
+            }
+            listeners.put(listener.name(), new Endpoint(listener.name(),
+                protocol,
+                listener.host(),
+                listener.port()));
+        });
+        return new ListenerInfo(listeners);
+    }
+
+    /**
+     * Create a ListenerInfo from data in a RegisterControllerRecord.
+     *
+     * @param collection    The record data.
+     *
+     * @return              The ListenerInfo object.
+     */
+    public static ListenerInfo fromControllerRegistrationRecord(
+        RegisterControllerRecord.ControllerEndpointCollection collection
+    ) {
+        LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>();
+        collection.forEach(listener -> {
+            SecurityProtocol protocol = 
SecurityProtocol.forId(listener.securityProtocol());
+            if (protocol == null) {
+                throw new RuntimeException("Unknown security protocol " +
+                    (int) listener.securityProtocol() + " in listener " + 
listener.name());
+            }
+            listeners.put(listener.name(), new Endpoint(listener.name(),
+                protocol,
+                listener.host(),
+                listener.port()));
+        });
+        return new ListenerInfo(listeners);
+    }
+
+    /**
+     * Create a ListenerInfo from data in a BrokerRegistrationRequest RPC.
+     *
+     * @param collection    The RPC data.
+     *
+     * @return              The ListenerInfo object.
+     */
+    public static ListenerInfo fromBrokerRegistrationRequest(
+        BrokerRegistrationRequestData.ListenerCollection collection
+    ) {
+        LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>();
+        collection.forEach(listener -> {
+            SecurityProtocol protocol = 
SecurityProtocol.forId(listener.securityProtocol());
+            if (protocol == null) {
+                throw new RuntimeException("Unknown security protocol " +
+                        (int) listener.securityProtocol() + " in listener " + 
listener.name());
+            }
+            listeners.put(listener.name(), new Endpoint(listener.name(),
+                protocol,
+                listener.host(),
+                listener.port()));
+        });
+        return new ListenerInfo(listeners);
+    }
+
+    /**
+     * Create a ListenerInfo from data in a RegisterBrokerRecord.
+     *
+     * @param collection    The record data.
+     *
+     * @return              The ListenerInfo object.
+     */
+    public static ListenerInfo fromBrokerRegistrationRecord(
+        RegisterBrokerRecord.BrokerEndpointCollection collection
+    ) {
+        LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>();
+        collection.forEach(listener -> {
+            SecurityProtocol protocol = 
SecurityProtocol.forId(listener.securityProtocol());
+            if (protocol == null) {
+                throw new RuntimeException("Unknown security protocol " +
+                        (int) listener.securityProtocol() + " in listener " + 
listener.name());
+            }
+            listeners.put(listener.name(), new Endpoint(listener.name(),
+                    protocol,
+                    listener.host(),
+                    listener.port()));
+        });
+        return new ListenerInfo(listeners);
+    }
+
+    public static ListenerInfo create(
+        List<Endpoint> rawListeners
+    ) {
+        return create(Optional.empty(), rawListeners);
+    }
+
+    public static ListenerInfo create(
+        Optional<String> firstListenerName,
+        List<Endpoint> rawListeners
+    ) {
+        LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>();
+        for (Endpoint listener : rawListeners) {
+            String name = listener.listenerName().get();
+            if (Optional.of(name).equals(firstListenerName)) {
+                listeners.put(name, listener);
+            }
+        }
+        for (Endpoint listener : rawListeners) {
+            String name = listener.listenerName().get();
+            if (!Optional.of(name).equals(firstListenerName)) {
+                listeners.put(name, listener);
+            }
+        }
+        return new ListenerInfo(listeners);
+    }
+
+    /**
+     * An ordered map containing all of the listeners. The first listener is 
special, indicating
+     * either the inter-broker or inter-controller listener.
+     */
+    private final Map<String, Endpoint> listeners;
+
+    private ListenerInfo(Map<String, Endpoint> listeners) {
+        this.listeners = Collections.unmodifiableMap(listeners);
+    }
+
+    public Map<String, Endpoint> listeners() {
+        return listeners;
+    }
+
+    public Endpoint firstListener() {
+        return listeners.values().iterator().next();
+    }
+
+    /**
+     * Create a new ListenerInfo object where null or blank hostnames 
(signifying that the user
+     * asked to bind to 0.0.0.0) are replaced by specific hostnames.
+     *
+     * @return A new ListenerInfo object.
+     */
+    public ListenerInfo withWildcardHostnamesResolved() throws 
UnknownHostException {
+        LinkedHashMap<String, Endpoint> newListeners = new LinkedHashMap<>();
+        for (Map.Entry<String, Endpoint> entry : listeners.entrySet()) {
+            if (entry.getValue().host() == null || 
entry.getValue().host().trim().isEmpty()) {
+                String newHost = 
InetAddress.getLocalHost().getCanonicalHostName();

Review Comment:
   Should we log something to the user if we are resolving localhost as the 
host name? As you mention in the class javadoc, normally for production 
use-cases we expect the hostname to be explicitly set.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to