This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1907afe  Upgrade BookKeeper to 4.12.0 (#8447)
1907afe is described below

commit 1907afe788187f4ed23853c714af6102b8643b05
Author: Enrico Olivelli <[email protected]>
AuthorDate: Mon Nov 16 14:21:29 2020 +0100

    Upgrade BookKeeper to 4.12.0 (#8447)
    
    Upgrade Apache BookKeeper to 4.12.0
    
    Most notable changes that impact this patch are:
    - BP-41 -> "BookieSocketAddress" becomes "BookieId"
    - BP-42 -> LedgerMetadata now carries "ledgerId" (as a transient non 
serialized value)
---
 distribution/server/src/assemble/LICENSE.bin.txt   | 54 +++++++------
 .../bookkeeper/mledger/offload/OffloadUtils.java   | 16 ++--
 .../mledger/impl/OffloadPrefixReadTest.java        | 13 ++-
 pom.xml                                            |  2 +-
 .../broker/service/BrokerBookieIsolationTest.java  | 40 +++++-----
 .../pulsar/broker/service/RackAwareTest.java       | 19 +----
 pulsar-sql/presto-distribution/LICENSE             | 23 +++---
 .../zookeeper/ZkBookieRackAffinityMapping.java     |  9 +--
 .../ZkIsolatedBookieEnsemblePlacementPolicy.java   | 37 ++++-----
 ...kIsolatedBookieEnsemblePlacementPolicyTest.java | 93 +++++++++++-----------
 .../bookkeeper/client/PulsarMockLedgerHandle.java  | 14 ++--
 .../impl/FileStoreBackedReadHandleImpl.java        |  2 +-
 .../offload/jcloud/impl/OffloadIndexBlockImpl.java | 23 +++---
 .../offload/jcloud/impl/OffloadIndexTest.java      | 15 ++--
 14 files changed, 182 insertions(+), 178 deletions(-)

diff --git a/distribution/server/src/assemble/LICENSE.bin.txt 
b/distribution/server/src/assemble/LICENSE.bin.txt
index a0ffde0..bb6ef44 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -334,6 +334,8 @@ The Apache Software License, Version 2.0
     - io.sundr-sundr-core-0.21.0.jar
  * Guava
     - com.google.guava-guava-25.1-jre.jar
+    - com.google.guava-failureaccess-1.0.1.jar
+    - 
com.google.guava-listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar
  * J2ObjC Annotations -- com.google.j2objc-j2objc-annotations-1.1.jar
  * Netty Reactive Streams -- 
com.typesafe.netty-netty-reactive-streams-2.0.4.jar
  * Swagger
@@ -392,32 +394,32 @@ The Apache Software License, Version 2.0
     - org.apache.logging.log4j-log4j-web-2.10.0.jar
  * Java Native Access JNA -- net.java.dev.jna-jna-4.2.0.jar
  * BookKeeper
-    - org.apache.bookkeeper-bookkeeper-common-4.11.1.jar
-    - org.apache.bookkeeper-bookkeeper-common-allocator-4.11.1.jar
-    - org.apache.bookkeeper-bookkeeper-proto-4.11.1.jar
-    - org.apache.bookkeeper-bookkeeper-server-4.11.1.jar
-    - org.apache.bookkeeper-bookkeeper-tools-framework-4.11.1.jar
-    - org.apache.bookkeeper-circe-checksum-4.11.1.jar
-    - org.apache.bookkeeper-cpu-affinity-4.11.1.jar
-    - org.apache.bookkeeper-statelib-4.11.1.jar
-    - org.apache.bookkeeper-stream-storage-api-4.11.1.jar
-    - org.apache.bookkeeper-stream-storage-common-4.11.1.jar
-    - org.apache.bookkeeper-stream-storage-java-client-4.11.1.jar
-    - org.apache.bookkeeper-stream-storage-java-client-base-4.11.1.jar
-    - org.apache.bookkeeper-stream-storage-proto-4.11.1.jar
-    - org.apache.bookkeeper-stream-storage-server-4.11.1.jar
-    - org.apache.bookkeeper-stream-storage-service-api-4.11.1.jar
-    - org.apache.bookkeeper-stream-storage-service-impl-4.11.1.jar
-    - org.apache.bookkeeper.http-http-server-4.11.1.jar
-    - org.apache.bookkeeper.http-vertx-http-server-4.11.1.jar
-    - org.apache.bookkeeper.stats-bookkeeper-stats-api-4.11.1.jar
-    - org.apache.bookkeeper.stats-prometheus-metrics-provider-4.11.1.jar
-    - org.apache.bookkeeper.tests-stream-storage-tests-common-4.11.1.jar
-    - org.apache.distributedlog-distributedlog-common-4.11.1.jar
-    - org.apache.distributedlog-distributedlog-core-4.11.1-tests.jar
-    - org.apache.distributedlog-distributedlog-core-4.11.1.jar
-    - org.apache.distributedlog-distributedlog-protocol-4.11.1.jar
-    - org.apache.bookkeeper.stats-codahale-metrics-provider-4.11.1.jar
+    - org.apache.bookkeeper-bookkeeper-common-4.12.0.jar
+    - org.apache.bookkeeper-bookkeeper-common-allocator-4.12.0.jar
+    - org.apache.bookkeeper-bookkeeper-proto-4.12.0.jar
+    - org.apache.bookkeeper-bookkeeper-server-4.12.0.jar
+    - org.apache.bookkeeper-bookkeeper-tools-framework-4.12.0.jar
+    - org.apache.bookkeeper-circe-checksum-4.12.0.jar
+    - org.apache.bookkeeper-cpu-affinity-4.12.0.jar
+    - org.apache.bookkeeper-statelib-4.12.0.jar
+    - org.apache.bookkeeper-stream-storage-api-4.12.0.jar
+    - org.apache.bookkeeper-stream-storage-common-4.12.0.jar
+    - org.apache.bookkeeper-stream-storage-java-client-4.12.0.jar
+    - org.apache.bookkeeper-stream-storage-java-client-base-4.12.0.jar
+    - org.apache.bookkeeper-stream-storage-proto-4.12.0.jar
+    - org.apache.bookkeeper-stream-storage-server-4.12.0.jar
+    - org.apache.bookkeeper-stream-storage-service-api-4.12.0.jar
+    - org.apache.bookkeeper-stream-storage-service-impl-4.12.0.jar
+    - org.apache.bookkeeper.http-http-server-4.12.0.jar
+    - org.apache.bookkeeper.http-vertx-http-server-4.12.0.jar
+    - org.apache.bookkeeper.stats-bookkeeper-stats-api-4.12.0.jar
+    - org.apache.bookkeeper.stats-prometheus-metrics-provider-4.12.0.jar
+    - org.apache.bookkeeper.tests-stream-storage-tests-common-4.12.0.jar
+    - org.apache.distributedlog-distributedlog-common-4.12.0.jar
+    - org.apache.distributedlog-distributedlog-core-4.12.0-tests.jar
+    - org.apache.distributedlog-distributedlog-core-4.12.0.jar
+    - org.apache.distributedlog-distributedlog-protocol-4.12.0.jar
+    - org.apache.bookkeeper.stats-codahale-metrics-provider-4.12.0.jar
   * Apache HTTP Client
     - org.apache.httpcomponents-httpclient-4.5.5.jar
     - org.apache.httpcomponents-httpcore-4.4.9.jar
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java
index 5adbb5a..83514a9 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java
@@ -36,6 +36,7 @@ import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.KeyValue;
 import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadContext;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadDriverMetadata;
+import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.DataFormats;
 
@@ -120,21 +121,22 @@ public final class OffloadUtils {
                     
.setKey(e.getKey()).setValue(ByteString.copyFrom(e.getValue()));
         }
 
-        for (Map.Entry<Long, ? extends List<BookieSocketAddress>> e : 
metadata.getAllEnsembles().entrySet()) {
+        for (Map.Entry<Long, ? extends List<BookieId>> e : 
metadata.getAllEnsembles().entrySet()) {
             builder.addSegmentBuilder()
                     .setFirstEntryId(e.getKey())
-                    
.addAllEnsembleMember(e.getValue().stream().map(BookieSocketAddress::toString).collect(Collectors.toList()));
+                    
.addAllEnsembleMember(e.getValue().stream().map(BookieId::toString).collect(Collectors.toList()));
         }
 
         return builder.build().toByteArray();
     }
 
-    public static LedgerMetadata parseLedgerMetadata(byte[] bytes) throws 
IOException {
+    public static LedgerMetadata parseLedgerMetadata(long id, byte[] bytes) 
throws IOException {
         DataFormats.LedgerMetadataFormat ledgerMetadataFormat = 
DataFormats.LedgerMetadataFormat.newBuilder().mergeFrom(bytes).build();
         LedgerMetadataBuilder builder = LedgerMetadataBuilder.create()
                 .withLastEntryId(ledgerMetadataFormat.getLastEntryId())
                 .withPassword(ledgerMetadataFormat.getPassword().toByteArray())
                 .withClosedState()
+                .withId(id)
                 .withMetadataFormatVersion(2)
                 .withLength(ledgerMetadataFormat.getLength())
                 .withAckQuorumSize(ledgerMetadataFormat.getAckQuorumSize())
@@ -142,12 +144,12 @@ public final class OffloadUtils {
                 .withWriteQuorumSize(ledgerMetadataFormat.getQuorumSize())
                 .withEnsembleSize(ledgerMetadataFormat.getEnsembleSize());
         ledgerMetadataFormat.getSegmentList().forEach(segment -> {
-            ArrayList<BookieSocketAddress> addressArrayList = new 
ArrayList<>();
+            ArrayList<BookieId> addressArrayList = new ArrayList<>();
             segment.getEnsembleMemberList().forEach(address -> {
                 try {
-                    addressArrayList.add(new BookieSocketAddress(address));
-                } catch (IOException e) {
-                    log.error("Exception when create BookieSocketAddress. ", 
e);
+                    addressArrayList.add(BookieId.parse(address));
+                } catch (IllegalArgumentException e) {
+                    log.error("Exception when create BookieId {}. ", address, 
e);
                 }
             });
             builder.newEnsembleEntry(segment.getFirstEntryId(), 
addressArrayList);
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
index 0a96a1f..d3d24b2 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
@@ -52,6 +52,7 @@ import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
@@ -254,8 +255,9 @@ public class OffloadPrefixReadTest extends 
MockedBookKeeperTestCase {
         private final State state;
         private final byte[] password;
         private final Map<String, byte[]> customMetadata;
-
+        private final long ledgerId;
         MockMetadata(LedgerMetadata toCopy) {
+            ledgerId = toCopy.getLedgerId();
             ensembleSize = toCopy.getEnsembleSize();
             writeQuorumSize = toCopy.getWriteQuorumSize();
             ackQuorumSize = toCopy.getAckQuorumSize();
@@ -271,6 +273,11 @@ public class OffloadPrefixReadTest extends 
MockedBookKeeperTestCase {
         }
 
         @Override
+        public long getLedgerId() {
+            return ledgerId;
+        }
+
+        @Override
         public boolean hasPassword() { return true; }
 
         @Override
@@ -315,12 +322,12 @@ public class OffloadPrefixReadTest extends 
MockedBookKeeperTestCase {
         public Map<String, byte[]> getCustomMetadata() { return 
customMetadata; }
 
         @Override
-        public List<BookieSocketAddress> getEnsembleAt(long entryId) {
+        public List<BookieId> getEnsembleAt(long entryId) {
             throw new UnsupportedOperationException("Pulsar shouldn't look at 
this");
         }
 
         @Override
-        public NavigableMap<Long, ? extends List<BookieSocketAddress>> 
getAllEnsembles() {
+        public NavigableMap<Long, ? extends List<BookieId>> getAllEnsembles() {
             throw new UnsupportedOperationException("Pulsar shouldn't look at 
this");
         }
 
diff --git a/pom.xml b/pom.xml
index 7ef22c8..f99fa86 100644
--- a/pom.xml
+++ b/pom.xml
@@ -98,7 +98,7 @@ flexible messaging model and an intuitive client 
API.</description>
     <!-- apache commons -->
     <commons-compress.version>1.19</commons-compress.version>
 
-    <bookkeeper.version>4.11.1</bookkeeper.version>
+    <bookkeeper.version>4.12.0</bookkeeper.version>
     <zookeeper.version>3.5.7</zookeeper.version>
     <netty.version>4.1.51.Final</netty.version>
     <netty-tc-native.version>2.0.30.Final</netty-tc-native.version>
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
index fecd26e..bb0ae00 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
@@ -43,7 +43,7 @@ import org.apache.bookkeeper.meta.LedgerManager;
 import 
org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.EnsemblePlacementPolicyConfig;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
-import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.pulsar.broker.ManagedLedgerClientFactory;
@@ -133,10 +133,10 @@ public class BrokerBookieIsolationTest {
         BookieServer[] bookies = bkEnsemble.getBookies();
         ZooKeeper zkClient = bkEnsemble.getZkClient();
 
-        Set<BookieSocketAddress> defaultBookies = 
Sets.newHashSet(bookies[0].getLocalAddress(),
-                bookies[1].getLocalAddress());
-        Set<BookieSocketAddress> isolatedBookies = 
Sets.newHashSet(bookies[2].getLocalAddress(),
-                bookies[3].getLocalAddress());
+        Set<BookieId> defaultBookies = 
Sets.newHashSet(bookies[0].getBookieId(),
+                bookies[1].getBookieId());
+        Set<BookieId> isolatedBookies = 
Sets.newHashSet(bookies[2].getBookieId(),
+                bookies[3].getBookieId());
 
         setDefaultIsolationGroup(brokerBookkeeperClientIsolationGroups, 
zkClient, defaultBookies);
         setDefaultIsolationGroup(tenantNamespaceIsolationGroups, zkClient, 
isolatedBookies);
@@ -271,12 +271,12 @@ public class BrokerBookieIsolationTest {
         BookieServer[] bookies = bkEnsemble.getBookies();
         ZooKeeper zkClient = bkEnsemble.getZkClient();
 
-        Set<BookieSocketAddress> defaultBookies = 
Sets.newHashSet(bookies[0].getLocalAddress(),
-                bookies[1].getLocalAddress());
-        Set<BookieSocketAddress> isolatedBookies = 
Sets.newHashSet(bookies[2].getLocalAddress(),
-                bookies[3].getLocalAddress());
-        Set<BookieSocketAddress> downedBookies = Sets.newHashSet(new 
BookieSocketAddress("1.1.1.1:1111"),
-                new BookieSocketAddress("1.1.1.1:1112"));
+        Set<BookieId> defaultBookies = 
Sets.newHashSet(bookies[0].getBookieId(),
+                bookies[1].getBookieId());
+        Set<BookieId> isolatedBookies = 
Sets.newHashSet(bookies[2].getBookieId(),
+                bookies[3].getBookieId());
+        Set<BookieId> downedBookies = 
Sets.newHashSet(BookieId.parse("1.1.1.1:1111"),
+                BookieId.parse("1.1.1.1:1112"));
 
         setDefaultIsolationGroup(brokerBookkeeperClientIsolationGroups, 
zkClient, defaultBookies);
         // primary group empty
@@ -395,10 +395,10 @@ public class BrokerBookieIsolationTest {
         BookieServer[] bookies = bkEnsemble.getBookies();
         ZooKeeper zkClient = bkEnsemble.getZkClient();
 
-        Set<BookieSocketAddress> defaultBookies = 
Sets.newHashSet(bookies[0].getLocalAddress(),
-                bookies[1].getLocalAddress());
-        Set<BookieSocketAddress> isolatedBookies = 
Sets.newHashSet(bookies[2].getLocalAddress(),
-                bookies[3].getLocalAddress());
+        Set<BookieId> defaultBookies = 
Sets.newHashSet(bookies[0].getBookieId(),
+                bookies[1].getBookieId());
+        Set<BookieId> isolatedBookies = 
Sets.newHashSet(bookies[2].getBookieId(),
+                bookies[3].getBookieId());
 
         setDefaultIsolationGroup(brokerBookkeeperClientIsolationGroups, 
zkClient, defaultBookies);
         // primary group empty
@@ -460,12 +460,12 @@ public class BrokerBookieIsolationTest {
     }
 
     private void assertAffinityBookies(LedgerManager ledgerManager, 
List<LedgerInfo> ledgers1,
-            Set<BookieSocketAddress> defaultBookies) throws Exception {
+            Set<BookieId> defaultBookies) throws Exception {
         for (LedgerInfo lInfo : ledgers1) {
             long ledgerId = lInfo.getLedgerId();
             CompletableFuture<Versioned<LedgerMetadata>> ledgerMetaFuture = 
ledgerManager.readLedgerMetadata(ledgerId);
             LedgerMetadata ledgerMetadata = ledgerMetaFuture.get().getValue();
-            Set<BookieSocketAddress> ledgerBookies = Sets.newHashSet();
+            Set<BookieId> ledgerBookies = Sets.newHashSet();
             
ledgerBookies.addAll(ledgerMetadata.getAllEnsembles().values().iterator().next());
             assertEquals(ledgerBookies.size(), defaultBookies.size());
             ledgerBookies.removeAll(defaultBookies);
@@ -493,7 +493,7 @@ public class BrokerBookieIsolationTest {
     }
 
     private void setDefaultIsolationGroup(String 
brokerBookkeeperClientIsolationGroups, ZooKeeper zkClient,
-            Set<BookieSocketAddress> bookieAddresses) throws Exception {
+            Set<BookieId> bookieAddresses) throws Exception {
         BookiesRackConfiguration bookies = null;
         try {
             byte[] data = 
zkClient.getData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, false, 
null);
@@ -509,8 +509,8 @@ public class BrokerBookieIsolationTest {
         }
 
         Map<String, BookieInfo> bookieInfoMap = Maps.newHashMap();
-        for (BookieSocketAddress bkSocket : bookieAddresses) {
-            BookieInfo info = new BookieInfo("use", bkSocket.getHostName() + 
":" + bkSocket.getPort());
+        for (BookieId bkSocket : bookieAddresses) {
+            BookieInfo info = new BookieInfo("use", bkSocket.toString());
             bookieInfoMap.put(bkSocket.toString(), info);
         }
         bookies.put(brokerBookkeeperClientIsolationGroups, bookieInfoMap);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java
index 4632392..8d9ea4e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java
@@ -30,7 +30,7 @@ import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.discover.BookieServiceInfo;
-import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.pulsar.common.policies.data.BookieInfo;
@@ -108,7 +108,7 @@ public class RackAwareTest extends BkEnsemblesTestBase {
         BookKeeper bkc = this.pulsar.getBookKeeperClient();
 
         // Create few ledgers and verify all of them should have a copy in the 
first bookie
-        BookieSocketAddress fistBookie = bookies.get(0).getLocalAddress();
+        BookieId fistBookie = bookies.get(0).getBookieId();
         for (int i = 0; i < 100; i++) {
             LedgerHandle lh = bkc.createLedger(2, 2, DigestType.DUMMY, new 
byte[0]);
             log.info("Ledger: {} -- Ensemble: {}", i, 
lh.getLedgerMetadata().getEnsembleAt(0));
@@ -118,20 +118,5 @@ public class RackAwareTest extends BkEnsemblesTestBase {
         }
     }
 
-    @Test(enabled = false)
-    public void testCrashBrokerWithoutCursorLedgerLeak() throws Exception {
-        // Ignore test
-    }
-
-    @Test(enabled = false)
-    public void testSkipCorruptDataLedger() throws Exception {
-        // Ignore test
-    }
-
-    @Test(enabled = false)
-    public void testTopicWithWildCardChar() throws Exception {
-        // Ignore test
-    }
-
     private static final Logger log = 
LoggerFactory.getLogger(RackAwareTest.class);
 }
diff --git a/pulsar-sql/presto-distribution/LICENSE 
b/pulsar-sql/presto-distribution/LICENSE
index b1475ee..80c48c0 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -408,17 +408,18 @@ The Apache Software License, Version 2.0
     - async-http-client-2.12.1.jar
     - async-http-client-netty-utils-2.12.1.jar
   * Apache Bookkeeper
-    - bookkeeper-common-4.11.1.jar
-    - bookkeeper-common-allocator-4.11.1.jar
-    - bookkeeper-proto-4.11.1.jar
-    - bookkeeper-server-4.11.1.jar
-    - bookkeeper-stats-api-4.11.1.jar
-    - bookkeeper-tools-framework-4.11.1.jar
-    - circe-checksum-4.11.1.jar
-    - codahale-metrics-provider-4.11.1.jar
-    - cpu-affinity-4.11.1.jar
-    - http-server-4.11.1.jar
-    - prometheus-metrics-provider-4.11.1.jar
+    - bookkeeper-common-4.12.0.jar
+    - bookkeeper-common-allocator-4.12.0.jar
+    - bookkeeper-proto-4.12.0.jar
+    - bookkeeper-server-4.12.0.jar
+    - bookkeeper-stats-api-4.12.0.jar
+    - bookkeeper-tools-framework-4.12.0.jar
+    - circe-checksum-4.12.0.jar
+    - codahale-metrics-provider-4.12.0jar
+    - cpu-affinity-4.12.0.jar
+    - http-server-4.12.0.jar
+    - prometheus-metrics-provider-4.12.0.jar
+    - codahale-metrics-provider-4.12.0.jar
   * Apache Commons
     - commons-cli-1.2.jar
     - commons-codec-1.10.jar
diff --git 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java
 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java
index 93400e9..97e48c8 100644
--- 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java
+++ 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java
@@ -33,6 +33,7 @@ import 
org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy;
 import org.apache.bookkeeper.client.RackChangeNotifier;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.AbstractDNSToSwitchMapping;
+import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.net.BookieNode;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
@@ -218,14 +219,10 @@ public class ZkBookieRackAffinityMapping extends 
AbstractDNSToSwitchMapping
     public void onUpdate(String path, BookiesRackConfiguration data, Stat 
stat) {
         if (rackawarePolicy != null) {
             LOG.info("Bookie rack info updated to {}. Notifying rackaware 
policy.", data.toString());
-            List<BookieSocketAddress> bookieAddressList = new ArrayList<>();
+            List<BookieId> bookieAddressList = new ArrayList<>();
             for (Map<String, BookieInfo> bookieMapping : data.values()) {
                 for (String addr : bookieMapping.keySet()) {
-                    try {
-                        bookieAddressList.add(new BookieSocketAddress(addr));
-                    } catch (UnknownHostException e) {
-                        throw new RuntimeException(e);
-                    }
+                    bookieAddressList.add(BookieId.parse(addr));
                 }
             }
             rackawarePolicy.onBookieRackChange(bookieAddressList);
diff --git 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
index f508e64..43d727a 100644
--- 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
+++ 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
@@ -31,7 +31,6 @@ import 
org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
 import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.feature.FeatureProvider;
-import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
@@ -48,6 +47,8 @@ import org.slf4j.LoggerFactory;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
 import io.netty.util.HashedWheelTimer;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.proto.BookieAddressResolver;
 
 public class ZkIsolatedBookieEnsemblePlacementPolicy extends 
RackawareEnsemblePlacementPolicy
         implements Deserializer<BookiesRackConfiguration> {
@@ -69,7 +70,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends 
RackawareEnsemblePl
     @Override
     public RackawareEnsemblePlacementPolicyImpl initialize(ClientConfiguration 
conf,
             Optional<DNSToSwitchMapping> optionalDnsResolver, HashedWheelTimer 
timer, FeatureProvider featureProvider,
-            StatsLogger statsLogger) {
+            StatsLogger statsLogger, BookieAddressResolver 
bookieAddressResolver) {
         if (conf.getProperty(ISOLATION_BOOKIE_GROUPS) != null) {
             String isolationGroupsString = 
castToString(conf.getProperty(ISOLATION_BOOKIE_GROUPS));
             if (!isolationGroupsString.isEmpty()) {
@@ -87,7 +88,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends 
RackawareEnsemblePl
                 }
             }
         }
-        return super.initialize(conf, optionalDnsResolver, timer, 
featureProvider, statsLogger);
+        return super.initialize(conf, optionalDnsResolver, timer, 
featureProvider, statsLogger, bookieAddressResolver);
     }
 
     private String castToString(Object obj) {
@@ -130,33 +131,33 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy 
extends RackawareEnsemblePl
     }
 
     @Override
-    public PlacementResult<List<BookieSocketAddress>> newEnsemble(int 
ensembleSize, int writeQuorumSize, int ackQuorumSize,
-            Map<String, byte[]> customMetadata, Set<BookieSocketAddress> 
excludeBookies)
+    public PlacementResult<List<BookieId>> newEnsemble(int ensembleSize, int 
writeQuorumSize, int ackQuorumSize,
+            Map<String, byte[]> customMetadata, Set<BookieId> excludeBookies)
             throws BKNotEnoughBookiesException {
-        Set<BookieSocketAddress> blacklistedBookies = 
getBlacklistedBookies(ensembleSize);
+        Set<BookieId> blacklistedBookies = getBlacklistedBookies(ensembleSize);
         if (excludeBookies == null) {
-            excludeBookies = new HashSet<BookieSocketAddress>();
+            excludeBookies = new HashSet<BookieId>();
         }
         excludeBookies.addAll(blacklistedBookies);
         return super.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, 
customMetadata, excludeBookies);
     }
 
     @Override
-    public PlacementResult<BookieSocketAddress> replaceBookie(int 
ensembleSize, int writeQuorumSize, int ackQuorumSize,
-            Map<String, byte[]> customMetadata, List<BookieSocketAddress> 
currentEnsemble,
-            BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> 
excludeBookies)
+    public PlacementResult<BookieId> replaceBookie(int ensembleSize, int 
writeQuorumSize, int ackQuorumSize,
+            Map<String, byte[]> customMetadata, List<BookieId> currentEnsemble,
+            BookieId bookieToReplace, Set<BookieId> excludeBookies)
             throws BKNotEnoughBookiesException {
-        Set<BookieSocketAddress> blacklistedBookies = 
getBlacklistedBookies(ensembleSize);
+        Set<BookieId> blacklistedBookies = getBlacklistedBookies(ensembleSize);
         if (excludeBookies == null) {
-            excludeBookies = new HashSet<BookieSocketAddress>();
+            excludeBookies = new HashSet<BookieId>();
         }
         excludeBookies.addAll(blacklistedBookies);
         return super.replaceBookie(ensembleSize, writeQuorumSize, 
ackQuorumSize, customMetadata, currentEnsemble,
                 bookieToReplace, excludeBookies);
     }
 
-    private Set<BookieSocketAddress> getBlacklistedBookies(int ensembleSize) {
-        Set<BookieSocketAddress> blacklistedBookies = new 
HashSet<BookieSocketAddress>();
+    private Set<BookieId> getBlacklistedBookies(int ensembleSize) {
+        Set<BookieId> blacklistedBookies = new HashSet<BookieId>();
         try {
             if (bookieMappingCache != null) {
                 BookiesRackConfiguration allGroupsBookieMapping = 
bookieMappingCache
@@ -169,12 +170,12 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy 
extends RackawareEnsemblePl
                     Set<String> bookiesInGroup = 
allGroupsBookieMapping.get(group).keySet();
                     if (!primaryIsolationGroups.contains(group)) {
                         for (String bookieAddress : bookiesInGroup) {
-                            blacklistedBookies.add(new 
BookieSocketAddress(bookieAddress));
+                            
blacklistedBookies.add(BookieId.parse(bookieAddress));
                         }
                     } else {
                         for (String groupBookie : bookiesInGroup) {
                             totalAvailableBookiesInPrimaryGroup += knownBookies
-                                    .containsKey(new 
BookieSocketAddress(groupBookie)) ? 1 : 0;
+                                    .containsKey(BookieId.parse(groupBookie)) 
? 1 : 0;
                         }
                     }
                 }
@@ -186,7 +187,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy 
extends RackawareEnsemblePl
                     Map<String, BookieInfo> bookieGroup = 
allGroupsBookieMapping.get(group);
                     if (bookieGroup != null && !bookieGroup.isEmpty()) {
                         for (String bookieAddress : bookieGroup.keySet()) {
-                            blacklistedBookies.remove(new 
BookieSocketAddress(bookieAddress));
+                            
blacklistedBookies.remove(BookieId.parse(bookieAddress));
                         }
                     }
                 }
@@ -199,7 +200,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy 
extends RackawareEnsemblePl
                         Map<String, BookieInfo> bookieGroup = 
allGroupsBookieMapping.get(group);
                         if (bookieGroup != null && !bookieGroup.isEmpty()) {
                             for (String bookieAddress : bookieGroup.keySet()) {
-                                blacklistedBookies.remove(new 
BookieSocketAddress(bookieAddress));
+                                
blacklistedBookies.remove(BookieId.parse(bookieAddress));
                             }
                         }
                     }
diff --git 
a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java
 
b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java
index b8057ea..e3f1a9c 100644
--- 
a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java
+++ 
b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java
@@ -39,6 +39,7 @@ import java.util.Set;
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.feature.SettableFeatureProvider;
+import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.ZkUtils;
@@ -64,8 +65,8 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
     private ZooKeeper localZkc;
 
     private final ObjectMapper jsonMapper = ObjectMapperFactory.create();
-    Set<BookieSocketAddress> writableBookies = new HashSet<>();
-    Set<BookieSocketAddress> readOnlyBookies = new HashSet<>();
+    Set<BookieId> writableBookies = new HashSet<>();
+    Set<BookieId> readOnlyBookies = new HashSet<>();
     List<String> isolationGroups = new ArrayList<>();
 
     HashedWheelTimer timer;
@@ -77,10 +78,10 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
         localZkS.start();
 
         localZkc = ZooKeeperClient.newBuilder().connectString("127.0.0.1" + 
":" + localZkS.getZookeeperPort()).build();
-        writableBookies.add(new BookieSocketAddress(BOOKIE1));
-        writableBookies.add(new BookieSocketAddress(BOOKIE2));
-        writableBookies.add(new BookieSocketAddress(BOOKIE3));
-        writableBookies.add(new BookieSocketAddress(BOOKIE4));
+        writableBookies.add(new BookieSocketAddress(BOOKIE1).toBookieId());
+        writableBookies.add(new BookieSocketAddress(BOOKIE2).toBookieId());
+        writableBookies.add(new BookieSocketAddress(BOOKIE3).toBookieId());
+        writableBookies.add(new BookieSocketAddress(BOOKIE4).toBookieId());
         isolationGroups.add("group1");
     }
 
@@ -116,16 +117,16 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
         bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new 
ZooKeeperCache("test", localZkc, 30) {
         });
         
bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
 isolationGroups);
-        isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, 
SettableFeatureProvider.DISABLE_ALL, NullStatsLogger.INSTANCE);
+        isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, 
SettableFeatureProvider.DISABLE_ALL, NullStatsLogger.INSTANCE, 
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
         isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);
 
-        List<BookieSocketAddress> ensemble = isolationPolicy.newEnsemble(3, 3, 
2, Collections.emptyMap(), new HashSet<>()).getResult();
-        assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE1)));
-        assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE2)));
-        assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE4)));
+        List<BookieId> ensemble = isolationPolicy.newEnsemble(3, 3, 2, 
Collections.emptyMap(), new HashSet<>()).getResult();
+        assertTrue(ensemble.contains(new 
BookieSocketAddress(BOOKIE1).toBookieId()));
+        assertTrue(ensemble.contains(new 
BookieSocketAddress(BOOKIE2).toBookieId()));
+        assertTrue(ensemble.contains(new 
BookieSocketAddress(BOOKIE4).toBookieId()));
 
         ensemble = isolationPolicy.newEnsemble(1, 1, 1, 
Collections.emptyMap(), new HashSet<>()).getResult();
-        assertFalse(ensemble.contains(new BookieSocketAddress(BOOKIE3)));
+        assertFalse(ensemble.contains(new 
BookieSocketAddress(BOOKIE3).toBookieId()));
 
         try {
             isolationPolicy.newEnsemble(4, 4, 4, Collections.emptyMap(), new 
HashSet<>());
@@ -134,11 +135,11 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
             // ok
         }
 
-        Set<BookieSocketAddress> bookieToExclude = new HashSet<>();
-        bookieToExclude.add(new BookieSocketAddress(BOOKIE1));
+        Set<BookieId> bookieToExclude = new HashSet<>();
+        bookieToExclude.add(new BookieSocketAddress(BOOKIE1).toBookieId());
         ensemble = isolationPolicy.newEnsemble(2, 2, 2, 
Collections.emptyMap(), bookieToExclude).getResult();
-        assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE4)));
-        assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE2)));
+        assertTrue(ensemble.contains(new 
BookieSocketAddress(BOOKIE4).toBookieId()));
+        assertTrue(ensemble.contains(new 
BookieSocketAddress(BOOKIE2).toBookieId()));
 
         secondaryBookieGroup.put(BOOKIE4, new BookieInfo("rack0", null));
         bookieMapping.put("group2", secondaryBookieGroup);
@@ -150,8 +151,8 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
 
         ensemble = isolationPolicy.newEnsemble(2, 2, 2, 
Collections.emptyMap(), null).getResult();
 
-        assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE1)));
-        assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE2)));
+        assertTrue(ensemble.contains(new 
BookieSocketAddress(BOOKIE1).toBookieId()));
+        assertTrue(ensemble.contains(new 
BookieSocketAddress(BOOKIE2).toBookieId()));
 
         try {
             isolationPolicy.newEnsemble(3, 3, 3, Collections.emptyMap(), new 
HashSet<>());
@@ -162,18 +163,18 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
 
         try {
             isolationPolicy.replaceBookie(3, 3, 3, Collections.emptyMap(), 
ensemble,
-                    new BookieSocketAddress(BOOKIE5), new HashSet<>());
+                    new BookieSocketAddress(BOOKIE5).toBookieId(), new 
HashSet<>());
             fail("should not pass");
         } catch (BKNotEnoughBookiesException e) {
             // ok
         }
         bookieToExclude = new HashSet<>();
