This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 8856606d8f46082773799770e201731b8207d658 Author: Jiwei Guo <[email protected]> AuthorDate: Sun Jun 26 16:28:24 2022 +0800 [fix][broker] Fix compaction subscription acknowledge Marker msg issue. (#16205) (cherry picked from commit 8e0cd9c954a7d3bf00fc1bc790e811443c411c32) --- .../broker/service/AbstractBaseDispatcher.java | 17 ++++-- .../apache/pulsar/compaction/CompactionTest.java | 70 ++++++++++++++++++++++ 2 files changed, 81 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index 8e7dfc14aa7..c9ea4a56d6c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -24,6 +24,7 @@ import io.netty.buffer.ByteBuf; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; @@ -36,6 +37,7 @@ import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.intercept.BrokerInterceptor; +import org.apache.pulsar.broker.service.persistent.CompactorSubscription; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.plugin.EntryFilter; @@ -170,15 +172,13 @@ public abstract class AbstractBaseDispatcher implements Dispatcher { if (Markers.isTxnMarker(msgMetadata)) { // because consumer can receive message is smaller than maxReadPosition, // so this marker is useless for this subscription - subscription.acknowledgeMessage(Collections.singletonList(entry.getPosition()), AckType.Individual, - Collections.emptyMap()); + individualAcknowledgeMessageIfNeeded(entry.getPosition(), Collections.emptyMap()); entries.set(i, null); entry.release(); continue; } else if (((PersistentTopic) subscription.getTopic()) .isTxnAborted(new TxnID(msgMetadata.getTxnidMostBits(), msgMetadata.getTxnidLeastBits()))) { - subscription.acknowledgeMessage(Collections.singletonList(entry.getPosition()), AckType.Individual, - Collections.emptyMap()); + individualAcknowledgeMessageIfNeeded(entry.getPosition(), Collections.emptyMap()); entries.set(i, null); entry.release(); continue; @@ -193,8 +193,7 @@ public abstract class AbstractBaseDispatcher implements Dispatcher { entries.set(i, null); entry.release(); - subscription.acknowledgeMessage(Collections.singletonList(pos), AckType.Individual, - Collections.emptyMap()); + individualAcknowledgeMessageIfNeeded(pos, Collections.emptyMap()); continue; } else if (msgMetadata.hasDeliverAtTime() && trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) { @@ -252,6 +251,12 @@ public abstract class AbstractBaseDispatcher implements Dispatcher { return totalEntries; } + private void individualAcknowledgeMessageIfNeeded(Position position, Map<String, Long> properties) { + if (!(subscription instanceof CompactorSubscription)) { + subscription.acknowledgeMessage(Collections.singletonList(position), AckType.Individual, properties); + } + } + private static EntryFilter.FilterResult getFilterResult(FilterContext filterContext, Entry entry, ImmutableList<EntryFilterWithClassLoader> entryFilters) { for (EntryFilter entryFilter : entryFilters) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index 8c9df66dac4..ddad53fbc82 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -29,6 +29,7 @@ import static org.testng.Assert.assertFalse; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; @@ -44,11 +45,17 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import io.netty.buffer.ByteBuf; +import lombok.Cleanup; import lombok.SneakyThrows; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.api.OpenBuilder; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerInfo; +import org.apache.bookkeeper.mledger.Position; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.Topic; @@ -66,13 +73,17 @@ import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.protocol.Markers; import org.apache.pulsar.common.util.FutureUtil; +import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -1678,4 +1689,63 @@ public class CompactionTest extends MockedPulsarServiceBaseTest { producer1.close(); producer2.close(); } + + @Test(timeOut = 60000) + public void testCompactionWithMarker() throws Exception { + String namespace = "my-property/use/my-ns"; + final TopicName dest = TopicName.get( + BrokerTestUtil.newUniqueName("persistent://" + namespace + "/testWriteMarker")); + admin.topics().createNonPartitionedTopic(dest.toString()); + @Cleanup + Consumer<byte[]> consumer = pulsarClient.newConsumer() + .topic(dest.toString()) + .subscriptionName("test-compaction-sub") + .subscriptionType(SubscriptionType.Exclusive) + .readCompacted(true) + .subscriptionInitialPosition(SubscriptionInitialPosition.Latest) + .subscribe(); + @Cleanup + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(dest.toString()) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + producer.send("msg-1".getBytes(StandardCharsets.UTF_8)); + Optional<Topic> topic = pulsar.getBrokerService().getTopic(dest.toString(), true).join(); + Assert.assertTrue(topic.isPresent()); + PersistentTopic persistentTopic = (PersistentTopic) topic.get(); + Random random = new Random(); + for (int i = 0; i < 100; i++) { + int rad = random.nextInt(3); + ByteBuf marker; + if (rad == 0) { + marker = Markers.newTxnCommitMarker(-1L, 0, i); + } else if (rad == 1) { + marker = Markers.newTxnAbortMarker(-1L, 0, i); + } else { + marker = Markers.newReplicatedSubscriptionsSnapshotRequest(UUID.randomUUID().toString(), "r1"); + } + persistentTopic.getManagedLedger().asyncAddEntry(marker, new AsyncCallbacks.AddEntryCallback() { + @Override + public void addComplete(Position position, ByteBuf entryData, Object ctx) { + // + } + + @Override + public void addFailed(ManagedLedgerException exception, Object ctx) { + // + } + }, null); + marker.release(); + } + producer.send("msg-2".getBytes(StandardCharsets.UTF_8)); + admin.topics().triggerCompaction(dest.toString()); + Awaitility.await() + .atMost(50, TimeUnit.SECONDS) + .pollInterval(1, TimeUnit.SECONDS) + .untilAsserted(() -> { + long ledgerId = admin.topics().getInternalStats(dest.toString()).compactedLedger.ledgerId; + Assert.assertNotEquals(ledgerId, -1L); + }); + } }
