This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9b7e97bf8faae5ec62565c8dc6f5ddf5f5a1c737 Author: Marvin Cai <[email protected]> AuthorDate: Mon Mar 15 18:19:25 2021 -0700 Fix message not dispatch for key_shared sub type in non-persistent su… (#9826) Fixes #9703 ### Motivation With a non-persistent topic, I see messages published in the topic stats, but consumers do not consume them if they use Key_Shared. Other consumer modes work fine. ### Verifying this change Covered by existing test case, verified manually. (cherry picked from commit 960a79e69fd562fde36c0cccd0cb39783da11937) --- ...istentStickyKeyDispatcherMultipleConsumers.java | 2 +- ...ntStickyKeyDispatcherMultipleConsumersTest.java | 163 +++++++++++++++++++++ .../client/api/KeySharedSubscriptionTest.java | 38 +++-- 3 files changed, 189 insertions(+), 14 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java index 736252e..c1c0227 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java @@ -76,7 +76,7 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis @Override public void sendMessages(List<Entry> entries) { - if (!entries.isEmpty()) { + if (entries.isEmpty()) { return; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java new file mode 100644 index 0000000..49dac2a --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.nonpersistent; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelPromise; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.impl.EntryImpl; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.BrokerServiceException; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.EntryBatchSizes; +import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector; +import org.apache.pulsar.broker.service.RedeliveryTracker; +import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; +import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.apache.pulsar.common.protocol.Commands; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.testng.IObjectFactory; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.ObjectFactory; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; + +@PrepareForTest({ DispatchRateLimiter.class }) +@PowerMockIgnore({"org.apache.logging.log4j.*"}) +public class NonPersistentStickyKeyDispatcherMultipleConsumersTest { + + private PulsarService pulsarMock; + private BrokerService brokerMock; + private NonPersistentTopic topicMock; + private NonPersistentSubscription subscriptionMock; + private ServiceConfiguration configMock; + private ChannelPromise channelMock; + + private NonPersistentStickyKeyDispatcherMultipleConsumers nonpersistentDispatcher; + + final String topicName = "non-persistent://public/default/testTopic"; + + @ObjectFactory + public IObjectFactory getObjectFactory() { + return new org.powermock.modules.testng.PowerMockObjectFactory(); + } + + @BeforeMethod + public void setup() throws Exception { + configMock = mock(ServiceConfiguration.class); + doReturn(true).when(configMock).isSubscriptionRedeliveryTrackerEnabled(); + doReturn(100).when(configMock).getDispatcherMaxReadBatchSize(); + doReturn(true).when(configMock).isSubscriptionKeySharedUseConsistentHashing(); + doReturn(1).when(configMock).getSubscriptionKeySharedConsistentHashingReplicaPoints(); + + pulsarMock = mock(PulsarService.class); + doReturn(configMock).when(pulsarMock).getConfiguration(); + + brokerMock = mock(BrokerService.class); + doReturn(pulsarMock).when(brokerMock).pulsar(); + + topicMock = mock(NonPersistentTopic.class); + doReturn(brokerMock).when(topicMock).getBrokerService(); + doReturn(topicName).when(topicMock).getName(); + + channelMock = mock(ChannelPromise.class); + subscriptionMock = mock(NonPersistentSubscription.class); + + PowerMockito.mockStatic(DispatchRateLimiter.class); + PowerMockito.when(DispatchRateLimiter.isDispatchRateNeeded( + any(BrokerService.class), + any(Optional.class), + anyString(), + any(DispatchRateLimiter.Type.class)) + ).thenReturn(false); + + nonpersistentDispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers( + topicMock, subscriptionMock, + new HashRangeAutoSplitStickyKeyConsumerSelector()); + } + + @Test(timeOut = 10000) + public void testSendMessage() throws BrokerServiceException { + Consumer consumerMock = mock(Consumer.class); + nonpersistentDispatcher.addConsumer(consumerMock); + + List<Entry> entries = new ArrayList<>(); + entries.add(EntryImpl.create(1, 1, createMessage("message1", 1))); + entries.add(EntryImpl.create(1, 2, createMessage("message2", 2))); + doAnswer(invocationOnMock -> { + ChannelPromise mockPromise = mock(ChannelPromise.class); + List<Entry> receivedEntries = invocationOnMock.getArgument(0, List.class); + for (int index = 1; index <= receivedEntries.size(); index++) { + Entry entry = receivedEntries.get(index - 1); + assertEquals(entry.getLedgerId(), 1); + assertEquals(entry.getEntryId(), index); + ByteBuf byteBuf = entry.getDataBuffer(); + MessageMetadata messageMetadata = Commands.parseMessageMetadata(byteBuf); + assertEquals(byteBuf.toString(UTF_8), "message" + index); + }; + return mockPromise; + }).when(consumerMock).sendMessages(any(List.class), any(EntryBatchSizes.class), any(), + anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class)); + try { + nonpersistentDispatcher.sendMessages(entries); + } catch (Exception e) { + fail("Failed to sendMessages.", e); + } + verify(consumerMock, times(1)).sendMessages(any(List.class), any(EntryBatchSizes.class), + eq(null), anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class)); + } + + private ByteBuf createMessage(String message, int sequenceId) { + return createMessage(message, sequenceId, "testKey"); + } + + private ByteBuf createMessage(String message, int sequenceId, String key) { + MessageMetadata messageMetadata = new MessageMetadata() + .setSequenceId(sequenceId) + .setProducerName("testProducer") + .setPartitionKey(key) + .setPartitionKeyB64Encoded(false) + .setPublishTime(System.currentTimeMillis()); + return serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, Unpooled.copiedBuffer(message.getBytes(UTF_8))); + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java index e9b30b2..d87c257 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java @@ -35,6 +35,7 @@ import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -130,12 +131,12 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase { .send(); } - receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, consumer3)); + receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, consumer3), 1000); } @Test(dataProvider = "data") public void testSendAndReceiveWithBatching(String topicType, boolean enableBatch) - throws PulsarClientException { + throws Exception { this.conf.setSubscriptionKeySharedEnable(true); String topic = topicType + "://public/default/key_shared-" + UUID.randomUUID(); @@ -151,23 +152,33 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase { @Cleanup Producer<Integer> producer = createProducer(topic, enableBatch); + CompletableFuture<MessageId> future; + for (int i = 0; i < 1000; i++) { // Send the same key twice so that we'll have a batch message String key = String.valueOf(random.nextInt(NUMBER_OF_KEYS)); - producer.newMessage() + future = producer.newMessage() .key(key) .value(i) .sendAsync(); - producer.newMessage() + // If not batching, need to wait for message to be persisted + if (!enableBatch) { + future.get(); + } + + future = producer.newMessage() .key(key) .value(i) .sendAsync(); + if (!enableBatch) { + future.get(); + } } - + // If batching, flush buffered messages producer.flush(); - receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, consumer3)); + receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, consumer3), 1000 * 2); } @Test(dataProvider = "batch") @@ -247,7 +258,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase { .send(); } - receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, consumer3)); + receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, consumer3), 1000); // wait for consumer grouping acking send. Thread.sleep(1000); @@ -262,7 +273,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase { .send(); } - receiveAndCheckDistribution(Lists.newArrayList(consumer3)); + receiveAndCheckDistribution(Lists.newArrayList(consumer3), 10); } @Test(dataProvider = "data") @@ -354,7 +365,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase { .send(); } - receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, consumer3)); + receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, consumer3), 1000); } @Test(dataProvider = "batch") @@ -783,7 +794,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase { .send(); } - receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, consumer3)); + receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, consumer3), 1000); } @Test @@ -940,11 +951,13 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase { producer = pulsarClient.newProducer(Schema.INT32) .topic(topic) .enableBatching(true) + .maxPendingMessages(2001) .batcherBuilder(BatcherBuilder.KEY_BASED) .create(); } else { producer = pulsarClient.newProducer(Schema.INT32) .topic(topic) + .maxPendingMessages(2001) .enableBatching(false) .create(); } @@ -998,7 +1011,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase { /** * Check that every consumer receives a fair number of messages and that same key is delivered to only 1 consumer */ - private void receiveAndCheckDistribution(List<Consumer<?>> consumers) throws PulsarClientException { + private void receiveAndCheckDistribution(List<Consumer<?>> consumers, int expectedTotalMessage) throws PulsarClientException { // Add a key so that we know this key was already assigned to one consumer Map<String, Consumer<?>> keyToConsumer = new HashMap<>(); Map<Consumer<?>, Integer> messagesPerConsumer = new HashMap<>(); @@ -1036,8 +1049,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase { final double PERCENT_ERROR = 0.40; // 40 % double expectedMessagesPerConsumer = totalMessages / consumers.size(); - - System.err.println(messagesPerConsumer); + Assert.assertEquals(expectedTotalMessage, totalMessages); for (int count : messagesPerConsumer.values()) { Assert.assertEquals(count, expectedMessagesPerConsumer, expectedMessagesPerConsumer * PERCENT_ERROR); }
