This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new c7d990876b0 [Branch 2.9] Fix compaction subscription acknowledge 
Marker msg issue. (#16348)
c7d990876b0 is described below

commit c7d990876b0a0bb84a6d596e2a56d821faffa6bc
Author: Qiang Zhao <[email protected]>
AuthorDate: Sat Jul 2 21:46:52 2022 +0800

    [Branch 2.9] Fix compaction subscription acknowledge Marker msg issue. 
(#16348)
---
 .../broker/service/AbstractBaseDispatcher.java     | 18 +++--
 .../apache/pulsar/compaction/CompactionTest.java   | 85 ++++++++++++++++++----
 2 files changed, 84 insertions(+), 19 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index ba757b529fe..95e89af8609 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -22,14 +22,17 @@ package org.apache.pulsar.broker.service;
 import io.netty.buffer.ByteBuf;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
+import org.apache.pulsar.broker.service.persistent.CompactorSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
@@ -128,15 +131,13 @@ public abstract class AbstractBaseDispatcher implements 
Dispatcher {
                 if (Markers.isTxnMarker(msgMetadata)) {
                     // because consumer can receive message is smaller than 
maxReadPosition,
                     // so this marker is useless for this subscription
-                    
subscription.acknowledgeMessage(Collections.singletonList(entry.getPosition()), 
AckType.Individual,
-                            Collections.emptyMap());
+                    individualAcknowledgeMessageIfNeeded(entry.getPosition(), 
Collections.emptyMap());
                     entries.set(i, null);
                     entry.release();
                     continue;
                 } else if (((PersistentTopic) subscription.getTopic())
                         .isTxnAborted(new 
TxnID(msgMetadata.getTxnidMostBits(), msgMetadata.getTxnidLeastBits()))) {
-                    
subscription.acknowledgeMessage(Collections.singletonList(entry.getPosition()), 
AckType.Individual,
-                            Collections.emptyMap());
+                    individualAcknowledgeMessageIfNeeded(entry.getPosition(), 
Collections.emptyMap());
                     entries.set(i, null);
                     entry.release();
                     continue;
@@ -151,8 +152,7 @@ public abstract class AbstractBaseDispatcher implements 
Dispatcher {
 
                 entries.set(i, null);
                 entry.release();
-                
subscription.acknowledgeMessage(Collections.singletonList(pos), 
AckType.Individual,
-                        Collections.emptyMap());
+                individualAcknowledgeMessageIfNeeded(pos, 
Collections.emptyMap());
                 continue;
             } else if (msgMetadata.hasDeliverAtTime()
                     && trackDelayedDelivery(entry.getLedgerId(), 
entry.getEntryId(), msgMetadata)) {
@@ -188,6 +188,12 @@ public abstract class AbstractBaseDispatcher implements 
Dispatcher {
         sendMessageInfo.setTotalChunkedMessages(totalChunkedMessages);
     }
 
+    private void individualAcknowledgeMessageIfNeeded(Position position, 
Map<String, Long> properties) {
+        if (!(subscription instanceof CompactorSubscription)) {
+            
subscription.acknowledgeMessage(Collections.singletonList(position), 
AckType.Individual, properties);
+        }
+    }
+
     /**
      * Determine whether the number of consumers on the subscription reaches 
the threshold.
      * @return
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index c9e0d95f7ff..ab9e62c6fac 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -28,6 +28,7 @@ import static org.testng.Assert.assertTrue;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
@@ -43,31 +44,30 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+
+import io.netty.buffer.ByteBuf;
+import lombok.Cleanup;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.api.OpenBuilder;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
+import org.apache.bookkeeper.mledger.Position;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.client.api.CompressionType;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.CryptoKeyReader;
-import org.apache.pulsar.client.api.EncryptionKeyInfo;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.MessageRoutingMode;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerBuilder;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.*;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.protocol.Markers;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -1656,4 +1656,63 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
             assertNull(none);
         }
     }
+
+    @Test(timeOut = 60000)
+    public void testCompactionWithMarker() throws Exception {
+        String namespace = "my-property/use/my-ns";
+        final TopicName dest = TopicName.get(
+                BrokerTestUtil.newUniqueName("persistent://" + namespace + 
"/testWriteMarker"));
+        admin.topics().createNonPartitionedTopic(dest.toString());
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(dest.toString())
+                .subscriptionName("test-compaction-sub")
+                .subscriptionType(SubscriptionType.Exclusive)
+                .readCompacted(true)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
+                .subscribe();
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                 .topic(dest.toString())
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
+        producer.send("msg-1".getBytes(StandardCharsets.UTF_8));
+        Optional<Topic> topic = 
pulsar.getBrokerService().getTopic(dest.toString(), true).join();
+        Assert.assertTrue(topic.isPresent());
+        PersistentTopic persistentTopic = (PersistentTopic) topic.get();
+        Random random = new Random();
+        for (int i = 0; i < 100; i++) {
+            int rad = random.nextInt(3);
+            ByteBuf marker;
+            if (rad == 0) {
+                marker = Markers.newTxnCommitMarker(-1L, 0, i);
+            } else if (rad == 1) {
+                marker = Markers.newTxnAbortMarker(-1L, 0, i);
+            } else {
+                marker = 
Markers.newReplicatedSubscriptionsSnapshotRequest(UUID.randomUUID().toString(), 
"r1");
+            }
+            persistentTopic.getManagedLedger().asyncAddEntry(marker, new 
AsyncCallbacks.AddEntryCallback() {
+                @Override
+                public void addComplete(Position position, ByteBuf entryData, 
Object ctx) {
+                    //
+                }
+
+                @Override
+                public void addFailed(ManagedLedgerException exception, Object 
ctx) {
+                    //
+                }
+            }, null);
+            marker.release();
+        }
+        producer.send("msg-2".getBytes(StandardCharsets.UTF_8));
+        admin.topics().triggerCompaction(dest.toString());
+        Awaitility.await()
+                .atMost(50, TimeUnit.SECONDS)
+                .pollInterval(1, TimeUnit.SECONDS)
+                .untilAsserted(() -> {
+                    long ledgerId = 
admin.topics().getInternalStats(dest.toString()).compactedLedger.ledgerId;
+                    Assert.assertNotEquals(ledgerId, -1L);
+                });
+    }
 }

Reply via email to