-        bookieToExclude.add(new BookieSocketAddress(BOOKIE1));
+        bookieToExclude.add(new BookieSocketAddress(BOOKIE1).toBookieId());
 
         ensemble = isolationPolicy.newEnsemble(1, 1, 1, 
Collections.emptyMap(), bookieToExclude).getResult();
-        BookieSocketAddress chosenBookie = isolationPolicy.replaceBookie(1, 1, 
1, Collections.emptyMap(),
+        BookieId chosenBookie = isolationPolicy.replaceBookie(1, 1, 1, 
Collections.emptyMap(),
                 ensemble, ensemble.get(0), new HashSet<>()).getResult();
-        assertEquals(new BookieSocketAddress(BOOKIE1), chosenBookie);
+        assertEquals(new BookieSocketAddress(BOOKIE1).toBookieId(), 
chosenBookie);
 
         localZkc.delete(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, -1);
     }
@@ -185,7 +186,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
         bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new 
ZooKeeperCache("test", localZkc, 30) {
         });
         
bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
 isolationGroups);
-        isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, 
SettableFeatureProvider.DISABLE_ALL, NullStatsLogger.INSTANCE);
+        isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, 
SettableFeatureProvider.DISABLE_ALL, NullStatsLogger.INSTANCE, 
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
         isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);
 
         isolationPolicy.newEnsemble(4, 4, 4, Collections.emptyMap(), new 
HashSet<>());
@@ -201,9 +202,9 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
 
         Thread.sleep(100);
 
