This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new fa472d26a5b MINOR: Update BrokerRegistration to use a Builder fa472d26a5b is described below commit fa472d26a5b13b2125229527be99cd080eb3a122 Author: Proven Provenzano <pprovenz...@confluent.io> AuthorDate: Wed Nov 8 23:42:20 2023 -0500 MINOR: Update BrokerRegistration to use a Builder Update BrokerRegistration to use a Builder. This fixes the proliferation of different constructors, and makes it clear what arguments are being used where. Reviewers: Colin P. McCabe <cmcc...@confluent.io> --- .../kafka/controller/ClusterControlManager.java | 14 +- .../apache/kafka/metadata/BrokerRegistration.java | 154 +++++++++++++-------- .../controller/ClusterControlManagerTest.java | 13 +- .../metrics/ControllerMetricsChangesTest.java | 31 ++--- .../org/apache/kafka/image/ClusterImageTest.java | 85 ++++++------ .../image/node/ClusterImageBrokersNodeTest.java | 17 +-- .../kafka/metadata/BrokerRegistrationTest.java | 57 +++++--- 7 files changed, 225 insertions(+), 146 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java index fa984c1e4dc..2c667339b0a 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java @@ -449,10 +449,16 @@ public class ClusterControlManager { } // Update broker registrations. BrokerRegistration prevRegistration = brokerRegistrations.put(brokerId, - new BrokerRegistration(brokerId, record.brokerEpoch(), - record.incarnationId(), listenerInfo.listeners(), features, - Optional.ofNullable(record.rack()), record.fenced(), - record.inControlledShutdown(), record.isMigratingZkBroker())); + new BrokerRegistration.Builder(). + setId(brokerId). + setEpoch(record.brokerEpoch()). + setIncarnationId(record.incarnationId()). + setListeners(listenerInfo.listeners()). + setSupportedFeatures(features). + setRack(Optional.ofNullable(record.rack())). + setFenced(record.fenced()). + setInControlledShutdown(record.inControlledShutdown()). + setIsMigratingZkBroker(record.isMigratingZkBroker()).build()); if (heartbeatManager != null) { if (prevRegistration != null) heartbeatManager.remove(brokerId); heartbeatManager.register(brokerId, record.fenced()); diff --git a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java index 0ece4dd94de..ed0428bef15 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java @@ -27,7 +27,6 @@ import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.image.writer.ImageWriterOptions; import org.apache.kafka.server.common.ApiMessageAndVersion; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -41,12 +40,96 @@ import java.util.stream.Collectors; * An immutable class which represents broker registrations. */ public class BrokerRegistration { - private static Map<String, Endpoint> listenersToMap(Collection<Endpoint> listeners) { - Map<String, Endpoint> listenersMap = new HashMap<>(); - for (Endpoint endpoint : listeners) { - listenersMap.put(endpoint.listenerName().get(), endpoint); + public static class Builder { + private int id = 0; + private long epoch = -1; + private Uuid incarnationId = null; + private Map<String, Endpoint> listeners; + private Map<String, VersionRange> supportedFeatures; + private Optional<String> rack = Optional.empty(); + private boolean fenced = false; + private boolean inControlledShutdown = false; + private boolean isMigratingZkBroker = false; + + public Builder() { + this.id = 0; + this.epoch = -1; + this.incarnationId = null; + this.listeners = new HashMap<>(); + this.supportedFeatures = new HashMap<>(); + this.rack = Optional.empty(); + this.fenced = false; + this.inControlledShutdown = false; + this.isMigratingZkBroker = false; + } + + public Builder setId(int id) { + this.id = id; + return this; + } + + public Builder setEpoch(long epoch) { + this.epoch = epoch; + return this; + } + + public Builder setIncarnationId(Uuid incarnationId) { + this.incarnationId = incarnationId; + return this; + } + + public Builder setListeners(List<Endpoint> listeners) { + Map<String, Endpoint> listenersMap = new HashMap<>(); + for (Endpoint endpoint : listeners) { + listenersMap.put(endpoint.listenerName().get(), endpoint); + } + this.listeners = listenersMap; + return this; + } + + public Builder setListeners(Map<String, Endpoint> listeners) { + this.listeners = listeners; + return this; + } + + public Builder setSupportedFeatures(Map<String, VersionRange> supportedFeatures) { + this.supportedFeatures = supportedFeatures; + return this; + } + + public Builder setRack(Optional<String> rack) { + Objects.requireNonNull(rack); + this.rack = rack; + return this; + } + + public Builder setFenced(boolean fenced) { + this.fenced = fenced; + return this; + } + + public Builder setInControlledShutdown(boolean inControlledShutdown) { + this.inControlledShutdown = inControlledShutdown; + return this; + } + + public Builder setIsMigratingZkBroker(boolean isMigratingZkBroker) { + this.isMigratingZkBroker = isMigratingZkBroker; + return this; + } + + public BrokerRegistration build() { + return new BrokerRegistration( + id, + epoch, + incarnationId, + listeners, + supportedFeatures, + rack, + fenced, + inControlledShutdown, + isMigratingZkBroker); } - return listenersMap; } public static Optional<Long> zkBrokerEpoch(long value) { @@ -67,53 +150,17 @@ public class BrokerRegistration { private final boolean inControlledShutdown; private final boolean isMigratingZkBroker; - // Visible for testing - public BrokerRegistration(int id, - long epoch, - Uuid incarnationId, - List<Endpoint> listeners, - Map<String, VersionRange> supportedFeatures, - Optional<String> rack, - boolean fenced, - boolean inControlledShutdown) { - this(id, epoch, incarnationId, listenersToMap(listeners), supportedFeatures, rack, - fenced, inControlledShutdown, false); - } - - public BrokerRegistration(int id, - long epoch, - Uuid incarnationId, - List<Endpoint> listeners, - Map<String, VersionRange> supportedFeatures, - Optional<String> rack, - boolean fenced, - boolean inControlledShutdown, - boolean isMigratingZkBroker) { - this(id, epoch, incarnationId, listenersToMap(listeners), supportedFeatures, rack, - fenced, inControlledShutdown, isMigratingZkBroker); - } - - // Visible for testing - public BrokerRegistration(int id, - long epoch, - Uuid incarnationId, - Map<String, Endpoint> listeners, - Map<String, VersionRange> supportedFeatures, - Optional<String> rack, - boolean fenced, - boolean inControlledShutdown) { - this(id, epoch, incarnationId, listeners, supportedFeatures, rack, fenced, inControlledShutdown, false); - } - - public BrokerRegistration(int id, - long epoch, - Uuid incarnationId, - Map<String, Endpoint> listeners, - Map<String, VersionRange> supportedFeatures, - Optional<String> rack, - boolean fenced, - boolean inControlledShutdown, - boolean isMigratingZkBroker) { + private BrokerRegistration( + int id, + long epoch, + Uuid incarnationId, + Map<String, Endpoint> listeners, + Map<String, VersionRange> supportedFeatures, + Optional<String> rack, + boolean fenced, + boolean inControlledShutdown, + boolean isMigratingZkBroker + ) { this.id = id; this.epoch = epoch; this.incarnationId = incarnationId; @@ -127,7 +174,6 @@ public class BrokerRegistration { this.listeners = Collections.unmodifiableMap(newListeners); Objects.requireNonNull(supportedFeatures); this.supportedFeatures = new HashMap<>(supportedFeatures); - Objects.requireNonNull(rack); this.rack = rack; this.fenced = fenced; this.inControlledShutdown = inControlledShutdown; diff --git a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java index 54e73b2617b..69a9d67e3d8 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java @@ -344,10 +344,15 @@ public class ClusterControlManagerTest { build(); clusterControl.activate(); clusterControl.replay(brokerRecord, 100L); - assertEquals(new BrokerRegistration(1, 100, - Uuid.fromString("fPZv1VBsRFmnlRvmGcOW9w"), Collections.singletonMap("PLAINTEXT", - new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "example.com", 9092)), - Collections.emptyMap(), Optional.of("arack"), true, false), + assertEquals(new BrokerRegistration.Builder(). + setId(1). + setEpoch(100). + setIncarnationId(Uuid.fromString("fPZv1VBsRFmnlRvmGcOW9w")). + setListeners(Collections.singletonMap("PLAINTEXT", + new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "example.com", 9092))). + setRack(Optional.of("arack")). + setFenced(true). + setInControlledShutdown(false).build(), clusterControl.brokerRegistrations().get(1)); assertEquals(100L, clusterControl.registerBrokerRecordOffset(brokerRecord.brokerId()).getAsLong()); UnregisterBrokerRecord unregisterRecord = new UnregisterBrokerRecord(). diff --git a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java index 80867fa896e..eaf789619d1 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java @@ -20,7 +20,6 @@ package org.apache.kafka.controller.metrics; import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.Optional; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.metadata.PartitionChangeRecord; @@ -52,28 +51,24 @@ public class ControllerMetricsChangesTest { int brokerId, boolean fenced ) { - return new BrokerRegistration(brokerId, - 100L, - Uuid.fromString("Pxi6QwS2RFuN8VSKjqJZyQ"), - Collections.emptyList(), - Collections.emptyMap(), - Optional.empty(), - fenced, - false); + return new BrokerRegistration.Builder(). + setId(brokerId). + setEpoch(100L). + setIncarnationId(Uuid.fromString("Pxi6QwS2RFuN8VSKjqJZyQ")). + setFenced(fenced). + setInControlledShutdown(false).build(); } private static BrokerRegistration zkBrokerRegistration( int brokerId ) { - return new BrokerRegistration(brokerId, - 100L, - Uuid.fromString("Pxi6QwS2RFuN8VSKjqJZyQ"), - Collections.emptyList(), - Collections.emptyMap(), - Optional.empty(), - false, - false, - true); + return new BrokerRegistration.Builder(). + setId(brokerId). + setEpoch(100L). + setIncarnationId(Uuid.fromString("Pxi6QwS2RFuN8VSKjqJZyQ")). + setFenced(false). + setInControlledShutdown(false). + setIsMigratingZkBroker(true).build(); } @Test diff --git a/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java b/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java index 75c3c39428b..1bde2c0a42c 100644 --- a/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java @@ -67,30 +67,33 @@ public class ClusterImageTest { static { Map<Integer, BrokerRegistration> map1 = new HashMap<>(); - map1.put(0, new BrokerRegistration(0, - 1000, - Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q"), - Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9092)), - Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 3)), - Optional.empty(), - true, - false)); - map1.put(1, new BrokerRegistration(1, - 1001, - Uuid.fromString("U52uRe20RsGI0RvpcTx33Q"), - Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9093)), - Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 3)), - Optional.empty(), - false, - false)); - map1.put(2, new BrokerRegistration(2, - 123, - Uuid.fromString("hr4TVh3YQiu3p16Awkka6w"), - Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9093)), - Collections.emptyMap(), - Optional.of("arack"), - false, - false)); + map1.put(0, new BrokerRegistration.Builder(). + setId(0). + setEpoch(1000). + setIncarnationId(Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")). + setListeners(Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9092))). + setSupportedFeatures(Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 3))). + setRack(Optional.empty()). + setFenced(true). + setInControlledShutdown(false).build()); + map1.put(1, new BrokerRegistration.Builder(). + setId(1). + setEpoch(1001). + setIncarnationId(Uuid.fromString("U52uRe20RsGI0RvpcTx33Q")). + setListeners(Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9093))). + setSupportedFeatures(Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 3))). + setRack(Optional.empty()). + setFenced(false). + setInControlledShutdown(false).build()); + map1.put(2, new BrokerRegistration.Builder(). + setId(2). + setEpoch(123). + setIncarnationId(Uuid.fromString("hr4TVh3YQiu3p16Awkka6w")). + setListeners(Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9093))). + setSupportedFeatures(Collections.emptyMap()). + setRack(Optional.of("arack")). + setFenced(false). + setInControlledShutdown(false).build()); Map<Integer, ControllerRegistration> cmap1 = new HashMap<>(); cmap1.put(1000, new ControllerRegistration.Builder(). setId(1000). @@ -131,22 +134,24 @@ public class ClusterImageTest { RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS); Map<Integer, BrokerRegistration> map2 = new HashMap<>(); - map2.put(0, new BrokerRegistration(0, - 1000, - Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q"), - Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9092)), - Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 3)), - Optional.empty(), - false, - true)); - map2.put(1, new BrokerRegistration(1, - 1001, - Uuid.fromString("U52uRe20RsGI0RvpcTx33Q"), - Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9093)), - Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 3)), - Optional.empty(), - true, - false)); + map2.put(0, new BrokerRegistration.Builder(). + setId(0). + setEpoch(1000). + setIncarnationId(Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")). + setListeners(Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9092))). + setSupportedFeatures(Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 3))). + setRack(Optional.empty()). + setFenced(false). + setInControlledShutdown(true).build()); + map2.put(1, new BrokerRegistration.Builder(). + setId(1). + setEpoch(1001). + setIncarnationId(Uuid.fromString("U52uRe20RsGI0RvpcTx33Q")). + setListeners(Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9093))). + setSupportedFeatures(Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 3))). + setRack(Optional.empty()). + setFenced(true). + setInControlledShutdown(false).build()); Map<Integer, ControllerRegistration> cmap2 = new HashMap<>(cmap1); cmap2.put(1001, new ControllerRegistration.Builder(). setId(1001). diff --git a/metadata/src/test/java/org/apache/kafka/image/node/ClusterImageBrokersNodeTest.java b/metadata/src/test/java/org/apache/kafka/image/node/ClusterImageBrokersNodeTest.java index b25f7f60110..418dcca45b1 100644 --- a/metadata/src/test/java/org/apache/kafka/image/node/ClusterImageBrokersNodeTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/node/ClusterImageBrokersNodeTest.java @@ -37,14 +37,15 @@ import static org.junit.jupiter.api.Assertions.assertNull; @Timeout(value = 40) public class ClusterImageBrokersNodeTest { private static final ClusterImage TEST_IMAGE = new ClusterImage( - Collections.singletonMap(1, new BrokerRegistration(1, - 1001, - Uuid.fromString("MJkaH0j0RwuC3W2GHQHtWA"), - Collections.emptyList(), - Collections.singletonMap(MetadataVersion.FEATURE_NAME, VersionRange.of(1, 4)), - Optional.empty(), - false, - false)), + Collections.singletonMap(1, new BrokerRegistration.Builder(). + setId(1). + setEpoch(1001). + setIncarnationId(Uuid.fromString("MJkaH0j0RwuC3W2GHQHtWA")). + setListeners(Collections.emptyList()). + setSupportedFeatures(Collections.singletonMap(MetadataVersion.FEATURE_NAME, VersionRange.of(1, 4))). + setRack(Optional.empty()). + setFenced(false). + setInControlledShutdown(false).build()), Collections.emptyMap()); private final static ClusterImageBrokersNode NODE = new ClusterImageBrokersNode(TEST_IMAGE); diff --git a/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java index 262c8513381..4e8f8e6cbef 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java @@ -41,25 +41,46 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals; @Timeout(value = 40) public class BrokerRegistrationTest { private static final List<BrokerRegistration> REGISTRATIONS = Arrays.asList( - new BrokerRegistration(0, 0, Uuid.fromString("pc1GhUlBS92cGGaKXl6ipw"), - Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9090)), - Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 2)), - Optional.empty(), false, false), - new BrokerRegistration(1, 0, Uuid.fromString("3MfdxWlNSn2UDYsmDP1pYg"), - Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9091)), - Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 2)), - Optional.empty(), true, false), - new BrokerRegistration(2, 0, Uuid.fromString("eY7oaG1RREie5Kk9uy1l6g"), - Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9092)), - Stream.of(new SimpleEntry<>("foo", VersionRange.of((short) 2, (short) 3)), + new BrokerRegistration.Builder(). + setId(0). + setEpoch(0). + setIncarnationId(Uuid.fromString("pc1GhUlBS92cGGaKXl6ipw")). + setListeners(Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9090))). + setSupportedFeatures(Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 2))). + setRack(Optional.empty()). + setFenced(false). + setInControlledShutdown(false).build(), + new BrokerRegistration.Builder(). + setId(1). + setEpoch(0). + setIncarnationId(Uuid.fromString("3MfdxWlNSn2UDYsmDP1pYg")). + setListeners(Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9091))). + setSupportedFeatures(Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 2))). + setRack(Optional.empty()). + setFenced(true). + setInControlledShutdown(false).build(), + new BrokerRegistration.Builder(). + setId(2). + setEpoch(0). + setIncarnationId(Uuid.fromString("eY7oaG1RREie5Kk9uy1l6g")). + setListeners(Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9092))). + setSupportedFeatures(Stream.of(new SimpleEntry<>("foo", VersionRange.of((short) 2, (short) 3)), new SimpleEntry<>("bar", VersionRange.of((short) 1, (short) 4))).collect( - Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue)), - Optional.of("myrack"), false, true), - new BrokerRegistration(3, 0, Uuid.fromString("1t8VyWx2TCSTpUWuqj-FOw"), - Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9093)), - Stream.of(new SimpleEntry<>("metadata.version", VersionRange.of((short) 7, (short) 7))) - .collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue)), - Optional.empty(), false, true, true)); + Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue))). + setRack(Optional.of("myrack")). + setFenced(false). + setInControlledShutdown(true).build(), + new BrokerRegistration.Builder(). + setId(3). + setEpoch(0). + setIncarnationId(Uuid.fromString("1t8VyWx2TCSTpUWuqj-FOw")). + setListeners(Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9093))). + setSupportedFeatures(Stream.of(new SimpleEntry<>("metadata.version", VersionRange.of((short) 7, (short) 7))) + .collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue))). + setRack(Optional.empty()). + setFenced(false). + setInControlledShutdown(true). + setIsMigratingZkBroker(true).build()); @Test public void testValues() {