This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 75e6a204d86 [Branch-2.7][Cherry-pick] Fix skips compacted data for
reader/consumer (#16301)
75e6a204d86 is described below
commit 75e6a204d86d440196b13cc6c88358600b8257df
Author: Jiwei Guo <[email protected]>
AuthorDate: Tue Jul 5 17:53:11 2022 +0800
[Branch-2.7][Cherry-pick] Fix skips compacted data for reader/consumer
(#16301)
---
.../src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java | 6 +++++-
.../java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 6 ++----
.../apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java | 2 +-
.../main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java | 7 ++++++-
4 files changed, 14 insertions(+), 7 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index a9724de2b39..763610f5740 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -398,7 +398,11 @@ public interface ManagedCursor {
* @param newReadPosition
* the position where to move the cursor
*/
- void seek(Position newReadPosition);
+ default void seek(Position newReadPosition) {
+ seek(newReadPosition, false);
+ }
+
+ void seek(Position newReadPosition, boolean force);
/**
* Clear the cursor backlog.
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index b932ebb78eb..77012b6db8d 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -2124,18 +2124,16 @@ public class ManagedCursorImpl implements ManagedCursor
{
}
@Override
- public void seek(Position newReadPositionInt) {
+ public void seek(Position newReadPositionInt, boolean force) {
checkArgument(newReadPositionInt instanceof PositionImpl);
PositionImpl newReadPosition = (PositionImpl) newReadPositionInt;
lock.writeLock().lock();
try {
- if (newReadPosition.compareTo(markDeletePosition) <= 0) {
+ if (!force && newReadPosition.compareTo(markDeletePosition) <= 0) {
// Make sure the newReadPosition comes after the mark delete
position
newReadPosition =
ledger.getNextValidPosition(markDeletePosition);
}
-
- PositionImpl oldReadPosition = readPosition;
readPosition = newReadPosition;
} finally {
lock.writeLock().unlock();
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index a43e6b4f922..3b64f3f0ee5 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -160,7 +160,7 @@ public class ManagedCursorContainerTest {
}
@Override
- public void seek(Position newReadPosition) {
+ public void seek(Position newReadPosition, boolean force) {
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 5d0ea33cc8a..5b95f1413ad 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -122,7 +122,12 @@ public class CompactedTopicImpl implements CompactedTopic {
return readEntries(context.ledger, startPoint,
endPoint)
.thenAccept((entries) -> {
Entry lastEntry =
entries.get(entries.size() - 1);
-
cursor.seek(lastEntry.getPosition().getNext());
+ // The compaction task depends on the
last snapshot and the incremental
+ // entries to build the new snapshot.
So for the compaction cursor, we
+ // need to force seek the read
position to ensure the compactor can read
+ // the complete last snapshot because
of the compactor will read the data
+ // before the compaction cursor mark
delete position
+
cursor.seek(lastEntry.getPosition().getNext(), true);
callback.readEntriesComplete(entries,
consumer);
});
}