This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 5cec5edc4a5 [fix] [broker] Fix items in 
dispatcher.recentlyJoinedConsumers are out-of-order, which may cause a delivery 
stuck (#23802)
5cec5edc4a5 is described below

commit 5cec5edc4a56befe2c30b20ecaf144ae999db335
Author: fengyubiao <[email protected]>
AuthorDate: Fri Jan 3 16:11:42 2025 +0800

    [fix] [broker] Fix items in dispatcher.recentlyJoinedConsumers are 
out-of-order, which may cause a delivery stuck (#23802)
    
    (cherry picked from commit 3d71c8794e5ccc560c431d2066a442d5816e3464)
---
 ...istentStickyKeyDispatcherMultipleConsumers.java |  37 ++-
 .../NonEntryCacheKeySharedSubscriptionV30Test.java | 287 +++++++++++++++++++++
 2 files changed, 316 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 27c9eed9709..14af67b4573 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -34,6 +34,7 @@ import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
+import lombok.Setter;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.Position;
@@ -74,6 +75,15 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
      */
     private final LinkedHashMap<Consumer, PositionImpl> 
recentlyJoinedConsumers;
 
+    /**
+     * The method {@link #sortRecentlyJoinedConsumersIfNeeded} is a un-normal 
method, which used to fix the issue that
+     * was described at https://github.com/apache/pulsar/pull/23795.
+     * To cover the case that does not contain the hot fix that 
https://github.com/apache/pulsar/pull/23795 provided,
+     * we add this method to reproduce the issue in tests.
+     **/
+    @Setter
+    public boolean sortRecentlyJoinedConsumersIfNeeded = true;
+
     PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, 
ManagedCursor cursor,
             Subscription subscription, ServiceConfiguration conf, 
KeySharedMeta ksm) {
         super(topic, cursor, subscription, ksm.isAllowOutOfOrderDelivery());
@@ -132,7 +142,11 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
                 if (!allowOutOfOrderDelivery
                         && recentlyJoinedConsumers != null
                         && consumerList.size() > 1
-                        && 
cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) {
+                        && 
cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1
+                        // If there is a delayed "cursor.rewind" after the 
pending read, the consumers that will be
+                        // added before the "cursor.rewind" will have a same 
"recent joined position", which is the
+                        // same as "mark deleted position +1", so we can skip 
this adding.
+                        && !shouldRewindBeforeReadingOrReplaying) {
                     recentlyJoinedConsumers.put(consumer, 
readPositionWhenJoining);
                     sortRecentlyJoinedConsumersIfNeeded();
                 }
@@ -141,9 +155,13 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
     }
 
     private void sortRecentlyJoinedConsumersIfNeeded() {
+        if (!sortRecentlyJoinedConsumersIfNeeded) {
+            return;
+        }
         if (recentlyJoinedConsumers.size() == 1) {
             return;
         }
+        // Since we check the order of queue after each consumer joined, we 
can only check the last two items.
         boolean sortNeeded = false;
         PositionImpl posPre = null;
         PositionImpl posAfter = null;
@@ -151,18 +169,21 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
             if (posPre == null) {
                 posPre = entry.getValue();
             } else {
+                posPre = posAfter;
                 posAfter = entry.getValue();
             }
-            if (posPre != null && posAfter != null) {
-                if (posPre.compareTo(posAfter) > 0) {
-                    sortNeeded = true;
-                    break;
-                }
-                posPre = posAfter;
+        }
+        if (posPre != null && posAfter != null) {
+            if (posPre.compareTo(posAfter) > 0) {
+                sortNeeded = true;
             }
         }
-
+        // Something went wrongly, sort the collection.
         if (sortNeeded) {
+            log.error("[{}] [{}] The items in recentlyJoinedConsumers are 
out-of-order. {}",
+                    topic.getName(), name, 
recentlyJoinedConsumers.entrySet().stream().map(entry ->
+                            String.format("%s-%s:%s", 
entry.getKey().consumerName(), entry.getValue().getLedgerId(),
+                                    
entry.getValue().getEntryId())).collect(Collectors.toList()));
             List<Map.Entry<Consumer, PositionImpl>> sortedList = new 
ArrayList<>(recentlyJoinedConsumers.entrySet());
             Collections.sort(sortedList, Map.Entry.comparingByValue());
             recentlyJoinedConsumers.clear();
diff --git 
a/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/NonEntryCacheKeySharedSubscriptionV30Test.java
 
b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/NonEntryCacheKeySharedSubscriptionV30Test.java
new file mode 100644
index 00000000000..4edcd65ac20
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/NonEntryCacheKeySharedSubscriptionV30Test.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertTrue;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import 
org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.awaitility.Awaitility;
+import org.mockito.stubbing.Answer;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-api")
+public class NonEntryCacheKeySharedSubscriptionV30Test extends 
ProducerConsumerBase {
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        this.conf.setManagedLedgerCacheSizeMB(0);
+        this.conf.setManagedLedgerMaxEntriesPerLedger(50000);
+    }
+
+
+    @Test(timeOut = 180 * 1000, invocationCount = 1)
+    public void testRecentJoinQueueIsInOrderAfterRewind() throws Exception {
+        int msgCount = 300;
+        final String topic = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        final String subName = "my-sub";
+        final DefaultThreadFactory threadFactory =
+                new 
DefaultThreadFactory(BrokerTestUtil.newUniqueName("thread"));
+        admin.topics().createNonPartitionedTopic(topic);
+        admin.topics().createSubscription(topic, subName, MessageId.earliest);
+
+        // Send messages.
+        @Cleanup
+        Producer<Integer> producer = 
pulsarClient.newProducer(Schema.INT32).topic(topic).enableBatching(false).create();
+        AtomicInteger msgGenerator = new AtomicInteger();
+        for (int i = 0; i < msgCount; i++) {
+            int v = msgGenerator.getAndIncrement();
+            producer.newMessage().key(String.valueOf(v)).value(v).send();
+        }
+
+        // Make ack holes.
+        Consumer<Integer> consumer1 = pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .subscriptionName(subName)
+                .receiverQueueSize(100)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .consumerName("c1")
+                .subscribe();
+        Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .subscriptionName(subName)
+                .receiverQueueSize(100)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .consumerName("c2")
+                .subscribe();
+        Consumer<Integer> consumer3 = pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .subscriptionName(subName)
+                .receiverQueueSize(100)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .consumerName("c3")
+                .subscribe();
+        final PersistentTopic persistentTopic =
+                (PersistentTopic) pulsar.getBrokerService().getTopic(topic, 
false).join().get();
+        final ManagedLedgerImpl ml = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
+        final PersistentSubscription persistentSubscription = 
persistentTopic.getSubscription(subName);
+        final PersistentStickyKeyDispatcherMultipleConsumers dispatcher =
+                (PersistentStickyKeyDispatcherMultipleConsumers) 
persistentSubscription.getDispatcher();
+        final ManagedCursorImpl cursor = (ManagedCursorImpl) 
ml.getCursors().get(subName);
+        dispatcher.setSortRecentlyJoinedConsumersIfNeeded(false);
+
+        // Make ack holes.
+        //  - ack all messages that consumer1 or consumer2 received.
+        //  - do not ack messages that consumer2 received.
+        ackAllMessages(consumer1, consumer2);
+        PositionImpl mdPosition = (PositionImpl) 
cursor.getMarkDeletedPosition();
+        PositionImpl readPosition = (PositionImpl) cursor.getReadPosition();
+        PositionImpl LAC = (PositionImpl) ml.getLastConfirmedEntry();
+        assertTrue(readPosition.compareTo(LAC) >= 0);
+        PositionImpl firstWaitingAckPos = ml.getNextValidPosition(mdPosition);
+        log.info("md-pos {}:{}", mdPosition.getLedgerId(), 
mdPosition.getEntryId());
+        log.info("rd-pos {}:{}", readPosition.getLedgerId(), 
readPosition.getEntryId());
+        log.info("lac-pos {}:{}", LAC.getLedgerId(), LAC.getEntryId());
+        log.info("first-waiting-ack-pos {}:{}", 
firstWaitingAckPos.getLedgerId(), firstWaitingAckPos.getEntryId());
+
+        // Inject a delay for the next replay read.
+        LedgerHandle firstLedger = ml.currentLedger;
+        Assert.assertEquals(firstWaitingAckPos.getLedgerId(), 
firstLedger.getId());
+        LedgerHandle spyFirstLedger = spy(firstLedger);
+        CountDownLatch replyReadSignal = new CountDownLatch(1);
+        AtomicBoolean replayReadWasTriggered = new AtomicBoolean();
+        Answer answer = invocation -> {
+            long firstEntry = (long) invocation.getArguments()[0];
+            if (firstEntry == firstWaitingAckPos.getEntryId()) {
+                replayReadWasTriggered.set(true);
+                final CompletableFuture res = new CompletableFuture<>();
+                threadFactory.newThread(() -> {
+                    try {
+                        replyReadSignal.await();
+                        CompletableFuture<LedgerEntries> future =
+                                (CompletableFuture<LedgerEntries>) 
invocation.callRealMethod();
+                        future.thenAccept(v -> {
+                            res.complete(v);
+                        }).exceptionally(ex -> {
+                            res.completeExceptionally(ex);
+                            return null;
+                        });
+                    } catch (Throwable ex) {
+                        res.completeExceptionally(ex);
+                    }
+                }).start();
+                return res;
+            } else {
+                return invocation.callRealMethod();
+            }
+        };
+        doAnswer(answer).when(spyFirstLedger).readAsync(anyLong(), anyLong());
+        doAnswer(answer).when(spyFirstLedger).readUnconfirmedAsync(anyLong(), 
anyLong());
+        ml.currentLedger = spyFirstLedger;
+
+        // Keep publish to avoid pending normal read.
+        AtomicBoolean keepPublishing = new AtomicBoolean(true);
+        new Thread(() -> {
+            while (keepPublishing.get()) {
+                int v = msgGenerator.getAndIncrement();
+                
producer.newMessage().key(String.valueOf(v)).value(v).sendAsync();
+                sleep(100);
+            }
+        }).start();
+
+        // Trigger a message redelivery.
+        consumer3.close();
+        Awaitility.await().until(() -> replayReadWasTriggered.get());
+
+        // Close all consumers to trigger a cursor.rewind.
+        consumer1.close();
+        consumer2.close();
+
+        // Start 100 consumers.
+        List<CompletableFuture<Consumer<Integer>>> consumerList = new 
ArrayList<>();
+        for (int i = 0; i < 40; i++) {
+            consumerList.add(pulsarClient.newConsumer(Schema.INT32)
+                    .topic(topic)
+                    .subscriptionName(subName)
+                    .subscriptionType(SubscriptionType.Key_Shared)
+                    .subscribeAsync());
+            if (i == 10) {
+                for (int j = 0; j < msgCount; j++) {
+                    int v = msgGenerator.getAndIncrement();
+                    
producer.newMessage().key(String.valueOf(v)).value(v).send();
+                }
+                final Consumer<Integer> firstConsumer = 
consumerList.get(0).join();
+                ackAllMessages(firstConsumer);
+                new Thread(() -> {
+                    while (keepPublishing.get()) {
+                        try {
+                            ackAllMessages(firstConsumer);
+                        } catch (Exception e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                }).start();
+            }
+            log.info("recent-joined-consumers {} {}", i, 
dispatcher.getRecentlyJoinedConsumers().size());
+            if (dispatcher.getRecentlyJoinedConsumers().size() > 0) {
+                PositionImpl mdPosition2 = (PositionImpl) 
cursor.getMarkDeletedPosition();
+                PositionImpl readPosition2 = (PositionImpl) 
cursor.getReadPosition();
+                PositionImpl LAC2 = (PositionImpl) ml.getLastConfirmedEntry();
+                assertTrue(readPosition.compareTo(LAC) >= 0);
+                PositionImpl firstWaitingAckPos2 = 
ml.getNextValidPosition(mdPosition);
+                if(readPosition2.compareTo(firstWaitingAckPos) > 0) {
+                    keepPublishing.set(false);
+                    log.info("consumer-index: {}", i);
+                    log.info("md-pos-2 {}:{}", mdPosition2.getLedgerId(), 
mdPosition2.getEntryId());
+                    log.info("rd-pos-2 {}:{}", readPosition2.getLedgerId(), 
readPosition2.getEntryId());
+                    log.info("lac-pos-2 {}:{}", LAC2.getLedgerId(), 
LAC2.getEntryId());
+                    log.info("first-waiting-ack-pos-2 {}:{}", 
firstWaitingAckPos2.getLedgerId(),
+                            firstWaitingAckPos2.getEntryId());
+                    // finish the replay read here.
+                    replyReadSignal.countDown();
+                } else {
+                    sleep(1000);
+                }
+            }
+        }
+        consumerList.get(consumerList.size() - 1).join();
+
+        synchronized (dispatcher) {
+            LinkedHashMap recentJoinedConsumers = 
dispatcher.getRecentlyJoinedConsumers();
+            assertTrue(verifyMapItemsAreInOrder(recentJoinedConsumers));
+        }
+
+        // cleanup.
+        producer.close();
+        for (CompletableFuture<Consumer<Integer>> c : consumerList) {
+            c.join().close();
+        }
+        admin.topics().delete(topic, false);
+    }
+
+    private void sleep(int millis) {
+        try {
+            Thread.sleep(millis);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private boolean 
verifyMapItemsAreInOrder(LinkedHashMap<org.apache.pulsar.broker.service.Consumer,
+            PositionImpl> map) {
+        boolean outOfOrder = false;
+        PositionImpl posPre = null;
+        PositionImpl posAfter = null;
+        for (Map.Entry<org.apache.pulsar.broker.service.Consumer, 
PositionImpl> entry : map.entrySet()) {
+            if (posPre == null) {
+                posPre = (PositionImpl) entry.getValue();
+            } else {
+                posAfter = (PositionImpl) entry.getValue();
+            }
+            if (posPre != null && posAfter != null) {
+                if (posPre.compareTo(posAfter) > 0) {
+                    outOfOrder = true;
+                    break;
+                }
+                posPre = posAfter;
+            }
+        }
+        return !outOfOrder;
+    }
+}

Reply via email to