-        List<BookieSocketAddress> ensemble = isolationPolicy.newEnsemble(2, 2, 
2, Collections.emptyMap(), new HashSet<>()).getResult();
-        assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE1)));
-        assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE2)));
+        List<BookieId> ensemble = isolationPolicy.newEnsemble(2, 2, 2, 
Collections.emptyMap(), new HashSet<>()).getResult();
+        assertTrue(ensemble.contains(new 
BookieSocketAddress(BOOKIE1).toBookieId()));
+        assertTrue(ensemble.contains(new 
BookieSocketAddress(BOOKIE2).toBookieId()));
 
         try {
             isolationPolicy.newEnsemble(3, 3, 3, Collections.emptyMap(), new 
HashSet<>());
@@ -239,12 +240,12 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
         bkClientConf.setZkServers("127.0.0.1" + ":" + 
localZkS.getZookeeperPort());
         bkClientConf.setZkTimeout(1000);
         
bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
 isolationGroups);
-        isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, 
SettableFeatureProvider.DISABLE_ALL, NullStatsLogger.INSTANCE);
+        isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, 
SettableFeatureProvider.DISABLE_ALL, NullStatsLogger.INSTANCE, 
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
         isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);
 
-        List<BookieSocketAddress> ensemble = isolationPolicy.newEnsemble(2, 2, 
2, Collections.emptyMap(), new HashSet<>()).getResult();
-        assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE1)));
-        assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE2)));
+        List<BookieId> ensemble = isolationPolicy.newEnsemble(2, 2, 2, 
Collections.emptyMap(), new HashSet<>()).getResult();
+        assertTrue(ensemble.contains(new 
BookieSocketAddress(BOOKIE1).toBookieId()));
+        assertTrue(ensemble.contains(new 
BookieSocketAddress(BOOKIE2).toBookieId()));
 
         try {
             isolationPolicy.newEnsemble(3, 3, 3, Collections.emptyMap(), new 
HashSet<>());
@@ -265,9 +266,9 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
         Thread.sleep(100);
 
         ensemble = isolationPolicy.newEnsemble(3, 3, 3, 
Collections.emptyMap(), new HashSet<>()).getResult();
-        assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE1)));
-        assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE2)));
-        assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE3)));
+        assertTrue(ensemble.contains(new 
BookieSocketAddress(BOOKIE1).toBookieId()));
+        assertTrue(ensemble.contains(new 
BookieSocketAddress(BOOKIE2).toBookieId()));
+        assertTrue(ensemble.contains(new 
BookieSocketAddress(BOOKIE3).toBookieId()));
 
         localZkc.delete(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, -1);
 
