This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9b7e97bf8faae5ec62565c8dc6f5ddf5f5a1c737
Author: Marvin Cai <[email protected]>
AuthorDate: Mon Mar 15 18:19:25 2021 -0700

    Fix message not dispatch for key_shared sub type in non-persistent su… 
(#9826)
    
    Fixes #9703
    
    ### Motivation
    
    With a non-persistent topic, I see messages published in the topic stats, 
but consumers do not consume them if they use Key_Shared. Other consumer modes 
work fine.
    
    ### Verifying this change
    
    Covered by existing test case, verified manually.
    
    (cherry picked from commit 960a79e69fd562fde36c0cccd0cb39783da11937)
---
 ...istentStickyKeyDispatcherMultipleConsumers.java |   2 +-
 ...ntStickyKeyDispatcherMultipleConsumersTest.java | 163 +++++++++++++++++++++
 .../client/api/KeySharedSubscriptionTest.java      |  38 +++--
 3 files changed, 189 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
index 736252e..c1c0227 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
@@ -76,7 +76,7 @@ public class 
NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
 
     @Override
     public void sendMessages(List<Entry> entries) {
-        if (!entries.isEmpty()) {
+        if (entries.isEmpty()) {
             return;
         }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
new file mode 100644
index 0000000..49dac2a
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.nonpersistent;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelPromise;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.EntryBatchSizes;
+import 
org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
+import org.apache.pulsar.broker.service.RedeliveryTracker;
+import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.protocol.Commands;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.testng.IObjectFactory;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.ObjectFactory;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
+@PrepareForTest({ DispatchRateLimiter.class })
+@PowerMockIgnore({"org.apache.logging.log4j.*"})
+public class NonPersistentStickyKeyDispatcherMultipleConsumersTest {
+
+    private PulsarService pulsarMock;
+    private BrokerService brokerMock;
+    private NonPersistentTopic topicMock;
+    private NonPersistentSubscription subscriptionMock;
+    private ServiceConfiguration configMock;
+    private ChannelPromise channelMock;
+
+    private NonPersistentStickyKeyDispatcherMultipleConsumers 
nonpersistentDispatcher;
+
+    final String topicName = "non-persistent://public/default/testTopic";
+
+    @ObjectFactory
+    public IObjectFactory getObjectFactory() {
+        return new org.powermock.modules.testng.PowerMockObjectFactory();
+    }
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        configMock = mock(ServiceConfiguration.class);
+        
doReturn(true).when(configMock).isSubscriptionRedeliveryTrackerEnabled();
+        doReturn(100).when(configMock).getDispatcherMaxReadBatchSize();
+        
doReturn(true).when(configMock).isSubscriptionKeySharedUseConsistentHashing();
+        
doReturn(1).when(configMock).getSubscriptionKeySharedConsistentHashingReplicaPoints();
+
+        pulsarMock = mock(PulsarService.class);
+        doReturn(configMock).when(pulsarMock).getConfiguration();
+
+        brokerMock = mock(BrokerService.class);
+        doReturn(pulsarMock).when(brokerMock).pulsar();
+
+        topicMock = mock(NonPersistentTopic.class);
+        doReturn(brokerMock).when(topicMock).getBrokerService();
+        doReturn(topicName).when(topicMock).getName();
+
+        channelMock = mock(ChannelPromise.class);
+        subscriptionMock = mock(NonPersistentSubscription.class);
+
+        PowerMockito.mockStatic(DispatchRateLimiter.class);
+        PowerMockito.when(DispatchRateLimiter.isDispatchRateNeeded(
+                any(BrokerService.class),
+                any(Optional.class),
+                anyString(),
+                any(DispatchRateLimiter.Type.class))
+        ).thenReturn(false);
+
+        nonpersistentDispatcher = new 
NonPersistentStickyKeyDispatcherMultipleConsumers(
+                topicMock, subscriptionMock,
+                new HashRangeAutoSplitStickyKeyConsumerSelector());
+    }
+
+    @Test(timeOut = 10000)
+    public void testSendMessage() throws BrokerServiceException {
+        Consumer consumerMock = mock(Consumer.class);
+        nonpersistentDispatcher.addConsumer(consumerMock);
+
+        List<Entry> entries = new ArrayList<>();
+        entries.add(EntryImpl.create(1, 1, createMessage("message1", 1)));
+        entries.add(EntryImpl.create(1, 2, createMessage("message2", 2)));
+        doAnswer(invocationOnMock -> {
+            ChannelPromise mockPromise = mock(ChannelPromise.class);
+            List<Entry> receivedEntries = invocationOnMock.getArgument(0, 
List.class);
+            for (int index = 1; index <= receivedEntries.size(); index++) {
+                Entry entry = receivedEntries.get(index - 1);
+                assertEquals(entry.getLedgerId(), 1);
+                assertEquals(entry.getEntryId(), index);
+                ByteBuf byteBuf = entry.getDataBuffer();
+                MessageMetadata messageMetadata = 
Commands.parseMessageMetadata(byteBuf);
+                assertEquals(byteBuf.toString(UTF_8), "message" + index);
+            };
+            return mockPromise;
+        }).when(consumerMock).sendMessages(any(List.class), 
any(EntryBatchSizes.class), any(),
+                anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class));
+        try {
+            nonpersistentDispatcher.sendMessages(entries);
+        } catch (Exception e) {
+            fail("Failed to sendMessages.", e);
+        }
+        verify(consumerMock, times(1)).sendMessages(any(List.class), 
any(EntryBatchSizes.class),
+                eq(null), anyInt(), anyLong(), anyLong(), 
any(RedeliveryTracker.class));
+    }
+
+    private ByteBuf createMessage(String message, int sequenceId) {
+        return createMessage(message, sequenceId, "testKey");
+    }
+
+    private ByteBuf createMessage(String message, int sequenceId, String key) {
+        MessageMetadata messageMetadata = new MessageMetadata()
+                .setSequenceId(sequenceId)
+                .setProducerName("testProducer")
+                .setPartitionKey(key)
+                .setPartitionKeyB64Encoded(false)
+                .setPublishTime(System.currentTimeMillis());
+        return serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, 
messageMetadata, Unpooled.copiedBuffer(message.getBytes(UTF_8)));
+    }
+
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index e9b30b2..d87c257 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -35,6 +35,7 @@ import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -130,12 +131,12 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
                     .send();
         }
 
