This is an automated email from the ASF dual-hosted git repository.
zhaocong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0cb1c780412 [fix][broker] Fix get outdated compactedTopicContext after
compactionHorizon has been updated (#20984)
0cb1c780412 is described below
commit 0cb1c780412ca3cc1d421fe6ebb5de95cf7392fa
Author: Cong Zhao <[email protected]>
AuthorDate: Thu Aug 17 00:09:39 2023 +0800
[fix][broker] Fix get outdated compactedTopicContext after
compactionHorizon has been updated (#20984)
---
.../pulsar/compaction/CompactedTopicImpl.java | 12 ++--
.../pulsar/compaction/CompactedTopicTest.java | 66 ++++++++++++++++++++++
2 files changed, 74 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 07082eec6b9..fe24a23b7cd 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -53,6 +53,10 @@ import org.apache.pulsar.common.api.proto.MessageIdData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * Note: If you want to guarantee that strong consistency between
`compactionHorizon` and `compactedTopicContext`,
+ * you need to call getting them method in "synchronized(CompactedTopicImpl){
... }" lock block.
+ */
public class CompactedTopicImpl implements CompactedTopic {
static final long NEWER_THAN_COMPACTED = -0xfeed0fbaL;
static final long COMPACT_LEDGER_EMPTY = -0xfeed0fbbL;
@@ -70,14 +74,14 @@ public class CompactedTopicImpl implements CompactedTopic {
@Override
public CompletableFuture<CompactedTopicContext>
newCompactedLedger(Position p, long compactedLedgerId) {
synchronized (this) {
- compactionHorizon = (PositionImpl) p;
-
CompletableFuture<CompactedTopicContext> previousContext =
compactedTopicContext;
compactedTopicContext = openCompactedLedger(bk, compactedLedgerId);
+ compactionHorizon = (PositionImpl) p;
+
// delete the ledger from the old context once the new one is open
- return compactedTopicContext.thenCompose(__ ->
- previousContext != null ? previousContext :
CompletableFuture.completedFuture(null));
+ return compactedTopicContext.thenCompose(
+ __ -> previousContext != null ? previousContext :
CompletableFuture.completedFuture(null));
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index 96adc67a27b..4a098fd0eb9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -38,6 +38,7 @@ import lombok.Cleanup;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
@@ -62,6 +63,7 @@ import
org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -840,4 +842,68 @@ public class CompactedTopicTest extends
MockedPulsarServiceBaseTest {
Assert.assertTrue(reader.hasMessageAvailable());
Assert.assertEquals(reader.readNext().getMessageId(),
lastMessage.get());
}
+
+ @Test
+ public void
testCompactWithConcurrentGetCompactionHorizonAndCompactedTopicContext() throws
Exception {
+ BookKeeper bk0 = pulsar.getBookKeeperClientFactory().create(
+ this.conf, null, null, Optional.empty(), null);
+
+ final BookKeeper bk = Mockito.spy(bk0);
+
+ Mockito.doAnswer(invocation -> {
+ Thread.sleep(1500);
+ invocation.callRealMethod();
+ return null;
+ }).when(bk).asyncOpenLedger(Mockito.anyLong(), Mockito.any(),
Mockito.any(), Mockito.any(), Mockito.any());
+
+ LedgerHandle oldCompactedLedger = bk.createLedger(1, 1,
+ Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
+ Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
+ oldCompactedLedger.close();
+ LedgerHandle newCompactedLedger = bk.createLedger(1, 1,
+ Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
+ Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
+ newCompactedLedger.close();
+
+ CompactedTopicImpl compactedTopic = new CompactedTopicImpl(bk);
+
+ PositionImpl oldHorizon = new PositionImpl(1, 2);
+ var future = CompletableFuture.supplyAsync(() -> {
+ // set the compacted topic ledger
+ return compactedTopic.newCompactedLedger(oldHorizon,
oldCompactedLedger.getId());
+ });
+ Thread.sleep(500);
+
+ Optional<Position> compactionHorizon =
compactedTopic.getCompactionHorizon();
+ CompletableFuture<CompactedTopicContext> compactedTopicContext =
+ compactedTopic.getCompactedTopicContextFuture();
+
+ if (compactedTopicContext != null) {
+ Assert.assertEquals(compactionHorizon.get(), oldHorizon);
+ Assert.assertNotNull(compactedTopicContext);
+ Assert.assertEquals(compactedTopicContext.join().ledger.getId(),
oldCompactedLedger.getId());
+ } else {
+ Assert.assertTrue(compactionHorizon.isEmpty());
+ }
+
+ future.join();
+
+ PositionImpl newHorizon = new PositionImpl(1, 3);
+ var future2 = CompletableFuture.supplyAsync(() -> {
+ // update the compacted topic ledger
+ return compactedTopic.newCompactedLedger(newHorizon,
newCompactedLedger.getId());
+ });
+ Thread.sleep(500);
+
+ compactionHorizon = compactedTopic.getCompactionHorizon();
+ compactedTopicContext =
compactedTopic.getCompactedTopicContextFuture();
+
+ if (compactedTopicContext.join().ledger.getId() ==
newCompactedLedger.getId()) {
+ Assert.assertEquals(compactionHorizon.get(), newHorizon);
+ } else {
+ Assert.assertEquals(compactionHorizon.get(), oldHorizon);
+ }
+
+ future2.join();
+ }
}