This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ede7ffe  Fix for cursorPersistenceAsyncMarkDeleteSameThread (#1544)
ede7ffe is described below

commit ede7ffe540b4ea8c715c3af9f30f739ea133da94
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Tue Apr 10 18:03:52 2018 -0700

    Fix for cursorPersistenceAsyncMarkDeleteSameThread (#1544)
---
 .../org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java    | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 2773090..34efb5c 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -81,7 +81,9 @@ import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.LongProperty;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
 import org.apache.commons.lang3.tuple.Pair;
+import org.jctools.queues.MessagePassingQueue;
 import org.jctools.queues.MpmcArrayQueue;
+import org.jctools.queues.MpscLinkedQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -149,7 +151,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         }
     }
 
-    private final MpmcArrayQueue<MarkDeleteEntry> pendingMarkDeleteOps = new 
MpmcArrayQueue<>(16);
+    private final MessagePassingQueue<MarkDeleteEntry> pendingMarkDeleteOps = 
MpscLinkedQueue.newMpscLinkedQueue();
 
     private static final AtomicIntegerFieldUpdater<ManagedCursorImpl> 
PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER =
         AtomicIntegerFieldUpdater.newUpdater(ManagedCursorImpl.class, 
"pendingMarkDeletedSubmittedCount");
@@ -1358,7 +1360,10 @@ public class ManagedCursorImpl implements ManagedCursor {
             startCreatingNewMetadataLedger();
             // fall through
         case SwitchingLedger:
-            pendingMarkDeleteOps.add(mdEntry);
+            if (!pendingMarkDeleteOps.offer(mdEntry)) {
+                callback.markDeleteFailed(new ManagedLedgerException("Cursor 
queue of mark-delete operations full"), ctx);
+                return;
+            }
             if (state != State.SwitchingLedger) {
                 // If state changed since we checked. Trigger a flush since we 
could have missed the current entry
                 flushPendingMarkDeletes();

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to