mumrah commented on code in PR #14376: URL: https://github.com/apache/kafka/pull/14376#discussion_r1324939918
########## metadata/src/main/java/org/apache/kafka/metadata/ListenerInfo.java: ########## @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata; + +import org.apache.kafka.common.Endpoint; +import org.apache.kafka.common.message.BrokerRegistrationRequestData; +import org.apache.kafka.common.message.ControllerRegistrationRequestData; +import org.apache.kafka.common.metadata.RegisterBrokerRecord; +import org.apache.kafka.common.metadata.RegisterControllerRecord; +import org.apache.kafka.common.security.auth.SecurityProtocol; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; + + +/** + * ListenerInfo contains information about the listeners of either a controller or a broker. + * ListenerInfo objects are immutable; they cannot be modified once created. The intention is + * that you store either controller listeners or broker listeners here, but not both. On a + * combined KRaft node, which has both broker and controller roles, you would have two + * separate ListenerInfo objects to represent the listeners of each role. + * + * Listener information is stored in a linked hash map. This maintains ordering while still + * allowing the traditional O(1) hash map access. By convention, the first listener is special, + * corresponding to either the inter-controller listener or the inter-broker listener. + * This is the only listener that other nodes will attempt to use to communicate with this node. + * + * You may wonder why nodes support multiple listeners, given that inter-cluster communication only + * ever uses the first one. Well, one reason is that external clients may wish to use the additional + * listeners. It is a good practice to separate external and internal traffic. In some cases, + * external traffic may be encrypted while internal traffic is not. (Although other admins may wish + * to encrypt everything.) Another reason is that supporting multiple listeners allows us to change + * the effective inter-cluster listener via a roll. During such a roll, half of the brokers + * (or controllers) might be using one listener, while the other half use another. This lets us, + * for example, transition from using a PLAINTEXT inter broker listener to using an SSL one without + * taking any downtime. + * + * The ListenerInfo class is intended to handle translating endpoint information between various + * different data structures, and also to handle the two big gotchas of Kafka endpoints. + * + * The first gotcha is that the hostname will be null or blank if we are listening on 0.0.0.0. + * The withWildcardHostnamesResolved function creates a ListenerInfo object where all such hostnames + * are replaced by specific hostnames. (It's not perfect because we have to choose a single hostname + * out of multiple possibilities. In production scenarios it would be better to set the desired + * hostname explicitly in the configuration rather than binding to 0.0.0.0.) + * + * The second gotcha is that if someone configures an ephemeral port (aka port 0), we need to fill + * The withEphemeralPortsCorrected resolves this by filling in the missing information for ephemeral Review Comment: seems some words are missing here? ########## metadata/src/main/java/org/apache/kafka/metadata/ListenerInfo.java: ########## @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata; + +import org.apache.kafka.common.Endpoint; +import org.apache.kafka.common.message.BrokerRegistrationRequestData; +import org.apache.kafka.common.message.ControllerRegistrationRequestData; +import org.apache.kafka.common.metadata.RegisterBrokerRecord; +import org.apache.kafka.common.metadata.RegisterControllerRecord; +import org.apache.kafka.common.security.auth.SecurityProtocol; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; + + +/** + * ListenerInfo contains information about the listeners of either a controller or a broker. + * ListenerInfo objects are immutable; they cannot be modified once created. The intention is + * that you store either controller listeners or broker listeners here, but not both. On a + * combined KRaft node, which has both broker and controller roles, you would have two + * separate ListenerInfo objects to represent the listeners of each role. + * + * Listener information is stored in a linked hash map. This maintains ordering while still + * allowing the traditional O(1) hash map access. By convention, the first listener is special, + * corresponding to either the inter-controller listener or the inter-broker listener. + * This is the only listener that other nodes will attempt to use to communicate with this node. + * + * You may wonder why nodes support multiple listeners, given that inter-cluster communication only + * ever uses the first one. Well, one reason is that external clients may wish to use the additional + * listeners. It is a good practice to separate external and internal traffic. In some cases, + * external traffic may be encrypted while internal traffic is not. (Although other admins may wish + * to encrypt everything.) Another reason is that supporting multiple listeners allows us to change + * the effective inter-cluster listener via a roll. During such a roll, half of the brokers + * (or controllers) might be using one listener, while the other half use another. This lets us, + * for example, transition from using a PLAINTEXT inter broker listener to using an SSL one without + * taking any downtime. + * + * The ListenerInfo class is intended to handle translating endpoint information between various + * different data structures, and also to handle the two big gotchas of Kafka endpoints. + * + * The first gotcha is that the hostname will be null or blank if we are listening on 0.0.0.0. + * The withWildcardHostnamesResolved function creates a ListenerInfo object where all such hostnames + * are replaced by specific hostnames. (It's not perfect because we have to choose a single hostname + * out of multiple possibilities. In production scenarios it would be better to set the desired + * hostname explicitly in the configuration rather than binding to 0.0.0.0.) + * + * The second gotcha is that if someone configures an ephemeral port (aka port 0), we need to fill + * The withEphemeralPortsCorrected resolves this by filling in the missing information for ephemeral + * ports. + */ +final public class ListenerInfo { + /** + * Create a ListenerInfo from data in a ControllerRegistrationRequest RPC. + * + * @param collection The RPC data. + * + * @return The ListenerInfo object. + */ + public static ListenerInfo fromControllerRegistrationRequest( + ControllerRegistrationRequestData.ListenerCollection collection + ) { + LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>(); + collection.forEach(listener -> { + SecurityProtocol protocol = SecurityProtocol.forId(listener.securityProtocol()); + if (protocol == null) { + throw new RuntimeException("Unknown security protocol " + + (int) listener.securityProtocol() + " in listener " + listener.name()); + } + listeners.put(listener.name(), new Endpoint(listener.name(), + protocol, + listener.host(), + listener.port())); + }); + return new ListenerInfo(listeners); + } + + /** + * Create a ListenerInfo from data in a RegisterControllerRecord. + * + * @param collection The record data. + * + * @return The ListenerInfo object. + */ + public static ListenerInfo fromControllerRegistrationRecord( + RegisterControllerRecord.ControllerEndpointCollection collection + ) { + LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>(); + collection.forEach(listener -> { + SecurityProtocol protocol = SecurityProtocol.forId(listener.securityProtocol()); + if (protocol == null) { + throw new RuntimeException("Unknown security protocol " + + (int) listener.securityProtocol() + " in listener " + listener.name()); + } + listeners.put(listener.name(), new Endpoint(listener.name(), + protocol, + listener.host(), + listener.port())); + }); + return new ListenerInfo(listeners); + } + + /** + * Create a ListenerInfo from data in a BrokerRegistrationRequest RPC. + * + * @param collection The RPC data. + * + * @return The ListenerInfo object. + */ + public static ListenerInfo fromBrokerRegistrationRequest( + BrokerRegistrationRequestData.ListenerCollection collection + ) { + LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>(); + collection.forEach(listener -> { + SecurityProtocol protocol = SecurityProtocol.forId(listener.securityProtocol()); + if (protocol == null) { + throw new RuntimeException("Unknown security protocol " + + (int) listener.securityProtocol() + " in listener " + listener.name()); + } + listeners.put(listener.name(), new Endpoint(listener.name(), + protocol, + listener.host(), + listener.port())); + }); + return new ListenerInfo(listeners); + } + + /** + * Create a ListenerInfo from data in a RegisterBrokerRecord. + * + * @param collection The record data. + * + * @return The ListenerInfo object. + */ + public static ListenerInfo fromBrokerRegistrationRecord( + RegisterBrokerRecord.BrokerEndpointCollection collection + ) { + LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>(); + collection.forEach(listener -> { + SecurityProtocol protocol = SecurityProtocol.forId(listener.securityProtocol()); + if (protocol == null) { + throw new RuntimeException("Unknown security protocol " + + (int) listener.securityProtocol() + " in listener " + listener.name()); + } + listeners.put(listener.name(), new Endpoint(listener.name(), + protocol, + listener.host(), + listener.port())); + }); + return new ListenerInfo(listeners); + } + + public static ListenerInfo create( + List<Endpoint> rawListeners + ) { + return create(Optional.empty(), rawListeners); + } + + public static ListenerInfo create( + Optional<String> firstListenerName, + List<Endpoint> rawListeners + ) { + LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>(); + for (Endpoint listener : rawListeners) { + String name = listener.listenerName().get(); + if (Optional.of(name).equals(firstListenerName)) { + listeners.put(name, listener); Review Comment: Can probably break here if you find the first listener. ########## metadata/src/main/java/org/apache/kafka/metadata/ListenerInfo.java: ########## @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata; + +import org.apache.kafka.common.Endpoint; +import org.apache.kafka.common.message.BrokerRegistrationRequestData; +import org.apache.kafka.common.message.ControllerRegistrationRequestData; +import org.apache.kafka.common.metadata.RegisterBrokerRecord; +import org.apache.kafka.common.metadata.RegisterControllerRecord; +import org.apache.kafka.common.security.auth.SecurityProtocol; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; + + +/** + * ListenerInfo contains information about the listeners of either a controller or a broker. + * ListenerInfo objects are immutable; they cannot be modified once created. The intention is + * that you store either controller listeners or broker listeners here, but not both. On a + * combined KRaft node, which has both broker and controller roles, you would have two + * separate ListenerInfo objects to represent the listeners of each role. + * + * Listener information is stored in a linked hash map. This maintains ordering while still + * allowing the traditional O(1) hash map access. By convention, the first listener is special, + * corresponding to either the inter-controller listener or the inter-broker listener. + * This is the only listener that other nodes will attempt to use to communicate with this node. + * + * You may wonder why nodes support multiple listeners, given that inter-cluster communication only + * ever uses the first one. Well, one reason is that external clients may wish to use the additional + * listeners. It is a good practice to separate external and internal traffic. In some cases, + * external traffic may be encrypted while internal traffic is not. (Although other admins may wish + * to encrypt everything.) Another reason is that supporting multiple listeners allows us to change + * the effective inter-cluster listener via a roll. During such a roll, half of the brokers + * (or controllers) might be using one listener, while the other half use another. This lets us, + * for example, transition from using a PLAINTEXT inter broker listener to using an SSL one without + * taking any downtime. + * + * The ListenerInfo class is intended to handle translating endpoint information between various + * different data structures, and also to handle the two big gotchas of Kafka endpoints. + * + * The first gotcha is that the hostname will be null or blank if we are listening on 0.0.0.0. + * The withWildcardHostnamesResolved function creates a ListenerInfo object where all such hostnames + * are replaced by specific hostnames. (It's not perfect because we have to choose a single hostname + * out of multiple possibilities. In production scenarios it would be better to set the desired + * hostname explicitly in the configuration rather than binding to 0.0.0.0.) + * + * The second gotcha is that if someone configures an ephemeral port (aka port 0), we need to fill + * The withEphemeralPortsCorrected resolves this by filling in the missing information for ephemeral + * ports. + */ +final public class ListenerInfo { + /** + * Create a ListenerInfo from data in a ControllerRegistrationRequest RPC. + * + * @param collection The RPC data. + * + * @return The ListenerInfo object. + */ + public static ListenerInfo fromControllerRegistrationRequest( + ControllerRegistrationRequestData.ListenerCollection collection + ) { + LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>(); + collection.forEach(listener -> { + SecurityProtocol protocol = SecurityProtocol.forId(listener.securityProtocol()); + if (protocol == null) { + throw new RuntimeException("Unknown security protocol " + + (int) listener.securityProtocol() + " in listener " + listener.name()); + } + listeners.put(listener.name(), new Endpoint(listener.name(), + protocol, + listener.host(), + listener.port())); + }); + return new ListenerInfo(listeners); + } + + /** + * Create a ListenerInfo from data in a RegisterControllerRecord. + * + * @param collection The record data. + * + * @return The ListenerInfo object. + */ + public static ListenerInfo fromControllerRegistrationRecord( + RegisterControllerRecord.ControllerEndpointCollection collection + ) { + LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>(); + collection.forEach(listener -> { + SecurityProtocol protocol = SecurityProtocol.forId(listener.securityProtocol()); + if (protocol == null) { + throw new RuntimeException("Unknown security protocol " + + (int) listener.securityProtocol() + " in listener " + listener.name()); + } + listeners.put(listener.name(), new Endpoint(listener.name(), + protocol, + listener.host(), + listener.port())); + }); + return new ListenerInfo(listeners); + } + + /** + * Create a ListenerInfo from data in a BrokerRegistrationRequest RPC. + * + * @param collection The RPC data. + * + * @return The ListenerInfo object. + */ + public static ListenerInfo fromBrokerRegistrationRequest( + BrokerRegistrationRequestData.ListenerCollection collection + ) { + LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>(); + collection.forEach(listener -> { + SecurityProtocol protocol = SecurityProtocol.forId(listener.securityProtocol()); + if (protocol == null) { + throw new RuntimeException("Unknown security protocol " + + (int) listener.securityProtocol() + " in listener " + listener.name()); + } + listeners.put(listener.name(), new Endpoint(listener.name(), + protocol, + listener.host(), + listener.port())); + }); + return new ListenerInfo(listeners); + } + + /** + * Create a ListenerInfo from data in a RegisterBrokerRecord. + * + * @param collection The record data. + * + * @return The ListenerInfo object. + */ + public static ListenerInfo fromBrokerRegistrationRecord( + RegisterBrokerRecord.BrokerEndpointCollection collection + ) { + LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>(); + collection.forEach(listener -> { + SecurityProtocol protocol = SecurityProtocol.forId(listener.securityProtocol()); + if (protocol == null) { + throw new RuntimeException("Unknown security protocol " + + (int) listener.securityProtocol() + " in listener " + listener.name()); + } + listeners.put(listener.name(), new Endpoint(listener.name(), + protocol, + listener.host(), + listener.port())); + }); + return new ListenerInfo(listeners); + } + + public static ListenerInfo create( + List<Endpoint> rawListeners + ) { + return create(Optional.empty(), rawListeners); + } + + public static ListenerInfo create( + Optional<String> firstListenerName, + List<Endpoint> rawListeners + ) { + LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>(); + for (Endpoint listener : rawListeners) { + String name = listener.listenerName().get(); + if (Optional.of(name).equals(firstListenerName)) { + listeners.put(name, listener); + } + } + for (Endpoint listener : rawListeners) { + String name = listener.listenerName().get(); + if (!Optional.of(name).equals(firstListenerName)) { + listeners.put(name, listener); + } + } + return new ListenerInfo(listeners); + } + + /** + * An ordered map containing all of the listeners. The first listener is special, indicating + * either the inter-broker or inter-controller listener. + */ + private final Map<String, Endpoint> listeners; + + private ListenerInfo(Map<String, Endpoint> listeners) { + this.listeners = Collections.unmodifiableMap(listeners); + } + + public Map<String, Endpoint> listeners() { + return listeners; + } + + public Endpoint firstListener() { + return listeners.values().iterator().next(); + } + + /** + * Create a new ListenerInfo object where null or blank hostnames (signifying that the user + * asked to bind to 0.0.0.0) are replaced by specific hostnames. + * + * @return A new ListenerInfo object. + */ + public ListenerInfo withWildcardHostnamesResolved() throws UnknownHostException { + LinkedHashMap<String, Endpoint> newListeners = new LinkedHashMap<>(); + for (Map.Entry<String, Endpoint> entry : listeners.entrySet()) { + if (entry.getValue().host() == null || entry.getValue().host().trim().isEmpty()) { + String newHost = InetAddress.getLocalHost().getCanonicalHostName(); + Endpoint prevEndpoint = entry.getValue(); + newListeners.put(entry.getKey(), new Endpoint(prevEndpoint.listenerName().get(), + prevEndpoint.securityProtocol(), + newHost, + prevEndpoint.port())); + } else { + newListeners.put(entry.getKey(), entry.getValue()); + } + } + return new ListenerInfo(newListeners); + } + + /** + * Create a new ListenerInfo object where ephemeral ports are populated with their true runtime + * values. + * + * In other words, if a port was set to 0, indicating that a random port should be assigned by the + * operating system, this function will replace it with the value the operating system actually + * chose. + * + * @param getBoundPortCallback The callback used to correct ephemeral endpoints. + * + * @return A new ListenerInfo object. + */ + public ListenerInfo withEphemeralPortsCorrected(Function<String, Integer> getBoundPortCallback) { + LinkedHashMap<String, Endpoint> newListeners = new LinkedHashMap<>(); + for (Map.Entry<String, Endpoint> entry : listeners.entrySet()) { + if (entry.getValue().port() == 0) { + Endpoint prevEndpoint = entry.getValue(); + int newPort = getBoundPortCallback.apply(entry.getKey()); Review Comment: Same logging question as above ########## metadata/src/main/java/org/apache/kafka/metadata/ListenerInfo.java: ########## @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata; + +import org.apache.kafka.common.Endpoint; +import org.apache.kafka.common.message.BrokerRegistrationRequestData; +import org.apache.kafka.common.message.ControllerRegistrationRequestData; +import org.apache.kafka.common.metadata.RegisterBrokerRecord; +import org.apache.kafka.common.metadata.RegisterControllerRecord; +import org.apache.kafka.common.security.auth.SecurityProtocol; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; + + +/** + * ListenerInfo contains information about the listeners of either a controller or a broker. + * ListenerInfo objects are immutable; they cannot be modified once created. The intention is + * that you store either controller listeners or broker listeners here, but not both. On a + * combined KRaft node, which has both broker and controller roles, you would have two + * separate ListenerInfo objects to represent the listeners of each role. + * + * Listener information is stored in a linked hash map. This maintains ordering while still + * allowing the traditional O(1) hash map access. By convention, the first listener is special, + * corresponding to either the inter-controller listener or the inter-broker listener. + * This is the only listener that other nodes will attempt to use to communicate with this node. + * + * You may wonder why nodes support multiple listeners, given that inter-cluster communication only + * ever uses the first one. Well, one reason is that external clients may wish to use the additional + * listeners. It is a good practice to separate external and internal traffic. In some cases, + * external traffic may be encrypted while internal traffic is not. (Although other admins may wish + * to encrypt everything.) Another reason is that supporting multiple listeners allows us to change + * the effective inter-cluster listener via a roll. During such a roll, half of the brokers + * (or controllers) might be using one listener, while the other half use another. This lets us, + * for example, transition from using a PLAINTEXT inter broker listener to using an SSL one without + * taking any downtime. + * + * The ListenerInfo class is intended to handle translating endpoint information between various + * different data structures, and also to handle the two big gotchas of Kafka endpoints. + * + * The first gotcha is that the hostname will be null or blank if we are listening on 0.0.0.0. + * The withWildcardHostnamesResolved function creates a ListenerInfo object where all such hostnames + * are replaced by specific hostnames. (It's not perfect because we have to choose a single hostname + * out of multiple possibilities. In production scenarios it would be better to set the desired + * hostname explicitly in the configuration rather than binding to 0.0.0.0.) + * + * The second gotcha is that if someone configures an ephemeral port (aka port 0), we need to fill + * The withEphemeralPortsCorrected resolves this by filling in the missing information for ephemeral + * ports. + */ +final public class ListenerInfo { + /** + * Create a ListenerInfo from data in a ControllerRegistrationRequest RPC. + * + * @param collection The RPC data. + * + * @return The ListenerInfo object. + */ + public static ListenerInfo fromControllerRegistrationRequest( + ControllerRegistrationRequestData.ListenerCollection collection + ) { + LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>(); + collection.forEach(listener -> { + SecurityProtocol protocol = SecurityProtocol.forId(listener.securityProtocol()); + if (protocol == null) { + throw new RuntimeException("Unknown security protocol " + + (int) listener.securityProtocol() + " in listener " + listener.name()); + } + listeners.put(listener.name(), new Endpoint(listener.name(), + protocol, + listener.host(), + listener.port())); + }); + return new ListenerInfo(listeners); + } + + /** + * Create a ListenerInfo from data in a RegisterControllerRecord. + * + * @param collection The record data. + * + * @return The ListenerInfo object. + */ + public static ListenerInfo fromControllerRegistrationRecord( + RegisterControllerRecord.ControllerEndpointCollection collection + ) { + LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>(); + collection.forEach(listener -> { + SecurityProtocol protocol = SecurityProtocol.forId(listener.securityProtocol()); + if (protocol == null) { + throw new RuntimeException("Unknown security protocol " + + (int) listener.securityProtocol() + " in listener " + listener.name()); + } + listeners.put(listener.name(), new Endpoint(listener.name(), + protocol, + listener.host(), + listener.port())); + }); + return new ListenerInfo(listeners); + } + + /** + * Create a ListenerInfo from data in a BrokerRegistrationRequest RPC. + * + * @param collection The RPC data. + * + * @return The ListenerInfo object. + */ + public static ListenerInfo fromBrokerRegistrationRequest( + BrokerRegistrationRequestData.ListenerCollection collection + ) { + LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>(); + collection.forEach(listener -> { + SecurityProtocol protocol = SecurityProtocol.forId(listener.securityProtocol()); + if (protocol == null) { + throw new RuntimeException("Unknown security protocol " + + (int) listener.securityProtocol() + " in listener " + listener.name()); + } + listeners.put(listener.name(), new Endpoint(listener.name(), + protocol, + listener.host(), + listener.port())); + }); + return new ListenerInfo(listeners); + } + + /** + * Create a ListenerInfo from data in a RegisterBrokerRecord. + * + * @param collection The record data. + * + * @return The ListenerInfo object. + */ + public static ListenerInfo fromBrokerRegistrationRecord( + RegisterBrokerRecord.BrokerEndpointCollection collection + ) { + LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>(); + collection.forEach(listener -> { + SecurityProtocol protocol = SecurityProtocol.forId(listener.securityProtocol()); + if (protocol == null) { + throw new RuntimeException("Unknown security protocol " + + (int) listener.securityProtocol() + " in listener " + listener.name()); + } + listeners.put(listener.name(), new Endpoint(listener.name(), + protocol, + listener.host(), + listener.port())); + }); + return new ListenerInfo(listeners); + } + + public static ListenerInfo create( + List<Endpoint> rawListeners + ) { + return create(Optional.empty(), rawListeners); + } + + public static ListenerInfo create( + Optional<String> firstListenerName, + List<Endpoint> rawListeners + ) { + LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>(); + for (Endpoint listener : rawListeners) { + String name = listener.listenerName().get(); + if (Optional.of(name).equals(firstListenerName)) { + listeners.put(name, listener); + } + } + for (Endpoint listener : rawListeners) { + String name = listener.listenerName().get(); + if (!Optional.of(name).equals(firstListenerName)) { + listeners.put(name, listener); + } + } + return new ListenerInfo(listeners); + } + + /** + * An ordered map containing all of the listeners. The first listener is special, indicating + * either the inter-broker or inter-controller listener. + */ + private final Map<String, Endpoint> listeners; + + private ListenerInfo(Map<String, Endpoint> listeners) { + this.listeners = Collections.unmodifiableMap(listeners); + } + + public Map<String, Endpoint> listeners() { + return listeners; + } + + public Endpoint firstListener() { + return listeners.values().iterator().next(); Review Comment: Is the iterator guaranteed to be non-empty? ########## metadata/src/main/java/org/apache/kafka/metadata/ListenerInfo.java: ########## @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata; + +import org.apache.kafka.common.Endpoint; +import org.apache.kafka.common.message.BrokerRegistrationRequestData; +import org.apache.kafka.common.message.ControllerRegistrationRequestData; +import org.apache.kafka.common.metadata.RegisterBrokerRecord; +import org.apache.kafka.common.metadata.RegisterControllerRecord; +import org.apache.kafka.common.security.auth.SecurityProtocol; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; + + +/** + * ListenerInfo contains information about the listeners of either a controller or a broker. + * ListenerInfo objects are immutable; they cannot be modified once created. The intention is + * that you store either controller listeners or broker listeners here, but not both. On a + * combined KRaft node, which has both broker and controller roles, you would have two + * separate ListenerInfo objects to represent the listeners of each role. + * + * Listener information is stored in a linked hash map. This maintains ordering while still + * allowing the traditional O(1) hash map access. By convention, the first listener is special, + * corresponding to either the inter-controller listener or the inter-broker listener. + * This is the only listener that other nodes will attempt to use to communicate with this node. + * + * You may wonder why nodes support multiple listeners, given that inter-cluster communication only + * ever uses the first one. Well, one reason is that external clients may wish to use the additional + * listeners. It is a good practice to separate external and internal traffic. In some cases, + * external traffic may be encrypted while internal traffic is not. (Although other admins may wish + * to encrypt everything.) Another reason is that supporting multiple listeners allows us to change + * the effective inter-cluster listener via a roll. During such a roll, half of the brokers + * (or controllers) might be using one listener, while the other half use another. This lets us, + * for example, transition from using a PLAINTEXT inter broker listener to using an SSL one without + * taking any downtime. + * + * The ListenerInfo class is intended to handle translating endpoint information between various + * different data structures, and also to handle the two big gotchas of Kafka endpoints. + * + * The first gotcha is that the hostname will be null or blank if we are listening on 0.0.0.0. + * The withWildcardHostnamesResolved function creates a ListenerInfo object where all such hostnames + * are replaced by specific hostnames. (It's not perfect because we have to choose a single hostname + * out of multiple possibilities. In production scenarios it would be better to set the desired + * hostname explicitly in the configuration rather than binding to 0.0.0.0.) + * + * The second gotcha is that if someone configures an ephemeral port (aka port 0), we need to fill + * The withEphemeralPortsCorrected resolves this by filling in the missing information for ephemeral + * ports. + */ +final public class ListenerInfo { + /** + * Create a ListenerInfo from data in a ControllerRegistrationRequest RPC. + * + * @param collection The RPC data. + * + * @return The ListenerInfo object. + */ + public static ListenerInfo fromControllerRegistrationRequest( + ControllerRegistrationRequestData.ListenerCollection collection + ) { + LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>(); + collection.forEach(listener -> { + SecurityProtocol protocol = SecurityProtocol.forId(listener.securityProtocol()); + if (protocol == null) { + throw new RuntimeException("Unknown security protocol " + + (int) listener.securityProtocol() + " in listener " + listener.name()); + } + listeners.put(listener.name(), new Endpoint(listener.name(), + protocol, + listener.host(), + listener.port())); + }); + return new ListenerInfo(listeners); + } + + /** + * Create a ListenerInfo from data in a RegisterControllerRecord. + * + * @param collection The record data. + * + * @return The ListenerInfo object. + */ + public static ListenerInfo fromControllerRegistrationRecord( + RegisterControllerRecord.ControllerEndpointCollection collection + ) { + LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>(); + collection.forEach(listener -> { + SecurityProtocol protocol = SecurityProtocol.forId(listener.securityProtocol()); + if (protocol == null) { + throw new RuntimeException("Unknown security protocol " + + (int) listener.securityProtocol() + " in listener " + listener.name()); + } + listeners.put(listener.name(), new Endpoint(listener.name(), + protocol, + listener.host(), + listener.port())); + }); + return new ListenerInfo(listeners); + } + + /** + * Create a ListenerInfo from data in a BrokerRegistrationRequest RPC. + * + * @param collection The RPC data. + * + * @return The ListenerInfo object. + */ + public static ListenerInfo fromBrokerRegistrationRequest( + BrokerRegistrationRequestData.ListenerCollection collection + ) { + LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>(); + collection.forEach(listener -> { + SecurityProtocol protocol = SecurityProtocol.forId(listener.securityProtocol()); + if (protocol == null) { + throw new RuntimeException("Unknown security protocol " + + (int) listener.securityProtocol() + " in listener " + listener.name()); + } + listeners.put(listener.name(), new Endpoint(listener.name(), + protocol, + listener.host(), + listener.port())); + }); + return new ListenerInfo(listeners); + } + + /** + * Create a ListenerInfo from data in a RegisterBrokerRecord. + * + * @param collection The record data. + * + * @return The ListenerInfo object. + */ + public static ListenerInfo fromBrokerRegistrationRecord( + RegisterBrokerRecord.BrokerEndpointCollection collection + ) { + LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>(); + collection.forEach(listener -> { + SecurityProtocol protocol = SecurityProtocol.forId(listener.securityProtocol()); + if (protocol == null) { + throw new RuntimeException("Unknown security protocol " + + (int) listener.securityProtocol() + " in listener " + listener.name()); + } + listeners.put(listener.name(), new Endpoint(listener.name(), + protocol, + listener.host(), + listener.port())); + }); + return new ListenerInfo(listeners); + } + + public static ListenerInfo create( + List<Endpoint> rawListeners + ) { + return create(Optional.empty(), rawListeners); + } + + public static ListenerInfo create( + Optional<String> firstListenerName, + List<Endpoint> rawListeners + ) { + LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>(); + for (Endpoint listener : rawListeners) { + String name = listener.listenerName().get(); + if (Optional.of(name).equals(firstListenerName)) { + listeners.put(name, listener); + } + } + for (Endpoint listener : rawListeners) { + String name = listener.listenerName().get(); + if (!Optional.of(name).equals(firstListenerName)) { + listeners.put(name, listener); + } + } + return new ListenerInfo(listeners); + } + + /** + * An ordered map containing all of the listeners. The first listener is special, indicating + * either the inter-broker or inter-controller listener. + */ + private final Map<String, Endpoint> listeners; + + private ListenerInfo(Map<String, Endpoint> listeners) { + this.listeners = Collections.unmodifiableMap(listeners); + } + + public Map<String, Endpoint> listeners() { + return listeners; + } + + public Endpoint firstListener() { + return listeners.values().iterator().next(); + } + + /** + * Create a new ListenerInfo object where null or blank hostnames (signifying that the user + * asked to bind to 0.0.0.0) are replaced by specific hostnames. + * + * @return A new ListenerInfo object. + */ + public ListenerInfo withWildcardHostnamesResolved() throws UnknownHostException { + LinkedHashMap<String, Endpoint> newListeners = new LinkedHashMap<>(); + for (Map.Entry<String, Endpoint> entry : listeners.entrySet()) { + if (entry.getValue().host() == null || entry.getValue().host().trim().isEmpty()) { + String newHost = InetAddress.getLocalHost().getCanonicalHostName(); + Endpoint prevEndpoint = entry.getValue(); + newListeners.put(entry.getKey(), new Endpoint(prevEndpoint.listenerName().get(), + prevEndpoint.securityProtocol(), + newHost, + prevEndpoint.port())); + } else { + newListeners.put(entry.getKey(), entry.getValue()); + } + } + return new ListenerInfo(newListeners); + } + + /** + * Create a new ListenerInfo object where ephemeral ports are populated with their true runtime + * values. + * + * In other words, if a port was set to 0, indicating that a random port should be assigned by the + * operating system, this function will replace it with the value the operating system actually + * chose. + * + * @param getBoundPortCallback The callback used to correct ephemeral endpoints. + * + * @return A new ListenerInfo object. + */ + public ListenerInfo withEphemeralPortsCorrected(Function<String, Integer> getBoundPortCallback) { + LinkedHashMap<String, Endpoint> newListeners = new LinkedHashMap<>(); + for (Map.Entry<String, Endpoint> entry : listeners.entrySet()) { + if (entry.getValue().port() == 0) { + Endpoint prevEndpoint = entry.getValue(); + int newPort = getBoundPortCallback.apply(entry.getKey()); + checkPortIsSerializable(newPort); + newListeners.put(entry.getKey(), new Endpoint(prevEndpoint.listenerName().get(), + prevEndpoint.securityProtocol(), + prevEndpoint.host(), + newPort)); + } else { + newListeners.put(entry.getKey(), entry.getValue()); + } + } + return new ListenerInfo(newListeners); + } + + private static void checkPortIsSerializable(int port) { + if (port == 0) { + throw new RuntimeException("Cannot serialize ephemeral port 0 in ListenerInfo."); + } else if (port < 0) { + throw new RuntimeException("Cannot serialize negative port number " + port + + " in ListenerInfo."); + } else if (port > 65535) { + throw new RuntimeException("Cannot serialize invalid port number " + port + + " in ListenerInfo."); Review Comment: nit: too much indentation ########## metadata/src/main/java/org/apache/kafka/metadata/ListenerInfo.java: ########## @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata; + +import org.apache.kafka.common.Endpoint; +import org.apache.kafka.common.message.BrokerRegistrationRequestData; +import org.apache.kafka.common.message.ControllerRegistrationRequestData; +import org.apache.kafka.common.metadata.RegisterBrokerRecord; +import org.apache.kafka.common.metadata.RegisterControllerRecord; +import org.apache.kafka.common.security.auth.SecurityProtocol; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; + + +/** + * ListenerInfo contains information about the listeners of either a controller or a broker. + * ListenerInfo objects are immutable; they cannot be modified once created. The intention is + * that you store either controller listeners or broker listeners here, but not both. On a + * combined KRaft node, which has both broker and controller roles, you would have two + * separate ListenerInfo objects to represent the listeners of each role. + * + * Listener information is stored in a linked hash map. This maintains ordering while still + * allowing the traditional O(1) hash map access. By convention, the first listener is special, + * corresponding to either the inter-controller listener or the inter-broker listener. + * This is the only listener that other nodes will attempt to use to communicate with this node. + * + * You may wonder why nodes support multiple listeners, given that inter-cluster communication only + * ever uses the first one. Well, one reason is that external clients may wish to use the additional + * listeners. It is a good practice to separate external and internal traffic. In some cases, + * external traffic may be encrypted while internal traffic is not. (Although other admins may wish + * to encrypt everything.) Another reason is that supporting multiple listeners allows us to change + * the effective inter-cluster listener via a roll. During such a roll, half of the brokers + * (or controllers) might be using one listener, while the other half use another. This lets us, + * for example, transition from using a PLAINTEXT inter broker listener to using an SSL one without + * taking any downtime. + * + * The ListenerInfo class is intended to handle translating endpoint information between various + * different data structures, and also to handle the two big gotchas of Kafka endpoints. + * + * The first gotcha is that the hostname will be null or blank if we are listening on 0.0.0.0. + * The withWildcardHostnamesResolved function creates a ListenerInfo object where all such hostnames + * are replaced by specific hostnames. (It's not perfect because we have to choose a single hostname + * out of multiple possibilities. In production scenarios it would be better to set the desired + * hostname explicitly in the configuration rather than binding to 0.0.0.0.) + * + * The second gotcha is that if someone configures an ephemeral port (aka port 0), we need to fill + * The withEphemeralPortsCorrected resolves this by filling in the missing information for ephemeral + * ports. + */ +final public class ListenerInfo { + /** + * Create a ListenerInfo from data in a ControllerRegistrationRequest RPC. + * + * @param collection The RPC data. + * + * @return The ListenerInfo object. + */ + public static ListenerInfo fromControllerRegistrationRequest( + ControllerRegistrationRequestData.ListenerCollection collection + ) { + LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>(); + collection.forEach(listener -> { + SecurityProtocol protocol = SecurityProtocol.forId(listener.securityProtocol()); + if (protocol == null) { + throw new RuntimeException("Unknown security protocol " + + (int) listener.securityProtocol() + " in listener " + listener.name()); + } + listeners.put(listener.name(), new Endpoint(listener.name(), + protocol, + listener.host(), + listener.port())); + }); + return new ListenerInfo(listeners); + } + + /** + * Create a ListenerInfo from data in a RegisterControllerRecord. + * + * @param collection The record data. + * + * @return The ListenerInfo object. + */ + public static ListenerInfo fromControllerRegistrationRecord( + RegisterControllerRecord.ControllerEndpointCollection collection + ) { + LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>(); + collection.forEach(listener -> { + SecurityProtocol protocol = SecurityProtocol.forId(listener.securityProtocol()); + if (protocol == null) { + throw new RuntimeException("Unknown security protocol " + + (int) listener.securityProtocol() + " in listener " + listener.name()); + } + listeners.put(listener.name(), new Endpoint(listener.name(), + protocol, + listener.host(), + listener.port())); + }); + return new ListenerInfo(listeners); + } + + /** + * Create a ListenerInfo from data in a BrokerRegistrationRequest RPC. + * + * @param collection The RPC data. + * + * @return The ListenerInfo object. + */ + public static ListenerInfo fromBrokerRegistrationRequest( + BrokerRegistrationRequestData.ListenerCollection collection + ) { + LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>(); + collection.forEach(listener -> { + SecurityProtocol protocol = SecurityProtocol.forId(listener.securityProtocol()); + if (protocol == null) { + throw new RuntimeException("Unknown security protocol " + + (int) listener.securityProtocol() + " in listener " + listener.name()); + } + listeners.put(listener.name(), new Endpoint(listener.name(), + protocol, + listener.host(), + listener.port())); + }); + return new ListenerInfo(listeners); + } + + /** + * Create a ListenerInfo from data in a RegisterBrokerRecord. + * + * @param collection The record data. + * + * @return The ListenerInfo object. + */ + public static ListenerInfo fromBrokerRegistrationRecord( + RegisterBrokerRecord.BrokerEndpointCollection collection + ) { + LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>(); + collection.forEach(listener -> { + SecurityProtocol protocol = SecurityProtocol.forId(listener.securityProtocol()); + if (protocol == null) { + throw new RuntimeException("Unknown security protocol " + + (int) listener.securityProtocol() + " in listener " + listener.name()); + } + listeners.put(listener.name(), new Endpoint(listener.name(), + protocol, + listener.host(), + listener.port())); + }); + return new ListenerInfo(listeners); + } + + public static ListenerInfo create( + List<Endpoint> rawListeners + ) { + return create(Optional.empty(), rawListeners); + } + + public static ListenerInfo create( + Optional<String> firstListenerName, + List<Endpoint> rawListeners + ) { + LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>(); + for (Endpoint listener : rawListeners) { + String name = listener.listenerName().get(); + if (Optional.of(name).equals(firstListenerName)) { + listeners.put(name, listener); + } + } + for (Endpoint listener : rawListeners) { + String name = listener.listenerName().get(); + if (!Optional.of(name).equals(firstListenerName)) { + listeners.put(name, listener); + } + } + return new ListenerInfo(listeners); + } + + /** + * An ordered map containing all of the listeners. The first listener is special, indicating + * either the inter-broker or inter-controller listener. + */ + private final Map<String, Endpoint> listeners; + + private ListenerInfo(Map<String, Endpoint> listeners) { + this.listeners = Collections.unmodifiableMap(listeners); + } + + public Map<String, Endpoint> listeners() { + return listeners; + } + + public Endpoint firstListener() { + return listeners.values().iterator().next(); + } + + /** + * Create a new ListenerInfo object where null or blank hostnames (signifying that the user + * asked to bind to 0.0.0.0) are replaced by specific hostnames. + * + * @return A new ListenerInfo object. + */ + public ListenerInfo withWildcardHostnamesResolved() throws UnknownHostException { + LinkedHashMap<String, Endpoint> newListeners = new LinkedHashMap<>(); + for (Map.Entry<String, Endpoint> entry : listeners.entrySet()) { + if (entry.getValue().host() == null || entry.getValue().host().trim().isEmpty()) { + String newHost = InetAddress.getLocalHost().getCanonicalHostName(); Review Comment: Should we log something to the user if we are resolving localhost as the host name? As you mention in the class javadoc, normally for production use-cases we expect the hostname to be explicitly set. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org