This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit adc04f6fe481980644b26131b9ba956a769d1f3b Author: li jinquan <[email protected]> AuthorDate: Wed Mar 24 16:55:48 2021 +0800 [Pulsar-sql]Using pulsar SQL query messages will appear `NoSuchLedger… (#9910) * [Pulsar-sql]Using pulsar SQL query messages will appear `NoSuchLedger` when zk root directory changed (#2258) Fixes #2258 *Motivation* When zookeeper ledgers root path is changed, using pulsar-sql to query messages will cause `BKNoSuchLedgerExistsException`. *Modifications* To use new DefaultBkFactory(clientConfiguration),so that zk will be null in Bookeeper constructor;(Bookeeper.java row 113) when metadataDriver will be initialized(Bookeeper.java row 167),zookeeper conection is null; we can jump to another branch.If we have done the above steps, finally, zkServers will be localhost:2181 rather than localhost:2181/pulsar in row 168(ZKMetadataDriverBase.java); the path that we use to get ledger is localhost:2181/pulsar/ledger/00/0000/L0001 rather than localhost:2181/pulsar/pulsar/ledger/00/0000/L0001; * [Pulsar-sql]Using pulsar SQL query messages will appear `NoSuchLedger` when zk root directory changed (#2258) Fixes #2258 *Motivation* When zookeeper ledgers root path is changed, using pulsar-sql to query messages will cause `BKNoSuchLedgerExistsException`. *Modifications* To use new DefaultBkFactory(clientConfiguration),so that zk will be null in Bookeeper constructor;(Bookeeper.java row 113) when metadataDriver will be initialized(Bookeeper.java row 167),zookeeper conection is null; we can jump to another branch.If we have done the above steps, finally, zkServers will be localhost:2181 rather than localhost:2181/pulsar in row 168(ZKMetadataDriverBase.java); the path that we use to get ledger is localhost:2181/pulsar/ledger/00/0000/L0001 rather than localhost:2181/pulsar/pulsar/ledger/00/0000/L0001; * [Pulsar-sql]Using pulsar SQL query messages will appear `NoSuchLedger` when zk root directory changed (#2258) Fixes #2258 *Motivation* When zookeeper ledgers root path is changed, using pulsar-sql to query messages will cause `BKNoSuchLedgerExistsException`. *Modifications* To use new DefaultBkFactory(clientConfiguration),so that zk will be null in Bookeeper constructor;(Bookeeper.java row 113) when metadataDriver will be initialized(Bookeeper.java row 167),zookeeper conection is null; we can jump to another branch.If we have done the above steps, finally, zkServers will be localhost:2181 rather than localhost:2181/pulsar in row 168(ZKMetadataDriverBase.java); the path that we use to get ledger is localhost:2181/pulsar/ledger/00/0000/L0001 rather than localhost:2181/pulsar/pulsar/ledger/00/0000/L0001; Co-authored-by: [email protected] <1314520Ljq--> (cherry picked from commit b1ba8e5c87e86ab00bf9b8e854cca002e7e0fe0f) --- .../mledger/impl/ManagedLedgerFactoryImpl.java | 2 +- .../ManagedLedgerFactoryChangeLedgerPathTest.java | 36 ++++++++++++++++++++++ .../pulsar/sql/presto/PulsarConnectorCache.java | 2 +- 3 files changed, 38 insertions(+), 2 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index 7efb471..3354fab 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -145,7 +145,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { zkc, config, NullStatsLogger.INSTANCE); } - private ManagedLedgerFactoryImpl(ClientConfiguration clientConfiguration, String zkConnection, ManagedLedgerFactoryConfig config) throws Exception { + public ManagedLedgerFactoryImpl(ClientConfiguration clientConfiguration, String zkConnection, ManagedLedgerFactoryConfig config) throws Exception { this(new DefaultBkFactory(clientConfiguration), true, ZooKeeperClient.newBuilder() diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryChangeLedgerPathTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryChangeLedgerPathTest.java index 1b596d7..c70f5bb 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryChangeLedgerPathTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryChangeLedgerPathTest.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.bookkeeper.common.allocator.PoolingPolicy; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.*; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; @@ -75,4 +76,39 @@ public class ManagedLedgerFactoryChangeLedgerPathTest extends BookKeeperClusterT } factory.shutdown(); } + @Test() + public void testChangeZKPath2() throws Exception { + ClientConfiguration configuration = new ClientConfiguration(); + String zkConnectString = zkUtil.getZooKeeperConnectString() + "/test"; + configuration.setMetadataServiceUri("zk://" + zkConnectString + "/ledgers"); + configuration.setUseV2WireProtocol(true); + configuration.setEnableDigestTypeAutodetection(true); + configuration.setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap); + + ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig(); + ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(configuration, zkConnectString,managedLedgerFactoryConfig); + + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setEnsembleSize(1) + .setWriteQuorumSize(1) + .setAckQuorumSize(1) + .setMetadataAckQuorumSize(1) + .setMetadataAckQuorumSize(1); + ManagedLedger ledger = factory.open("test-ledger", config); + ManagedCursor cursor = ledger.openCursor("test-c1"); + + for (int i = 0; i < 10; i++) { + String entry = "entry" + i; + ledger.addEntry(entry.getBytes("UTF8")); + } + + List<Entry> entryList = cursor.readEntries(10); + Assert.assertEquals(10, entryList.size()); + + for (int i = 0; i < 10; i++) { + Entry entry = entryList.get(i); + Assert.assertEquals(("entry" + i).getBytes("UTF8"), entry.getData()); + } + factory.shutdown(); + } } diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java index ec0e3b3..1530c79 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java @@ -112,7 +112,7 @@ public class PulsarConnectorCache { pulsarConnectorConfig.getManagedLedgerNumWorkerThreads()); managedLedgerFactoryConfig.setNumManagedLedgerSchedulerThreads( pulsarConnectorConfig.getManagedLedgerNumSchedulerThreads()); - return new ManagedLedgerFactoryImpl(bkClientConfiguration, managedLedgerFactoryConfig); + return new ManagedLedgerFactoryImpl(bkClientConfiguration, pulsarConnectorConfig.getZookeeperUri(),managedLedgerFactoryConfig); } public ManagedLedgerConfig getManagedLedgerConfig(NamespaceName namespaceName, OffloadPolicies offloadPolicies,
