merlimat closed pull request #1450: Added multiple position delete in 
ManagedLedger
URL: https://github.com/apache/incubator-pulsar/pull/1450
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index db8e3699c..f6793b405 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -265,6 +265,43 @@ void markDelete(Position position, Map<String, Long> 
properties)
      */
     void asyncDelete(Position position, DeleteCallback callback, Object ctx);
 
+
+    /**
+     * Delete a group of entries.
+     *
+     * <p/>
+     * Mark multiple single messages for deletion. When all the previous 
messages are all deleted, then markDelete()
+     * will be called internally to advance the persistent acknowledged 
position.
+     *
+     * <p/>
+     * The deletion of the message is not persisted into the durable storage 
and cannot be recovered upon the reopening
+     * of the ManagedLedger
+     *
+     * @param positions
+     *            positions of the messages to be deleted
+     */
+    void delete(Iterable<Position> positions) throws InterruptedException, 
ManagedLedgerException;
+
+    /**
+     * Delete a group of messages asynchronously
+     *
+     * <p/>
+     * Mark a group of messages for deletion. When all the previous messages 
are all deleted, then markDelete() will be
+     * called internally to advance the persistent acknowledged position.
+     *
+     * <p/>
+     * The deletion of the messages is not persisted into the durable storage 
and cannot be recovered upon the reopening
+     * of the ManagedLedger
+     *
+     * @param positions
+     *            the positions of the messages to be deleted
+     * @param callback
+     *            callback object
+     * @param ctx
+     *            opaque context
+     */
+    void asyncDelete(Iterable<Position> position, DeleteCallback callback, 
Object ctx);
+
     /**
      * Get the read position. This points to the next message to be read from 
the cursor.
      *
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index c2fbbab8b..9b3d9cd7a 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1447,8 +1447,17 @@ public void operationFailed(ManagedLedgerException 
exception) {
 
     @Override
     public void delete(final Position position) throws InterruptedException, 
ManagedLedgerException {
-        checkNotNull(position);
-        checkArgument(position instanceof PositionImpl);
+        delete(Collections.singletonList(position));
+    }
+
+    @Override
+    public void asyncDelete(Position pos, final AsyncCallbacks.DeleteCallback 
callback, Object ctx) {
+        asyncDelete(Collections.singletonList(pos), callback, ctx);
+    }
+
+    @Override
+    public void delete(Iterable<Position> positions) throws 
InterruptedException, ManagedLedgerException {
+        checkNotNull(positions);
 
         class Result {
             ManagedLedgerException exception = null;
@@ -1458,12 +1467,12 @@ public void delete(final Position position) throws 
InterruptedException, Managed
         final CountDownLatch counter = new CountDownLatch(1);
         final AtomicBoolean timeout = new AtomicBoolean(false);
 
-        asyncDelete(position, new AsyncCallbacks.DeleteCallback() {
+        asyncDelete(positions, new AsyncCallbacks.DeleteCallback() {
             @Override
             public void deleteComplete(Object ctx) {
                 if (timeout.get()) {
                     log.warn("[{}] [{}] Delete operation timeout. Callback 
deleteComplete at position {}",
-                            ledger.getName(), name, position);
+                            ledger.getName(), name, positions);
                 }
 
                 counter.countDown();
@@ -1475,7 +1484,7 @@ public void deleteFailed(ManagedLedgerException 
exception, Object ctx) {
 
                 if (timeout.get()) {
                     log.warn("[{}] [{}] Delete operation timeout. Callback 
deleteFailed at position {}",
-                            ledger.getName(), name, position);
+                            ledger.getName(), name, positions);
                 }
 
                 counter.countDown();
@@ -1485,7 +1494,7 @@ public void deleteFailed(ManagedLedgerException 
exception, Object ctx) {
         if (!counter.await(ManagedLedgerImpl.AsyncOperationTimeoutSeconds, 
TimeUnit.SECONDS)) {
             timeout.set(true);
             log.warn("[{}] [{}] Delete operation timeout. No callback was 
triggered at position {}", ledger.getName(),
-                    name, position);
+                    name, positions);
             throw new ManagedLedgerException("Timeout during delete 
operation");
         }
 
@@ -1494,46 +1503,37 @@ public void deleteFailed(ManagedLedgerException 
exception, Object ctx) {
         }
     }
 
-    @Override
-    public void asyncDelete(Position pos, final AsyncCallbacks.DeleteCallback 
callback, Object ctx) {
-        checkArgument(pos instanceof PositionImpl);
 
-        if (STATE_UPDATER.get(this) == State.Closed) {
+    @Override
+    public void asyncDelete(Iterable<Position> positions, 
AsyncCallbacks.DeleteCallback callback, Object ctx) {
+        if (state == State.Closed) {
             callback.deleteFailed(new ManagedLedgerException("Cursor was 
already closed"), ctx);
             return;
         }
 
-        PositionImpl position = (PositionImpl) pos;
-
-        PositionImpl previousPosition = ledger.getPreviousPosition(position);
         PositionImpl newMarkDeletePosition = null;
 
         lock.writeLock().lock();
 
         try {
             if (log.isDebugEnabled()) {
-                log.debug("[{}] [{}] Deleting single message at {}. "
-                        + "Current status: {} - md-position: {}  - 
previous-position: {}",
-                        ledger.getName(), name, pos, 
individualDeletedMessages, markDeletePosition, previousPosition);
+                log.debug("[{}] [{}] Deleting individual messages at {}. 
Current status: {} - md-position: {}",
+                        ledger.getName(), name, positions, 
individualDeletedMessages, markDeletePosition);
             }
 
-            if (individualDeletedMessages.contains(position) || 
position.compareTo(markDeletePosition) <= 0) {
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] [{}] Position was already deleted {}", 
ledger.getName(), name, position);
-                }
-                callback.deleteComplete(ctx);
-                return;
-            }
+            for (Position pos : positions) {
+                PositionImpl position  = (PositionImpl) checkNotNull(pos);
 
-            if (previousPosition.compareTo(markDeletePosition) == 0 && 
individualDeletedMessages.isEmpty()) {
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}][{}] Immediately mark-delete to position 
{}", ledger.getName(), name, position);
+                if (individualDeletedMessages.contains(position) || 
position.compareTo(markDeletePosition) <= 0) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] [{}] Position was already deleted {}", 
ledger.getName(), name, position);
+                    }
+                    continue;
                 }
 
-                newMarkDeletePosition = position;
-            } else {
                 // Add a range (prev, pos] to the set. Adding the previous 
entry as an open limit to the range will make
                 // the RangeSet recognize the "continuity" between adjacent 
Positions
+                PositionImpl previousPosition = 
ledger.getPreviousPosition(position);
                 
individualDeletedMessages.add(Range.openClosed(previousPosition, position));
                 ++messagesConsumedCounter;
 
@@ -1541,24 +1541,28 @@ public void asyncDelete(Position pos, final 
AsyncCallbacks.DeleteCallback callba
                     log.debug("[{}] [{}] Individually deleted messages: {}", 
ledger.getName(), name,
                             individualDeletedMessages);
                 }
+            }
 
-                // If the lower bound of the range set is the current mark 
delete position, then we can trigger a new
-                // mark
-                // delete to the upper bound of the first range segment
-                Range<PositionImpl> range = 
individualDeletedMessages.asRanges().iterator().next();
+            if (individualDeletedMessages.isEmpty()) {
+                // No changes to individually deleted messages, so nothing to 
do at this point
+                callback.deleteComplete(ctx);
+                return;
+            }
 
-                // Bug:7062188 - markDeletePosition can sometimes be stuck at 
the beginning of an empty ledger.
-                // If the lowerBound is ahead of MarkDelete, verify if there 
are any entries in-between
-                if (range.lowerEndpoint().compareTo(markDeletePosition) <= 0 
|| ledger
-                        
.getNumberOfEntries(Range.openClosed(markDeletePosition, 
range.lowerEndpoint())) <= 0) {
+            // If the lower bound of the range set is the current mark delete 
position, then we can trigger a new
+            // mark-delete to the upper bound of the first range segment
+            Range<PositionImpl> range = 
individualDeletedMessages.asRanges().iterator().next();
 
-                    if (log.isDebugEnabled()) {
-                        log.debug("[{}] Found a position range to mark delete 
for cursor {}: {} ", ledger.getName(),
-                                name, range);
-                    }
+            // If the lowerBound is ahead of MarkDelete, verify if there are 
any entries in-between
+            if (range.lowerEndpoint().compareTo(markDeletePosition) <= 0 || 
ledger
+                    .getNumberOfEntries(Range.openClosed(markDeletePosition, 
range.lowerEndpoint())) <= 0) {
 
-                    newMarkDeletePosition = range.upperEndpoint();
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Found a position range to mark delete for 
cursor {}: {} ", ledger.getName(),
+                            name, range);
                 }
+
+                newMarkDeletePosition = range.upperEndpoint();
             }
 
             if (newMarkDeletePosition != null) {
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index 5ad25f5af..de98b601a 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -155,6 +155,14 @@ public void delete(Position position) throws 
InterruptedException, ManagedLedger
         public void asyncDelete(Position position, DeleteCallback callback, 
Object ctx) {
         }
 
+        @Override
+        public void delete(Iterable<Position> positions) throws 
InterruptedException, ManagedLedgerException {
+        }
+
+        @Override
+        public void asyncDelete(Iterable<Position> position, DeleteCallback 
callback, Object ctx) {
+        }
+
         @Override
         public void clearBacklog() throws InterruptedException, 
ManagedLedgerException {
         }
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java
new file mode 100644
index 000000000..aaf7cd1c6
--- /dev/null
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import static org.testng.Assert.assertEquals;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+
+import java.nio.charset.Charset;
+
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.testng.annotations.Test;
+
+public class ManagedCursorListAckTest extends MockedBookKeeperTestCase {
+
+    private static final Charset Encoding = Charsets.UTF_8;
+
+    @Test(timeOut = 20000)
+    void testMultiPositionDelete() throws Exception {
+        ManagedLedger ledger = factory.open("my_test_ledger", new 
ManagedLedgerConfig().setMaxEntriesPerLedger(2));
+
+        ManagedCursor c1 = ledger.openCursor("c1");
+        Position p0 = c1.getMarkDeletedPosition();
+        Position p1 = ledger.addEntry("dummy-entry-1".getBytes(Encoding));
+        Position p2 = ledger.addEntry("dummy-entry-2".getBytes(Encoding));
+        Position p3 = ledger.addEntry("dummy-entry-3".getBytes(Encoding));
+        Position p4 = ledger.addEntry("dummy-entry-4".getBytes(Encoding));
+        Position p5 = ledger.addEntry("dummy-entry-5".getBytes(Encoding));
+        Position p6 = ledger.addEntry("dummy-entry-6".getBytes(Encoding));
+        Position p7 = ledger.addEntry("dummy-entry-7".getBytes(Encoding));
+
+        assertEquals(c1.getNumberOfEntries(), 7);
+        assertEquals(c1.getNumberOfEntriesInBacklog(), 7);
+
+        c1.delete(Lists.newArrayList(p2, p3, p5, p7));
+
+        assertEquals(c1.getNumberOfEntries(), 3);
+        assertEquals(c1.getNumberOfEntriesInBacklog(), 3);
+        assertEquals(c1.getMarkDeletedPosition(), p0);
+
+        c1.delete(Lists.newArrayList(p1));
+
+        assertEquals(c1.getNumberOfEntries(), 2);
+        assertEquals(c1.getNumberOfEntriesInBacklog(), 2);
+        assertEquals(c1.getMarkDeletedPosition(), p3);
+
+        c1.delete(Lists.newArrayList(p4, p6, p7));
+
+        assertEquals(c1.getNumberOfEntries(), 0);
+        assertEquals(c1.getNumberOfEntriesInBacklog(), 0);
+        assertEquals(c1.getMarkDeletedPosition(), p7);
+    }
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to