This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3b5de44 [pulsar-package-management] Support to access a separated
bookkeeper cluster (#13089)
3b5de44 is described below
commit 3b5de44ac1daf87997516277f74981004fff573d
Author: Fangbin Sun <[email protected]>
AuthorDate: Wed Dec 8 23:16:25 2021 +0800
[pulsar-package-management] Support to access a separated bookkeeper
cluster (#13089)
### Motivation
Currently, the bookkeeper client used by can not be initialized correctly
when using a separated bookkeeper cluster via `bookkeeperMetadataServiceUri`
whose zookkeeper connection may be different with pulsar zookkeeper servers.
e.g.
```
17:08:07.661 [ForkJoinPool.commonPool-worker-43] WARN
org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl - Failed to
find 1 bookies : excludeBookies [], allBookies [].
17:08:07.662 [ForkJoinPool.commonPool-worker-43] ERROR
org.apache.bookkeeper.client.LedgerCreateOp - Not enough bookies to create
ledger with ensembleSize=1, writeQuorumSize=1 and ackQuorumSize=1
17:08:07.662 [ForkJoinPool.commonPool-worker-43] ERROR
org.apache.distributedlog.bk.SimpleLedgerAllocator - Error creating ledger for
allocating
/kobe/pulsar/packages/function/public/default/redis/v1.0/meta/<default>/allocation
:
org.apache.bookkeeper.client.BKException$BKNotEnoughBookiesException: Not
enough non-faulty bookies available
at
org.apache.bookkeeper.client.BKException.create(BKException.java:72)
~[org.apache.bookkeeper-bookkeeper-server-4.14.3.jar:4.14.3]
at
org.apache.distributedlog.BookKeeperClient$1.createComplete(BookKeeperClient.java:223)
~[org.apache.distributedlog-distributedlog-core-4.14.3.jar:4.14.3]
at
org.apache.bookkeeper.client.LedgerCreateOp.createComplete(LedgerCreateOp.java:272)
~[org.apache.bookkeeper-bookkeeper-server-4.14.3.jar:4.14.3]
at
org.apache.bookkeeper.client.LedgerCreateOp.initiate(LedgerCreateOp.java:166)
~[org.apache.bookkeeper-bookkeeper-server-4.14.3.jar:4.14.3]
```
### Modifications
Use the correct `BKDLConfig` from property `bookkeeperMetadataServiceUri`.
---
.../bookkeeper/BookKeeperPackagesStorage.java | 15 +++++-
.../bookkeeper/BookKeeperPackagesStorageTest.java | 54 ++++++++++++++++++++++
2 files changed, 67 insertions(+), 2 deletions(-)
diff --git
a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java
b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java
index f0db593..3da1384 100644
---
a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java
+++
b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.DistributedLogConstants;
import org.apache.distributedlog.api.DistributedLogManager;
@@ -82,8 +83,18 @@ public class BookKeeperPackagesStorage implements
PackagesStorage {
}
private URI initializeDlogNamespace() throws IOException {
- BKDLConfig bkdlConfig = new
BKDLConfig(configuration.getZookeeperServers(),
- configuration.getPackagesManagementLedgerRootPath());
+ String bookkeeperMetadataServiceUri =
configuration.getProperty("bookkeeperMetadataServiceUri");
+ String ledgersRootPath;
+ String ledgersStoreServers;
+ if (StringUtils.isNotBlank(bookkeeperMetadataServiceUri)) {
+ URI metadataServiceUri = URI.create(bookkeeperMetadataServiceUri);
+ ledgersStoreServers =
metadataServiceUri.getAuthority().replace(";", ",");
+ ledgersRootPath = metadataServiceUri.getPath();
+ } else {
+ ledgersRootPath =
configuration.getPackagesManagementLedgerRootPath();
+ ledgersStoreServers = configuration.getZookeeperServers();
+ }
+ BKDLConfig bkdlConfig = new BKDLConfig(ledgersStoreServers,
ledgersRootPath);
DLMetadata dlMetadata = DLMetadata.create(bkdlConfig);
URI dlogURI =
URI.create(String.format("distributedlog://%s/pulsar/packages",
configuration.getZookeeperServers()));
diff --git
a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java
b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java
index 339bafc..6931207 100644
---
a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java
+++
b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java
@@ -192,4 +192,58 @@ public class BookKeeperPackagesStorageTest extends
BookKeeperClusterTestCase {
assertTrue(exist);
}
+ @Test(timeOut = 60000)
+ public void testReadWriteOperationsWithSeparatedBkCluster() throws
Exception {
+ PackagesStorageProvider provider = PackagesStorageProvider
+
.newProvider(BookKeeperPackagesStorageProvider.class.getName());
+ DefaultPackagesStorageConfiguration configuration = new
DefaultPackagesStorageConfiguration();
+ // set the unavailable bk cluster with mock zookeeper path
+ configuration.setProperty("zookeeperServers",
zkUtil.getZooKeeperConnectString() + "/mock");
+ configuration.setProperty("packagesReplicas", "1");
+ configuration.setProperty("packagesManagementLedgerRootPath",
"/ledgers");
+ PackagesStorage storage1 = provider.getStorage(configuration);
+ storage1.initialize();
+
+ String mockData = "mock-data";
+ ByteArrayInputStream mockDataStream = new
ByteArrayInputStream(mockData.getBytes(StandardCharsets.UTF_8));
+ String mockPath = "mock-path";
+
+ // write some data to the dlog will fail
+ try {
+ storage1.writeAsync(mockPath, mockDataStream).get();
+ } catch (Exception e) {
+ String errMsg = e.getCause().getMessage();
+ assertTrue(errMsg.contains("Error on allocating ledger") ||
errMsg.contains("Write rejected"));
+ } finally {
+ storage1.closeAsync().get();
+ }
+
+ // set the available bk cluster with bookkeeperMetadataServiceUri
using actual zookeeper path
+ String bookkeeperMetadataServiceUri =
String.format("zk+null://%s/ledgers", zkUtil.getZooKeeperConnectString());
+ DefaultPackagesStorageConfiguration configuration2 = new
DefaultPackagesStorageConfiguration();
+ configuration2.setProperty("zookeeperServers",
zkUtil.getZooKeeperConnectString());
+ configuration2.setProperty("bookkeeperMetadataServiceUri",
bookkeeperMetadataServiceUri);
+ configuration2.setProperty("packagesReplicas", "1");
+ PackagesStorage storage2 = provider.getStorage(configuration2);
+ storage2.initialize();
+
+ String testData = "test-data";
+ ByteArrayInputStream testDataStream = new
ByteArrayInputStream(testData.getBytes(StandardCharsets.UTF_8));
+ String testPath = "test-path";
+
+ // write some data to the dlog will success
+ try {
+ storage2.writeAsync(testPath, testDataStream).get();
+
+ // read the data from the dlog
+ ByteArrayOutputStream readData = new ByteArrayOutputStream();
+ storage2.readAsync(testPath, readData).get();
+ String readResult = new String(readData.toByteArray(),
StandardCharsets.UTF_8);
+
+ assertEquals(testData, readResult);
+ } finally {
+ storage2.closeAsync().get();
+ }
+ }
+
}