This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d3216d2f391 [fix][broker] Ignore individual acknowledgment for 
CompactorSubscription when an entry has been filtered. (#21434)
d3216d2f391 is described below

commit d3216d2f391ff1f8fd3047315954a24795a6f460
Author: Cong Zhao <[email protected]>
AuthorDate: Wed Oct 25 21:27:37 2023 +0800

    [fix][broker] Ignore individual acknowledgment for CompactorSubscription 
when an entry has been filtered. (#21434)
---
 .../broker/service/AbstractBaseDispatcher.java     | 16 +++---
 .../broker/service/plugin/FilterEntryTest.java     | 61 +++++++++++++++++++++-
 2 files changed, 68 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index eb8b0151395..b36389ab2da 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -176,14 +176,16 @@ public abstract class AbstractBaseDispatcher extends 
EntryFilterSupport implemen
                 if (Markers.isTxnMarker(msgMetadata)) {
                     // because consumer can receive message is smaller than 
maxReadPosition,
                     // so this marker is useless for this subscription
-                    individualAcknowledgeMessageIfNeeded(entry.getPosition(), 
Collections.emptyMap());
+                    
individualAcknowledgeMessageIfNeeded(Collections.singletonList(entry.getPosition()),
+                            Collections.emptyMap());
                     entries.set(i, null);
                     entry.release();
                     continue;
                 } else if (((PersistentTopic) subscription.getTopic())
                         .isTxnAborted(new 
TxnID(msgMetadata.getTxnidMostBits(), msgMetadata.getTxnidLeastBits()),
                                 (PositionImpl) entry.getPosition())) {
-                    individualAcknowledgeMessageIfNeeded(entry.getPosition(), 
Collections.emptyMap());
+                    
individualAcknowledgeMessageIfNeeded(Collections.singletonList(entry.getPosition()),
+                            Collections.emptyMap());
                     entries.set(i, null);
                     entry.release();
                     continue;
@@ -200,7 +202,8 @@ public abstract class AbstractBaseDispatcher extends 
EntryFilterSupport implemen
 
                 entries.set(i, null);
                 entry.release();
-                individualAcknowledgeMessageIfNeeded(pos, 
Collections.emptyMap());
+                
individualAcknowledgeMessageIfNeeded(Collections.singletonList(pos),
+                        Collections.emptyMap());
                 continue;
             } else if (trackDelayedDelivery(entry.getLedgerId(), 
entry.getEntryId(), msgMetadata)) {
                 // The message is marked for delayed delivery. Ignore for now.
@@ -271,8 +274,7 @@ public abstract class AbstractBaseDispatcher extends 
EntryFilterSupport implemen
             }
         }
         if (CollectionUtils.isNotEmpty(entriesToFiltered)) {
-            subscription.acknowledgeMessage(entriesToFiltered, 
AckType.Individual,
-                    Collections.emptyMap());
+            individualAcknowledgeMessageIfNeeded(entriesToFiltered, 
Collections.emptyMap());
 
             int filtered = entriesToFiltered.size();
             Topic topic = subscription.getTopic();
@@ -301,9 +303,9 @@ public abstract class AbstractBaseDispatcher extends 
EntryFilterSupport implemen
         return totalEntries;
     }
 
-    private void individualAcknowledgeMessageIfNeeded(Position position, 
Map<String, Long> properties) {
+    private void individualAcknowledgeMessageIfNeeded(List<Position> 
positions, Map<String, Long> properties) {
         if (!(subscription instanceof PulsarCompactorSubscription)) {
-            
subscription.acknowledgeMessage(Collections.singletonList(position), 
AckType.Individual, properties);
+            subscription.acknowledgeMessage(positions, AckType.Individual, 
properties);
         }
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
index b868858646c..1c4f88bc027 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
@@ -22,6 +22,7 @@ import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructor
 import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgsRecordingInvocations;
 import static 
org.apache.pulsar.client.api.SubscriptionInitialPosition.Earliest;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
@@ -30,8 +31,9 @@ import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertTrue;
 import static org.testng.AssertJUnit.assertEquals;
 import static org.testng.AssertJUnit.assertNotNull;
-
+import io.netty.buffer.ByteBuf;
 import java.lang.reflect.Field;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -39,7 +41,6 @@ import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-
 import lombok.Cleanup;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
@@ -58,11 +59,15 @@ import 
org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.api.RawReader;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult;
+import org.apache.pulsar.compaction.Compactor;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -150,6 +155,58 @@ public class FilterEntryTest extends BrokerTestBase {
         consumer.close();
     }
 
+    @Test
+    public void testEntryFilterWithCompactor() throws Exception {
+        conf.setAllowOverrideEntryFilters(true);
+        String topic = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
+
+        List<String> messages = new ArrayList<>();
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .enableBatching(false).topic(topic).create();
+        producer.newMessage().key("K1").value("V1").send();
+        producer.newMessage().key("K2").value("V2").send();
+        producer.newMessage().key("K3").value("V3").send();
+        producer.newMessage().key("K4").value("V4").send();
+        messages.add("V2");
+        messages.add("V4");
+
+        PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topic).get();
+
+        // set topic level entry filters
+        EntryFilter mockFilter = mock(EntryFilter.class);
+        doAnswer(invocationOnMock -> {
+            FilterContext filterContext = invocationOnMock.getArgument(1);
+            String partitionKey = 
filterContext.getMsgMetadata().getPartitionKey();
+            if (partitionKey.equals("K1") || partitionKey.equals("K3")) {
+                return EntryFilter.FilterResult.REJECT;
+            } else {
+                return EntryFilter.FilterResult.ACCEPT;
+            }
+        }).when(mockFilter).filterEntry(any(Entry.class), 
any(FilterContext.class));
+        setMockFilterToTopic(topicRef, List.of(mockFilter));
+
+        List<String> results = new ArrayList<>();
+        RawReader rawReader = RawReader.create(pulsarClient, topic, 
Compactor.COMPACTION_SUBSCRIPTION).get();
+        while (true) {
+            boolean hasMsg = rawReader.hasMessageAvailableAsync().get();
+            if (hasMsg) {
+                try (RawMessage m = rawReader.readNextAsync().get()) {
+                    ByteBuf headersAndPayload = m.getHeadersAndPayload();
+                    Commands.skipMessageMetadata(headersAndPayload);
+                    byte[] bytes = new byte[headersAndPayload.readableBytes()];
+                    headersAndPayload.readBytes(bytes);
+
+                    results.add(new String(bytes));
+                }
+            } else {
+                break;
+            }
+        }
+        rawReader.closeAsync().get();
+
+        Assert.assertEquals(messages, results);
+    }
+
     @SneakyThrows
     private void setMockFilterToTopic(PersistentTopic topicRef, 
List<EntryFilter> mockFilter) {
         FieldUtils.writeField(topicRef, "entryFilters", Pair.of(null, 
mockFilter), true);

Reply via email to