This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b5de44  [pulsar-package-management] Support to access a separated 
bookkeeper cluster (#13089)
3b5de44 is described below

commit 3b5de44ac1daf87997516277f74981004fff573d
Author: Fangbin Sun <[email protected]>
AuthorDate: Wed Dec 8 23:16:25 2021 +0800

    [pulsar-package-management] Support to access a separated bookkeeper 
cluster (#13089)
    
    ### Motivation
    
    Currently, the bookkeeper client used by can not be initialized correctly 
when using a separated bookkeeper cluster via `bookkeeperMetadataServiceUri` 
whose zookkeeper connection may be different with pulsar zookkeeper servers.  
e.g.
    ```
    17:08:07.661 [ForkJoinPool.commonPool-worker-43] WARN  
org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl - Failed to 
find 1 bookies : excludeBookies [], allBookies [].
    17:08:07.662 [ForkJoinPool.commonPool-worker-43] ERROR 
org.apache.bookkeeper.client.LedgerCreateOp - Not enough bookies to create 
ledger with ensembleSize=1, writeQuorumSize=1 and ackQuorumSize=1
    17:08:07.662 [ForkJoinPool.commonPool-worker-43] ERROR 
org.apache.distributedlog.bk.SimpleLedgerAllocator - Error creating ledger for 
allocating 
/kobe/pulsar/packages/function/public/default/redis/v1.0/meta/<default>/allocation
 :
    org.apache.bookkeeper.client.BKException$BKNotEnoughBookiesException: Not 
enough non-faulty bookies available
            at 
org.apache.bookkeeper.client.BKException.create(BKException.java:72) 
~[org.apache.bookkeeper-bookkeeper-server-4.14.3.jar:4.14.3]
            at 
org.apache.distributedlog.BookKeeperClient$1.createComplete(BookKeeperClient.java:223)
 ~[org.apache.distributedlog-distributedlog-core-4.14.3.jar:4.14.3]
            at 
org.apache.bookkeeper.client.LedgerCreateOp.createComplete(LedgerCreateOp.java:272)
 ~[org.apache.bookkeeper-bookkeeper-server-4.14.3.jar:4.14.3]
            at 
org.apache.bookkeeper.client.LedgerCreateOp.initiate(LedgerCreateOp.java:166) 
~[org.apache.bookkeeper-bookkeeper-server-4.14.3.jar:4.14.3]
    ```
    ### Modifications
    
    Use the correct `BKDLConfig` from property `bookkeeperMetadataServiceUri`.
---
 .../bookkeeper/BookKeeperPackagesStorage.java      | 15 +++++-
 .../bookkeeper/BookKeeperPackagesStorageTest.java  | 54 ++++++++++++++++++++++
 2 files changed, 67 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java
 
b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java
index f0db593..3da1384 100644
--- 
a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java
+++ 
b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.DistributedLogConstants;
 import org.apache.distributedlog.api.DistributedLogManager;
@@ -82,8 +83,18 @@ public class BookKeeperPackagesStorage implements 
PackagesStorage {
     }
 
     private URI initializeDlogNamespace() throws IOException {
-        BKDLConfig bkdlConfig = new 
BKDLConfig(configuration.getZookeeperServers(),
-            configuration.getPackagesManagementLedgerRootPath());
+        String bookkeeperMetadataServiceUri = 
configuration.getProperty("bookkeeperMetadataServiceUri");
+        String ledgersRootPath;
+        String ledgersStoreServers;
+        if (StringUtils.isNotBlank(bookkeeperMetadataServiceUri)) {
+            URI metadataServiceUri = URI.create(bookkeeperMetadataServiceUri);
+            ledgersStoreServers = 
metadataServiceUri.getAuthority().replace(";", ",");
+            ledgersRootPath = metadataServiceUri.getPath();
+        } else {
+            ledgersRootPath = 
configuration.getPackagesManagementLedgerRootPath();
+            ledgersStoreServers = configuration.getZookeeperServers();
+        }
+        BKDLConfig bkdlConfig = new BKDLConfig(ledgersStoreServers, 
ledgersRootPath);
         DLMetadata dlMetadata = DLMetadata.create(bkdlConfig);
         URI dlogURI = 
URI.create(String.format("distributedlog://%s/pulsar/packages",
             configuration.getZookeeperServers()));
diff --git 
a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java
 
b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java
index 339bafc..6931207 100644
--- 
a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java
+++ 
b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java
@@ -192,4 +192,58 @@ public class BookKeeperPackagesStorageTest extends 
BookKeeperClusterTestCase {
         assertTrue(exist);
     }
 
+    @Test(timeOut = 60000)
+    public void testReadWriteOperationsWithSeparatedBkCluster() throws 
Exception {
+        PackagesStorageProvider provider = PackagesStorageProvider
+                
.newProvider(BookKeeperPackagesStorageProvider.class.getName());
+        DefaultPackagesStorageConfiguration configuration = new 
DefaultPackagesStorageConfiguration();
+        // set the unavailable bk cluster with mock zookeeper path
+        configuration.setProperty("zookeeperServers", 
zkUtil.getZooKeeperConnectString() + "/mock");
+        configuration.setProperty("packagesReplicas", "1");
+        configuration.setProperty("packagesManagementLedgerRootPath", 
"/ledgers");
+        PackagesStorage storage1 = provider.getStorage(configuration);
+        storage1.initialize();
+
+        String mockData = "mock-data";
+        ByteArrayInputStream mockDataStream = new 
ByteArrayInputStream(mockData.getBytes(StandardCharsets.UTF_8));
+        String mockPath = "mock-path";
+
+        // write some data to the dlog will fail
+        try {
+            storage1.writeAsync(mockPath, mockDataStream).get();
+        } catch (Exception e) {
+            String errMsg = e.getCause().getMessage();
+            assertTrue(errMsg.contains("Error on allocating ledger") || 
errMsg.contains("Write rejected"));
+        } finally {
+            storage1.closeAsync().get();
+        }
+
+        // set the available bk cluster with bookkeeperMetadataServiceUri 
using actual zookeeper path
+        String bookkeeperMetadataServiceUri = 
String.format("zk+null://%s/ledgers", zkUtil.getZooKeeperConnectString());
+        DefaultPackagesStorageConfiguration configuration2 = new 
DefaultPackagesStorageConfiguration();
+        configuration2.setProperty("zookeeperServers", 
zkUtil.getZooKeeperConnectString());
+        configuration2.setProperty("bookkeeperMetadataServiceUri", 
bookkeeperMetadataServiceUri);
+        configuration2.setProperty("packagesReplicas", "1");
+        PackagesStorage storage2 = provider.getStorage(configuration2);
+        storage2.initialize();
+
+        String testData = "test-data";
+        ByteArrayInputStream testDataStream = new 
ByteArrayInputStream(testData.getBytes(StandardCharsets.UTF_8));
+        String testPath = "test-path";
+
+        // write some data to the dlog will success
+        try {
+            storage2.writeAsync(testPath, testDataStream).get();
+
+            // read the data from the dlog
+            ByteArrayOutputStream readData = new ByteArrayOutputStream();
+            storage2.readAsync(testPath, readData).get();
+            String readResult = new String(readData.toByteArray(), 
StandardCharsets.UTF_8);
+
+            assertEquals(testData, readResult);
+        } finally {
+            storage2.closeAsync().get();
+        }
+    }
+
 }

Reply via email to