This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0cb1c780412 [fix][broker] Fix get outdated compactedTopicContext after 
compactionHorizon has been updated (#20984)
0cb1c780412 is described below

commit 0cb1c780412ca3cc1d421fe6ebb5de95cf7392fa
Author: Cong Zhao <[email protected]>
AuthorDate: Thu Aug 17 00:09:39 2023 +0800

    [fix][broker] Fix get outdated compactedTopicContext after 
compactionHorizon has been updated (#20984)
---
 .../pulsar/compaction/CompactedTopicImpl.java      | 12 ++--
 .../pulsar/compaction/CompactedTopicTest.java      | 66 ++++++++++++++++++++++
 2 files changed, 74 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 07082eec6b9..fe24a23b7cd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -53,6 +53,10 @@ import org.apache.pulsar.common.api.proto.MessageIdData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Note: If you want to guarantee that strong consistency between 
`compactionHorizon` and `compactedTopicContext`,
+ * you need to call getting them method in "synchronized(CompactedTopicImpl){ 
... }" lock block.
+ */
 public class CompactedTopicImpl implements CompactedTopic {
     static final long NEWER_THAN_COMPACTED = -0xfeed0fbaL;
     static final long COMPACT_LEDGER_EMPTY = -0xfeed0fbbL;
@@ -70,14 +74,14 @@ public class CompactedTopicImpl implements CompactedTopic {
     @Override
     public CompletableFuture<CompactedTopicContext> 
newCompactedLedger(Position p, long compactedLedgerId) {
         synchronized (this) {
-            compactionHorizon = (PositionImpl) p;
-
             CompletableFuture<CompactedTopicContext> previousContext = 
compactedTopicContext;
             compactedTopicContext = openCompactedLedger(bk, compactedLedgerId);
 
+            compactionHorizon = (PositionImpl) p;
+
             // delete the ledger from the old context once the new one is open
-            return compactedTopicContext.thenCompose(__ ->
-                    previousContext != null ? previousContext : 
CompletableFuture.completedFuture(null));
+            return compactedTopicContext.thenCompose(
+                    __ -> previousContext != null ? previousContext : 
CompletableFuture.completedFuture(null));
         }
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index 96adc67a27b..4a098fd0eb9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -38,6 +38,7 @@ import lombok.Cleanup;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.tuple.Pair;
@@ -62,6 +63,7 @@ import 
org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -840,4 +842,68 @@ public class CompactedTopicTest extends 
MockedPulsarServiceBaseTest {
         Assert.assertTrue(reader.hasMessageAvailable());
         Assert.assertEquals(reader.readNext().getMessageId(), 
lastMessage.get());
     }
+
+    @Test
+    public void 
testCompactWithConcurrentGetCompactionHorizonAndCompactedTopicContext() throws 
Exception {
+        BookKeeper bk0 = pulsar.getBookKeeperClientFactory().create(
+                this.conf, null, null, Optional.empty(), null);
+
+        final BookKeeper bk = Mockito.spy(bk0);
+
+        Mockito.doAnswer(invocation -> {
+            Thread.sleep(1500);
+            invocation.callRealMethod();
+            return null;
+        }).when(bk).asyncOpenLedger(Mockito.anyLong(), Mockito.any(), 
Mockito.any(), Mockito.any(), Mockito.any());
+
+        LedgerHandle oldCompactedLedger = bk.createLedger(1, 1,
+                Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
+                Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
+        oldCompactedLedger.close();
+        LedgerHandle newCompactedLedger = bk.createLedger(1, 1,
+                Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
+                Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
+        newCompactedLedger.close();
+
+        CompactedTopicImpl compactedTopic = new CompactedTopicImpl(bk);
+
+        PositionImpl oldHorizon = new PositionImpl(1, 2);
+        var future = CompletableFuture.supplyAsync(() -> {
+            // set the compacted topic ledger
+            return compactedTopic.newCompactedLedger(oldHorizon, 
oldCompactedLedger.getId());
+        });
+        Thread.sleep(500);
+
+        Optional<Position> compactionHorizon = 
compactedTopic.getCompactionHorizon();
+        CompletableFuture<CompactedTopicContext> compactedTopicContext =
+                compactedTopic.getCompactedTopicContextFuture();
+
+        if (compactedTopicContext != null) {
+            Assert.assertEquals(compactionHorizon.get(), oldHorizon);
+            Assert.assertNotNull(compactedTopicContext);
+            Assert.assertEquals(compactedTopicContext.join().ledger.getId(), 
oldCompactedLedger.getId());
+        } else {
+            Assert.assertTrue(compactionHorizon.isEmpty());
+        }
+
+        future.join();
+
+        PositionImpl newHorizon = new PositionImpl(1, 3);
+        var future2 = CompletableFuture.supplyAsync(() -> {
+            // update the compacted topic ledger
+            return compactedTopic.newCompactedLedger(newHorizon, 
newCompactedLedger.getId());
+        });
+        Thread.sleep(500);
+
+        compactionHorizon = compactedTopic.getCompactionHorizon();
+        compactedTopicContext = 
compactedTopic.getCompactedTopicContextFuture();
+
+        if (compactedTopicContext.join().ledger.getId() == 
newCompactedLedger.getId()) {
+            Assert.assertEquals(compactionHorizon.get(), newHorizon);
+        } else {
+            Assert.assertEquals(compactionHorizon.get(), oldHorizon);
+        }
+
+        future2.join();
+    }
 }

Reply via email to