This is an automated email from the ASF dual-hosted git repository. zhaocong pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a5d1ca00712d489d8a5985c0af9b851ecd5d138c Author: Cong Zhao <[email protected]> AuthorDate: Tue Aug 22 18:07:09 2023 +0800 [fix][broker] Fix can't stop phase-two of compaction even though messageId read reaches lastReadId (#20988) (cherry picked from commit 9e2195ce553ddbd6859b271a5ae0f2a23db02c44) --- .../pulsar/compaction/TwoPhaseCompactor.java | 1 + .../apache/pulsar/compaction/CompactorTest.java | 96 +++++++++++++++++----- 2 files changed, 75 insertions(+), 22 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java index 7dcbda5a829..b1e24cce7c7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java @@ -284,6 +284,7 @@ public class TwoPhaseCompactor extends Compactor { promise.complete(null); } }); + return; } } catch (InterruptedException ie) { Thread.currentThread().interrupt(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java index bd875384aec..304aae3f57f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java @@ -19,13 +19,10 @@ package org.apache.pulsar.compaction; import static org.apache.pulsar.client.impl.RawReaderTest.extractKey; - import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ThreadFactoryBuilder; - import io.netty.buffer.ByteBuf; - import java.util.ArrayList; import java.util.Enumeration; import java.util.HashMap; @@ -33,22 +30,29 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Random; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; - +import lombok.Cleanup; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.RawMessage; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.RawMessageImpl; import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.protocol.Commands; +import org.awaitility.Awaitility; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -59,7 +63,6 @@ import org.testng.annotations.Test; public class CompactorTest extends MockedPulsarServiceBaseTest { private ScheduledExecutorService compactionScheduler; - @BeforeMethod @Override public void setup() throws Exception { @@ -82,18 +85,19 @@ public class CompactorTest extends MockedPulsarServiceBaseTest { compactionScheduler.shutdownNow(); } - private List<String> compactAndVerify(String topic, Map<String, byte[]> expected, boolean checkMetrics) throws Exception { + private List<String> compactAndVerify(String topic, Map<String, byte[]> expected, boolean checkMetrics) + throws Exception { BookKeeper bk = pulsar.getBookKeeperClientFactory().create( this.conf, null, null, Optional.empty(), null); Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); long compactedLedgerId = compactor.compact(topic).get(); LedgerHandle ledger = bk.openLedger(compactedLedgerId, - Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, - Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD); + Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, + Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD); Assert.assertEquals(ledger.getLastAddConfirmed() + 1, // 0..lac - expected.size(), - "Should have as many entries as there is keys"); + expected.size(), + "Should have as many entries as there is keys"); List<String> keys = new ArrayList<>(); Enumeration<LedgerEntry> entries = ledger.readEntries(0, ledger.getLastAddConfirmed()); @@ -107,7 +111,7 @@ public class CompactorTest extends MockedPulsarServiceBaseTest { byte[] bytes = new byte[payload.readableBytes()]; payload.readBytes(bytes); Assert.assertEquals(bytes, expected.remove(key), - "Compacted version should match expected version"); + "Compacted version should match expected version"); m.close(); } if (checkMetrics) { @@ -131,17 +135,18 @@ public class CompactorTest extends MockedPulsarServiceBaseTest { final int numMessages = 1000; final int maxKeys = 10; + @Cleanup Producer<byte[]> producer = pulsarClient.newProducer().topic(topic) - .enableBatching(false) - .messageRoutingMode(MessageRoutingMode.SinglePartition) - .create(); + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); Map<String, byte[]> expected = new HashMap<>(); Random r = new Random(0); for (int j = 0; j < numMessages; j++) { int keyIndex = r.nextInt(maxKeys); - String key = "key"+keyIndex; + String key = "key" + keyIndex; byte[] data = ("my-message-" + key + "-" + j).getBytes(); producer.newMessage() .key(key) @@ -156,10 +161,11 @@ public class CompactorTest extends MockedPulsarServiceBaseTest { public void testCompactAddCompact() throws Exception { String topic = "persistent://my-property/use/my-ns/my-topic1"; + @Cleanup Producer<byte[]> producer = pulsarClient.newProducer().topic(topic) - .enableBatching(false) - .messageRoutingMode(MessageRoutingMode.SinglePartition) - .create(); + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); Map<String, byte[]> expected = new HashMap<>(); @@ -193,10 +199,11 @@ public class CompactorTest extends MockedPulsarServiceBaseTest { public void testCompactedInOrder() throws Exception { String topic = "persistent://my-property/use/my-ns/my-topic1"; + @Cleanup Producer<byte[]> producer = pulsarClient.newProducer().topic(topic) - .enableBatching(false) - .messageRoutingMode(MessageRoutingMode.SinglePartition) - .create(); + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); producer.newMessage() .key("c") @@ -243,6 +250,51 @@ public class CompactorTest extends MockedPulsarServiceBaseTest { } + @Test + public void testCompactedWithConcurrentSend() throws Exception { + String topic = "persistent://my-property/use/my-ns/testCompactedWithConcurrentSend"; + + @Cleanup + Producer<byte[]> producer = pulsarClient.newProducer().topic(topic) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + + + BookKeeper bk = pulsar.getBookKeeperClientFactory().create( + this.conf, null, null, Optional.empty(), null); + Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + + CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { + for (int i = 0; i < 100; i++) { + try { + producer.newMessage().key(String.valueOf(i)).value(String.valueOf(i).getBytes()).send(); + } catch (PulsarClientException e) { + throw new RuntimeException(e); + } + } + }); + + PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get(); + CompactedTopic compactedTopic = persistentTopic.getCompactedTopic(); + + Awaitility.await().untilAsserted(() -> { + long compactedLedgerId = compactor.compact(topic).get(); + Thread.sleep(300); + Optional<CompactedTopicContext> compactedTopicContext = persistentTopic.getCompactedTopicContext(); + Assert.assertTrue(compactedTopicContext.isPresent()); + Assert.assertEquals(compactedTopicContext.get().ledger.getId(), compactedLedgerId); + }); + + Position lastCompactedPosition = compactedTopic.getCompactionHorizon().get(); + Entry lastCompactedEntry = compactedTopic.readLastEntryOfCompactedLedger().get(); + + Assert.assertTrue(PositionImpl.get(lastCompactedPosition.getLedgerId(), lastCompactedPosition.getEntryId()) + .compareTo(PositionImpl.get(lastCompactedEntry.getLedgerId(), lastCompactedEntry.getEntryId())) >= 0); + + future.join(); + } + public ByteBuf extractPayload(RawMessage m) throws Exception { ByteBuf payloadAndMetadata = m.getHeadersAndPayload(); Commands.skipChecksumIfPresent(payloadAndMetadata);
