This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fc393f69043 [fix][broker] Avoid compaction task stuck when the last 
message to compact is a marker (#21718)
fc393f69043 is described below

commit fc393f69043be6eb1b2572a27f131656a2cbc7f6
Author: Cong Zhao <[email protected]>
AuthorDate: Wed Dec 20 09:39:00 2023 +0800

    [fix][broker] Avoid compaction task stuck when the last message to compact 
is a marker (#21718)
---
 .../broker/service/AbstractBaseDispatcher.java     | 36 ++++++++-----
 .../pulsar/compaction/TwoPhaseCompactor.java       | 11 +++-
 .../broker/service/ReplicatorSubscriptionTest.java | 61 ++++++++++++++++++++++
 .../pulsar/broker/transaction/TransactionTest.java | 59 +++++++++++++++++++++
 4 files changed, 152 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index ea68d588896..2f38ad67d4f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Markers;
+import org.apache.pulsar.compaction.Compactor;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
 @Slf4j
@@ -174,13 +175,15 @@ public abstract class AbstractBaseDispatcher extends 
EntryFilterSupport implemen
             if (msgMetadata != null && msgMetadata.hasTxnidMostBits()
                     && msgMetadata.hasTxnidLeastBits()) {
                 if (Markers.isTxnMarker(msgMetadata)) {
-                    // because consumer can receive message is smaller than 
maxReadPosition,
-                    // so this marker is useless for this subscription
-                    
individualAcknowledgeMessageIfNeeded(Collections.singletonList(entry.getPosition()),
-                            Collections.emptyMap());
-                    entries.set(i, null);
-                    entry.release();
-                    continue;
+                    if (cursor == null || 
!cursor.getName().equals(Compactor.COMPACTION_SUBSCRIPTION)) {
+                        // because consumer can receive message is smaller 
than maxReadPosition,
+                        // so this marker is useless for this subscription
+                        
individualAcknowledgeMessageIfNeeded(Collections.singletonList(entry.getPosition()),
+                                Collections.emptyMap());
+                        entries.set(i, null);
+                        entry.release();
+                        continue;
+                    }
                 } else if (((PersistentTopic) subscription.getTopic())
                         .isTxnAborted(new 
TxnID(msgMetadata.getTxnidMostBits(), msgMetadata.getTxnidLeastBits()),
                                 (PositionImpl) entry.getPosition())) {
@@ -192,19 +195,26 @@ public abstract class AbstractBaseDispatcher extends 
EntryFilterSupport implemen
                 }
             }
 
-            if (msgMetadata == null || 
Markers.isServerOnlyMarker(msgMetadata)) {
+            if (msgMetadata == null || 
(Markers.isServerOnlyMarker(msgMetadata))) {
                 PositionImpl pos = (PositionImpl) entry.getPosition();
                 // Message metadata was corrupted or the messages was a 
server-only marker
 
                 if 
(Markers.isReplicatedSubscriptionSnapshotMarker(msgMetadata)) {
+                    final int readerIndex = metadataAndPayload.readerIndex();
                     processReplicatedSubscriptionSnapshot(pos, 
metadataAndPayload);
+                    metadataAndPayload.readerIndex(readerIndex);
                 }
 
-                entries.set(i, null);
-                entry.release();
-                
individualAcknowledgeMessageIfNeeded(Collections.singletonList(pos),
-                        Collections.emptyMap());
-                continue;
+                // Deliver marker to __compaction cursor to avoid compaction 
task stuck,
+                // and filter out them when doing topic compaction.
+                if (msgMetadata == null || cursor == null
+                        || 
!cursor.getName().equals(Compactor.COMPACTION_SUBSCRIPTION)) {
+                    entries.set(i, null);
+                    entry.release();
+                    
individualAcknowledgeMessageIfNeeded(Collections.singletonList(pos),
+                            Collections.emptyMap());
+                    continue;
+                }
             } else if (trackDelayedDelivery(entry.getLedgerId(), 
entry.getEntryId(), msgMetadata)) {
                 // The message is marked for delayed delivery. Ignore for now.
                 entries.set(i, null);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index a78323a9cfe..647c34a94ad 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -45,6 +45,7 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.RawBatchConverter;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.Markers;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -130,7 +131,10 @@ public class TwoPhaseCompactor extends Compactor {
                 boolean replaceMessage = false;
                 mxBean.addCompactionReadOp(reader.getTopic(), 
m.getHeadersAndPayload().readableBytes());
                 MessageMetadata metadata = 
Commands.parseMessageMetadata(m.getHeadersAndPayload());
-                if (RawBatchConverter.isReadableBatch(metadata)) {
+                if (Markers.isServerOnlyMarker(metadata)) {
+                    mxBean.addCompactionRemovedEvent(reader.getTopic());
+                    deletedMessage = true;
+                } else if (RawBatchConverter.isReadableBatch(metadata)) {
                     try {
                         int numMessagesInBatch = 
metadata.getNumMessagesInBatch();
                         int deleteCnt = 0;
@@ -262,7 +266,10 @@ public class TwoPhaseCompactor extends Compactor {
                 MessageId id = m.getMessageId();
                 Optional<RawMessage> messageToAdd = Optional.empty();
                 mxBean.addCompactionReadOp(reader.getTopic(), 
m.getHeadersAndPayload().readableBytes());
-                if (RawBatchConverter.isReadableBatch(m)) {
+                MessageMetadata metadata = 
Commands.parseMessageMetadata(m.getHeadersAndPayload());
+                if (Markers.isServerOnlyMarker(metadata)) {
+                    messageToAdd = Optional.empty();
+                } else if (RawBatchConverter.isReadableBatch(metadata)) {
                     try {
                         messageToAdd = rebatchMessage(reader.getTopic(),
                                 m, (key, subid) -> 
subid.equals(latestForKey.get(key)), topicCompactionRetainNullKey);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
index 529fb923f59..fe519827be7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
@@ -42,6 +42,7 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import 
org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController;
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -822,6 +823,66 @@ public class ReplicatorSubscriptionTest extends 
ReplicatorTestBase {
         pulsar1.getConfiguration().setForceDeleteNamespaceAllowed(false);
     }
 
+    @Test
+    public void testReplicatedSubscriptionWithCompaction() throws Exception {
+        final String namespace = 
BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription");
+        final String topicName = "persistent://" + namespace + 
"/testReplicatedSubscriptionWithCompaction";
+        final String subName = "sub";
+
+        admin1.namespaces().createNamespace(namespace);
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, 
Sets.newHashSet("r1", "r2"));
+        admin1.topics().createNonPartitionedTopic(topicName);
+        admin1.topicPolicies().setCompactionThreshold(topicName, 100 * 1024 * 
1024L);
+
+        @Cleanup final PulsarClient client = 
PulsarClient.builder().serviceUrl(url1.toString())
+                .statsInterval(0, TimeUnit.SECONDS).build();
+
+        Producer<String> producer = 
client.newProducer(Schema.STRING).topic(topicName).create();
+        producer.newMessage().key("K1").value("V1").send();
+        producer.newMessage().key("K1").value("V2").send();
+        producer.close();
+
+        createReplicatedSubscription(client, topicName, subName, true);
+        Awaitility.await().untilAsserted(() -> {
+            Map<String, Boolean> status = 
admin1.topics().getReplicatedSubscriptionStatus(topicName, subName);
+            assertTrue(status.get(topicName));
+        });
+
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopic t1 = (PersistentTopic) pulsar1.getBrokerService()
+                .getTopic(topicName, false).get().get();
+        ReplicatedSubscriptionsController rsc1 = 
t1.getReplicatedSubscriptionController().get();
+        Assert.assertTrue(rsc1.getLastCompletedSnapshotId().isPresent());
+        assertEquals(t1.getPendingWriteOps().get(), 0L);
+        });
+
+        admin1.topics().triggerCompaction(topicName);
+
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(admin1.topics().compactionStatus(topicName).status,
+                    LongRunningProcessStatus.Status.SUCCESS);
+        });
+
+        @Cleanup
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName("sub2")
+                .subscriptionType(SubscriptionType.Exclusive)
+                .readCompacted(true)
+                .subscribe();
+        List<String> result = new ArrayList<>();
+        while (true) {
+            Message<String> receive = consumer.receive(2, TimeUnit.SECONDS);
+            if (receive == null) {
+                break;
+            }
+
+            result.add(receive.getValue());
+        }
+
+        Assert.assertEquals(result, List.of("V2"));
+    }
+
     /**
      * Disable replication subscription.
      *    Test scheduled task case.
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 86b58839903..eba7f1e8c73 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -106,6 +106,7 @@ import 
org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckReplyCal
 import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
 import 
org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider;
 import 
org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -1849,4 +1850,62 @@ public class TransactionTest extends TransactionTestBase 
{
         Assert.assertEquals(messages, List.of("V2", "V3"));
     }
 
+
+    @Test
+    public void testReadCommittedWithCompaction() throws Exception{
+        final String namespace = "tnx/ns-prechecks";
+        final String topic = "persistent://" + namespace + 
"/test_transaction_topic" + UUID.randomUUID();
+        admin.namespaces().createNamespace(namespace);
+        admin.topics().createNonPartitionedTopic(topic);
+
+        admin.topicPolicies().setCompactionThreshold(topic, 100 * 1024 * 1024);
+
+        @Cleanup
+        Producer<String> producer = 
this.pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        producer.newMessage().key("K1").value("V1").send();
+
+        Transaction txn = pulsarClient.newTransaction()
+                .withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
+        producer.newMessage(txn).key("K2").value("V2").send();
+        producer.newMessage(txn).key("K3").value("V3").send();
+        txn.commit().get();
+
+        producer.newMessage().key("K1").value("V4").send();
+
+        Transaction txn2 = pulsarClient.newTransaction()
+                .withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
+        producer.newMessage(txn2).key("K2").value("V5").send();
+        producer.newMessage(txn2).key("K3").value("V6").send();
+        txn2.commit().get();
+
+        admin.topics().triggerCompaction(topic);
+
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(admin.topics().compactionStatus(topic).status,
+                    LongRunningProcessStatus.Status.SUCCESS);
+        });
+
+        @Cleanup
+        Consumer<String> consumer = 
this.pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("sub")
+                .subscriptionType(SubscriptionType.Exclusive)
+                .readCompacted(true)
+                .subscribe();
+        List<String> result = new ArrayList<>();
+        while (true) {
+            Message<String> receive = consumer.receive(2, TimeUnit.SECONDS);
+            if (receive == null) {
+                break;
+            }
+
+            result.add(receive.getValue());
+        }
+
+        Assert.assertEquals(result, List.of("V4", "V5", "V6"));
+    }
+
 }

Reply via email to