-        receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, 
consumer3));
+        receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, 
consumer3), 1000);
     }
 
     @Test(dataProvider = "data")
     public void testSendAndReceiveWithBatching(String topicType, boolean 
enableBatch)
-            throws PulsarClientException {
+            throws Exception {
         this.conf.setSubscriptionKeySharedEnable(true);
         String topic = topicType + "://public/default/key_shared-" + 
UUID.randomUUID();
 
@@ -151,23 +152,33 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
         @Cleanup
         Producer<Integer> producer = createProducer(topic, enableBatch);
 
+        CompletableFuture<MessageId> future;
+
         for (int i = 0; i < 1000; i++) {
             // Send the same key twice so that we'll have a batch message
             String key = String.valueOf(random.nextInt(NUMBER_OF_KEYS));
-            producer.newMessage()
+            future = producer.newMessage()
                     .key(key)
                     .value(i)
                     .sendAsync();
 
-            producer.newMessage()
+            // If not batching, need to wait for message to be persisted
+            if (!enableBatch) {
+                future.get();
+            }
+
+            future = producer.newMessage()
                     .key(key)
                     .value(i)
                     .sendAsync();
+            if (!enableBatch) {
+                future.get();
+            }
         }
-
+        // If batching, flush buffered messages
         producer.flush();
 
-        receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, 
consumer3));
+        receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, 
consumer3), 1000 * 2);
     }
 
     @Test(dataProvider = "batch")
@@ -247,7 +258,7 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
                     .send();
         }
 
-        receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, 
consumer3));
+        receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, 
consumer3), 1000);
 
         // wait for consumer grouping acking send.
         Thread.sleep(1000);
@@ -262,7 +273,7 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
                     .send();
         }
 
-        receiveAndCheckDistribution(Lists.newArrayList(consumer3));
+        receiveAndCheckDistribution(Lists.newArrayList(consumer3), 10);
     }
 
     @Test(dataProvider = "data")
@@ -354,7 +365,7 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
                     .send();
         }
 
-        receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, 
consumer3));
+        receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, 
consumer3), 1000);
     }
 
     @Test(dataProvider = "batch")
@@ -783,7 +794,7 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
                     .send();
         }
 
-        receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, 
consumer3));
+        receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, 
consumer3), 1000);
     }
 
     @Test
@@ -940,11 +951,13 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
             producer = pulsarClient.newProducer(Schema.INT32)
                     .topic(topic)
                     .enableBatching(true)
+                    .maxPendingMessages(2001)
                     .batcherBuilder(BatcherBuilder.KEY_BASED)
                     .create();
         } else {
             producer = pulsarClient.newProducer(Schema.INT32)
                     .topic(topic)
+                    .maxPendingMessages(2001)
                     .enableBatching(false)
                     .create();
         }
@@ -998,7 +1011,7 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
     /**
      * Check that every consumer receives a fair number of messages and that 
same key is delivered to only 1 consumer
      */
-    private void receiveAndCheckDistribution(List<Consumer<?>> consumers) 
throws PulsarClientException {
+    private void receiveAndCheckDistribution(List<Consumer<?>> consumers, int 
expectedTotalMessage) throws PulsarClientException {
         // Add a key so that we know this key was already assigned to one 
consumer
         Map<String, Consumer<?>> keyToConsumer = new HashMap<>();
         Map<Consumer<?>, Integer> messagesPerConsumer = new HashMap<>();
@@ -1036,8 +1049,7 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
         final double PERCENT_ERROR = 0.40; // 40 %
 
         double expectedMessagesPerConsumer = totalMessages / consumers.size();
-
-        System.err.println(messagesPerConsumer);
+        Assert.assertEquals(expectedTotalMessage, totalMessages);
         for (int count : messagesPerConsumer.values()) {
             Assert.assertEquals(count, expectedMessagesPerConsumer, 
expectedMessagesPerConsumer * PERCENT_ERROR);
         }

Reply via email to