This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 32a0fd5d01af0776e4245610ab8fddcb538148f2 Author: Lari Hotari <[email protected]> AuthorDate: Mon Jan 27 08:10:54 2025 +0200 [fix][broker] Apply dispatcherMaxReadSizeBytes also for replay reads for Shared and Key_Shared subscriptions (#23894) (cherry picked from commit 11a615e7cecb57c8c240cc565c5ac62d938c490d) --- .../PersistentDispatcherMultipleConsumers.java | 13 +- ...tDispatcherMultipleConsumersReadLimitsTest.java | 144 +++++++++++++++++++++ ...ntStickyKeyDispatcherMultipleConsumersTest.java | 4 + 3 files changed, 157 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 82b96c36507..3ceb703c26c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -384,7 +384,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractPersistentDis } Set<Position> messagesToReplayNow = - canReplayMessages() ? getMessagesToReplayNow(messagesToRead) : Collections.emptySet(); + canReplayMessages() ? getMessagesToReplayNow(messagesToRead, bytesToRead) : Collections.emptySet(); if (!messagesToReplayNow.isEmpty()) { if (log.isDebugEnabled()) { log.debug("[{}] Schedule replay of {} messages for {} consumers", name, @@ -1343,15 +1343,20 @@ public class PersistentDispatcherMultipleConsumers extends AbstractPersistentDis } } - protected synchronized NavigableSet<Position> getMessagesToReplayNow(int maxMessagesToRead) { + protected synchronized NavigableSet<Position> getMessagesToReplayNow(int maxMessagesToRead, long bytesToRead) { + int cappedMaxMessagesToRead = cursor.applyMaxSizeCap(maxMessagesToRead, bytesToRead); + if (cappedMaxMessagesToRead < maxMessagesToRead && log.isDebugEnabled()) { + log.debug("[{}] Capped max messages to read from redelivery list to {} (max was {})", + name, cappedMaxMessagesToRead, maxMessagesToRead); + } if (delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().hasMessageAvailable()) { delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis()); NavigableSet<Position> messagesAvailableNow = - delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead); + delayedDeliveryTracker.get().getScheduledMessages(cappedMaxMessagesToRead); messagesAvailableNow.forEach(p -> redeliveryMessages.add(p.getLedgerId(), p.getEntryId())); } if (!redeliveryMessages.isEmpty()) { - return redeliveryMessages.getMessagesToReplayNow(maxMessagesToRead, createFilterForReplay()); + return redeliveryMessages.getMessagesToReplayNow(cappedMaxMessagesToRead, createFilterForReplay()); } else { return Collections.emptyNavigableSet(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersReadLimitsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersReadLimitsTest.java new file mode 100644 index 00000000000..a5dc4523298 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersReadLimitsTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.persistent; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.commons.lang3.RandomUtils; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.testinterceptor.BrokerTestInterceptor; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.SubscriptionType; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker-api") +public class PersistentDispatcherMultipleConsumersReadLimitsTest extends ProducerConsumerBase { + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + // start at max batch size to reproduce the issue more easily + conf.setDispatcherMinReadBatchSize(conf.getDispatcherMaxReadBatchSize()); + BrokerTestInterceptor.INSTANCE.configure(conf); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @AfterMethod(alwaysRun = true) + protected void resetInterceptors() throws Exception { + BrokerTestInterceptor.INSTANCE.reset(); + } + + @Test(timeOut = 30 * 1000) + public void testDispatcherMaxReadSizeBytes() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName( + "persistent://public/default/testDispatcherMaxReadSizeBytes"); + final String subscription = "sub"; + + AtomicInteger entriesReadMax = new AtomicInteger(0); + BrokerTestInterceptor.INSTANCE.applyDispatcherSpyDecorator(PersistentDispatcherMultipleConsumers.class, + spy -> { + doAnswer(invocation -> { + List<Entry> entries = invocation.getArgument(0); + PersistentDispatcherMultipleConsumers.ReadType readType = invocation.getArgument(1); + int numberOfEntries = entries.size(); + log.info("intercepted readEntriesComplete with {} entries, read type {}", numberOfEntries, + readType); + entriesReadMax.updateAndGet(current -> Math.max(current, numberOfEntries)); + return invocation.callRealMethod(); + }).when(spy).readEntriesComplete(any(), any()); + } + ); + + // Create two consumers on a shared subscription + @Cleanup + Consumer<byte[]> consumer1 = pulsarClient.newConsumer() + .consumerName("c1") + .topic(topicName) + .subscriptionName(subscription) + .subscriptionType(SubscriptionType.Shared) + .receiverQueueSize(10000) + .subscribe(); + + @Cleanup + Consumer<byte[]> consumer2 = pulsarClient.newConsumer() + .consumerName("c2") + .topic(topicName) + .subscriptionName(subscription) + .subscriptionType(SubscriptionType.Shared) + .startPaused(true) + .receiverQueueSize(10000) + .subscribe(); + + @Cleanup + Producer<byte[]> producer = + pulsarClient.newProducer().enableBatching(false).topic(topicName).create(); + int numberOfMessages = 200; + int payLoadSizeBytes = 1025 * 1024; // 1025kB + byte[] payload = RandomUtils.nextBytes(payLoadSizeBytes); + for (int i = 0; i < numberOfMessages; i++) { + producer.send(payload); + } + + // Consume messages with consumer1 but don't ack + for (int i = 0; i < numberOfMessages; i++) { + consumer1.receive(); + } + + // Close consumer1 and resume consumer2 to replay the messages + consumer1.close(); + consumer2.resume(); + + // Verify that consumer2 can receive the messages + for (int i = 0; i < numberOfMessages; i++) { + Message<byte[]> msg = consumer2.receive(1, TimeUnit.SECONDS); + Assert.assertNotNull(msg, "Consumer2 should receive the message"); + consumer2.acknowledge(msg); + } + + int expectedMaxEntriesInRead = conf.getDispatcherMaxReadSizeBytes() / payLoadSizeBytes; + assertThat(entriesReadMax.get()).isLessThanOrEqualTo(expectedMaxEntriesInRead); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java index 7234f0caefc..aec161f237b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java @@ -191,6 +191,10 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest { doReturn(null).when(cursorMock).getLastIndividualDeletedRange(); doReturn(subscriptionName).when(cursorMock).getName(); doReturn(ledgerMock).when(cursorMock).getManagedLedger(); + doAnswer(invocation -> { + int max = invocation.getArgument(0); + return max; + }).when(cursorMock).applyMaxSizeCap(anyInt(), anyLong()); consumerMock = createMockConsumer(); channelMock = mock(ChannelPromise.class);
