merlimat commented on a change in pull request #1450: Added multiple position 
delete in ManagedLedger
URL: https://github.com/apache/incubator-pulsar/pull/1450#discussion_r178202293
 
 

 ##########
 File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 ##########
 @@ -1494,71 +1503,67 @@ public void deleteFailed(ManagedLedgerException 
exception, Object ctx) {
         }
     }
 
-    @Override
-    public void asyncDelete(Position pos, final AsyncCallbacks.DeleteCallback 
callback, Object ctx) {
-        checkArgument(pos instanceof PositionImpl);
 
-        if (STATE_UPDATER.get(this) == State.Closed) {
+    @Override
+    public void asyncDelete(Iterable<Position> positions, 
AsyncCallbacks.DeleteCallback callback, Object ctx) {
+        if (state == State.Closed) {
             callback.deleteFailed(new ManagedLedgerException("Cursor was 
already closed"), ctx);
             return;
         }
 
-        PositionImpl position = (PositionImpl) pos;
-
-        PositionImpl previousPosition = ledger.getPreviousPosition(position);
         PositionImpl newMarkDeletePosition = null;
 
         lock.writeLock().lock();
 
         try {
             if (log.isDebugEnabled()) {
-                log.debug("[{}] [{}] Deleting single message at {}. "
-                        + "Current status: {} - md-position: {}  - 
previous-position: {}",
-                        ledger.getName(), name, pos, 
individualDeletedMessages, markDeletePosition, previousPosition);
+                log.debug("[{}] [{}] Deleting individual messages at {}. 
Current status: {} - md-position: {}",
+                        ledger.getName(), name, positions, 
individualDeletedMessages, markDeletePosition);
             }
 
-            if (individualDeletedMessages.contains(position) || 
position.compareTo(markDeletePosition) <= 0) {
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] [{}] Position was already deleted {}", 
ledger.getName(), name, position);
-                }
-                callback.deleteComplete(ctx);
-                return;
-            }
+            for (Position pos : positions) {
+                PositionImpl position  = (PositionImpl) pos;
 
-            if (previousPosition.compareTo(markDeletePosition) == 0 && 
individualDeletedMessages.isEmpty()) {
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}][{}] Immediately mark-delete to position 
{}", ledger.getName(), name, position);
+                if (individualDeletedMessages.contains(position) || 
position.compareTo(markDeletePosition) <= 0) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] [{}] Position was already deleted {}", 
ledger.getName(), name, position);
+                    }
+                    continue;
                 }
 
-                newMarkDeletePosition = position;
-            } else {
                 // Add a range (prev, pos] to the set. Adding the previous 
entry as an open limit to the range will make
                 // the RangeSet recognize the "continuity" between adjacent 
Positions
+                PositionImpl previousPosition = 
ledger.getPreviousPosition(position);
                 
individualDeletedMessages.add(Range.openClosed(previousPosition, position));
                 ++messagesConsumedCounter;
 
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] [{}] Individually deleted messages: {}", 
ledger.getName(), name,
                             individualDeletedMessages);
                 }
+            }
 
-                // If the lower bound of the range set is the current mark 
delete position, then we can trigger a new
-                // mark
-                // delete to the upper bound of the first range segment
-                Range<PositionImpl> range = 
individualDeletedMessages.asRanges().iterator().next();
+            if (individualDeletedMessages.isEmpty()) {
+                // No changes to individually deleted messages, so nothing to 
do at this point
+                callback.deleteComplete(ctx);
+                return;
+            }
 
-                // Bug:7062188 - markDeletePosition can sometimes be stuck at 
the beginning of an empty ledger.
-                // If the lowerBound is ahead of MarkDelete, verify if there 
are any entries in-between
-                if (range.lowerEndpoint().compareTo(markDeletePosition) <= 0 
|| ledger
-                        
.getNumberOfEntries(Range.openClosed(markDeletePosition, 
range.lowerEndpoint())) <= 0) {
+            // If the lower bound of the range set is the current mark delete 
position, then we can trigger a new
+            // mark-delete to the upper bound of the first range segment
+            Range<PositionImpl> range = 
individualDeletedMessages.asRanges().iterator().next();
 
-                    if (log.isDebugEnabled()) {
-                        log.debug("[{}] Found a position range to mark delete 
for cursor {}: {} ", ledger.getName(),
-                                name, range);
-                    }
+            // Bug:7062188 - markDeletePosition can sometimes be stuck at the 
beginning of an empty ledger.
 
 Review comment:
   Yes, not related with this change but will remove

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to