merlimat closed pull request #1450: Added multiple position delete in
ManagedLedger
URL: https://github.com/apache/incubator-pulsar/pull/1450
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index db8e3699c..f6793b405 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -265,6 +265,43 @@ void markDelete(Position position, Map<String, Long>
properties)
*/
void asyncDelete(Position position, DeleteCallback callback, Object ctx);
+
+ /**
+ * Delete a group of entries.
+ *
+ * <p/>
+ * Mark multiple single messages for deletion. When all the previous
messages are all deleted, then markDelete()
+ * will be called internally to advance the persistent acknowledged
position.
+ *
+ * <p/>
+ * The deletion of the message is not persisted into the durable storage
and cannot be recovered upon the reopening
+ * of the ManagedLedger
+ *
+ * @param positions
+ * positions of the messages to be deleted
+ */
+ void delete(Iterable<Position> positions) throws InterruptedException,
ManagedLedgerException;
+
+ /**
+ * Delete a group of messages asynchronously
+ *
+ * <p/>
+ * Mark a group of messages for deletion. When all the previous messages
are all deleted, then markDelete() will be
+ * called internally to advance the persistent acknowledged position.
+ *
+ * <p/>
+ * The deletion of the messages is not persisted into the durable storage
and cannot be recovered upon the reopening
+ * of the ManagedLedger
+ *
+ * @param positions
+ * the positions of the messages to be deleted
+ * @param callback
+ * callback object
+ * @param ctx
+ * opaque context
+ */
+ void asyncDelete(Iterable<Position> position, DeleteCallback callback,
Object ctx);
+
/**
* Get the read position. This points to the next message to be read from
the cursor.
*
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index c2fbbab8b..9b3d9cd7a 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1447,8 +1447,17 @@ public void operationFailed(ManagedLedgerException
exception) {
@Override
public void delete(final Position position) throws InterruptedException,
ManagedLedgerException {
- checkNotNull(position);
- checkArgument(position instanceof PositionImpl);
+ delete(Collections.singletonList(position));
+ }
+
+ @Override
+ public void asyncDelete(Position pos, final AsyncCallbacks.DeleteCallback
callback, Object ctx) {
+ asyncDelete(Collections.singletonList(pos), callback, ctx);
+ }
+
+ @Override
+ public void delete(Iterable<Position> positions) throws
InterruptedException, ManagedLedgerException {
+ checkNotNull(positions);
class Result {
ManagedLedgerException exception = null;
@@ -1458,12 +1467,12 @@ public void delete(final Position position) throws
InterruptedException, Managed
final CountDownLatch counter = new CountDownLatch(1);
final AtomicBoolean timeout = new AtomicBoolean(false);
- asyncDelete(position, new AsyncCallbacks.DeleteCallback() {
+ asyncDelete(positions, new AsyncCallbacks.DeleteCallback() {
@Override
public void deleteComplete(Object ctx) {
if (timeout.get()) {
log.warn("[{}] [{}] Delete operation timeout. Callback
deleteComplete at position {}",
- ledger.getName(), name, position);
+ ledger.getName(), name, positions);
}
counter.countDown();
@@ -1475,7 +1484,7 @@ public void deleteFailed(ManagedLedgerException
exception, Object ctx) {
if (timeout.get()) {
log.warn("[{}] [{}] Delete operation timeout. Callback
deleteFailed at position {}",
- ledger.getName(), name, position);
+ ledger.getName(), name, positions);
}
counter.countDown();
@@ -1485,7 +1494,7 @@ public void deleteFailed(ManagedLedgerException
exception, Object ctx) {
if (!counter.await(ManagedLedgerImpl.AsyncOperationTimeoutSeconds,
TimeUnit.SECONDS)) {
timeout.set(true);
log.warn("[{}] [{}] Delete operation timeout. No callback was
triggered at position {}", ledger.getName(),
- name, position);
+ name, positions);
throw new ManagedLedgerException("Timeout during delete
operation");
}
@@ -1494,46 +1503,37 @@ public void deleteFailed(ManagedLedgerException
exception, Object ctx) {
}
}
- @Override
- public void asyncDelete(Position pos, final AsyncCallbacks.DeleteCallback
callback, Object ctx) {
- checkArgument(pos instanceof PositionImpl);
- if (STATE_UPDATER.get(this) == State.Closed) {
+ @Override
+ public void asyncDelete(Iterable<Position> positions,
AsyncCallbacks.DeleteCallback callback, Object ctx) {
+ if (state == State.Closed) {
callback.deleteFailed(new ManagedLedgerException("Cursor was
already closed"), ctx);
return;
}
- PositionImpl position = (PositionImpl) pos;
-
- PositionImpl previousPosition = ledger.getPreviousPosition(position);
PositionImpl newMarkDeletePosition = null;
lock.writeLock().lock();
try {
if (log.isDebugEnabled()) {
- log.debug("[{}] [{}] Deleting single message at {}. "
- + "Current status: {} - md-position: {} -
previous-position: {}",
- ledger.getName(), name, pos,
individualDeletedMessages, markDeletePosition, previousPosition);
+ log.debug("[{}] [{}] Deleting individual messages at {}.
Current status: {} - md-position: {}",
+ ledger.getName(), name, positions,
individualDeletedMessages, markDeletePosition);
}
- if (individualDeletedMessages.contains(position) ||
position.compareTo(markDeletePosition) <= 0) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] [{}] Position was already deleted {}",
ledger.getName(), name, position);
- }
- callback.deleteComplete(ctx);
- return;
- }
+ for (Position pos : positions) {
+ PositionImpl position = (PositionImpl) checkNotNull(pos);
- if (previousPosition.compareTo(markDeletePosition) == 0 &&
individualDeletedMessages.isEmpty()) {
- if (log.isDebugEnabled()) {
- log.debug("[{}][{}] Immediately mark-delete to position
{}", ledger.getName(), name, position);
+ if (individualDeletedMessages.contains(position) ||
position.compareTo(markDeletePosition) <= 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] [{}] Position was already deleted {}",
ledger.getName(), name, position);
+ }
+ continue;
}
- newMarkDeletePosition = position;
- } else {
// Add a range (prev, pos] to the set. Adding the previous
entry as an open limit to the range will make
// the RangeSet recognize the "continuity" between adjacent
Positions
+ PositionImpl previousPosition =
ledger.getPreviousPosition(position);
individualDeletedMessages.add(Range.openClosed(previousPosition, position));
++messagesConsumedCounter;
@@ -1541,24 +1541,28 @@ public void asyncDelete(Position pos, final
AsyncCallbacks.DeleteCallback callba
log.debug("[{}] [{}] Individually deleted messages: {}",
ledger.getName(), name,
individualDeletedMessages);
}
+ }
- // If the lower bound of the range set is the current mark
delete position, then we can trigger a new
- // mark
- // delete to the upper bound of the first range segment
- Range<PositionImpl> range =
individualDeletedMessages.asRanges().iterator().next();
+ if (individualDeletedMessages.isEmpty()) {
+ // No changes to individually deleted messages, so nothing to
do at this point
+ callback.deleteComplete(ctx);
+ return;
+ }
- // Bug:7062188 - markDeletePosition can sometimes be stuck at
the beginning of an empty ledger.
- // If the lowerBound is ahead of MarkDelete, verify if there
are any entries in-between
- if (range.lowerEndpoint().compareTo(markDeletePosition) <= 0
|| ledger
-
.getNumberOfEntries(Range.openClosed(markDeletePosition,
range.lowerEndpoint())) <= 0) {
+ // If the lower bound of the range set is the current mark delete
position, then we can trigger a new
+ // mark-delete to the upper bound of the first range segment
+ Range<PositionImpl> range =
individualDeletedMessages.asRanges().iterator().next();
- if (log.isDebugEnabled()) {
- log.debug("[{}] Found a position range to mark delete
for cursor {}: {} ", ledger.getName(),
- name, range);
- }
+ // If the lowerBound is ahead of MarkDelete, verify if there are
any entries in-between
+ if (range.lowerEndpoint().compareTo(markDeletePosition) <= 0 ||
ledger
+ .getNumberOfEntries(Range.openClosed(markDeletePosition,
range.lowerEndpoint())) <= 0) {
- newMarkDeletePosition = range.upperEndpoint();
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Found a position range to mark delete for
cursor {}: {} ", ledger.getName(),
+ name, range);
}
+
+ newMarkDeletePosition = range.upperEndpoint();
}
if (newMarkDeletePosition != null) {
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index 5ad25f5af..de98b601a 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -155,6 +155,14 @@ public void delete(Position position) throws
InterruptedException, ManagedLedger
public void asyncDelete(Position position, DeleteCallback callback,
Object ctx) {
}
+ @Override
+ public void delete(Iterable<Position> positions) throws
InterruptedException, ManagedLedgerException {
+ }
+
+ @Override
+ public void asyncDelete(Iterable<Position> position, DeleteCallback
callback, Object ctx) {
+ }
+
@Override
public void clearBacklog() throws InterruptedException,
ManagedLedgerException {
}
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java
new file mode 100644
index 000000000..aaf7cd1c6
--- /dev/null
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import static org.testng.Assert.assertEquals;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+
+import java.nio.charset.Charset;
+
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.testng.annotations.Test;
+
+public class ManagedCursorListAckTest extends MockedBookKeeperTestCase {
+
+ private static final Charset Encoding = Charsets.UTF_8;
+
+ @Test(timeOut = 20000)
+ void testMultiPositionDelete() throws Exception {
+ ManagedLedger ledger = factory.open("my_test_ledger", new
ManagedLedgerConfig().setMaxEntriesPerLedger(2));
+
+ ManagedCursor c1 = ledger.openCursor("c1");
+ Position p0 = c1.getMarkDeletedPosition();
+ Position p1 = ledger.addEntry("dummy-entry-1".getBytes(Encoding));
+ Position p2 = ledger.addEntry("dummy-entry-2".getBytes(Encoding));
+ Position p3 = ledger.addEntry("dummy-entry-3".getBytes(Encoding));
+ Position p4 = ledger.addEntry("dummy-entry-4".getBytes(Encoding));
+ Position p5 = ledger.addEntry("dummy-entry-5".getBytes(Encoding));
+ Position p6 = ledger.addEntry("dummy-entry-6".getBytes(Encoding));
+ Position p7 = ledger.addEntry("dummy-entry-7".getBytes(Encoding));
+
+ assertEquals(c1.getNumberOfEntries(), 7);
+ assertEquals(c1.getNumberOfEntriesInBacklog(), 7);
+
+ c1.delete(Lists.newArrayList(p2, p3, p5, p7));
+
+ assertEquals(c1.getNumberOfEntries(), 3);
+ assertEquals(c1.getNumberOfEntriesInBacklog(), 3);
+ assertEquals(c1.getMarkDeletedPosition(), p0);
+
+ c1.delete(Lists.newArrayList(p1));
+
+ assertEquals(c1.getNumberOfEntries(), 2);
+ assertEquals(c1.getNumberOfEntriesInBacklog(), 2);
+ assertEquals(c1.getMarkDeletedPosition(), p3);
+
+ c1.delete(Lists.newArrayList(p4, p6, p7));
+
+ assertEquals(c1.getNumberOfEntries(), 0);
+ assertEquals(c1.getNumberOfEntriesInBacklog(), 0);
+ assertEquals(c1.getMarkDeletedPosition(), p7);
+ }
+
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services