This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1907afe Upgrade BookKeeper to 4.12.0 (#8447)
1907afe is described below
commit 1907afe788187f4ed23853c714af6102b8643b05
Author: Enrico Olivelli <[email protected]>
AuthorDate: Mon Nov 16 14:21:29 2020 +0100
Upgrade BookKeeper to 4.12.0 (#8447)
Upgrade Apache BookKeeper to 4.12.0
Most notable changes that impact this patch are:
- BP-41 -> "BookieSocketAddress" becomes "BookieId"
- BP-42 -> LedgerMetadata now carries "ledgerId" (as a transient non
serialized value)
---
distribution/server/src/assemble/LICENSE.bin.txt | 54 +++++++------
.../bookkeeper/mledger/offload/OffloadUtils.java | 16 ++--
.../mledger/impl/OffloadPrefixReadTest.java | 13 ++-
pom.xml | 2 +-
.../broker/service/BrokerBookieIsolationTest.java | 40 +++++-----
.../pulsar/broker/service/RackAwareTest.java | 19 +----
pulsar-sql/presto-distribution/LICENSE | 23 +++---
.../zookeeper/ZkBookieRackAffinityMapping.java | 9 +--
.../ZkIsolatedBookieEnsemblePlacementPolicy.java | 37 ++++-----
...kIsolatedBookieEnsemblePlacementPolicyTest.java | 93 +++++++++++-----------
.../bookkeeper/client/PulsarMockLedgerHandle.java | 14 ++--
.../impl/FileStoreBackedReadHandleImpl.java | 2 +-
.../offload/jcloud/impl/OffloadIndexBlockImpl.java | 23 +++---
.../offload/jcloud/impl/OffloadIndexTest.java | 15 ++--
14 files changed, 182 insertions(+), 178 deletions(-)
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt
b/distribution/server/src/assemble/LICENSE.bin.txt
index a0ffde0..bb6ef44 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -334,6 +334,8 @@ The Apache Software License, Version 2.0
- io.sundr-sundr-core-0.21.0.jar
* Guava
- com.google.guava-guava-25.1-jre.jar
+ - com.google.guava-failureaccess-1.0.1.jar
+ -
com.google.guava-listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar
* J2ObjC Annotations -- com.google.j2objc-j2objc-annotations-1.1.jar
* Netty Reactive Streams --
com.typesafe.netty-netty-reactive-streams-2.0.4.jar
* Swagger
@@ -392,32 +394,32 @@ The Apache Software License, Version 2.0
- org.apache.logging.log4j-log4j-web-2.10.0.jar
* Java Native Access JNA -- net.java.dev.jna-jna-4.2.0.jar
* BookKeeper
- - org.apache.bookkeeper-bookkeeper-common-4.11.1.jar
- - org.apache.bookkeeper-bookkeeper-common-allocator-4.11.1.jar
- - org.apache.bookkeeper-bookkeeper-proto-4.11.1.jar
- - org.apache.bookkeeper-bookkeeper-server-4.11.1.jar
- - org.apache.bookkeeper-bookkeeper-tools-framework-4.11.1.jar
- - org.apache.bookkeeper-circe-checksum-4.11.1.jar
- - org.apache.bookkeeper-cpu-affinity-4.11.1.jar
- - org.apache.bookkeeper-statelib-4.11.1.jar
- - org.apache.bookkeeper-stream-storage-api-4.11.1.jar
- - org.apache.bookkeeper-stream-storage-common-4.11.1.jar
- - org.apache.bookkeeper-stream-storage-java-client-4.11.1.jar
- - org.apache.bookkeeper-stream-storage-java-client-base-4.11.1.jar
- - org.apache.bookkeeper-stream-storage-proto-4.11.1.jar
- - org.apache.bookkeeper-stream-storage-server-4.11.1.jar
- - org.apache.bookkeeper-stream-storage-service-api-4.11.1.jar
- - org.apache.bookkeeper-stream-storage-service-impl-4.11.1.jar
- - org.apache.bookkeeper.http-http-server-4.11.1.jar
- - org.apache.bookkeeper.http-vertx-http-server-4.11.1.jar
- - org.apache.bookkeeper.stats-bookkeeper-stats-api-4.11.1.jar
- - org.apache.bookkeeper.stats-prometheus-metrics-provider-4.11.1.jar
- - org.apache.bookkeeper.tests-stream-storage-tests-common-4.11.1.jar
- - org.apache.distributedlog-distributedlog-common-4.11.1.jar
- - org.apache.distributedlog-distributedlog-core-4.11.1-tests.jar
- - org.apache.distributedlog-distributedlog-core-4.11.1.jar
- - org.apache.distributedlog-distributedlog-protocol-4.11.1.jar
- - org.apache.bookkeeper.stats-codahale-metrics-provider-4.11.1.jar
+ - org.apache.bookkeeper-bookkeeper-common-4.12.0.jar
+ - org.apache.bookkeeper-bookkeeper-common-allocator-4.12.0.jar
+ - org.apache.bookkeeper-bookkeeper-proto-4.12.0.jar
+ - org.apache.bookkeeper-bookkeeper-server-4.12.0.jar
+ - org.apache.bookkeeper-bookkeeper-tools-framework-4.12.0.jar
+ - org.apache.bookkeeper-circe-checksum-4.12.0.jar
+ - org.apache.bookkeeper-cpu-affinity-4.12.0.jar
+ - org.apache.bookkeeper-statelib-4.12.0.jar
+ - org.apache.bookkeeper-stream-storage-api-4.12.0.jar
+ - org.apache.bookkeeper-stream-storage-common-4.12.0.jar
+ - org.apache.bookkeeper-stream-storage-java-client-4.12.0.jar
+ - org.apache.bookkeeper-stream-storage-java-client-base-4.12.0.jar
+ - org.apache.bookkeeper-stream-storage-proto-4.12.0.jar
+ - org.apache.bookkeeper-stream-storage-server-4.12.0.jar
+ - org.apache.bookkeeper-stream-storage-service-api-4.12.0.jar
+ - org.apache.bookkeeper-stream-storage-service-impl-4.12.0.jar
+ - org.apache.bookkeeper.http-http-server-4.12.0.jar
+ - org.apache.bookkeeper.http-vertx-http-server-4.12.0.jar
+ - org.apache.bookkeeper.stats-bookkeeper-stats-api-4.12.0.jar
+ - org.apache.bookkeeper.stats-prometheus-metrics-provider-4.12.0.jar
+ - org.apache.bookkeeper.tests-stream-storage-tests-common-4.12.0.jar
+ - org.apache.distributedlog-distributedlog-common-4.12.0.jar
+ - org.apache.distributedlog-distributedlog-core-4.12.0-tests.jar
+ - org.apache.distributedlog-distributedlog-core-4.12.0.jar
+ - org.apache.distributedlog-distributedlog-protocol-4.12.0.jar
+ - org.apache.bookkeeper.stats-codahale-metrics-provider-4.12.0.jar
* Apache HTTP Client
- org.apache.httpcomponents-httpclient-4.5.5.jar
- org.apache.httpcomponents-httpcore-4.4.9.jar
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java
index 5adbb5a..83514a9 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java
@@ -36,6 +36,7 @@ import
org.apache.bookkeeper.mledger.proto.MLDataFormats.KeyValue;
import
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadContext;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadDriverMetadata;
+import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.DataFormats;
@@ -120,21 +121,22 @@ public final class OffloadUtils {
.setKey(e.getKey()).setValue(ByteString.copyFrom(e.getValue()));
}
- for (Map.Entry<Long, ? extends List<BookieSocketAddress>> e :
metadata.getAllEnsembles().entrySet()) {
+ for (Map.Entry<Long, ? extends List<BookieId>> e :
metadata.getAllEnsembles().entrySet()) {
builder.addSegmentBuilder()
.setFirstEntryId(e.getKey())
-
.addAllEnsembleMember(e.getValue().stream().map(BookieSocketAddress::toString).collect(Collectors.toList()));
+
.addAllEnsembleMember(e.getValue().stream().map(BookieId::toString).collect(Collectors.toList()));
}
return builder.build().toByteArray();
}
- public static LedgerMetadata parseLedgerMetadata(byte[] bytes) throws
IOException {
+ public static LedgerMetadata parseLedgerMetadata(long id, byte[] bytes)
throws IOException {
DataFormats.LedgerMetadataFormat ledgerMetadataFormat =
DataFormats.LedgerMetadataFormat.newBuilder().mergeFrom(bytes).build();
LedgerMetadataBuilder builder = LedgerMetadataBuilder.create()
.withLastEntryId(ledgerMetadataFormat.getLastEntryId())
.withPassword(ledgerMetadataFormat.getPassword().toByteArray())
.withClosedState()
+ .withId(id)
.withMetadataFormatVersion(2)
.withLength(ledgerMetadataFormat.getLength())
.withAckQuorumSize(ledgerMetadataFormat.getAckQuorumSize())
@@ -142,12 +144,12 @@ public final class OffloadUtils {
.withWriteQuorumSize(ledgerMetadataFormat.getQuorumSize())
.withEnsembleSize(ledgerMetadataFormat.getEnsembleSize());
ledgerMetadataFormat.getSegmentList().forEach(segment -> {
- ArrayList<BookieSocketAddress> addressArrayList = new
ArrayList<>();
+ ArrayList<BookieId> addressArrayList = new ArrayList<>();
segment.getEnsembleMemberList().forEach(address -> {
try {
- addressArrayList.add(new BookieSocketAddress(address));
- } catch (IOException e) {
- log.error("Exception when create BookieSocketAddress. ",
e);
+ addressArrayList.add(BookieId.parse(address));
+ } catch (IllegalArgumentException e) {
+ log.error("Exception when create BookieId {}. ", address,
e);
}
});
builder.newEnsembleEntry(segment.getFirstEntryId(),
addressArrayList);
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
index 0a96a1f..d3d24b2 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
@@ -52,6 +52,7 @@ import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
@@ -254,8 +255,9 @@ public class OffloadPrefixReadTest extends
MockedBookKeeperTestCase {
private final State state;
private final byte[] password;
private final Map<String, byte[]> customMetadata;
-
+ private final long ledgerId;
MockMetadata(LedgerMetadata toCopy) {
+ ledgerId = toCopy.getLedgerId();
ensembleSize = toCopy.getEnsembleSize();
writeQuorumSize = toCopy.getWriteQuorumSize();
ackQuorumSize = toCopy.getAckQuorumSize();
@@ -271,6 +273,11 @@ public class OffloadPrefixReadTest extends
MockedBookKeeperTestCase {
}
@Override
+ public long getLedgerId() {
+ return ledgerId;
+ }
+
+ @Override
public boolean hasPassword() { return true; }
@Override
@@ -315,12 +322,12 @@ public class OffloadPrefixReadTest extends
MockedBookKeeperTestCase {
public Map<String, byte[]> getCustomMetadata() { return
customMetadata; }
@Override
- public List<BookieSocketAddress> getEnsembleAt(long entryId) {
+ public List<BookieId> getEnsembleAt(long entryId) {
throw new UnsupportedOperationException("Pulsar shouldn't look at
this");
}
@Override
- public NavigableMap<Long, ? extends List<BookieSocketAddress>>
getAllEnsembles() {
+ public NavigableMap<Long, ? extends List<BookieId>> getAllEnsembles() {
throw new UnsupportedOperationException("Pulsar shouldn't look at
this");
}
diff --git a/pom.xml b/pom.xml
index 7ef22c8..f99fa86 100644
--- a/pom.xml
+++ b/pom.xml
@@ -98,7 +98,7 @@ flexible messaging model and an intuitive client
API.</description>
<!-- apache commons -->
<commons-compress.version>1.19</commons-compress.version>
- <bookkeeper.version>4.11.1</bookkeeper.version>
+ <bookkeeper.version>4.12.0</bookkeeper.version>
<zookeeper.version>3.5.7</zookeeper.version>
<netty.version>4.1.51.Final</netty.version>
<netty-tc-native.version>2.0.30.Final</netty-tc-native.version>
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
index fecd26e..bb0ae00 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
@@ -43,7 +43,7 @@ import org.apache.bookkeeper.meta.LedgerManager;
import
org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.EnsemblePlacementPolicyConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
-import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.pulsar.broker.ManagedLedgerClientFactory;
@@ -133,10 +133,10 @@ public class BrokerBookieIsolationTest {
BookieServer[] bookies = bkEnsemble.getBookies();
ZooKeeper zkClient = bkEnsemble.getZkClient();
- Set<BookieSocketAddress> defaultBookies =
Sets.newHashSet(bookies[0].getLocalAddress(),
- bookies[1].getLocalAddress());
- Set<BookieSocketAddress> isolatedBookies =
Sets.newHashSet(bookies[2].getLocalAddress(),
- bookies[3].getLocalAddress());
+ Set<BookieId> defaultBookies =
Sets.newHashSet(bookies[0].getBookieId(),
+ bookies[1].getBookieId());
+ Set<BookieId> isolatedBookies =
Sets.newHashSet(bookies[2].getBookieId(),
+ bookies[3].getBookieId());
setDefaultIsolationGroup(brokerBookkeeperClientIsolationGroups,
zkClient, defaultBookies);
setDefaultIsolationGroup(tenantNamespaceIsolationGroups, zkClient,
isolatedBookies);
@@ -271,12 +271,12 @@ public class BrokerBookieIsolationTest {
BookieServer[] bookies = bkEnsemble.getBookies();
ZooKeeper zkClient = bkEnsemble.getZkClient();
- Set<BookieSocketAddress> defaultBookies =
Sets.newHashSet(bookies[0].getLocalAddress(),
- bookies[1].getLocalAddress());
- Set<BookieSocketAddress> isolatedBookies =
Sets.newHashSet(bookies[2].getLocalAddress(),
- bookies[3].getLocalAddress());
- Set<BookieSocketAddress> downedBookies = Sets.newHashSet(new
BookieSocketAddress("1.1.1.1:1111"),
- new BookieSocketAddress("1.1.1.1:1112"));
+ Set<BookieId> defaultBookies =
Sets.newHashSet(bookies[0].getBookieId(),
+ bookies[1].getBookieId());
+ Set<BookieId> isolatedBookies =
Sets.newHashSet(bookies[2].getBookieId(),
+ bookies[3].getBookieId());
+ Set<BookieId> downedBookies =
Sets.newHashSet(BookieId.parse("1.1.1.1:1111"),
+ BookieId.parse("1.1.1.1:1112"));
setDefaultIsolationGroup(brokerBookkeeperClientIsolationGroups,
zkClient, defaultBookies);
// primary group empty
@@ -395,10 +395,10 @@ public class BrokerBookieIsolationTest {
BookieServer[] bookies = bkEnsemble.getBookies();
ZooKeeper zkClient = bkEnsemble.getZkClient();
- Set<BookieSocketAddress> defaultBookies =
Sets.newHashSet(bookies[0].getLocalAddress(),
- bookies[1].getLocalAddress());
- Set<BookieSocketAddress> isolatedBookies =
Sets.newHashSet(bookies[2].getLocalAddress(),
- bookies[3].getLocalAddress());
+ Set<BookieId> defaultBookies =
Sets.newHashSet(bookies[0].getBookieId(),
+ bookies[1].getBookieId());
+ Set<BookieId> isolatedBookies =
Sets.newHashSet(bookies[2].getBookieId(),
+ bookies[3].getBookieId());
setDefaultIsolationGroup(brokerBookkeeperClientIsolationGroups,
zkClient, defaultBookies);
// primary group empty
@@ -460,12 +460,12 @@ public class BrokerBookieIsolationTest {
}
private void assertAffinityBookies(LedgerManager ledgerManager,
List<LedgerInfo> ledgers1,
- Set<BookieSocketAddress> defaultBookies) throws Exception {
+ Set<BookieId> defaultBookies) throws Exception {
for (LedgerInfo lInfo : ledgers1) {
long ledgerId = lInfo.getLedgerId();
CompletableFuture<Versioned<LedgerMetadata>> ledgerMetaFuture =
ledgerManager.readLedgerMetadata(ledgerId);
LedgerMetadata ledgerMetadata = ledgerMetaFuture.get().getValue();
- Set<BookieSocketAddress> ledgerBookies = Sets.newHashSet();
+ Set<BookieId> ledgerBookies = Sets.newHashSet();
ledgerBookies.addAll(ledgerMetadata.getAllEnsembles().values().iterator().next());
assertEquals(ledgerBookies.size(), defaultBookies.size());
ledgerBookies.removeAll(defaultBookies);
@@ -493,7 +493,7 @@ public class BrokerBookieIsolationTest {
}
private void setDefaultIsolationGroup(String
brokerBookkeeperClientIsolationGroups, ZooKeeper zkClient,
- Set<BookieSocketAddress> bookieAddresses) throws Exception {
+ Set<BookieId> bookieAddresses) throws Exception {
BookiesRackConfiguration bookies = null;
try {
byte[] data =
zkClient.getData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, false,
null);
@@ -509,8 +509,8 @@ public class BrokerBookieIsolationTest {
}
Map<String, BookieInfo> bookieInfoMap = Maps.newHashMap();
- for (BookieSocketAddress bkSocket : bookieAddresses) {
- BookieInfo info = new BookieInfo("use", bkSocket.getHostName() +
":" + bkSocket.getPort());
+ for (BookieId bkSocket : bookieAddresses) {
+ BookieInfo info = new BookieInfo("use", bkSocket.toString());
bookieInfoMap.put(bkSocket.toString(), info);
}
bookies.put(brokerBookkeeperClientIsolationGroups, bookieInfoMap);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java
index 4632392..8d9ea4e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java
@@ -30,7 +30,7 @@ import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.discover.BookieServiceInfo;
-import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.pulsar.common.policies.data.BookieInfo;
@@ -108,7 +108,7 @@ public class RackAwareTest extends BkEnsemblesTestBase {
BookKeeper bkc = this.pulsar.getBookKeeperClient();
// Create few ledgers and verify all of them should have a copy in the
first bookie
- BookieSocketAddress fistBookie = bookies.get(0).getLocalAddress();
+ BookieId fistBookie = bookies.get(0).getBookieId();
for (int i = 0; i < 100; i++) {
LedgerHandle lh = bkc.createLedger(2, 2, DigestType.DUMMY, new
byte[0]);
log.info("Ledger: {} -- Ensemble: {}", i,
lh.getLedgerMetadata().getEnsembleAt(0));
@@ -118,20 +118,5 @@ public class RackAwareTest extends BkEnsemblesTestBase {
}
}
- @Test(enabled = false)
- public void testCrashBrokerWithoutCursorLedgerLeak() throws Exception {
- // Ignore test
- }
-
- @Test(enabled = false)
- public void testSkipCorruptDataLedger() throws Exception {
- // Ignore test
- }
-
- @Test(enabled = false)
- public void testTopicWithWildCardChar() throws Exception {
- // Ignore test
- }
-
private static final Logger log =
LoggerFactory.getLogger(RackAwareTest.class);
}
diff --git a/pulsar-sql/presto-distribution/LICENSE
b/pulsar-sql/presto-distribution/LICENSE
index b1475ee..80c48c0 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -408,17 +408,18 @@ The Apache Software License, Version 2.0
- async-http-client-2.12.1.jar
- async-http-client-netty-utils-2.12.1.jar
* Apache Bookkeeper
- - bookkeeper-common-4.11.1.jar
- - bookkeeper-common-allocator-4.11.1.jar
- - bookkeeper-proto-4.11.1.jar
- - bookkeeper-server-4.11.1.jar
- - bookkeeper-stats-api-4.11.1.jar
- - bookkeeper-tools-framework-4.11.1.jar
- - circe-checksum-4.11.1.jar
- - codahale-metrics-provider-4.11.1.jar
- - cpu-affinity-4.11.1.jar
- - http-server-4.11.1.jar
- - prometheus-metrics-provider-4.11.1.jar
+ - bookkeeper-common-4.12.0.jar
+ - bookkeeper-common-allocator-4.12.0.jar
+ - bookkeeper-proto-4.12.0.jar
+ - bookkeeper-server-4.12.0.jar
+ - bookkeeper-stats-api-4.12.0.jar
+ - bookkeeper-tools-framework-4.12.0.jar
+ - circe-checksum-4.12.0.jar
+ - codahale-metrics-provider-4.12.0jar
+ - cpu-affinity-4.12.0.jar
+ - http-server-4.12.0.jar
+ - prometheus-metrics-provider-4.12.0.jar
+ - codahale-metrics-provider-4.12.0.jar
* Apache Commons
- commons-cli-1.2.jar
- commons-codec-1.10.jar
diff --git
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java
index 93400e9..97e48c8 100644
---
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java
+++
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java
@@ -33,6 +33,7 @@ import
org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy;
import org.apache.bookkeeper.client.RackChangeNotifier;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.AbstractDNSToSwitchMapping;
+import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieNode;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
@@ -218,14 +219,10 @@ public class ZkBookieRackAffinityMapping extends
AbstractDNSToSwitchMapping
public void onUpdate(String path, BookiesRackConfiguration data, Stat
stat) {
if (rackawarePolicy != null) {
LOG.info("Bookie rack info updated to {}. Notifying rackaware
policy.", data.toString());
- List<BookieSocketAddress> bookieAddressList = new ArrayList<>();
+ List<BookieId> bookieAddressList = new ArrayList<>();
for (Map<String, BookieInfo> bookieMapping : data.values()) {
for (String addr : bookieMapping.keySet()) {
- try {
- bookieAddressList.add(new BookieSocketAddress(addr));
- } catch (UnknownHostException e) {
- throw new RuntimeException(e);
- }
+ bookieAddressList.add(BookieId.parse(addr));
}
}
rackawarePolicy.onBookieRackChange(bookieAddressList);
diff --git
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
index f508e64..43d727a 100644
---
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
+++
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
@@ -31,7 +31,6 @@ import
org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.feature.FeatureProvider;
-import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
@@ -48,6 +47,8 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.util.HashedWheelTimer;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.proto.BookieAddressResolver;
public class ZkIsolatedBookieEnsemblePlacementPolicy extends
RackawareEnsemblePlacementPolicy
implements Deserializer<BookiesRackConfiguration> {
@@ -69,7 +70,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends
RackawareEnsemblePl
@Override
public RackawareEnsemblePlacementPolicyImpl initialize(ClientConfiguration
conf,
Optional<DNSToSwitchMapping> optionalDnsResolver, HashedWheelTimer
timer, FeatureProvider featureProvider,
- StatsLogger statsLogger) {
+ StatsLogger statsLogger, BookieAddressResolver
bookieAddressResolver) {
if (conf.getProperty(ISOLATION_BOOKIE_GROUPS) != null) {
String isolationGroupsString =
castToString(conf.getProperty(ISOLATION_BOOKIE_GROUPS));
if (!isolationGroupsString.isEmpty()) {
@@ -87,7 +88,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends
RackawareEnsemblePl
}
}
}
- return super.initialize(conf, optionalDnsResolver, timer,
featureProvider, statsLogger);
+ return super.initialize(conf, optionalDnsResolver, timer,
featureProvider, statsLogger, bookieAddressResolver);
}
private String castToString(Object obj) {
@@ -130,33 +131,33 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy
extends RackawareEnsemblePl
}
@Override
- public PlacementResult<List<BookieSocketAddress>> newEnsemble(int
ensembleSize, int writeQuorumSize, int ackQuorumSize,
- Map<String, byte[]> customMetadata, Set<BookieSocketAddress>
excludeBookies)
+ public PlacementResult<List<BookieId>> newEnsemble(int ensembleSize, int
writeQuorumSize, int ackQuorumSize,
+ Map<String, byte[]> customMetadata, Set<BookieId> excludeBookies)
throws BKNotEnoughBookiesException {
- Set<BookieSocketAddress> blacklistedBookies =
getBlacklistedBookies(ensembleSize);
+ Set<BookieId> blacklistedBookies = getBlacklistedBookies(ensembleSize);
if (excludeBookies == null) {
- excludeBookies = new HashSet<BookieSocketAddress>();
+ excludeBookies = new HashSet<BookieId>();
}
excludeBookies.addAll(blacklistedBookies);
return super.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize,
customMetadata, excludeBookies);
}
@Override
- public PlacementResult<BookieSocketAddress> replaceBookie(int
ensembleSize, int writeQuorumSize, int ackQuorumSize,
- Map<String, byte[]> customMetadata, List<BookieSocketAddress>
currentEnsemble,
- BookieSocketAddress bookieToReplace, Set<BookieSocketAddress>
excludeBookies)
+ public PlacementResult<BookieId> replaceBookie(int ensembleSize, int
writeQuorumSize, int ackQuorumSize,
+ Map<String, byte[]> customMetadata, List<BookieId> currentEnsemble,
+ BookieId bookieToReplace, Set<BookieId> excludeBookies)
throws BKNotEnoughBookiesException {
- Set<BookieSocketAddress> blacklistedBookies =
getBlacklistedBookies(ensembleSize);
+ Set<BookieId> blacklistedBookies = getBlacklistedBookies(ensembleSize);
if (excludeBookies == null) {
- excludeBookies = new HashSet<BookieSocketAddress>();
+ excludeBookies = new HashSet<BookieId>();
}
excludeBookies.addAll(blacklistedBookies);
return super.replaceBookie(ensembleSize, writeQuorumSize,
ackQuorumSize, customMetadata, currentEnsemble,
bookieToReplace, excludeBookies);
}
- private Set<BookieSocketAddress> getBlacklistedBookies(int ensembleSize) {
- Set<BookieSocketAddress> blacklistedBookies = new
HashSet<BookieSocketAddress>();
+ private Set<BookieId> getBlacklistedBookies(int ensembleSize) {
+ Set<BookieId> blacklistedBookies = new HashSet<BookieId>();
try {
if (bookieMappingCache != null) {
BookiesRackConfiguration allGroupsBookieMapping =
bookieMappingCache
@@ -169,12 +170,12 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy
extends RackawareEnsemblePl
Set<String> bookiesInGroup =
allGroupsBookieMapping.get(group).keySet();
if (!primaryIsolationGroups.contains(group)) {
for (String bookieAddress : bookiesInGroup) {
- blacklistedBookies.add(new
BookieSocketAddress(bookieAddress));
+
blacklistedBookies.add(BookieId.parse(bookieAddress));
}
} else {
for (String groupBookie : bookiesInGroup) {
totalAvailableBookiesInPrimaryGroup += knownBookies
- .containsKey(new
BookieSocketAddress(groupBookie)) ? 1 : 0;
+ .containsKey(BookieId.parse(groupBookie))
? 1 : 0;
}
}
}
@@ -186,7 +187,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy
extends RackawareEnsemblePl
Map<String, BookieInfo> bookieGroup =
allGroupsBookieMapping.get(group);
if (bookieGroup != null && !bookieGroup.isEmpty()) {
for (String bookieAddress : bookieGroup.keySet()) {
- blacklistedBookies.remove(new
BookieSocketAddress(bookieAddress));
+
blacklistedBookies.remove(BookieId.parse(bookieAddress));
}
}
}
@@ -199,7 +200,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy
extends RackawareEnsemblePl
Map<String, BookieInfo> bookieGroup =
allGroupsBookieMapping.get(group);
if (bookieGroup != null && !bookieGroup.isEmpty()) {
for (String bookieAddress : bookieGroup.keySet()) {
- blacklistedBookies.remove(new
BookieSocketAddress(bookieAddress));
+
blacklistedBookies.remove(BookieId.parse(bookieAddress));
}
}
}
diff --git
a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java
b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java
index b8057ea..e3f1a9c 100644
---
a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java
+++
b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java
@@ -39,6 +39,7 @@ import java.util.Set;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.feature.SettableFeatureProvider;
+import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.ZkUtils;
@@ -64,8 +65,8 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
private ZooKeeper localZkc;
private final ObjectMapper jsonMapper = ObjectMapperFactory.create();
- Set<BookieSocketAddress> writableBookies = new HashSet<>();
- Set<BookieSocketAddress> readOnlyBookies = new HashSet<>();
+ Set<BookieId> writableBookies = new HashSet<>();
+ Set<BookieId> readOnlyBookies = new HashSet<>();
List<String> isolationGroups = new ArrayList<>();
HashedWheelTimer timer;
@@ -77,10 +78,10 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
localZkS.start();
localZkc = ZooKeeperClient.newBuilder().connectString("127.0.0.1" +
":" + localZkS.getZookeeperPort()).build();
- writableBookies.add(new BookieSocketAddress(BOOKIE1));
- writableBookies.add(new BookieSocketAddress(BOOKIE2));
- writableBookies.add(new BookieSocketAddress(BOOKIE3));
- writableBookies.add(new BookieSocketAddress(BOOKIE4));
+ writableBookies.add(new BookieSocketAddress(BOOKIE1).toBookieId());
+ writableBookies.add(new BookieSocketAddress(BOOKIE2).toBookieId());
+ writableBookies.add(new BookieSocketAddress(BOOKIE3).toBookieId());
+ writableBookies.add(new BookieSocketAddress(BOOKIE4).toBookieId());
isolationGroups.add("group1");
}
@@ -116,16 +117,16 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new
ZooKeeperCache("test", localZkc, 30) {
});
bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
isolationGroups);
- isolationPolicy.initialize(bkClientConf, Optional.empty(), timer,
SettableFeatureProvider.DISABLE_ALL, NullStatsLogger.INSTANCE);
+ isolationPolicy.initialize(bkClientConf, Optional.empty(), timer,
SettableFeatureProvider.DISABLE_ALL, NullStatsLogger.INSTANCE,
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);
- List<BookieSocketAddress> ensemble = isolationPolicy.newEnsemble(3, 3,
2, Collections.emptyMap(), new HashSet<>()).getResult();
- assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE1)));
- assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE2)));
- assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE4)));
+ List<BookieId> ensemble = isolationPolicy.newEnsemble(3, 3, 2,
Collections.emptyMap(), new HashSet<>()).getResult();
+ assertTrue(ensemble.contains(new
BookieSocketAddress(BOOKIE1).toBookieId()));
+ assertTrue(ensemble.contains(new
BookieSocketAddress(BOOKIE2).toBookieId()));
+ assertTrue(ensemble.contains(new
BookieSocketAddress(BOOKIE4).toBookieId()));
ensemble = isolationPolicy.newEnsemble(1, 1, 1,
Collections.emptyMap(), new HashSet<>()).getResult();
- assertFalse(ensemble.contains(new BookieSocketAddress(BOOKIE3)));
+ assertFalse(ensemble.contains(new
BookieSocketAddress(BOOKIE3).toBookieId()));
try {
isolationPolicy.newEnsemble(4, 4, 4, Collections.emptyMap(), new
HashSet<>());
@@ -134,11 +135,11 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
// ok
}
- Set<BookieSocketAddress> bookieToExclude = new HashSet<>();
- bookieToExclude.add(new BookieSocketAddress(BOOKIE1));
+ Set<BookieId> bookieToExclude = new HashSet<>();
+ bookieToExclude.add(new BookieSocketAddress(BOOKIE1).toBookieId());
ensemble = isolationPolicy.newEnsemble(2, 2, 2,
Collections.emptyMap(), bookieToExclude).getResult();
- assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE4)));
- assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE2)));
+ assertTrue(ensemble.contains(new
BookieSocketAddress(BOOKIE4).toBookieId()));
+ assertTrue(ensemble.contains(new
BookieSocketAddress(BOOKIE2).toBookieId()));
secondaryBookieGroup.put(BOOKIE4, new BookieInfo("rack0", null));
bookieMapping.put("group2", secondaryBookieGroup);
@@ -150,8 +151,8 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
ensemble = isolationPolicy.newEnsemble(2, 2, 2,
Collections.emptyMap(), null).getResult();
- assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE1)));
- assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE2)));
+ assertTrue(ensemble.contains(new
BookieSocketAddress(BOOKIE1).toBookieId()));
+ assertTrue(ensemble.contains(new
BookieSocketAddress(BOOKIE2).toBookieId()));
try {
isolationPolicy.newEnsemble(3, 3, 3, Collections.emptyMap(), new
HashSet<>());
@@ -162,18 +163,18 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
try {
isolationPolicy.replaceBookie(3, 3, 3, Collections.emptyMap(),
ensemble,
- new BookieSocketAddress(BOOKIE5), new HashSet<>());
+ new BookieSocketAddress(BOOKIE5).toBookieId(), new
HashSet<>());
fail("should not pass");
} catch (BKNotEnoughBookiesException e) {
// ok
}
bookieToExclude = new HashSet<>();
- bookieToExclude.add(new BookieSocketAddress(BOOKIE1));
+ bookieToExclude.add(new BookieSocketAddress(BOOKIE1).toBookieId());
ensemble = isolationPolicy.newEnsemble(1, 1, 1,
Collections.emptyMap(), bookieToExclude).getResult();
- BookieSocketAddress chosenBookie = isolationPolicy.replaceBookie(1, 1,
1, Collections.emptyMap(),
+ BookieId chosenBookie = isolationPolicy.replaceBookie(1, 1, 1,
Collections.emptyMap(),
ensemble, ensemble.get(0), new HashSet<>()).getResult();
- assertEquals(new BookieSocketAddress(BOOKIE1), chosenBookie);
+ assertEquals(new BookieSocketAddress(BOOKIE1).toBookieId(),
chosenBookie);
localZkc.delete(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, -1);
}
@@ -185,7 +186,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new
ZooKeeperCache("test", localZkc, 30) {
});
bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
isolationGroups);
- isolationPolicy.initialize(bkClientConf, Optional.empty(), timer,
SettableFeatureProvider.DISABLE_ALL, NullStatsLogger.INSTANCE);
+ isolationPolicy.initialize(bkClientConf, Optional.empty(), timer,
SettableFeatureProvider.DISABLE_ALL, NullStatsLogger.INSTANCE,
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);
isolationPolicy.newEnsemble(4, 4, 4, Collections.emptyMap(), new
HashSet<>());
@@ -201,9 +202,9 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
Thread.sleep(100);
- List<BookieSocketAddress> ensemble = isolationPolicy.newEnsemble(2, 2,
2, Collections.emptyMap(), new HashSet<>()).getResult();
- assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE1)));
- assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE2)));
+ List<BookieId> ensemble = isolationPolicy.newEnsemble(2, 2, 2,
Collections.emptyMap(), new HashSet<>()).getResult();
+ assertTrue(ensemble.contains(new
BookieSocketAddress(BOOKIE1).toBookieId()));
+ assertTrue(ensemble.contains(new
BookieSocketAddress(BOOKIE2).toBookieId()));
try {
isolationPolicy.newEnsemble(3, 3, 3, Collections.emptyMap(), new
HashSet<>());
@@ -239,12 +240,12 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
bkClientConf.setZkServers("127.0.0.1" + ":" +
localZkS.getZookeeperPort());
bkClientConf.setZkTimeout(1000);
bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
isolationGroups);
- isolationPolicy.initialize(bkClientConf, Optional.empty(), timer,
SettableFeatureProvider.DISABLE_ALL, NullStatsLogger.INSTANCE);
+ isolationPolicy.initialize(bkClientConf, Optional.empty(), timer,
SettableFeatureProvider.DISABLE_ALL, NullStatsLogger.INSTANCE,
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);
- List<BookieSocketAddress> ensemble = isolationPolicy.newEnsemble(2, 2,
2, Collections.emptyMap(), new HashSet<>()).getResult();
- assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE1)));
- assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE2)));
+ List<BookieId> ensemble = isolationPolicy.newEnsemble(2, 2, 2,
Collections.emptyMap(), new HashSet<>()).getResult();
+ assertTrue(ensemble.contains(new
BookieSocketAddress(BOOKIE1).toBookieId()));
+ assertTrue(ensemble.contains(new
BookieSocketAddress(BOOKIE2).toBookieId()));
try {
isolationPolicy.newEnsemble(3, 3, 3, Collections.emptyMap(), new
HashSet<>());
@@ -265,9 +266,9 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
Thread.sleep(100);
ensemble = isolationPolicy.newEnsemble(3, 3, 3,
Collections.emptyMap(), new HashSet<>()).getResult();
- assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE1)));
- assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE2)));
- assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE3)));
+ assertTrue(ensemble.contains(new
BookieSocketAddress(BOOKIE1).toBookieId()));
+ assertTrue(ensemble.contains(new
BookieSocketAddress(BOOKIE2).toBookieId()));
+ assertTrue(ensemble.contains(new
BookieSocketAddress(BOOKIE3).toBookieId()));
localZkc.delete(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, -1);
@@ -294,7 +295,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new
ZooKeeperCache("test", localZkc, 30) {
});
isolationPolicy.initialize(bkClientConf, Optional.empty(), timer,
SettableFeatureProvider.DISABLE_ALL,
- NullStatsLogger.INSTANCE);
+ NullStatsLogger.INSTANCE,
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);
isolationPolicy.newEnsemble(4, 4, 4, Collections.emptyMap(), new
HashSet<>());
@@ -342,14 +343,14 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
});
bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
isolatedGroup);
isolationPolicy.initialize(bkClientConf, Optional.empty(), timer,
SettableFeatureProvider.DISABLE_ALL,
- NullStatsLogger.INSTANCE);
+ NullStatsLogger.INSTANCE,
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);
- List<BookieSocketAddress> ensemble = isolationPolicy
+ List<BookieId> ensemble = isolationPolicy
.newEnsemble(3, 3, 2, Collections.emptyMap(), new
HashSet<>()).getResult();
- assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE1)));
- assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE2)));
- assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE4)));
+ assertTrue(ensemble.contains(new
BookieSocketAddress(BOOKIE1).toBookieId()));
+ assertTrue(ensemble.contains(new
BookieSocketAddress(BOOKIE2).toBookieId()));
+ assertTrue(ensemble.contains(new
BookieSocketAddress(BOOKIE4).toBookieId()));
localZkc.delete(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, -1);
}
@@ -390,14 +391,14 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
isolatedGroup);
bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
secondaryIsolatedGroup);
isolationPolicy.initialize(bkClientConf, Optional.empty(), timer,
SettableFeatureProvider.DISABLE_ALL,
- NullStatsLogger.INSTANCE);
+ NullStatsLogger.INSTANCE,
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);
- List<BookieSocketAddress> ensemble = isolationPolicy
+ List<BookieId> ensemble = isolationPolicy
.newEnsemble(3, 3, 2, Collections.emptyMap(), new
HashSet<>()).getResult();
- assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE1)));
- assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE2)));
- assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE4)));
+ assertTrue(ensemble.contains(new
BookieSocketAddress(BOOKIE1).toBookieId()));
+ assertTrue(ensemble.contains(new
BookieSocketAddress(BOOKIE2).toBookieId()));
+ assertTrue(ensemble.contains(new
BookieSocketAddress(BOOKIE4).toBookieId()));
localZkc.delete(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, -1);
}
@@ -435,11 +436,11 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
secondaryIsolatedGroup);
isolationPolicy.initialize(bkClientConf, Optional.empty(), timer,
SettableFeatureProvider.DISABLE_ALL,
- NullStatsLogger.INSTANCE);
+ NullStatsLogger.INSTANCE,
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);
try {
- List<BookieSocketAddress> ensemble = isolationPolicy
+ isolationPolicy
.newEnsemble(3, 3, 2, Collections.emptyMap(), new
HashSet<>()).getResult();
Assert.fail("Should have thrown BKNotEnoughBookiesException");
} catch (BKNotEnoughBookiesException ne) {
diff --git
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
index 4692851..59d03ec 100644
---
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
+++
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
@@ -43,6 +43,7 @@ import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Versioned;
@@ -65,7 +66,7 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id,
DigestType digest, byte[] passwd) throws
GeneralSecurityException {
- super(bk.getClientCtx(), id, new Versioned<>(createMetadata(digest,
passwd), new LongVersion(0L)),
+ super(bk.getClientCtx(), id, new Versioned<>(createMetadata(id,
digest, passwd), new LongVersion(0L)),
digest, passwd, WriteFlag.NONE);
this.bk = bk;
this.id = id;
@@ -253,14 +254,15 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
return readHandle.readLastAddConfirmedAndEntryAsync(entryId,
timeOutInMillis, parallel);
}
- private static LedgerMetadata createMetadata(DigestType digest, byte[]
passwd) {
- List<BookieSocketAddress> ensemble = Lists.newArrayList(
- new BookieSocketAddress("192.0.2.1", 1234),
- new BookieSocketAddress("192.0.2.2", 1234),
- new BookieSocketAddress("192.0.2.3", 1234));
+ private static LedgerMetadata createMetadata(long id, DigestType digest,
byte[] passwd) {
+ List<BookieId> ensemble = Lists.newArrayList(
+ new BookieSocketAddress("192.0.2.1", 1234).toBookieId(),
+ new BookieSocketAddress("192.0.2.2", 1234).toBookieId(),
+ new BookieSocketAddress("192.0.2.3", 1234).toBookieId());
return LedgerMetadataBuilder.create()
.withDigestType(digest.toApiDigestType())
.withPassword(passwd)
+ .withId(id)
.newEnsembleEntry(0L, ensemble)
.build();
}
diff --git
a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
index 64399d4..c035f7a 100644
---
a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
+++
b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
@@ -60,7 +60,7 @@ public class FileStoreBackedReadHandleImpl implements
ReadHandle {
try {
key.set(FileSystemManagedLedgerOffloader.METADATA_KEY_INDEX);
reader.get(key, value);
- this.ledgerMetadata = parseLedgerMetadata(value.copyBytes());
+ this.ledgerMetadata = parseLedgerMetadata(ledgerId,
value.copyBytes());
} catch (IOException e) {
log.error("Fail to read LedgerMetadata for ledgerId {}",
ledgerId);
diff --git
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java
index 0a4e90b..7194889 100644
---
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java
+++
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java
@@ -47,7 +47,7 @@ import org.slf4j.LoggerFactory;
import static com.google.common.base.Preconditions.checkState;
import static
org.apache.bookkeeper.mledger.offload.OffloadUtils.buildLedgerMetadataFormat;
-import static
org.apache.bookkeeper.mledger.offload.OffloadUtils.parseLedgerMetadata;
+import org.apache.bookkeeper.net.BookieId;
public class OffloadIndexBlockImpl implements OffloadIndexBlock {
private static final Logger log =
LoggerFactory.getLogger(OffloadIndexBlockImpl.class);
@@ -188,9 +188,9 @@ public class OffloadIndexBlockImpl implements
OffloadIndexBlock {
private long ctime;
private State state;
private Map<String, byte[]> customMetadata = Maps.newHashMap();
- private TreeMap<Long, ArrayList<BookieSocketAddress>> ensembles =
- new TreeMap<Long, ArrayList<BookieSocketAddress>>();
-
+ private TreeMap<Long, ArrayList<BookieId>> ensembles =
+ new TreeMap<Long, ArrayList<BookieId>>();
+
InternalLedgerMetadata(LedgerMetadataFormat ledgerMetadataFormat) {
this.ensembleSize = ledgerMetadataFormat.getEnsembleSize();
this.writeQuorumSize = ledgerMetadataFormat.getQuorumSize();
@@ -208,11 +208,11 @@ public class OffloadIndexBlockImpl implements
OffloadIndexBlock {
}
ledgerMetadataFormat.getSegmentList().forEach(segment -> {
- ArrayList<BookieSocketAddress> addressArrayList = new
ArrayList<BookieSocketAddress>();
+ ArrayList<BookieId> addressArrayList = new ArrayList<>();
segment.getEnsembleMemberList().forEach(address -> {
try {
- addressArrayList.add(new BookieSocketAddress(address));
- } catch (IOException e) {
+ addressArrayList.add(BookieId.parse(address));
+ } catch (IllegalArgumentException e) {
log.error("Exception when create BookieSocketAddress.
", e);
}
});
@@ -221,6 +221,11 @@ public class OffloadIndexBlockImpl implements
OffloadIndexBlock {
}
@Override
+ public long getLedgerId() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public int getEnsembleSize() {
return this.ensembleSize;
}
@@ -277,12 +282,12 @@ public class OffloadIndexBlockImpl implements
OffloadIndexBlock {
}
@Override
- public List<BookieSocketAddress> getEnsembleAt(long entryId) {
+ public List<BookieId> getEnsembleAt(long entryId) {
return ensembles.get(ensembles.headMap(entryId + 1).lastKey());
}
@Override
- public NavigableMap<Long, ? extends List<BookieSocketAddress>>
getAllEnsembles() {
+ public NavigableMap<Long, ? extends List<BookieId>> getAllEnsembles() {
return this.ensembles;
}
diff --git
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexTest.java
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexTest.java
index ad1bdc7..4d6f3e7 100644
---
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexTest.java
+++
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexTest.java
@@ -39,6 +39,7 @@ import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexEntry;
+import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.testng.annotations.Test;
@@ -81,21 +82,21 @@ public class OffloadIndexTest {
// }
// }
- private LedgerMetadata createLedgerMetadata() throws Exception {
+ private LedgerMetadata createLedgerMetadata(long id) throws Exception {
Map<String, byte[]> metadataCustom = Maps.newHashMap();
metadataCustom.put("key1", "value1".getBytes(UTF_8));
metadataCustom.put("key7", "value7".getBytes(UTF_8));
- ArrayList<BookieSocketAddress> bookies = Lists.newArrayList();
- bookies.add(0, new BookieSocketAddress("127.0.0.1:3181"));
- bookies.add(1, new BookieSocketAddress("127.0.0.2:3181"));
- bookies.add(2, new BookieSocketAddress("127.0.0.3:3181"));
+ ArrayList<BookieId> bookies = Lists.newArrayList();
+ bookies.add(0, new BookieSocketAddress("127.0.0.1:3181").toBookieId());
+ bookies.add(1, new BookieSocketAddress("127.0.0.2:3181").toBookieId());
+ bookies.add(2, new BookieSocketAddress("127.0.0.3:3181").toBookieId());
return
LedgerMetadataBuilder.create().withEnsembleSize(3).withWriteQuorumSize(3).withAckQuorumSize(2)
.withDigestType(DigestType.CRC32C).withPassword("password".getBytes(UTF_8))
.withCustomMetadata(metadataCustom).withClosedState().withLastEntryId(5000).withLength(100)
- .newEnsembleEntry(0L, bookies).build();
+ .newEnsembleEntry(0L, bookies).withId(id).build();
}
@@ -104,7 +105,7 @@ public class OffloadIndexTest {
@Test
public void offloadIndexBlockImplTest() throws Exception {
OffloadIndexBlockBuilder blockBuilder =
OffloadIndexBlockBuilder.create();
- LedgerMetadata metadata = createLedgerMetadata();
+ LedgerMetadata metadata = createLedgerMetadata(1); // use dummy
ledgerId, from BK 4.12 the ledger is is required
log.debug("created metadata: {}", metadata.toString());
blockBuilder.withLedgerMetadata(metadata).withDataObjectLength(1).withDataBlockHeaderLength(23455);