This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit bd5f75f6f351bb0ac0dfeb50fec1dbac501fe748 Author: Matteo Merli <[email protected]> AuthorDate: Tue Aug 17 18:22:11 2021 -0700 KeyShared dispatcher on non-persistent topics was not respecting consumer flow-control (#11692) ### Motivation Fixes #10734 The KeyShared dispatcher for non-persistent topics is not taking the flow control advertised by consumers to apply back pressure from consumer. That results in broker in pushing messages to consumer without restriction, causing memory issue in consumers. (cherry picked from commit 5835fd23e5347dd73ff073c371366a6b4ba8d6c4) --- ...istentStickyKeyDispatcherMultipleConsumers.java | 25 +++++++++++++--- ...ntStickyKeyDispatcherMultipleConsumersTest.java | 34 ++++++++++++++++++++++ 2 files changed, 55 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java index dc0d8a6..704fd93 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java @@ -31,6 +31,7 @@ import org.apache.pulsar.broker.service.SendMessageInfo; import org.apache.pulsar.broker.service.StickyKeyConsumerSelector; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; +import org.apache.pulsar.common.protocol.Commands; public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersistentDispatcherMultipleConsumers { @@ -89,7 +90,11 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis for (Entry entry : entries) { Consumer consumer = selector.select(peekStickyKey(entry.getDataBuffer())); - groupedEntries.computeIfAbsent(consumer, k -> new ArrayList<>()).add(entry); + if (consumer != null) { + groupedEntries.computeIfAbsent(consumer, k -> new ArrayList<>()).add(entry); + } else { + entry.release(); + } } for (Map.Entry<Consumer, List<Entry>> entriesByConsumer : groupedEntries.entrySet()) { @@ -99,9 +104,21 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal(); EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForConsumer.size()); filterEntriesForConsumer(entriesForConsumer, batchSizes, sendMessageInfo, null, null, false); - consumer.sendMessages(entriesForConsumer, batchSizes, null, sendMessageInfo.getTotalMessages(), - sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), getRedeliveryTracker()); - TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -sendMessageInfo.getTotalMessages()); + + if (consumer.getAvailablePermits() > 0 && consumer.isWritable()) { + consumer.sendMessages(entriesForConsumer, batchSizes, null, sendMessageInfo.getTotalMessages(), + sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), + getRedeliveryTracker()); + TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -sendMessageInfo.getTotalMessages()); + } else { + entriesForConsumer.forEach(e -> { + int totalMsgs = Commands.getNumberOfMessagesInBatch(e.getDataBuffer(), subscription.toString(), -1); + if (totalMsgs > 0) { + msgDrop.recordEvent(totalMsgs); + } + e.release(); + }); + } } } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java index 49dac2a..990bd8f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java @@ -58,6 +58,7 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.fail; @@ -118,6 +119,8 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumersTest { @Test(timeOut = 10000) public void testSendMessage() throws BrokerServiceException { Consumer consumerMock = mock(Consumer.class); + when(consumerMock.getAvailablePermits()).thenReturn(1000); + when(consumerMock.isWritable()).thenReturn(true); nonpersistentDispatcher.addConsumer(consumerMock); List<Entry> entries = new ArrayList<>(); @@ -146,6 +149,37 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumersTest { eq(null), anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class)); } + @Test(timeOut = 10000) + public void testSendMessageRespectFlowControl() throws BrokerServiceException { + Consumer consumerMock = mock(Consumer.class); + nonpersistentDispatcher.addConsumer(consumerMock); + + List<Entry> entries = new ArrayList<>(); + entries.add(EntryImpl.create(1, 1, createMessage("message1", 1))); + entries.add(EntryImpl.create(1, 2, createMessage("message2", 2))); + doAnswer(invocationOnMock -> { + ChannelPromise mockPromise = mock(ChannelPromise.class); + List<Entry> receivedEntries = invocationOnMock.getArgument(0, List.class); + for (int index = 1; index <= receivedEntries.size(); index++) { + Entry entry = receivedEntries.get(index - 1); + assertEquals(entry.getLedgerId(), 1); + assertEquals(entry.getEntryId(), index); + ByteBuf byteBuf = entry.getDataBuffer(); + MessageMetadata messageMetadata = Commands.parseMessageMetadata(byteBuf); + assertEquals(byteBuf.toString(UTF_8), "message" + index); + }; + return mockPromise; + }).when(consumerMock).sendMessages(any(List.class), any(EntryBatchSizes.class), any(), + anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class)); + try { + nonpersistentDispatcher.sendMessages(entries); + } catch (Exception e) { + fail("Failed to sendMessages.", e); + } + verify(consumerMock, times(0)).sendMessages(any(List.class), any(EntryBatchSizes.class), + eq(null), anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class)); + } + private ByteBuf createMessage(String message, int sequenceId) { return createMessage(message, sequenceId, "testKey"); }
