This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 33ab210356543dd641d263995b8614674a1636a3
Author: Cong Zhao <[email protected]>
AuthorDate: Tue Aug 22 18:07:09 2023 +0800

    [fix][broker] Fix can't stop phase-two of compaction even though messageId 
read reaches lastReadId (#20988)
    
    (cherry picked from commit 9e2195ce553ddbd6859b271a5ae0f2a23db02c44)
---
 .../pulsar/compaction/TwoPhaseCompactor.java       |  1 +
 .../apache/pulsar/compaction/CompactorTest.java    | 95 +++++++++++++++++-----
 2 files changed, 74 insertions(+), 22 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 21ccb281f05..d71227d94f5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -283,6 +283,7 @@ public class TwoPhaseCompactor extends Compactor {
                                     promise.complete(null);
                                 }
                             });
+                            return;
                         }
                     } catch (InterruptedException ie) {
                         Thread.currentThread().interrupt();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
index e86be6a4db8..5f83ef97d32 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
@@ -19,13 +19,10 @@
 package org.apache.pulsar.compaction;
 
 import static org.apache.pulsar.client.impl.RawReaderTest.extractKey;
-
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
 import io.netty.buffer.ByteBuf;
-
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.HashMap;
@@ -33,23 +30,30 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Random;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-
+import lombok.Cleanup;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.RawMessage;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.RawMessageImpl;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.protocol.Commands;
+import org.awaitility.Awaitility;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -65,7 +69,6 @@ public class CompactorTest extends 
MockedPulsarServiceBaseTest {
     protected Compactor compactor;
 
 
-
     @BeforeMethod
     @Override
     public void setup() throws Exception {
@@ -101,16 +104,17 @@ public class CompactorTest extends 
MockedPulsarServiceBaseTest {
         return compactor;
     }
 
-    private List<String> compactAndVerify(String topic, Map<String, byte[]> 
expected, boolean checkMetrics) throws Exception {
+    private List<String> compactAndVerify(String topic, Map<String, byte[]> 
expected, boolean checkMetrics)
+            throws Exception {
 
         long compactedLedgerId = compact(topic);
 
         LedgerHandle ledger = bk.openLedger(compactedLedgerId,
-                                            
Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
-                                            
Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
+                Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
+                Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
         Assert.assertEquals(ledger.getLastAddConfirmed() + 1, // 0..lac
-                            expected.size(),
-                            "Should have as many entries as there is keys");
+                expected.size(),
+                "Should have as many entries as there is keys");
 
         List<String> keys = new ArrayList<>();
         Enumeration<LedgerEntry> entries = ledger.readEntries(0, 
ledger.getLastAddConfirmed());
@@ -124,7 +128,7 @@ public class CompactorTest extends 
MockedPulsarServiceBaseTest {
             byte[] bytes = new byte[payload.readableBytes()];
             payload.readBytes(bytes);
             Assert.assertEquals(bytes, expected.remove(key),
-                                "Compacted version should match expected 
version");
+                    "Compacted version should match expected version");
             m.close();
         }
         if (checkMetrics) {
@@ -148,17 +152,18 @@ public class CompactorTest extends 
MockedPulsarServiceBaseTest {
         final int numMessages = 1000;
         final int maxKeys = 10;
 
+        @Cleanup
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
-            .enableBatching(false)
-            .messageRoutingMode(MessageRoutingMode.SinglePartition)
-            .create();
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
 
         Map<String, byte[]> expected = new HashMap<>();
         Random r = new Random(0);
 
         for (int j = 0; j < numMessages; j++) {
             int keyIndex = r.nextInt(maxKeys);
-            String key = "key"+keyIndex;
+            String key = "key" + keyIndex;
             byte[] data = ("my-message-" + key + "-" + j).getBytes();
             producer.newMessage()
                     .key(key)
@@ -173,10 +178,11 @@ public class CompactorTest extends 
MockedPulsarServiceBaseTest {
     public void testCompactAddCompact() throws Exception {
         String topic = "persistent://my-property/use/my-ns/my-topic1";
 
+        @Cleanup
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
-            .enableBatching(false)
-            .messageRoutingMode(MessageRoutingMode.SinglePartition)
-            .create();
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
 
         Map<String, byte[]> expected = new HashMap<>();
 
@@ -210,10 +216,11 @@ public class CompactorTest extends 
MockedPulsarServiceBaseTest {
     public void testCompactedInOrder() throws Exception {
         String topic = "persistent://my-property/use/my-ns/my-topic1";
 
+        @Cleanup
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
-            .enableBatching(false)
-            .messageRoutingMode(MessageRoutingMode.SinglePartition)
-            .create();
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
 
         producer.newMessage()
                 .key("c")
@@ -256,6 +263,50 @@ public class CompactorTest extends 
MockedPulsarServiceBaseTest {
         Assert.assertEquals(compactor.getPhaseOneLoopReadTimeoutInSeconds(), 
60);
     }
 
+    @Test
+    public void testCompactedWithConcurrentSend() throws Exception {
+        String topic = 
"persistent://my-property/use/my-ns/testCompactedWithConcurrentSend";
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
+
+        BookKeeper bk = pulsar.getBookKeeperClientFactory().create(
+                this.conf, null, null, Optional.empty(), null);
+        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+
+        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
+            for (int i = 0; i < 100; i++) {
+                try {
+                    
producer.newMessage().key(String.valueOf(i)).value(String.valueOf(i).getBytes()).send();
+                } catch (PulsarClientException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        });
+
+        PersistentTopic persistentTopic = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topic).get();
+        CompactedTopic compactedTopic = persistentTopic.getCompactedTopic();
+
+        Awaitility.await().untilAsserted(() -> {
+            long compactedLedgerId = compactor.compact(topic).join();
+            Thread.sleep(300);
+            Optional<CompactedTopicContext> compactedTopicContext = 
persistentTopic.getCompactedTopicContext();
+            Assert.assertTrue(compactedTopicContext.isPresent());
+            Assert.assertEquals(compactedTopicContext.get().ledger.getId(), 
compactedLedgerId);
+        });
+
+        Position lastCompactedPosition = 
compactedTopic.getCompactionHorizon().get();
+        Entry lastCompactedEntry = 
compactedTopic.readLastEntryOfCompactedLedger().get();
+
+        
Assert.assertTrue(PositionImpl.get(lastCompactedPosition.getLedgerId(), 
lastCompactedPosition.getEntryId())
+                .compareTo(lastCompactedEntry.getLedgerId(), 
lastCompactedEntry.getEntryId()) >= 0);
+
+        future.join();
+    }
+
     public ByteBuf extractPayload(RawMessage m) throws Exception {
         ByteBuf payloadAndMetadata = m.getHeadersAndPayload();
         Commands.skipChecksumIfPresent(payloadAndMetadata);

Reply via email to