@@ -294,7 +295,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
         bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new 
ZooKeeperCache("test", localZkc, 30) {
         });
         isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, 
SettableFeatureProvider.DISABLE_ALL,
-                NullStatsLogger.INSTANCE);
+                NullStatsLogger.INSTANCE, 
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
         isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);
 
         isolationPolicy.newEnsemble(4, 4, 4, Collections.emptyMap(), new 
HashSet<>());
@@ -342,14 +343,14 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
         });
         
bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
 isolatedGroup);
         isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, 
SettableFeatureProvider.DISABLE_ALL,
-                NullStatsLogger.INSTANCE);
+                NullStatsLogger.INSTANCE, 
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
         isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);
 
-        List<BookieSocketAddress> ensemble = isolationPolicy
+        List<BookieId> ensemble = isolationPolicy
                 .newEnsemble(3, 3, 2, Collections.emptyMap(), new 
HashSet<>()).getResult();
-        assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE1)));
-        assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE2)));
-        assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE4)));
+        assertTrue(ensemble.contains(new 
BookieSocketAddress(BOOKIE1).toBookieId()));
+        assertTrue(ensemble.contains(new 
BookieSocketAddress(BOOKIE2).toBookieId()));
+        assertTrue(ensemble.contains(new 
BookieSocketAddress(BOOKIE4).toBookieId()));
 
         localZkc.delete(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, -1);
     }
