This is an automated email from the ASF dual-hosted git repository.
zhaocong pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 2df8853da43 [fix][broker] Fix can't stop phase-two of compaction even
though messageId read reaches lastReadId (#20988)
2df8853da43 is described below
commit 2df8853da4331391284f9efb1892708fa2df4646
Author: Cong Zhao <[email protected]>
AuthorDate: Tue Aug 22 18:07:09 2023 +0800
[fix][broker] Fix can't stop phase-two of compaction even though messageId
read reaches lastReadId (#20988)
(cherry picked from commit 9e2195ce553ddbd6859b271a5ae0f2a23db02c44)
---
.../pulsar/compaction/TwoPhaseCompactor.java | 1 +
.../apache/pulsar/compaction/CompactorTest.java | 93 +++++++++++++++++-----
.../compaction/TopicCompactionServiceTest.java | 32 +++++++-
3 files changed, 102 insertions(+), 24 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index d350ea08532..6396b72b7dd 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -281,6 +281,7 @@ public class TwoPhaseCompactor extends Compactor {
promise.complete(null);
}
});
+ return;
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
index e86be6a4db8..d97d22ae7f1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
@@ -19,13 +19,10 @@
package org.apache.pulsar.compaction;
import static org.apache.pulsar.client.impl.RawReaderTest.extractKey;
-
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
import io.netty.buffer.ByteBuf;
-
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
@@ -33,23 +30,30 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-
+import lombok.Cleanup;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.RawMessageImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.protocol.Commands;
+import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -65,7 +69,6 @@ public class CompactorTest extends
MockedPulsarServiceBaseTest {
protected Compactor compactor;
-
@BeforeMethod
@Override
public void setup() throws Exception {
@@ -101,16 +104,17 @@ public class CompactorTest extends
MockedPulsarServiceBaseTest {
return compactor;
}
- private List<String> compactAndVerify(String topic, Map<String, byte[]>
expected, boolean checkMetrics) throws Exception {
+ private List<String> compactAndVerify(String topic, Map<String, byte[]>
expected, boolean checkMetrics)
+ throws Exception {
long compactedLedgerId = compact(topic);
LedgerHandle ledger = bk.openLedger(compactedLedgerId,
-
Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
-
Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
+ Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
+ Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
Assert.assertEquals(ledger.getLastAddConfirmed() + 1, // 0..lac
- expected.size(),
- "Should have as many entries as there is keys");
+ expected.size(),
+ "Should have as many entries as there is keys");
List<String> keys = new ArrayList<>();
Enumeration<LedgerEntry> entries = ledger.readEntries(0,
ledger.getLastAddConfirmed());
@@ -124,7 +128,7 @@ public class CompactorTest extends
MockedPulsarServiceBaseTest {
byte[] bytes = new byte[payload.readableBytes()];
payload.readBytes(bytes);
Assert.assertEquals(bytes, expected.remove(key),
- "Compacted version should match expected
version");
+ "Compacted version should match expected version");
m.close();
}
if (checkMetrics) {
@@ -148,17 +152,18 @@ public class CompactorTest extends
MockedPulsarServiceBaseTest {
final int numMessages = 1000;
final int maxKeys = 10;
+ @Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
- .enableBatching(false)
- .messageRoutingMode(MessageRoutingMode.SinglePartition)
- .create();
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
Map<String, byte[]> expected = new HashMap<>();
Random r = new Random(0);
for (int j = 0; j < numMessages; j++) {
int keyIndex = r.nextInt(maxKeys);
- String key = "key"+keyIndex;
+ String key = "key" + keyIndex;
byte[] data = ("my-message-" + key + "-" + j).getBytes();
producer.newMessage()
.key(key)
@@ -173,10 +178,11 @@ public class CompactorTest extends
MockedPulsarServiceBaseTest {
public void testCompactAddCompact() throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";
+ @Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
- .enableBatching(false)
- .messageRoutingMode(MessageRoutingMode.SinglePartition)
- .create();
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
Map<String, byte[]> expected = new HashMap<>();
@@ -210,10 +216,11 @@ public class CompactorTest extends
MockedPulsarServiceBaseTest {
public void testCompactedInOrder() throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";
+ @Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
- .enableBatching(false)
- .messageRoutingMode(MessageRoutingMode.SinglePartition)
- .create();
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
producer.newMessage()
.key("c")
@@ -256,6 +263,48 @@ public class CompactorTest extends
MockedPulsarServiceBaseTest {
Assert.assertEquals(compactor.getPhaseOneLoopReadTimeoutInSeconds(),
60);
}
+ @Test
+ public void testCompactedWithConcurrentSend() throws Exception {
+ String topic =
"persistent://my-property/use/my-ns/testCompactedWithConcurrentSend";
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+
+ var future = CompletableFuture.runAsync(() -> {
+ for (int i = 0; i < 100; i++) {
+ try {
+
producer.newMessage().key(String.valueOf(i)).value(String.valueOf(i).getBytes()).send();
+ } catch (PulsarClientException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ PersistentTopic persistentTopic = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topic).get();
+ PulsarTopicCompactionService topicCompactionService =
+ (PulsarTopicCompactionService)
persistentTopic.getTopicCompactionService();
+
+ Awaitility.await().untilAsserted(() -> {
+ long compactedLedgerId = compact(topic);
+ Thread.sleep(300);
+ Optional<CompactedTopicContext> compactedTopicContext =
topicCompactionService.getCompactedTopic()
+ .getCompactedTopicContext();
+ Assert.assertTrue(compactedTopicContext.isPresent());
+ Assert.assertEquals(compactedTopicContext.get().ledger.getId(),
compactedLedgerId);
+ });
+
+ Position lastCompactedPosition =
topicCompactionService.getLastCompactedPosition().get();
+ Entry lastCompactedEntry =
topicCompactionService.readLastCompactedEntry().get();
+
+
Assert.assertTrue(PositionImpl.get(lastCompactedPosition.getLedgerId(),
lastCompactedPosition.getEntryId())
+ .compareTo(lastCompactedEntry.getLedgerId(),
lastCompactedEntry.getEntryId()) >= 0);
+
+ future.join();
+ }
+
public ByteBuf extractPayload(RawMessage m) throws Exception {
ByteBuf payloadAndMetadata = m.getHeadersAndPayload();
Commands.skipChecksumIfPresent(payloadAndMetadata);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java
index b92e54a37bf..4abe00fb0c6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java
@@ -21,14 +21,20 @@ package org.apache.pulsar.compaction;
import static
org.apache.pulsar.compaction.Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY;
import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
import static org.testng.Assert.assertEquals;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import lombok.Cleanup;
+import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
@@ -36,21 +42,43 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-public class TopicCompactionServiceTest extends CompactorTest {
+public class TopicCompactionServiceTest extends MockedPulsarServiceBaseTest {
+
+ protected ScheduledExecutorService compactionScheduler;
+ protected BookKeeper bk;
+ private TwoPhaseCompactor compactor;
@BeforeMethod
@Override
public void setup() throws Exception {
- super.setup();
+ super.internalSetup();
+
admin.clusters().createCluster("test",
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1",
"role2"), Set.of("test"));
String defaultTenant = "prop-xyz";
admin.tenants().createTenant(defaultTenant, tenantInfo);
String defaultNamespace = defaultTenant + "/ns1";
admin.namespaces().createNamespace(defaultNamespace, Set.of("test"));
+
+ compactionScheduler = Executors.newSingleThreadScheduledExecutor(
+ new
ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build());
+ bk = pulsar.getBookKeeperClientFactory().create(
+ this.conf, null, null, Optional.empty(), null);
+ compactor = new TwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ public void cleanup() throws Exception {
+ super.internalCleanup();
+ bk.close();
+ if (compactionScheduler != null) {
+ compactionScheduler.shutdownNow();
+ }
}
@Test