This is an automated email from the ASF dual-hosted git repository.
zhaocong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d3216d2f391 [fix][broker] Ignore individual acknowledgment for
CompactorSubscription when an entry has been filtered. (#21434)
d3216d2f391 is described below
commit d3216d2f391ff1f8fd3047315954a24795a6f460
Author: Cong Zhao <[email protected]>
AuthorDate: Wed Oct 25 21:27:37 2023 +0800
[fix][broker] Ignore individual acknowledgment for CompactorSubscription
when an entry has been filtered. (#21434)
---
.../broker/service/AbstractBaseDispatcher.java | 16 +++---
.../broker/service/plugin/FilterEntryTest.java | 61 +++++++++++++++++++++-
2 files changed, 68 insertions(+), 9 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index eb8b0151395..b36389ab2da 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -176,14 +176,16 @@ public abstract class AbstractBaseDispatcher extends
EntryFilterSupport implemen
if (Markers.isTxnMarker(msgMetadata)) {
// because consumer can receive message is smaller than
maxReadPosition,
// so this marker is useless for this subscription
- individualAcknowledgeMessageIfNeeded(entry.getPosition(),
Collections.emptyMap());
+
individualAcknowledgeMessageIfNeeded(Collections.singletonList(entry.getPosition()),
+ Collections.emptyMap());
entries.set(i, null);
entry.release();
continue;
} else if (((PersistentTopic) subscription.getTopic())
.isTxnAborted(new
TxnID(msgMetadata.getTxnidMostBits(), msgMetadata.getTxnidLeastBits()),
(PositionImpl) entry.getPosition())) {
- individualAcknowledgeMessageIfNeeded(entry.getPosition(),
Collections.emptyMap());
+
individualAcknowledgeMessageIfNeeded(Collections.singletonList(entry.getPosition()),
+ Collections.emptyMap());
entries.set(i, null);
entry.release();
continue;
@@ -200,7 +202,8 @@ public abstract class AbstractBaseDispatcher extends
EntryFilterSupport implemen
entries.set(i, null);
entry.release();
- individualAcknowledgeMessageIfNeeded(pos,
Collections.emptyMap());
+
individualAcknowledgeMessageIfNeeded(Collections.singletonList(pos),
+ Collections.emptyMap());
continue;
} else if (trackDelayedDelivery(entry.getLedgerId(),
entry.getEntryId(), msgMetadata)) {
// The message is marked for delayed delivery. Ignore for now.
@@ -271,8 +274,7 @@ public abstract class AbstractBaseDispatcher extends
EntryFilterSupport implemen
}
}
if (CollectionUtils.isNotEmpty(entriesToFiltered)) {
- subscription.acknowledgeMessage(entriesToFiltered,
AckType.Individual,
- Collections.emptyMap());
+ individualAcknowledgeMessageIfNeeded(entriesToFiltered,
Collections.emptyMap());
int filtered = entriesToFiltered.size();
Topic topic = subscription.getTopic();
@@ -301,9 +303,9 @@ public abstract class AbstractBaseDispatcher extends
EntryFilterSupport implemen
return totalEntries;
}
- private void individualAcknowledgeMessageIfNeeded(Position position,
Map<String, Long> properties) {
+ private void individualAcknowledgeMessageIfNeeded(List<Position>
positions, Map<String, Long> properties) {
if (!(subscription instanceof PulsarCompactorSubscription)) {
-
subscription.acknowledgeMessage(Collections.singletonList(position),
AckType.Individual, properties);
+ subscription.acknowledgeMessage(positions, AckType.Individual,
properties);
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
index b868858646c..1c4f88bc027 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
@@ -22,6 +22,7 @@ import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructor
import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgsRecordingInvocations;
import static
org.apache.pulsar.client.api.SubscriptionInitialPosition.Earliest;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
@@ -30,8 +31,9 @@ import static org.mockito.Mockito.when;
import static org.testng.Assert.assertTrue;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertNotNull;
-
+import io.netty.buffer.ByteBuf;
import java.lang.reflect.Field;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -39,7 +41,6 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-
import lombok.Cleanup;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@@ -58,11 +59,15 @@ import
org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult;
+import org.apache.pulsar.compaction.Compactor;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -150,6 +155,58 @@ public class FilterEntryTest extends BrokerTestBase {
consumer.close();
}
+ @Test
+ public void testEntryFilterWithCompactor() throws Exception {
+ conf.setAllowOverrideEntryFilters(true);
+ String topic = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
+
+ List<String> messages = new ArrayList<>();
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .enableBatching(false).topic(topic).create();
+ producer.newMessage().key("K1").value("V1").send();
+ producer.newMessage().key("K2").value("V2").send();
+ producer.newMessage().key("K3").value("V3").send();
+ producer.newMessage().key("K4").value("V4").send();
+ messages.add("V2");
+ messages.add("V4");
+
+ PersistentTopic topicRef = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topic).get();
+
+ // set topic level entry filters
+ EntryFilter mockFilter = mock(EntryFilter.class);
+ doAnswer(invocationOnMock -> {
+ FilterContext filterContext = invocationOnMock.getArgument(1);
+ String partitionKey =
filterContext.getMsgMetadata().getPartitionKey();
+ if (partitionKey.equals("K1") || partitionKey.equals("K3")) {
+ return EntryFilter.FilterResult.REJECT;
+ } else {
+ return EntryFilter.FilterResult.ACCEPT;
+ }
+ }).when(mockFilter).filterEntry(any(Entry.class),
any(FilterContext.class));
+ setMockFilterToTopic(topicRef, List.of(mockFilter));
+
+ List<String> results = new ArrayList<>();
+ RawReader rawReader = RawReader.create(pulsarClient, topic,
Compactor.COMPACTION_SUBSCRIPTION).get();
+ while (true) {
+ boolean hasMsg = rawReader.hasMessageAvailableAsync().get();
+ if (hasMsg) {
+ try (RawMessage m = rawReader.readNextAsync().get()) {
+ ByteBuf headersAndPayload = m.getHeadersAndPayload();
+ Commands.skipMessageMetadata(headersAndPayload);
+ byte[] bytes = new byte[headersAndPayload.readableBytes()];
+ headersAndPayload.readBytes(bytes);
+
+ results.add(new String(bytes));
+ }
+ } else {
+ break;
+ }
+ }
+ rawReader.closeAsync().get();
+
+ Assert.assertEquals(messages, results);
+ }
+
@SneakyThrows
private void setMockFilterToTopic(PersistentTopic topicRef,
List<EntryFilter> mockFilter) {
FieldUtils.writeField(topicRef, "entryFilters", Pair.of(null,
mockFilter), true);