@@ -390,14 +391,14 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
         
bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
 isolatedGroup);
         
bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
 secondaryIsolatedGroup);
         isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, 
SettableFeatureProvider.DISABLE_ALL,
-                NullStatsLogger.INSTANCE);
+                NullStatsLogger.INSTANCE, 
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
         isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);
 
-        List<BookieSocketAddress> ensemble = isolationPolicy
+        List<BookieId> ensemble = isolationPolicy
                 .newEnsemble(3, 3, 2, Collections.emptyMap(), new 
HashSet<>()).getResult();
-        assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE1)));
-        assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE2)));
-        assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE4)));
+        assertTrue(ensemble.contains(new 
BookieSocketAddress(BOOKIE1).toBookieId()));
+        assertTrue(ensemble.contains(new 
BookieSocketAddress(BOOKIE2).toBookieId()));
+        assertTrue(ensemble.contains(new 
BookieSocketAddress(BOOKIE4).toBookieId()));
 
         localZkc.delete(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, -1);
     }
@@ -435,11 +436,11 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
         
bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
                 secondaryIsolatedGroup);
         isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, 
SettableFeatureProvider.DISABLE_ALL,
-                NullStatsLogger.INSTANCE);
+                NullStatsLogger.INSTANCE, 
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
         isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);
 
         try {
-            List<BookieSocketAddress> ensemble = isolationPolicy
+            isolationPolicy
                     .newEnsemble(3, 3, 2, Collections.emptyMap(), new 
HashSet<>()).getResult();
             Assert.fail("Should have thrown BKNotEnoughBookiesException");
         } catch (BKNotEnoughBookiesException ne) {
diff --git 
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
 
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
index 4692851..59d03ec 100644
--- 
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
+++ 
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
@@ -43,6 +43,7 @@ import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.client.api.WriteFlag;
 import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.versioning.LongVersion;
 import org.apache.bookkeeper.versioning.Versioned;
@@ -65,7 +66,7 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
 
     public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id,
                            DigestType digest, byte[] passwd) throws 
