This is an automated email from the ASF dual-hosted git repository.
zhaocong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fc393f69043 [fix][broker] Avoid compaction task stuck when the last
message to compact is a marker (#21718)
fc393f69043 is described below
commit fc393f69043be6eb1b2572a27f131656a2cbc7f6
Author: Cong Zhao <[email protected]>
AuthorDate: Wed Dec 20 09:39:00 2023 +0800
[fix][broker] Avoid compaction task stuck when the last message to compact
is a marker (#21718)
---
.../broker/service/AbstractBaseDispatcher.java | 36 ++++++++-----
.../pulsar/compaction/TwoPhaseCompactor.java | 11 +++-
.../broker/service/ReplicatorSubscriptionTest.java | 61 ++++++++++++++++++++++
.../pulsar/broker/transaction/TransactionTest.java | 59 +++++++++++++++++++++
4 files changed, 152 insertions(+), 15 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index ea68d588896..2f38ad67d4f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
+import org.apache.pulsar.compaction.Compactor;
import org.checkerframework.checker.nullness.qual.Nullable;
@Slf4j
@@ -174,13 +175,15 @@ public abstract class AbstractBaseDispatcher extends
EntryFilterSupport implemen
if (msgMetadata != null && msgMetadata.hasTxnidMostBits()
&& msgMetadata.hasTxnidLeastBits()) {
if (Markers.isTxnMarker(msgMetadata)) {
- // because consumer can receive message is smaller than
maxReadPosition,
- // so this marker is useless for this subscription
-
individualAcknowledgeMessageIfNeeded(Collections.singletonList(entry.getPosition()),
- Collections.emptyMap());
- entries.set(i, null);
- entry.release();
- continue;
+ if (cursor == null ||
!cursor.getName().equals(Compactor.COMPACTION_SUBSCRIPTION)) {
+ // because consumer can receive message is smaller
than maxReadPosition,
+ // so this marker is useless for this subscription
+
individualAcknowledgeMessageIfNeeded(Collections.singletonList(entry.getPosition()),
+ Collections.emptyMap());
+ entries.set(i, null);
+ entry.release();
+ continue;
+ }
} else if (((PersistentTopic) subscription.getTopic())
.isTxnAborted(new
TxnID(msgMetadata.getTxnidMostBits(), msgMetadata.getTxnidLeastBits()),
(PositionImpl) entry.getPosition())) {
@@ -192,19 +195,26 @@ public abstract class AbstractBaseDispatcher extends
EntryFilterSupport implemen
}
}
- if (msgMetadata == null ||
Markers.isServerOnlyMarker(msgMetadata)) {
+ if (msgMetadata == null ||
(Markers.isServerOnlyMarker(msgMetadata))) {
PositionImpl pos = (PositionImpl) entry.getPosition();
// Message metadata was corrupted or the messages was a
server-only marker
if
(Markers.isReplicatedSubscriptionSnapshotMarker(msgMetadata)) {
+ final int readerIndex = metadataAndPayload.readerIndex();
processReplicatedSubscriptionSnapshot(pos,
metadataAndPayload);
+ metadataAndPayload.readerIndex(readerIndex);
}
- entries.set(i, null);
- entry.release();
-
individualAcknowledgeMessageIfNeeded(Collections.singletonList(pos),
- Collections.emptyMap());
- continue;
+ // Deliver marker to __compaction cursor to avoid compaction
task stuck,
+ // and filter out them when doing topic compaction.
+ if (msgMetadata == null || cursor == null
+ ||
!cursor.getName().equals(Compactor.COMPACTION_SUBSCRIPTION)) {
+ entries.set(i, null);
+ entry.release();
+
individualAcknowledgeMessageIfNeeded(Collections.singletonList(pos),
+ Collections.emptyMap());
+ continue;
+ }
} else if (trackDelayedDelivery(entry.getLedgerId(),
entry.getEntryId(), msgMetadata)) {
// The message is marked for delayed delivery. Ignore for now.
entries.set(i, null);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index a78323a9cfe..647c34a94ad 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -45,6 +45,7 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.RawBatchConverter;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -130,7 +131,10 @@ public class TwoPhaseCompactor extends Compactor {
boolean replaceMessage = false;
mxBean.addCompactionReadOp(reader.getTopic(),
m.getHeadersAndPayload().readableBytes());
MessageMetadata metadata =
Commands.parseMessageMetadata(m.getHeadersAndPayload());
- if (RawBatchConverter.isReadableBatch(metadata)) {
+ if (Markers.isServerOnlyMarker(metadata)) {
+ mxBean.addCompactionRemovedEvent(reader.getTopic());
+ deletedMessage = true;
+ } else if (RawBatchConverter.isReadableBatch(metadata)) {
try {
int numMessagesInBatch =
metadata.getNumMessagesInBatch();
int deleteCnt = 0;
@@ -262,7 +266,10 @@ public class TwoPhaseCompactor extends Compactor {
MessageId id = m.getMessageId();
Optional<RawMessage> messageToAdd = Optional.empty();
mxBean.addCompactionReadOp(reader.getTopic(),
m.getHeadersAndPayload().readableBytes());
- if (RawBatchConverter.isReadableBatch(m)) {
+ MessageMetadata metadata =
Commands.parseMessageMetadata(m.getHeadersAndPayload());
+ if (Markers.isServerOnlyMarker(metadata)) {
+ messageToAdd = Optional.empty();
+ } else if (RawBatchConverter.isReadableBatch(metadata)) {
try {
messageToAdd = rebatchMessage(reader.getTopic(),
m, (key, subid) ->
subid.equals(latestForKey.get(key)), topicCompactionRetainNullKey);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
index 529fb923f59..fe519827be7 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
@@ -42,6 +42,7 @@ import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import
org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController;
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -822,6 +823,66 @@ public class ReplicatorSubscriptionTest extends
ReplicatorTestBase {
pulsar1.getConfiguration().setForceDeleteNamespaceAllowed(false);
}
+ @Test
+ public void testReplicatedSubscriptionWithCompaction() throws Exception {
+ final String namespace =
BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription");
+ final String topicName = "persistent://" + namespace +
"/testReplicatedSubscriptionWithCompaction";
+ final String subName = "sub";
+
+ admin1.namespaces().createNamespace(namespace);
+ admin1.namespaces().setNamespaceReplicationClusters(namespace,
Sets.newHashSet("r1", "r2"));
+ admin1.topics().createNonPartitionedTopic(topicName);
+ admin1.topicPolicies().setCompactionThreshold(topicName, 100 * 1024 *
1024L);
+
+ @Cleanup final PulsarClient client =
PulsarClient.builder().serviceUrl(url1.toString())
+ .statsInterval(0, TimeUnit.SECONDS).build();
+
+ Producer<String> producer =
client.newProducer(Schema.STRING).topic(topicName).create();
+ producer.newMessage().key("K1").value("V1").send();
+ producer.newMessage().key("K1").value("V2").send();
+ producer.close();
+
+ createReplicatedSubscription(client, topicName, subName, true);
+ Awaitility.await().untilAsserted(() -> {
+ Map<String, Boolean> status =
admin1.topics().getReplicatedSubscriptionStatus(topicName, subName);
+ assertTrue(status.get(topicName));
+ });
+
+ Awaitility.await().untilAsserted(() -> {
+ PersistentTopic t1 = (PersistentTopic) pulsar1.getBrokerService()
+ .getTopic(topicName, false).get().get();
+ ReplicatedSubscriptionsController rsc1 =
t1.getReplicatedSubscriptionController().get();
+ Assert.assertTrue(rsc1.getLastCompletedSnapshotId().isPresent());
+ assertEquals(t1.getPendingWriteOps().get(), 0L);
+ });
+
+ admin1.topics().triggerCompaction(topicName);
+
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(admin1.topics().compactionStatus(topicName).status,
+ LongRunningProcessStatus.Status.SUCCESS);
+ });
+
+ @Cleanup
+ Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName("sub2")
+ .subscriptionType(SubscriptionType.Exclusive)
+ .readCompacted(true)
+ .subscribe();
+ List<String> result = new ArrayList<>();
+ while (true) {
+ Message<String> receive = consumer.receive(2, TimeUnit.SECONDS);
+ if (receive == null) {
+ break;
+ }
+
+ result.add(receive.getValue());
+ }
+
+ Assert.assertEquals(result, List.of("V2"));
+ }
+
/**
* Disable replication subscription.
* Test scheduled task case.
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 86b58839903..eba7f1e8c73 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -106,6 +106,7 @@ import
org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckReplyCal
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
import
org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider;
import
org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -1849,4 +1850,62 @@ public class TransactionTest extends TransactionTestBase
{
Assert.assertEquals(messages, List.of("V2", "V3"));
}
+
+ @Test
+ public void testReadCommittedWithCompaction() throws Exception{
+ final String namespace = "tnx/ns-prechecks";
+ final String topic = "persistent://" + namespace +
"/test_transaction_topic" + UUID.randomUUID();
+ admin.namespaces().createNamespace(namespace);
+ admin.topics().createNonPartitionedTopic(topic);
+
+ admin.topicPolicies().setCompactionThreshold(topic, 100 * 1024 * 1024);
+
+ @Cleanup
+ Producer<String> producer =
this.pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .create();
+
+ producer.newMessage().key("K1").value("V1").send();
+
+ Transaction txn = pulsarClient.newTransaction()
+ .withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
+ producer.newMessage(txn).key("K2").value("V2").send();
+ producer.newMessage(txn).key("K3").value("V3").send();
+ txn.commit().get();
+
+ producer.newMessage().key("K1").value("V4").send();
+
+ Transaction txn2 = pulsarClient.newTransaction()
+ .withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
+ producer.newMessage(txn2).key("K2").value("V5").send();
+ producer.newMessage(txn2).key("K3").value("V6").send();
+ txn2.commit().get();
+
+ admin.topics().triggerCompaction(topic);
+
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(admin.topics().compactionStatus(topic).status,
+ LongRunningProcessStatus.Status.SUCCESS);
+ });
+
+ @Cleanup
+ Consumer<String> consumer =
this.pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("sub")
+ .subscriptionType(SubscriptionType.Exclusive)
+ .readCompacted(true)
+ .subscribe();
+ List<String> result = new ArrayList<>();
+ while (true) {
+ Message<String> receive = consumer.receive(2, TimeUnit.SECONDS);
+ if (receive == null) {
+ break;
+ }
+
+ result.add(receive.getValue());
+ }
+
+ Assert.assertEquals(result, List.of("V4", "V5", "V6"));
+ }
+
}