This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9445fa7f9d8 [fix][broker] Fix pulsarLedgerIdGenerator can't delete
index path when zk metadata store config rootPath. (#17192)
9445fa7f9d8 is described below
commit 9445fa7f9d8e76a6d6435a65b42763b5d8eb310c
Author: Yan Zhao <[email protected]>
AuthorDate: Wed Aug 24 09:51:21 2022 +0800
[fix][broker] Fix pulsarLedgerIdGenerator can't delete index path when zk
metadata store config rootPath. (#17192)
---
.../bookkeeper/PulsarLedgerIdGenerator.java | 17 +++-
.../pulsar/metadata/impl/ZKMetadataStore.java | 9 ++
.../bookkeeper/PulsarLedgerIdGeneratorTest.java | 112 +++++++++++++++++++--
3 files changed, 130 insertions(+), 8 deletions(-)
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGenerator.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGenerator.java
index 29a65b84d88..0e1703cf343 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGenerator.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGenerator.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
@Slf4j
public class PulsarLedgerIdGenerator implements LedgerIdGenerator {
@@ -104,7 +105,7 @@ public class PulsarLedgerIdGenerator implements
LedgerIdGenerator {
EnumSet.of(CreateOption.Ephemeral, CreateOption.Sequential))
.thenCompose(stat -> {
// delete the znode for id generation
- store.delete(stat.getPath(), Optional.empty()).
+ store.delete(handleTheDeletePath(stat.getPath()),
Optional.empty()).
exceptionally(ex -> {
log.warn("Exception during deleting node for
id generation: ", ex);
return null;
@@ -235,7 +236,7 @@ public class PulsarLedgerIdGenerator implements
LedgerIdGenerator {
.put(prefix, new byte[0], Optional.of(-1L),
EnumSet.of(CreateOption.Ephemeral, CreateOption.Sequential))
.thenCompose(stat -> {
// delete the znode for id generation
- store.delete(stat.getPath(), Optional.empty()).
+ store.delete(handleTheDeletePath(stat.getPath()),
Optional.empty()).
exceptionally(ex -> {
log.warn("Exception during deleting node for
id generation: ", ex);
return null;
@@ -287,4 +288,16 @@ public class PulsarLedgerIdGenerator implements
LedgerIdGenerator {
return ledgerIdGenPath + "/" + "ID-";
}
+ //If the config rootPath when use zk metadata store, it will append
rootPath as the prefix of the path.
+ //So when we get the path from the stat, we should truncate the rootPath.
+ private String handleTheDeletePath(String path) {
+ if (store instanceof ZKMetadataStore) {
+ String rootPath = ((ZKMetadataStore) store).getRootPath();
+ if (rootPath == null) {
+ return path;
+ }
+ return path.replaceFirst(rootPath, "");
+ }
+ return path;
+ }
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index 87b6be283fc..ad23faea25e 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -63,6 +63,7 @@ import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.client.ConnectStringParser;
@Slf4j
public class ZKMetadataStore extends AbstractBatchedMetadataStore
@@ -71,6 +72,7 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
public static final String ZK_SCHEME_IDENTIFIER = "zk:";
private final String zkConnectString;
+ private final String rootPath;
private final MetadataStoreConfig metadataStoreConfig;
private final boolean isZkManaged;
private final ZooKeeper zkc;
@@ -87,6 +89,8 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
this.zkConnectString = metadataURL;
}
this.metadataStoreConfig = metadataStoreConfig;
+ this.rootPath = new
ConnectStringParser(zkConnectString).getChrootPath();
+
isZkManaged = true;
zkc =
PulsarZooKeeperClient.newBuilder().connectString(zkConnectString)
.connectRetryPolicy(new
BoundExponentialBackoffRetryPolicy(100, 60_000, Integer.MAX_VALUE))
@@ -121,6 +125,7 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
super(config);
this.zkConnectString = null;
+ this.rootPath = null;
this.metadataStoreConfig = null;
this.isZkManaged = false;
this.zkc = zkc;
@@ -580,4 +585,8 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
throw KeeperException.create(Code.get(rc.get()));
}
}
+
+ public String getRootPath() {
+ return rootPath;
+ }
}
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGeneratorTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGeneratorTest.java
index 2238bcf6160..c67daec93bb 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGeneratorTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGeneratorTest.java
@@ -62,7 +62,8 @@ public class PulsarLedgerIdGeneratorTest extends
BaseMetadataStoreTest {
CountDownLatch countDownLatch1 = new CountDownLatch(nThread *
nLedgers);
final AtomicInteger errCount = new AtomicInteger(0);
- final ConcurrentLinkedQueue<Long> ledgerIds = new
ConcurrentLinkedQueue<Long>();
+ final ConcurrentLinkedQueue<Long> shortLedgerIds = new
ConcurrentLinkedQueue<Long>();
+ final ConcurrentLinkedQueue<Long> longLedgerIds = new
ConcurrentLinkedQueue<Long>();
long start = System.currentTimeMillis();
@@ -74,7 +75,7 @@ public class PulsarLedgerIdGeneratorTest extends
BaseMetadataStoreTest {
for (int j = 0; j < nLedgers; j++) {
ledgerIdGenerator.generateLedgerId((rc, result) -> {
if (KeeperException.Code.OK.intValue() == rc) {
- ledgerIds.add(result);
+ shortLedgerIds.add(result);
} else {
errCount.incrementAndGet();
}
@@ -96,7 +97,7 @@ public class PulsarLedgerIdGeneratorTest extends
BaseMetadataStoreTest {
for (int j = 0; j < nLedgers; j++) {
ledgerIdGenerator.generateLedgerId((rc, result) -> {
if (KeeperException.Code.OK.intValue() == rc) {
- ledgerIds.add(result);
+ longLedgerIds.add(result);
} else {
errCount.incrementAndGet();
}
@@ -108,19 +109,118 @@ public class PulsarLedgerIdGeneratorTest extends
BaseMetadataStoreTest {
assertTrue(countDownLatch2.await(120, TimeUnit.SECONDS),
"Wait ledger id generation threads to stop timeout : ");
- log.info("Number of generated ledger id: {}, time used: {}",
ledgerIds.size(),
+ log.info("Number of generated ledger id: {}, time used: {}",
shortLedgerIds.size() + longLedgerIds.size(),
System.currentTimeMillis() - start);
assertEquals(errCount.get(), 0, "Error occur during ledger id
generation : ");
Set<Long> ledgers = new HashSet<>();
- while (!ledgerIds.isEmpty()) {
- Long ledger = ledgerIds.poll();
+ while (!shortLedgerIds.isEmpty()) {
+ Long ledger = shortLedgerIds.poll();
+ assertNotNull(ledger, "Generated ledger id is null");
+ assertFalse(ledgers.contains(ledger), "Ledger id [" + ledger + "]
conflict : ");
+ ledgers.add(ledger);
+ }
+ while (!longLedgerIds.isEmpty()) {
+ Long ledger = longLedgerIds.poll();
+ assertNotNull(ledger, "Generated ledger id is null");
+ assertFalse(ledgers.contains(ledger), "Ledger id [" + ledger + "]
conflict : ");
+ ledgers.add(ledger);
+ }
+ }
+
+ @Test
+ public void testGenerateLedgerIdWithZkPrefix() throws Exception {
+ @Cleanup
+ MetadataStoreExtended store =
+ MetadataStoreExtended.create(zks.getConnectionString() +
"/test", MetadataStoreConfig.builder().build());
+
+ @Cleanup
+ PulsarLedgerIdGenerator ledgerIdGenerator = new
PulsarLedgerIdGenerator(store, "/ledgers");
+ // Create *nThread* threads each generate *nLedgers* ledger id,
+ // and then check there is no identical ledger id.
+ final int nThread = 2;
+ final int nLedgers = 2000;
+ // Multiply by two. We're going to do half in the old legacy space and
half in the new.
+ CountDownLatch countDownLatch1 = new CountDownLatch(nThread *
nLedgers);
+
+ final AtomicInteger errCount = new AtomicInteger(0);
+ final ConcurrentLinkedQueue<Long> shortLedgerIds = new
ConcurrentLinkedQueue<Long>();
+ final ConcurrentLinkedQueue<Long> longLedgerIds = new
ConcurrentLinkedQueue<Long>();
+
+ long start = System.currentTimeMillis();
+
+ @Cleanup(value = "shutdownNow")
+ ExecutorService executor = Executors.newCachedThreadPool();
+
+ for (int i = 0; i < nThread; i++) {
+ executor.submit(() -> {
+ for (int j = 0; j < nLedgers; j++) {
+ ledgerIdGenerator.generateLedgerId((rc, result) -> {
+ if (KeeperException.Code.OK.intValue() == rc) {
+ shortLedgerIds.add(result);
+ } else {
+ errCount.incrementAndGet();
+ }
+ countDownLatch1.countDown();
+ });
+ }
+ });
+ }
+
+ countDownLatch1.await();
+ for (Long ledgerId : shortLedgerIds) {
+ assertFalse(store.exists("/ledgers/idgen/ID-" +
String.format("%010d", ledgerId)).get(),
+ "Exception during deleting node for id generation : ");
+ }
+ CountDownLatch countDownLatch2 = new CountDownLatch(nThread *
nLedgers);
+
+ // Go and create the long-id directory in zookeeper. This should cause
the id generator to generate ids with the
+ // new algo once we clear it's stored status.
+ store.put("/ledgers/idgen-long", new byte[0], Optional.empty()).join();
+
+ for (int i = 0; i < nThread; i++) {
+ executor.submit(() -> {
+ for (int j = 0; j < nLedgers; j++) {
+ ledgerIdGenerator.generateLedgerId((rc, result) -> {
+ if (KeeperException.Code.OK.intValue() == rc) {
+ longLedgerIds.add(result);
+ } else {
+ errCount.incrementAndGet();
+ }
+ countDownLatch2.countDown();
+ });
+ }
+ });
+ }
+
+ assertTrue(countDownLatch2.await(120, TimeUnit.SECONDS),
+ "Wait ledger id generation threads to stop timeout : ");
+ ///test/ledgers/idgen-long/HOB-0000000001/ID-0000000000
+ for (Long ledgerId : longLedgerIds) {
+ assertFalse(store.exists("/ledgers/idgen-long/HOB-0000000001/ID-"
+ String.format("%010d", ledgerId >> 32)).get(),
+ "Exception during deleting node for id generation : ");
+ }
+
+ log.info("Number of generated ledger id: {}, time used: {}",
shortLedgerIds.size() + longLedgerIds.size(),
+ System.currentTimeMillis() - start);
+ assertEquals(errCount.get(), 0, "Error occur during ledger id
generation : ");
+
+ Set<Long> ledgers = new HashSet<>();
+ while (!shortLedgerIds.isEmpty()) {
+ Long ledger = shortLedgerIds.poll();
+ assertNotNull(ledger, "Generated ledger id is null");
+ assertFalse(ledgers.contains(ledger), "Ledger id [" + ledger + "]
conflict : ");
+ ledgers.add(ledger);
+ }
+ while (!longLedgerIds.isEmpty()) {
+ Long ledger = longLedgerIds.poll();
assertNotNull(ledger, "Generated ledger id is null");
assertFalse(ledgers.contains(ledger), "Ledger id [" + ledger + "]
conflict : ");
ledgers.add(ledger);
}
}
+
@Test(dataProvider = "impl")
public void testEnsureCounterIsNotResetWithContainerNodes(String provider,
Supplier<String> urlSupplier)
throws Exception {