GeneralSecurityException {
-        super(bk.getClientCtx(), id, new Versioned<>(createMetadata(digest, 
passwd), new LongVersion(0L)),
+        super(bk.getClientCtx(), id, new Versioned<>(createMetadata(id, 
digest, passwd), new LongVersion(0L)),
               digest, passwd, WriteFlag.NONE);
         this.bk = bk;
         this.id = id;
@@ -253,14 +254,15 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
         return readHandle.readLastAddConfirmedAndEntryAsync(entryId, 
timeOutInMillis, parallel);
     }
 
-    private static LedgerMetadata createMetadata(DigestType digest, byte[] 
passwd) {
-        List<BookieSocketAddress> ensemble = Lists.newArrayList(
-                new BookieSocketAddress("192.0.2.1", 1234),
-                new BookieSocketAddress("192.0.2.2", 1234),
-                new BookieSocketAddress("192.0.2.3", 1234));
+    private static LedgerMetadata createMetadata(long id, DigestType digest, 
byte[] passwd) {
+        List<BookieId> ensemble = Lists.newArrayList(
+                new BookieSocketAddress("192.0.2.1", 1234).toBookieId(),
+                new BookieSocketAddress("192.0.2.2", 1234).toBookieId(),
+                new BookieSocketAddress("192.0.2.3", 1234).toBookieId());
         return LedgerMetadataBuilder.create()
             .withDigestType(digest.toApiDigestType())
             .withPassword(passwd)
+            .withId(id)
             .newEnsembleEntry(0L, ensemble)
             .build();
     }
diff --git 
a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
 
b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
index 64399d4..c035f7a 100644
--- 
a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
+++ 
b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
@@ -60,7 +60,7 @@ public class FileStoreBackedReadHandleImpl implements 
ReadHandle {
         try {
             key.set(FileSystemManagedLedgerOffloader.METADATA_KEY_INDEX);
             reader.get(key, value);
-            this.ledgerMetadata = parseLedgerMetadata(value.copyBytes());
+            this.ledgerMetadata = parseLedgerMetadata(ledgerId, 
value.copyBytes());
         } catch (IOException e) {
             log.error("Fail to read LedgerMetadata for ledgerId {}",
                     ledgerId);
diff --git 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java
 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java
index 0a4e90b..7194889 100644
--- 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java
+++ 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java
@@ -47,7 +47,7 @@ import org.slf4j.LoggerFactory;
 
 import static com.google.common.base.Preconditions.checkState;
 import static 
org.apache.bookkeeper.mledger.offload.OffloadUtils.buildLedgerMetadataFormat;
-import static 
org.apache.bookkeeper.mledger.offload.OffloadUtils.parseLedgerMetadata;
+import org.apache.bookkeeper.net.BookieId;
 
 public class OffloadIndexBlockImpl implements OffloadIndexBlock {
     private static final Logger log = 
LoggerFactory.getLogger(OffloadIndexBlockImpl.class);
@@ -188,9 +188,9 @@ public class OffloadIndexBlockImpl implements 
OffloadIndexBlock {
         private long ctime;
         private State state;
         private Map<String, byte[]> customMetadata = Maps.newHashMap();
-        private TreeMap<Long, ArrayList<BookieSocketAddress>> ensembles =
-                new TreeMap<Long, ArrayList<BookieSocketAddress>>();
-
+        private TreeMap<Long, ArrayList<BookieId>> ensembles =
+                new TreeMap<Long, ArrayList<BookieId>>();
+        
         InternalLedgerMetadata(LedgerMetadataFormat ledgerMetadataFormat) {
             this.ensembleSize = ledgerMetadataFormat.getEnsembleSize();
             this.writeQuorumSize = ledgerMetadataFormat.getQuorumSize();
@@ -208,11 +208,11 @@ public class OffloadIndexBlockImpl implements 
OffloadIndexBlock {
             }
 
             ledgerMetadataFormat.getSegmentList().forEach(segment -> {
-                ArrayList<BookieSocketAddress> addressArrayList = new 
ArrayList<BookieSocketAddress>();
+                ArrayList<BookieId> addressArrayList = new ArrayList<>();
                 segment.getEnsembleMemberList().forEach(address -> {
                     try {
-                        addressArrayList.add(new BookieSocketAddress(address));
-                    } catch (IOException e) {
+                        addressArrayList.add(BookieId.parse(address));
+                    } catch (IllegalArgumentException e) {
                         log.error("Exception when create BookieSocketAddress. 
", e);
                     }
                 });
@@ -221,6 +221,11 @@ public class OffloadIndexBlockImpl implements 
OffloadIndexBlock {
         }
 
         @Override
+        public long getLedgerId() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
         public int getEnsembleSize() {
             return this.ensembleSize;
         }
@@ -277,12 +282,12 @@ public class OffloadIndexBlockImpl implements 
OffloadIndexBlock {
         }
 
         @Override
-        public List<BookieSocketAddress> getEnsembleAt(long entryId) {
+        public List<BookieId> getEnsembleAt(long entryId) {
             return ensembles.get(ensembles.headMap(entryId + 1).lastKey());
         }
 
         @Override
-        public NavigableMap<Long, ? extends List<BookieSocketAddress>> 
getAllEnsembles() {
+        public NavigableMap<Long, ? extends List<BookieId>> getAllEnsembles() {
             return this.ensembles;
         }
 
diff --git 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexTest.java
 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexTest.java
index ad1bdc7..4d6f3e7 100644
--- 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexTest.java
+++ 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexTest.java
@@ -39,6 +39,7 @@ import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexEntry;
+import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.testng.annotations.Test;
 
@@ -81,21 +82,21 @@ public class OffloadIndexTest {
 //        }
 //    }
 
-    private LedgerMetadata createLedgerMetadata() throws Exception {
+    private LedgerMetadata createLedgerMetadata(long id) throws Exception {
 
         Map<String, byte[]> metadataCustom = Maps.newHashMap();
         metadataCustom.put("key1", "value1".getBytes(UTF_8));
         metadataCustom.put("key7", "value7".getBytes(UTF_8));
 
-        ArrayList<BookieSocketAddress> bookies = Lists.newArrayList();
-        bookies.add(0, new BookieSocketAddress("127.0.0.1:3181"));
-        bookies.add(1, new BookieSocketAddress("127.0.0.2:3181"));
-        bookies.add(2, new BookieSocketAddress("127.0.0.3:3181"));
+        ArrayList<BookieId> bookies = Lists.newArrayList();
+        bookies.add(0, new BookieSocketAddress("127.0.0.1:3181").toBookieId());
+        bookies.add(1, new BookieSocketAddress("127.0.0.2:3181").toBookieId());
+        bookies.add(2, new BookieSocketAddress("127.0.0.3:3181").toBookieId());
         
         return 
LedgerMetadataBuilder.create().withEnsembleSize(3).withWriteQuorumSize(3).withAckQuorumSize(2)
                 
.withDigestType(DigestType.CRC32C).withPassword("password".getBytes(UTF_8))
                 
.withCustomMetadata(metadataCustom).withClosedState().withLastEntryId(5000).withLength(100)
-                .newEnsembleEntry(0L, bookies).build();
+                .newEnsembleEntry(0L, bookies).withId(id).build();
 
     }
 
@@ -104,7 +105,7 @@ public class OffloadIndexTest {
     @Test
     public void offloadIndexBlockImplTest() throws Exception {
         OffloadIndexBlockBuilder blockBuilder = 
OffloadIndexBlockBuilder.create();
-        LedgerMetadata metadata = createLedgerMetadata();
+        LedgerMetadata metadata = createLedgerMetadata(1); // use dummy 
ledgerId, from BK 4.12 the ledger is is required
         log.debug("created metadata: {}", metadata.toString());
 
         
blockBuilder.withLedgerMetadata(metadata).withDataObjectLength(1).withDataBlockHeaderLength(23455);

Reply via email to