This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2eda972 Fix race condition in skipEntries that moves readPosition and
markDeletePosition incorrectly (#1478)
2eda972 is described below
commit 2eda9725ecb0ec494512bf3725839f0d24bf1d0b
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Sat Mar 31 07:05:36 2018 -0700
Fix race condition in skipEntries that moves readPosition and
markDeletePosition incorrectly (#1478)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 25 +++++++++++-----------
1 file changed, 13 insertions(+), 12 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 9b3d9cd..29fe0a7 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1245,18 +1245,6 @@ public class ManagedCursorImpl implements ManagedCursor {
throw new IllegalArgumentException("Mark deleting an already
mark-deleted position");
}
- if (readPosition.compareTo(newMarkDeletePosition) <= 0) {
- // If the position that is mark-deleted is past the read position,
it
- // means that the client has skipped some entries. We need to move
- // read position forward
- PositionImpl oldReadPosition = readPosition;
- readPosition = ledger.getNextValidPosition(newMarkDeletePosition);
-
- if (log.isDebugEnabled()) {
- log.debug("[{}] Moved read position from: {} to: {}",
ledger.getName(), oldReadPosition, readPosition);
- }
- }
-
PositionImpl oldMarkDeletePosition = markDeletePosition;
if (!newMarkDeletePosition.equals(oldMarkDeletePosition)) {
@@ -1285,6 +1273,19 @@ public class ManagedCursorImpl implements ManagedCursor {
// markDelete-position and clear out deletedMsgSet
markDeletePosition = PositionImpl.get(newMarkDeletePosition);
individualDeletedMessages.remove(Range.atMost(markDeletePosition));
+
+ if (readPosition.compareTo(newMarkDeletePosition) <= 0) {
+ // If the position that is mark-deleted is past the read position,
it
+ // means that the client has skipped some entries. We need to move
+ // read position forward
+ PositionImpl oldReadPosition = readPosition;
+ readPosition = ledger.getNextValidPosition(newMarkDeletePosition);
+
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Moved read position from: {} to: {}, and new
mark-delete position {}", ledger.getName(),
+ oldReadPosition, readPosition, markDeletePosition);
+ }
+ }
return newMarkDeletePosition;
}
--
To stop receiving notification emails like this one, please contact
[email protected].