This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit adc04f6fe481980644b26131b9ba956a769d1f3b
Author: li jinquan <[email protected]>
AuthorDate: Wed Mar 24 16:55:48 2021 +0800

    [Pulsar-sql]Using pulsar SQL query messages will appear `NoSuchLedger… 
(#9910)
    
    * [Pulsar-sql]Using pulsar SQL query messages will appear `NoSuchLedger` 
when zk root directory changed (#2258)
    
    Fixes #2258
    
    *Motivation*
    
    When zookeeper ledgers root path is changed, using pulsar-sql to query 
messages will cause `BKNoSuchLedgerExistsException`.
    
    *Modifications*
    
    To use new DefaultBkFactory(clientConfiguration),so that zk will be null in 
Bookeeper constructor;(Bookeeper.java row 113)
    when metadataDriver will be initialized(Bookeeper.java row 167),zookeeper 
conection is null; we can jump to another  branch.If we have done the above 
steps,
    finally, zkServers will be localhost:2181 rather than localhost:2181/pulsar 
in row 168(ZKMetadataDriverBase.java); the path that we use to get ledger is 
localhost:2181/pulsar/ledger/00/0000/L0001 rather than 
localhost:2181/pulsar/pulsar/ledger/00/0000/L0001;
    
    * [Pulsar-sql]Using pulsar SQL query messages will appear `NoSuchLedger` 
when zk root directory changed (#2258)
    
    Fixes #2258
    
    *Motivation*
    
    When zookeeper ledgers root path is changed, using pulsar-sql to query 
messages will cause `BKNoSuchLedgerExistsException`.
    
    *Modifications*
    
    To use new DefaultBkFactory(clientConfiguration),so that zk will be null in 
Bookeeper constructor;(Bookeeper.java row 113)
    when metadataDriver will be initialized(Bookeeper.java row 167),zookeeper 
conection is null; we can jump to another  branch.If we have done the above 
steps,
    finally, zkServers will be localhost:2181 rather than localhost:2181/pulsar 
in row 168(ZKMetadataDriverBase.java); the path that we use to get ledger is 
localhost:2181/pulsar/ledger/00/0000/L0001 rather than 
localhost:2181/pulsar/pulsar/ledger/00/0000/L0001;
    
    * [Pulsar-sql]Using pulsar SQL query messages will appear `NoSuchLedger` 
when zk root directory changed (#2258)
    
    Fixes #2258
    
    *Motivation*
    
    When zookeeper ledgers root path is changed, using pulsar-sql to query 
messages will cause `BKNoSuchLedgerExistsException`.
    
    *Modifications*
    
    To use new DefaultBkFactory(clientConfiguration),so that zk will be null in 
Bookeeper constructor;(Bookeeper.java row 113)
    when metadataDriver will be initialized(Bookeeper.java row 167),zookeeper 
conection is null; we can jump to another  branch.If we have done the above 
steps,
    finally, zkServers will be localhost:2181 rather than localhost:2181/pulsar 
in row 168(ZKMetadataDriverBase.java); the path that we use to get ledger is 
localhost:2181/pulsar/ledger/00/0000/L0001 rather than 
localhost:2181/pulsar/pulsar/ledger/00/0000/L0001;
    
    Co-authored-by: [email protected] <1314520Ljq-->
    (cherry picked from commit b1ba8e5c87e86ab00bf9b8e854cca002e7e0fe0f)
---
 .../mledger/impl/ManagedLedgerFactoryImpl.java     |  2 +-
 .../ManagedLedgerFactoryChangeLedgerPathTest.java  | 36 ++++++++++++++++++++++
 .../pulsar/sql/presto/PulsarConnectorCache.java    |  2 +-
 3 files changed, 38 insertions(+), 2 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 7efb471..3354fab 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -145,7 +145,7 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
                 zkc, config, NullStatsLogger.INSTANCE);
     }
 
-    private ManagedLedgerFactoryImpl(ClientConfiguration clientConfiguration, 
String zkConnection, ManagedLedgerFactoryConfig config) throws Exception {
+    public ManagedLedgerFactoryImpl(ClientConfiguration clientConfiguration, 
String zkConnection, ManagedLedgerFactoryConfig config) throws Exception {
         this(new DefaultBkFactory(clientConfiguration),
             true,
             ZooKeeperClient.newBuilder()
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryChangeLedgerPathTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryChangeLedgerPathTest.java
index 1b596d7..c70f5bb 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryChangeLedgerPathTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryChangeLedgerPathTest.java
@@ -22,6 +22,7 @@ import java.util.List;
 import org.apache.bookkeeper.common.allocator.PoolingPolicy;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.*;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
@@ -75,4 +76,39 @@ public class ManagedLedgerFactoryChangeLedgerPathTest 
extends BookKeeperClusterT
         }
         factory.shutdown();
     }
+    @Test()
+    public void testChangeZKPath2() throws Exception {
+        ClientConfiguration configuration = new ClientConfiguration();
+        String zkConnectString = zkUtil.getZooKeeperConnectString() + "/test";
+        configuration.setMetadataServiceUri("zk://" + zkConnectString + 
"/ledgers");
+        configuration.setUseV2WireProtocol(true);
+        configuration.setEnableDigestTypeAutodetection(true);
+        configuration.setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap);
+
+        ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new 
ManagedLedgerFactoryConfig();
+        ManagedLedgerFactory factory = new 
ManagedLedgerFactoryImpl(configuration, 
zkConnectString,managedLedgerFactoryConfig);
+
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setEnsembleSize(1)
+                .setWriteQuorumSize(1)
+                .setAckQuorumSize(1)
+                .setMetadataAckQuorumSize(1)
+                .setMetadataAckQuorumSize(1);
+        ManagedLedger ledger = factory.open("test-ledger", config);
+        ManagedCursor cursor = ledger.openCursor("test-c1");
+
+        for (int i = 0; i < 10; i++) {
+            String entry = "entry" + i;
+            ledger.addEntry(entry.getBytes("UTF8"));
+        }
+
+        List<Entry> entryList = cursor.readEntries(10);
+        Assert.assertEquals(10, entryList.size());
+
+        for (int i = 0; i < 10; i++) {
+            Entry entry = entryList.get(i);
+            Assert.assertEquals(("entry" + i).getBytes("UTF8"), 
entry.getData());
+        }
+        factory.shutdown();
+    }
 }
diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
index ec0e3b3..1530c79 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
@@ -112,7 +112,7 @@ public class PulsarConnectorCache {
                 pulsarConnectorConfig.getManagedLedgerNumWorkerThreads());
         managedLedgerFactoryConfig.setNumManagedLedgerSchedulerThreads(
                 pulsarConnectorConfig.getManagedLedgerNumSchedulerThreads());
-        return new ManagedLedgerFactoryImpl(bkClientConfiguration, 
managedLedgerFactoryConfig);
+        return new ManagedLedgerFactoryImpl(bkClientConfiguration, 
pulsarConnectorConfig.getZookeeperUri(),managedLedgerFactoryConfig);
     }
 
     public ManagedLedgerConfig getManagedLedgerConfig(NamespaceName 
namespaceName, OffloadPolicies offloadPolicies,

Reply via email to