This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fa472d26a5b MINOR: Update BrokerRegistration to use a Builder
fa472d26a5b is described below

commit fa472d26a5b13b2125229527be99cd080eb3a122
Author: Proven Provenzano <pprovenz...@confluent.io>
AuthorDate: Wed Nov 8 23:42:20 2023 -0500

    MINOR: Update BrokerRegistration to use a Builder
    
    Update BrokerRegistration to use a Builder. This fixes the proliferation of 
different constructors,
    and makes it clear what arguments are being used where.
    
    Reviewers: Colin P. McCabe <cmcc...@confluent.io>
---
 .../kafka/controller/ClusterControlManager.java    |  14 +-
 .../apache/kafka/metadata/BrokerRegistration.java  | 154 +++++++++++++--------
 .../controller/ClusterControlManagerTest.java      |  13 +-
 .../metrics/ControllerMetricsChangesTest.java      |  31 ++---
 .../org/apache/kafka/image/ClusterImageTest.java   |  85 ++++++------
 .../image/node/ClusterImageBrokersNodeTest.java    |  17 +--
 .../kafka/metadata/BrokerRegistrationTest.java     |  57 +++++---
 7 files changed, 225 insertions(+), 146 deletions(-)

diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java 
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index fa984c1e4dc..2c667339b0a 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -449,10 +449,16 @@ public class ClusterControlManager {
         }
         // Update broker registrations.
         BrokerRegistration prevRegistration = brokerRegistrations.put(brokerId,
-                new BrokerRegistration(brokerId, record.brokerEpoch(),
-                    record.incarnationId(), listenerInfo.listeners(), features,
-                    Optional.ofNullable(record.rack()), record.fenced(),
-                    record.inControlledShutdown(), 
record.isMigratingZkBroker()));
+            new BrokerRegistration.Builder().
+                setId(brokerId).
+                setEpoch(record.brokerEpoch()).
+                setIncarnationId(record.incarnationId()).
+                setListeners(listenerInfo.listeners()).
+                setSupportedFeatures(features).
+                setRack(Optional.ofNullable(record.rack())).
+                setFenced(record.fenced()).
+                setInControlledShutdown(record.inControlledShutdown()).
+                setIsMigratingZkBroker(record.isMigratingZkBroker()).build());
         if (heartbeatManager != null) {
             if (prevRegistration != null) heartbeatManager.remove(brokerId);
             heartbeatManager.register(brokerId, record.fenced());
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java 
b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
index 0ece4dd94de..ed0428bef15 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
@@ -27,7 +27,6 @@ import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.image.writer.ImageWriterOptions;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -41,12 +40,96 @@ import java.util.stream.Collectors;
  * An immutable class which represents broker registrations.
  */
 public class BrokerRegistration {
-    private static Map<String, Endpoint> listenersToMap(Collection<Endpoint> 
listeners) {
-        Map<String, Endpoint> listenersMap = new HashMap<>();
-        for (Endpoint endpoint : listeners) {
-            listenersMap.put(endpoint.listenerName().get(), endpoint);
+    public static class Builder {
+        private int id = 0;
+        private long epoch = -1;
+        private Uuid incarnationId = null;
+        private Map<String, Endpoint> listeners;
+        private Map<String, VersionRange> supportedFeatures;
+        private Optional<String> rack = Optional.empty();
+        private boolean fenced = false;
+        private boolean inControlledShutdown = false;
+        private boolean isMigratingZkBroker = false;
+
+        public Builder() {
+            this.id = 0;
+            this.epoch = -1;
+            this.incarnationId = null;
+            this.listeners = new HashMap<>();
+            this.supportedFeatures = new HashMap<>();
+            this.rack = Optional.empty();
+            this.fenced = false;
+            this.inControlledShutdown = false;
+            this.isMigratingZkBroker = false;
+        }
+
+        public Builder setId(int id) {
+            this.id = id;
+            return this;
+        }
+
+        public Builder setEpoch(long epoch) {
+            this.epoch = epoch;
+            return this;
+        }
+
+        public Builder setIncarnationId(Uuid incarnationId) {
+            this.incarnationId = incarnationId;
+            return this;
+        }
+
+        public Builder setListeners(List<Endpoint> listeners) {
+            Map<String, Endpoint> listenersMap = new HashMap<>();
+            for (Endpoint endpoint : listeners) {
+                listenersMap.put(endpoint.listenerName().get(), endpoint);
+            }
+            this.listeners = listenersMap;
+            return this;
+        }
+
+        public Builder setListeners(Map<String, Endpoint> listeners) {
+            this.listeners = listeners;
+            return this;
+        }
+
+        public Builder setSupportedFeatures(Map<String, VersionRange> 
supportedFeatures) {
+            this.supportedFeatures = supportedFeatures;
+            return this;
+        }
+
+        public Builder setRack(Optional<String> rack) {
+            Objects.requireNonNull(rack);
+            this.rack = rack;
+            return this;
+        }
+
+        public Builder setFenced(boolean fenced) {
+            this.fenced = fenced;
+            return this;
+        }
+
+        public Builder setInControlledShutdown(boolean inControlledShutdown) {
+            this.inControlledShutdown = inControlledShutdown;
+            return this;
+        }
+
+        public Builder setIsMigratingZkBroker(boolean isMigratingZkBroker) {
+            this.isMigratingZkBroker = isMigratingZkBroker;
+            return this;
+        }
+
+        public BrokerRegistration build() {
+            return new BrokerRegistration(
+                id,
+                epoch,
+                incarnationId,
+                listeners,
+                supportedFeatures,
+                rack,
+                fenced,
+                inControlledShutdown,
+                isMigratingZkBroker);
         }
-        return listenersMap;
     }
 
     public static Optional<Long> zkBrokerEpoch(long value) {
@@ -67,53 +150,17 @@ public class BrokerRegistration {
     private final boolean inControlledShutdown;
     private final boolean isMigratingZkBroker;
 
-    // Visible for testing
-    public BrokerRegistration(int id,
-                              long epoch,
-                              Uuid incarnationId,
-                              List<Endpoint> listeners,
-                              Map<String, VersionRange> supportedFeatures,
-                              Optional<String> rack,
-                              boolean fenced,
-                              boolean inControlledShutdown) {
-        this(id, epoch, incarnationId, listenersToMap(listeners), 
supportedFeatures, rack,
-            fenced, inControlledShutdown, false);
-    }
-
-    public BrokerRegistration(int id,
-                              long epoch,
-                              Uuid incarnationId,
-                              List<Endpoint> listeners,
-                              Map<String, VersionRange> supportedFeatures,
-                              Optional<String> rack,
-                              boolean fenced,
-                              boolean inControlledShutdown,
-                              boolean isMigratingZkBroker) {
-        this(id, epoch, incarnationId, listenersToMap(listeners), 
supportedFeatures, rack,
-            fenced, inControlledShutdown, isMigratingZkBroker);
-    }
-
-    // Visible for testing
-    public BrokerRegistration(int id,
-                              long epoch,
-                              Uuid incarnationId,
-                              Map<String, Endpoint> listeners,
-                              Map<String, VersionRange> supportedFeatures,
-                              Optional<String> rack,
-                              boolean fenced,
-                              boolean inControlledShutdown) {
-        this(id, epoch, incarnationId, listeners, supportedFeatures, rack, 
fenced, inControlledShutdown, false);
-    }
-
-    public BrokerRegistration(int id,
-                              long epoch,
-                              Uuid incarnationId,
-                              Map<String, Endpoint> listeners,
-                              Map<String, VersionRange> supportedFeatures,
-                              Optional<String> rack,
-                              boolean fenced,
-                              boolean inControlledShutdown,
-                              boolean isMigratingZkBroker) {
+    private BrokerRegistration(
+        int id,
+        long epoch,
+        Uuid incarnationId,
+        Map<String, Endpoint> listeners,
+        Map<String, VersionRange> supportedFeatures,
+        Optional<String> rack,
+        boolean fenced,
+        boolean inControlledShutdown,
+        boolean isMigratingZkBroker
+    ) {
         this.id = id;
         this.epoch = epoch;
         this.incarnationId = incarnationId;
@@ -127,7 +174,6 @@ public class BrokerRegistration {
         this.listeners = Collections.unmodifiableMap(newListeners);
         Objects.requireNonNull(supportedFeatures);
         this.supportedFeatures = new HashMap<>(supportedFeatures);
-        Objects.requireNonNull(rack);
         this.rack = rack;
         this.fenced = fenced;
         this.inControlledShutdown = inControlledShutdown;
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
index 54e73b2617b..69a9d67e3d8 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
@@ -344,10 +344,15 @@ public class ClusterControlManagerTest {
             build();
         clusterControl.activate();
         clusterControl.replay(brokerRecord, 100L);
-        assertEquals(new BrokerRegistration(1, 100,
-                Uuid.fromString("fPZv1VBsRFmnlRvmGcOW9w"), 
Collections.singletonMap("PLAINTEXT",
-                new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, 
"example.com", 9092)),
-                Collections.emptyMap(), Optional.of("arack"), true, false),
+        assertEquals(new BrokerRegistration.Builder().
+            setId(1).
+            setEpoch(100).
+            setIncarnationId(Uuid.fromString("fPZv1VBsRFmnlRvmGcOW9w")).
+            setListeners(Collections.singletonMap("PLAINTEXT",
+                new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, 
"example.com", 9092))).
+            setRack(Optional.of("arack")).
+            setFenced(true).
+            setInControlledShutdown(false).build(),
             clusterControl.brokerRegistrations().get(1));
         assertEquals(100L, 
clusterControl.registerBrokerRecordOffset(brokerRecord.brokerId()).getAsLong());
         UnregisterBrokerRecord unregisterRecord = new UnregisterBrokerRecord().
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java
index 80867fa896e..eaf789619d1 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java
@@ -20,7 +20,6 @@ package org.apache.kafka.controller.metrics;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.metadata.PartitionChangeRecord;
@@ -52,28 +51,24 @@ public class ControllerMetricsChangesTest {
         int brokerId,
         boolean fenced
     ) {
-        return new BrokerRegistration(brokerId,
-            100L,
-            Uuid.fromString("Pxi6QwS2RFuN8VSKjqJZyQ"),
-            Collections.emptyList(),
-            Collections.emptyMap(),
-            Optional.empty(),
-            fenced,
-            false);
+        return new BrokerRegistration.Builder().
+            setId(brokerId).
+            setEpoch(100L).
+            setIncarnationId(Uuid.fromString("Pxi6QwS2RFuN8VSKjqJZyQ")).
+            setFenced(fenced).
+            setInControlledShutdown(false).build();
     }
 
     private static BrokerRegistration zkBrokerRegistration(
         int brokerId
     ) {
-        return new BrokerRegistration(brokerId,
-            100L,
-            Uuid.fromString("Pxi6QwS2RFuN8VSKjqJZyQ"),
-            Collections.emptyList(),
-            Collections.emptyMap(),
-            Optional.empty(),
-            false,
-            false,
-            true);
+        return new BrokerRegistration.Builder().
+            setId(brokerId).
+            setEpoch(100L).
+            setIncarnationId(Uuid.fromString("Pxi6QwS2RFuN8VSKjqJZyQ")).
+            setFenced(false).
+            setInControlledShutdown(false).
+            setIsMigratingZkBroker(true).build();
     }
 
     @Test
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java 
b/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java
index 75c3c39428b..1bde2c0a42c 100644
--- a/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java
@@ -67,30 +67,33 @@ public class ClusterImageTest {
 
     static {
         Map<Integer, BrokerRegistration> map1 = new HashMap<>();
-        map1.put(0, new BrokerRegistration(0,
-            1000,
-            Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q"),
-            Arrays.asList(new Endpoint("PLAINTEXT", 
SecurityProtocol.PLAINTEXT, "localhost", 9092)),
-            Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 
3)),
-            Optional.empty(),
-            true,
-            false));
-        map1.put(1, new BrokerRegistration(1,
-            1001,
-            Uuid.fromString("U52uRe20RsGI0RvpcTx33Q"),
-            Arrays.asList(new Endpoint("PLAINTEXT", 
SecurityProtocol.PLAINTEXT, "localhost", 9093)),
-            Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 
3)),
-            Optional.empty(),
-            false,
-            false));
-        map1.put(2, new BrokerRegistration(2,
-            123,
-            Uuid.fromString("hr4TVh3YQiu3p16Awkka6w"),
-            Arrays.asList(new Endpoint("PLAINTEXT", 
SecurityProtocol.PLAINTEXT, "localhost", 9093)),
-            Collections.emptyMap(),
-            Optional.of("arack"),
-            false,
-            false));
+        map1.put(0, new BrokerRegistration.Builder().
+            setId(0).
+            setEpoch(1000).
+            setIncarnationId(Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")).
+            setListeners(Arrays.asList(new Endpoint("PLAINTEXT", 
SecurityProtocol.PLAINTEXT, "localhost", 9092))).
+            setSupportedFeatures(Collections.singletonMap("foo", 
VersionRange.of((short) 1, (short) 3))).
+            setRack(Optional.empty()).
+            setFenced(true).
+            setInControlledShutdown(false).build());
+        map1.put(1, new BrokerRegistration.Builder().
+            setId(1).
+            setEpoch(1001).
+            setIncarnationId(Uuid.fromString("U52uRe20RsGI0RvpcTx33Q")).
+            setListeners(Arrays.asList(new Endpoint("PLAINTEXT", 
SecurityProtocol.PLAINTEXT, "localhost", 9093))).
+            setSupportedFeatures(Collections.singletonMap("foo", 
VersionRange.of((short) 1, (short) 3))).
+            setRack(Optional.empty()).
+            setFenced(false).
+            setInControlledShutdown(false).build());
+        map1.put(2, new BrokerRegistration.Builder().
+            setId(2).
+            setEpoch(123).
+            setIncarnationId(Uuid.fromString("hr4TVh3YQiu3p16Awkka6w")).
+            setListeners(Arrays.asList(new Endpoint("PLAINTEXT", 
SecurityProtocol.PLAINTEXT, "localhost", 9093))).
+            setSupportedFeatures(Collections.emptyMap()).
+            setRack(Optional.of("arack")).
+            setFenced(false).
+            setInControlledShutdown(false).build());
         Map<Integer, ControllerRegistration> cmap1 = new HashMap<>();
         cmap1.put(1000, new ControllerRegistration.Builder().
             setId(1000).
@@ -131,22 +134,24 @@ public class ClusterImageTest {
         RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS);
 
         Map<Integer, BrokerRegistration> map2 = new HashMap<>();
-        map2.put(0, new BrokerRegistration(0,
-            1000,
-            Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q"),
-            Arrays.asList(new Endpoint("PLAINTEXT", 
SecurityProtocol.PLAINTEXT, "localhost", 9092)),
-            Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 
3)),
-            Optional.empty(),
-            false,
-            true));
-        map2.put(1, new BrokerRegistration(1,
-            1001,
-            Uuid.fromString("U52uRe20RsGI0RvpcTx33Q"),
-            Arrays.asList(new Endpoint("PLAINTEXT", 
SecurityProtocol.PLAINTEXT, "localhost", 9093)),
-            Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 
3)),
-            Optional.empty(),
-            true,
-            false));
+        map2.put(0, new BrokerRegistration.Builder().
+            setId(0).
+            setEpoch(1000).
+            setIncarnationId(Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")).
+            setListeners(Arrays.asList(new Endpoint("PLAINTEXT", 
SecurityProtocol.PLAINTEXT, "localhost", 9092))).
+            setSupportedFeatures(Collections.singletonMap("foo", 
VersionRange.of((short) 1, (short) 3))).
+            setRack(Optional.empty()).
+            setFenced(false).
+            setInControlledShutdown(true).build());
+        map2.put(1, new BrokerRegistration.Builder().
+            setId(1).
+            setEpoch(1001).
+            setIncarnationId(Uuid.fromString("U52uRe20RsGI0RvpcTx33Q")).
+            setListeners(Arrays.asList(new Endpoint("PLAINTEXT", 
SecurityProtocol.PLAINTEXT, "localhost", 9093))).
+            setSupportedFeatures(Collections.singletonMap("foo", 
VersionRange.of((short) 1, (short) 3))).
+            setRack(Optional.empty()).
+            setFenced(true).
+            setInControlledShutdown(false).build());
         Map<Integer, ControllerRegistration> cmap2 = new HashMap<>(cmap1);
         cmap2.put(1001, new ControllerRegistration.Builder().
             setId(1001).
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/node/ClusterImageBrokersNodeTest.java
 
b/metadata/src/test/java/org/apache/kafka/image/node/ClusterImageBrokersNodeTest.java
index b25f7f60110..418dcca45b1 100644
--- 
a/metadata/src/test/java/org/apache/kafka/image/node/ClusterImageBrokersNodeTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/image/node/ClusterImageBrokersNodeTest.java
@@ -37,14 +37,15 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 @Timeout(value = 40)
 public class ClusterImageBrokersNodeTest {
     private static final ClusterImage TEST_IMAGE = new ClusterImage(
-            Collections.singletonMap(1, new BrokerRegistration(1,
-                    1001,
-                    Uuid.fromString("MJkaH0j0RwuC3W2GHQHtWA"),
-                    Collections.emptyList(),
-                    Collections.singletonMap(MetadataVersion.FEATURE_NAME, 
VersionRange.of(1, 4)),
-                    Optional.empty(),
-                    false,
-                    false)),
+            Collections.singletonMap(1, new BrokerRegistration.Builder().
+                    setId(1).
+                    setEpoch(1001).
+                    
setIncarnationId(Uuid.fromString("MJkaH0j0RwuC3W2GHQHtWA")).
+                    setListeners(Collections.emptyList()).
+                    
setSupportedFeatures(Collections.singletonMap(MetadataVersion.FEATURE_NAME, 
VersionRange.of(1, 4))).
+                    setRack(Optional.empty()).
+                    setFenced(false).
+                    setInControlledShutdown(false).build()),
             Collections.emptyMap());
 
     private final static ClusterImageBrokersNode NODE = new 
ClusterImageBrokersNode(TEST_IMAGE);
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java 
b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
index 262c8513381..4e8f8e6cbef 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
@@ -41,25 +41,46 @@ import static 
org.junit.jupiter.api.Assertions.assertNotEquals;
 @Timeout(value = 40)
 public class BrokerRegistrationTest {
     private static final List<BrokerRegistration> REGISTRATIONS = 
Arrays.asList(
-        new BrokerRegistration(0, 0, Uuid.fromString("pc1GhUlBS92cGGaKXl6ipw"),
-            Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, 
"localhost", 9090)),
-            Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 
2)),
-            Optional.empty(), false, false),
-        new BrokerRegistration(1, 0, Uuid.fromString("3MfdxWlNSn2UDYsmDP1pYg"),
-            Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, 
"localhost", 9091)),
-            Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 
2)),
-            Optional.empty(), true, false),
-        new BrokerRegistration(2, 0, Uuid.fromString("eY7oaG1RREie5Kk9uy1l6g"),
-            Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, 
"localhost", 9092)),
-            Stream.of(new SimpleEntry<>("foo", VersionRange.of((short) 2, 
(short) 3)),
+        new BrokerRegistration.Builder().
+            setId(0).
+            setEpoch(0).
+            setIncarnationId(Uuid.fromString("pc1GhUlBS92cGGaKXl6ipw")).
+            setListeners(Arrays.asList(new Endpoint("INTERNAL", 
SecurityProtocol.PLAINTEXT, "localhost", 9090))).
+            setSupportedFeatures(Collections.singletonMap("foo", 
VersionRange.of((short) 1, (short) 2))).
+            setRack(Optional.empty()).
+            setFenced(false).
+            setInControlledShutdown(false).build(),
+        new BrokerRegistration.Builder().
+            setId(1).
+            setEpoch(0).
+            setIncarnationId(Uuid.fromString("3MfdxWlNSn2UDYsmDP1pYg")).
+            setListeners(Arrays.asList(new Endpoint("INTERNAL", 
SecurityProtocol.PLAINTEXT, "localhost", 9091))).
+            setSupportedFeatures(Collections.singletonMap("foo", 
VersionRange.of((short) 1, (short) 2))).
+            setRack(Optional.empty()).
+            setFenced(true).
+            setInControlledShutdown(false).build(),
+        new BrokerRegistration.Builder().
+            setId(2).
+            setEpoch(0).
+            setIncarnationId(Uuid.fromString("eY7oaG1RREie5Kk9uy1l6g")).
+            setListeners(Arrays.asList(new Endpoint("INTERNAL", 
SecurityProtocol.PLAINTEXT, "localhost", 9092))).
+            setSupportedFeatures(Stream.of(new SimpleEntry<>("foo", 
VersionRange.of((short) 2, (short) 3)),
                 new SimpleEntry<>("bar", VersionRange.of((short) 1, (short) 
4))).collect(
-                        Collectors.toMap(SimpleEntry::getKey, 
SimpleEntry::getValue)),
-            Optional.of("myrack"), false, true),
-        new BrokerRegistration(3, 0, Uuid.fromString("1t8VyWx2TCSTpUWuqj-FOw"),
-            Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, 
"localhost", 9093)),
-            Stream.of(new SimpleEntry<>("metadata.version", 
VersionRange.of((short) 7, (short) 7)))
-                .collect(Collectors.toMap(SimpleEntry::getKey, 
SimpleEntry::getValue)),
-            Optional.empty(), false, true, true));
+                        Collectors.toMap(SimpleEntry::getKey, 
SimpleEntry::getValue))).
+            setRack(Optional.of("myrack")).
+            setFenced(false).
+            setInControlledShutdown(true).build(),
+        new BrokerRegistration.Builder().
+            setId(3).
+            setEpoch(0).
+            setIncarnationId(Uuid.fromString("1t8VyWx2TCSTpUWuqj-FOw")).
+            setListeners(Arrays.asList(new Endpoint("INTERNAL", 
SecurityProtocol.PLAINTEXT, "localhost", 9093))).
+            setSupportedFeatures(Stream.of(new 
SimpleEntry<>("metadata.version", VersionRange.of((short) 7, (short) 7)))
+                .collect(Collectors.toMap(SimpleEntry::getKey, 
SimpleEntry::getValue))).
+            setRack(Optional.empty()).
+            setFenced(false).
+            setInControlledShutdown(true).
+            setIsMigratingZkBroker(true).build());
 
     @Test
     public void testValues() {

Reply via email to