This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 1b4f35db22e [fix] [ml] make the result of delete cursor is success if
cursor is deleted (#19825)
1b4f35db22e is described below
commit 1b4f35db22e9bf03550db4bc110007eee02f8418
Author: fengyubiao <[email protected]>
AuthorDate: Wed Mar 22 15:30:00 2023 +0800
[fix] [ml] make the result of delete cursor is success if cursor is deleted
(#19825)
When deleting the zk node of the cursor, if the exception
`MetadataStoreException.NotFoundException` occurs, the deletion is considered
successful.
---
.../bookkeeper/mledger/impl/MetaStoreImpl.java | 16 +++++---
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 44 ++++++++++++++++++++++
2 files changed, 55 insertions(+), 5 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
index 2a47cfdc537..43d734b28a6 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
@@ -43,6 +43,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Stat;
@@ -248,7 +249,7 @@ public class MetaStoreImpl implements MetaStore {
@Override
public void asyncRemoveCursor(String ledgerName, String cursorName,
MetaStoreCallback<Void> callback) {
String path = PREFIX + ledgerName + "/" + cursorName;
- log.info("[{}] Remove consumer={}", ledgerName, cursorName);
+ log.info("[{}] Remove cursor={}", ledgerName, cursorName);
store.delete(path, Optional.empty())
.thenAcceptAsync(v -> {
@@ -257,11 +258,16 @@ public class MetaStoreImpl implements MetaStore {
}
callback.operationComplete(null, null);
}, executor.chooseThread(ledgerName))
- .exceptionally(ex -> {
- executor.executeOrdered(ledgerName,
SafeRunnable.safeRun(() -> callback
- .operationFailed(getException(ex))));
+ .exceptionallyAsync(ex -> {
+ Throwable actEx = FutureUtil.unwrapCompletionException(ex);
+ if (actEx instanceof
MetadataStoreException.NotFoundException){
+ log.info("[{}] [{}] cursor delete done because it did
not exist.", ledgerName, cursorName);
+ callback.operationComplete(null, null);
+ return null;
+ }
+ SafeRunnable.safeRun(() ->
callback.operationFailed(getException(ex)));
return null;
- });
+ }, executor.chooseThread(ledgerName));
}
@Override
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 7ad7e0f8d68..45fd275a412 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -128,6 +128,7 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
@@ -3958,4 +3959,47 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
managedLedger.getEnsemblesAsync(lastLedger).join();
Assert.assertFalse(managedLedger.ledgerCache.containsKey(lastLedger));
}
+
+ @Test
+ public void testGetEstimatedBacklogSize() throws Exception {
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(2);
+ config.setRetentionTime(-1, TimeUnit.SECONDS);
+ config.setRetentionSizeInMB(-1);
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("testGetEstimatedBacklogSize", config);
+ List<Position> positions = new ArrayList<>(10);
+ for (int i = 0; i < 10; i++) {
+ positions.add(ledger.addEntry(new byte[1]));
+ }
+
+ Assert.assertEquals(ledger.getEstimatedBacklogSize(new
PositionImpl(-1, -1)), 10);
+ Assert.assertEquals(ledger.getEstimatedBacklogSize(((PositionImpl)
positions.get(1))), 8);
+ Assert.assertEquals(ledger.getEstimatedBacklogSize(((PositionImpl)
positions.get(9)).getNext()), 0);
+ ledger.close();
+ }
+
+ @Test
+ public void testDeleteCursorTwice() throws Exception {
+ ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open("ml");
+ String cursorName = "cursor_1";
+ ml.openCursor(cursorName);
+ syncRemoveCursor(ml, cursorName);
+ syncRemoveCursor(ml, cursorName);
+ }
+
+ private void syncRemoveCursor(ManagedLedgerImpl ml, String cursorName){
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ ml.getStore().asyncRemoveCursor(ml.name, cursorName, new
MetaStoreCallback<Void>() {
+ @Override
+ public void operationComplete(Void result, Stat stat) {
+ future.complete(null);
+ }
+
+ @Override
+ public void operationFailed(MetaStoreException e) {
+
future.completeExceptionally(FutureUtil.unwrapCompletionException(e));
+ }
+ });
+ future.join();
+ }
}