This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 77d5bb5ff1b Fix lost compaction data due to compaction properties 
missed during reset-cursor. (#16404)
77d5bb5ff1b is described below

commit 77d5bb5ff1bc68b7fff7ef869b35c071b550b670
Author: Jiwei Guo <[email protected]>
AuthorDate: Wed Jul 6 13:31:55 2022 +0800

    Fix lost compaction data due to compaction properties missed during 
reset-cursor. (#16404)
---
 .../org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java  | 10 ++++++++--
 .../org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java  |  3 ++-
 2 files changed, 10 insertions(+), 3 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 77012b6db8d..5986334cb06 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -188,6 +188,7 @@ public class ManagedCursorImpl implements ManagedCursor {
     private long entriesReadCount;
     private long entriesReadSize;
     private int individualDeletedMessagesSerializedSize;
+    private static final String COMPACTION_CURSOR_NAME = "__compaction";
 
     class MarkDeleteEntry {
         final PositionImpl newPosition;
@@ -1033,7 +1034,8 @@ public class ManagedCursorImpl implements ManagedCursor {
                                 Range.closedOpen(markDeletePosition, 
newMarkDeletePosition)));
                     }
                     markDeletePosition = newMarkDeletePosition;
-                    lastMarkDeleteEntry = new 
MarkDeleteEntry(newMarkDeletePosition, Collections.emptyMap(),
+                    lastMarkDeleteEntry = new 
MarkDeleteEntry(newMarkDeletePosition, isCompactionCursor() ?
+                            getProperties() : Collections.emptyMap(),
                             null, null);
                     individualDeletedMessages.clear();
                     if (config.isDeletionAtBatchIndexLevelEnabled() && 
batchDeletedIndexes != null) {
@@ -1084,7 +1086,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         };
 
         lastMarkDeleteEntry = new MarkDeleteEntry(newPosition, 
getProperties(), null, null);
-        internalAsyncMarkDelete(newPosition, Collections.emptyMap(), new 
MarkDeleteCallback() {
+        internalAsyncMarkDelete(newPosition, isCompactionCursor() ? 
getProperties() : Collections.emptyMap(), new MarkDeleteCallback() {
             @Override
             public void markDeleteComplete(Object ctx) {
                 finalCallback.operationComplete();
@@ -3016,5 +3018,9 @@ public class ManagedCursorImpl implements ManagedCursor {
         return Math.min(maxEntriesBasedOnSize, maxEntries);
     }
 
+    private boolean isCompactionCursor() {
+        return COMPACTION_CURSOR_NAME.equals(name);
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(ManagedCursorImpl.class);
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index f8d7098d042..b66b9a8aa8d 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2294,7 +2294,8 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             if (highestPositionToDelete.compareTo((PositionImpl) 
cursor.getMarkDeletedPosition()) > 0
                     && highestPositionToDelete.compareTo((PositionImpl) 
cursor.getManagedLedger().getLastConfirmedEntry()) <= 0
                     && !(!cursor.isDurable() && cursor instanceof 
NonDurableCursorImpl && ((NonDurableCursorImpl) cursor).isReadCompacted())) {
-                cursor.asyncMarkDelete(highestPositionToDelete, new 
MarkDeleteCallback() {
+                cursor.asyncMarkDelete(highestPositionToDelete, 
cursor.getProperties(), new MarkDeleteCallback() {
+
                     @Override
                     public void markDeleteComplete(Object ctx) {
                     }

Reply via email to