This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 257ca7f6286dd39009b675f794e324adafcde985
Author: 道君 <[email protected]>
AuthorDate: Wed Jul 24 14:40:03 2024 +0800

    [fix][broker] Handle BucketDelayedDeliveryTracker recover failed (#22735)
    
    (cherry picked from commit 1c53841cc7f585bdd8ff6702d74f37491d8cc9c6)
---
 .../BucketDelayedDeliveryTrackerFactory.java       |  30 ++-
 .../broker/delayed/DelayedDeliveryTracker.java     |  47 ++++
 .../InMemoryDelayedDeliveryTrackerFactory.java     |  19 ++
 .../bucket/BucketDelayedDeliveryTracker.java       |  35 +--
 .../RecoverDelayedDeliveryTrackerException.java    |  25 +++
 .../pulsar/broker/service/BrokerService.java       |  28 ++-
 .../PersistentDispatcherMultipleConsumers.java     |  11 +-
 .../delayed/DelayedDeliveryTrackerFactoryTest.java | 242 +++++++++++++++++++++
 .../bucket/BucketDelayedDeliveryTrackerTest.java   |   6 +-
 9 files changed, 420 insertions(+), 23 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
index 33076fd51a8..69a08bd2be4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.delayed;
 
+import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
 import io.netty.util.concurrent.DefaultThreadFactory;
@@ -33,10 +34,15 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.delayed.bucket.BookkeeperBucketSnapshotStorage;
 import org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker;
 import org.apache.pulsar.broker.delayed.bucket.BucketSnapshotStorage;
+import 
org.apache.pulsar.broker.delayed.bucket.RecoverDelayedDeliveryTrackerException;
+import org.apache.pulsar.broker.service.BrokerService;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class BucketDelayedDeliveryTrackerFactory implements 
DelayedDeliveryTrackerFactory {
+    private static final Logger log = 
LoggerFactory.getLogger(BucketDelayedDeliveryTrackerFactory.class);
 
     BucketSnapshotStorage bucketSnapshotStorage;
 
@@ -73,8 +79,28 @@ public class BucketDelayedDeliveryTrackerFactory implements 
DelayedDeliveryTrack
 
     @Override
     public DelayedDeliveryTracker 
newTracker(PersistentDispatcherMultipleConsumers dispatcher) {
-        return new BucketDelayedDeliveryTracker(dispatcher, timer, 
tickTimeMillis, isDelayedDeliveryDeliverAtTimeStrict,
-                bucketSnapshotStorage, delayedDeliveryMinIndexCountPerBucket,
+        String topicName = dispatcher.getTopic().getName();
+        String subscriptionName = dispatcher.getSubscription().getName();
+        BrokerService brokerService = dispatcher.getTopic().getBrokerService();
+        DelayedDeliveryTracker tracker;
+
+        try {
+            tracker = newTracker0(dispatcher);
+        } catch (RecoverDelayedDeliveryTrackerException ex) {
+            log.warn("Failed to recover BucketDelayedDeliveryTracker, fallback 
to InMemoryDelayedDeliveryTracker."
+                    + " topic {}, subscription {}", topicName, 
subscriptionName, ex);
+            // If failed to create BucketDelayedDeliveryTracker, fallback to 
InMemoryDelayedDeliveryTracker
+            brokerService.initializeFallbackDelayedDeliveryTrackerFactory();
+            tracker = 
brokerService.getFallbackDelayedDeliveryTrackerFactory().newTracker(dispatcher);
+        }
+        return tracker;
+    }
+
+    @VisibleForTesting
+    BucketDelayedDeliveryTracker 
newTracker0(PersistentDispatcherMultipleConsumers dispatcher)
+            throws RecoverDelayedDeliveryTrackerException {
+        return new BucketDelayedDeliveryTracker(dispatcher, timer, 
tickTimeMillis,
+                isDelayedDeliveryDeliverAtTimeStrict, bucketSnapshotStorage, 
delayedDeliveryMinIndexCountPerBucket,
                 
TimeUnit.SECONDS.toMillis(delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds),
                 delayedDeliveryMaxIndexesPerBucketSnapshotSegment, 
delayedDeliveryMaxNumBuckets);
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
index 78229fef25a..893a4e59c3c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
@@ -85,4 +85,51 @@ public interface DelayedDeliveryTracker extends 
AutoCloseable {
      * Close the subscription tracker and release all resources.
      */
     void close();
+
+    DelayedDeliveryTracker DISABLE = new DelayedDeliveryTracker() {
+        @Override
+        public boolean addMessage(long ledgerId, long entryId, long 
deliveryAt) {
+            return false;
+        }
+
+        @Override
+        public boolean hasMessageAvailable() {
+            return false;
+        }
+
+        @Override
+        public long getNumberOfDelayedMessages() {
+            return 0;
+        }
+
+        @Override
+        public long getBufferMemoryUsage() {
+            return 0;
+        }
+
+        @Override
+        public NavigableSet<PositionImpl> getScheduledMessages(int 
maxMessages) {
+            return null;
+        }
+
+        @Override
+        public boolean shouldPauseAllDeliveries() {
+            return false;
+        }
+
+        @Override
+        public void resetTickTime(long tickTime) {
+
+        }
+
+        @Override
+        public CompletableFuture<Void> clear() {
+            return null;
+        }
+
+        @Override
+        public void close() {
+
+        }
+    };
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java
index e7dc3f18f46..179cf74db41 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.delayed;
 
+import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
 import io.netty.util.concurrent.DefaultThreadFactory;
@@ -25,8 +26,11 @@ import java.util.concurrent.TimeUnit;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class InMemoryDelayedDeliveryTrackerFactory implements 
DelayedDeliveryTrackerFactory {
+    private static final Logger log = 
LoggerFactory.getLogger(InMemoryDelayedDeliveryTrackerFactory.class);
 
     private Timer timer;
 
@@ -48,6 +52,21 @@ public class InMemoryDelayedDeliveryTrackerFactory 
implements DelayedDeliveryTra
 
     @Override
     public DelayedDeliveryTracker 
newTracker(PersistentDispatcherMultipleConsumers dispatcher) {
+        String topicName = dispatcher.getTopic().getName();
+        String subscriptionName = dispatcher.getSubscription().getName();
+        DelayedDeliveryTracker tracker =  DelayedDeliveryTracker.DISABLE;
+        try {
+            tracker = newTracker0(dispatcher);
+        } catch (Exception e) {
+            // it should never go here
+            log.warn("Failed to create InMemoryDelayedDeliveryTracker, topic 
{}, subscription {}",
+                    topicName, subscriptionName, e);
+        }
+        return tracker;
+    }
+
+    @VisibleForTesting
+    InMemoryDelayedDeliveryTracker 
newTracker0(PersistentDispatcherMultipleConsumers dispatcher) {
         return new InMemoryDelayedDeliveryTracker(dispatcher, timer, 
tickTimeMillis,
                 isDelayedDeliveryDeliverAtTimeStrict, 
fixedDelayDetectionLookahead);
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index f98c9e000f1..1cbae674bd5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -105,22 +105,24 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
     private CompletableFuture<Void> pendingLoad = null;
 
     public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers 
dispatcher,
-                                 Timer timer, long tickTimeMillis,
-                                 boolean isDelayedDeliveryDeliverAtTimeStrict,
-                                 BucketSnapshotStorage bucketSnapshotStorage,
-                                 long minIndexCountPerBucket, long 
timeStepPerBucketSnapshotSegmentInMillis,
-                                 int maxIndexesPerBucketSnapshotSegment, int 
maxNumBuckets) {
+                                        Timer timer, long tickTimeMillis,
+                                        boolean 
isDelayedDeliveryDeliverAtTimeStrict,
+                                        BucketSnapshotStorage 
bucketSnapshotStorage,
+                                        long minIndexCountPerBucket, long 
timeStepPerBucketSnapshotSegmentInMillis,
+                                        int 
maxIndexesPerBucketSnapshotSegment, int maxNumBuckets)
+            throws RecoverDelayedDeliveryTrackerException {
         this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), 
isDelayedDeliveryDeliverAtTimeStrict,
                 bucketSnapshotStorage, minIndexCountPerBucket, 
timeStepPerBucketSnapshotSegmentInMillis,
                 maxIndexesPerBucketSnapshotSegment, maxNumBuckets);
     }
 
     public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers 
dispatcher,
-                                 Timer timer, long tickTimeMillis, Clock clock,
-                                 boolean isDelayedDeliveryDeliverAtTimeStrict,
-                                 BucketSnapshotStorage bucketSnapshotStorage,
-                                 long minIndexCountPerBucket, long 
timeStepPerBucketSnapshotSegmentInMillis,
-                                 int maxIndexesPerBucketSnapshotSegment, int 
maxNumBuckets) {
+                                        Timer timer, long tickTimeMillis, 
Clock clock,
+                                        boolean 
isDelayedDeliveryDeliverAtTimeStrict,
+                                        BucketSnapshotStorage 
bucketSnapshotStorage,
+                                        long minIndexCountPerBucket, long 
timeStepPerBucketSnapshotSegmentInMillis,
+                                        int 
maxIndexesPerBucketSnapshotSegment, int maxNumBuckets)
+            throws RecoverDelayedDeliveryTrackerException {
         super(dispatcher, timer, tickTimeMillis, clock, 
isDelayedDeliveryDeliverAtTimeStrict);
         this.minIndexCountPerBucket = minIndexCountPerBucket;
         this.timeStepPerBucketSnapshotSegmentInMillis = 
timeStepPerBucketSnapshotSegmentInMillis;
@@ -133,10 +135,17 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                 new MutableBucket(dispatcher.getName(), 
dispatcher.getCursor(), FutureUtil.Sequencer.create(),
                         bucketSnapshotStorage);
         this.stats = new BucketDelayedMessageIndexStats();
-        this.numberDelayedMessages = recoverBucketSnapshot();
+
+        // Close the tracker if failed to recover.
+        try {
+            this.numberDelayedMessages = recoverBucketSnapshot();
+        } catch (RecoverDelayedDeliveryTrackerException e) {
+            close();
+            throw e;
+        }
     }
 
-    private synchronized long recoverBucketSnapshot() throws RuntimeException {
+    private synchronized long recoverBucketSnapshot() throws 
RecoverDelayedDeliveryTrackerException {
         ManagedCursor cursor = this.lastMutableBucket.getCursor();
         Map<String, String> cursorProperties = cursor.getCursorProperties();
         if (MapUtils.isEmpty(cursorProperties)) {
@@ -181,7 +190,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
             if (e instanceof InterruptedException) {
                 Thread.currentThread().interrupt();
             }
-            throw new RuntimeException(e);
+            throw new RecoverDelayedDeliveryTrackerException(e);
         }
 
         for (Map.Entry<Range<Long>, CompletableFuture<List<DelayedIndex>>> 
entry : futures.entrySet()) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/RecoverDelayedDeliveryTrackerException.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/RecoverDelayedDeliveryTrackerException.java
new file mode 100644
index 00000000000..71a851100fe
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/RecoverDelayedDeliveryTrackerException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+public class RecoverDelayedDeliveryTrackerException extends Exception {
+    public RecoverDelayedDeliveryTrackerException(Throwable cause) {
+        super(cause);
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index ca3c9834be0..0254d8087d0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -104,6 +104,7 @@ import 
org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.cache.BundlesQuotas;
 import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory;
 import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerLoader;
+import org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTrackerFactory;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl;
 import org.apache.pulsar.broker.loadbalance.LoadManager;
@@ -272,10 +273,11 @@ public class BrokerService implements Closeable {
     private static final AtomicBoolean blockedDispatcherOnHighUnackedMsgs = 
new AtomicBoolean(false);
     private final ConcurrentOpenHashSet<PersistentDispatcherMultipleConsumers> 
blockedDispatchers;
     private final ReadWriteLock lock = new ReentrantReadWriteLock();
-
-    @Getter
     @VisibleForTesting
     private final DelayedDeliveryTrackerFactory delayedDeliveryTrackerFactory;
+    // InMemoryDelayedDeliveryTrackerFactory is for the purpose of
+    // fallback if recover BucketDelayedDeliveryTracker failed.
+    private volatile DelayedDeliveryTrackerFactory 
fallbackDelayedDeliveryTrackerFactory;
     private final ServerBootstrap defaultServerBootstrap;
     private final List<EventLoopGroup> protocolHandlersWorkerGroups = new 
ArrayList<>();
 
@@ -884,6 +886,9 @@ public class BrokerService implements Closeable {
                                 pulsarStats.close();
                                 try {
                                     delayedDeliveryTrackerFactory.close();
+                                    if (fallbackDelayedDeliveryTrackerFactory 
!= null) {
+                                        
fallbackDelayedDeliveryTrackerFactory.close();
+                                    }
                                 } catch (Exception e) {
                                     log.warn("Error in closing 
delayedDeliveryTrackerFactory", e);
                                 }
@@ -3457,6 +3462,25 @@ public class BrokerService implements Closeable {
         }
     }
 
+    /**
+     * Initializes the in-memory delayed delivery tracker factory when
+     * BucketDelayedDeliveryTrackerFactory.newTracker failed.
+     */
+    public synchronized void initializeFallbackDelayedDeliveryTrackerFactory() 
{
+        if (fallbackDelayedDeliveryTrackerFactory != null) {
+            return;
+        }
+
+        DelayedDeliveryTrackerFactory factory = new 
InMemoryDelayedDeliveryTrackerFactory();
+        try {
+            factory.initialize(pulsar);
+            this.fallbackDelayedDeliveryTrackerFactory = factory;
+        } catch (Exception e) {
+            // it should never go here
+            log.error("Failed to initialize 
InMemoryDelayedDeliveryTrackerFactory", e);
+        }
+    }
+
     private static class ConfigField {
         final Field field;
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 2f99d57aa45..a04b1fc3ad1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -1083,15 +1083,15 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractDispatcherMul
         }
 
         synchronized (this) {
-            if (!delayedDeliveryTracker.isPresent()) {
+            if (delayedDeliveryTracker.isEmpty()) {
                 if (!msgMetadata.hasDeliverAtTime()) {
                     // No need to initialize the tracker here
                     return false;
                 }
 
                 // Initialize the tracker the first time we need to use it
-                delayedDeliveryTracker = Optional
-                        
.of(topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this));
+                delayedDeliveryTracker = Optional.of(
+                        
topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this));
             }
 
             
delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis());
@@ -1245,5 +1245,10 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractDispatcherMul
         return 
StickyKeyConsumerSelector.makeStickyKeyHash(peekStickyKey(entry.getDataBuffer()));
     }
 
+
+    public Subscription getSubscription() {
+        return subscription;
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(PersistentDispatcherMultipleConsumers.class);
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerFactoryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerFactoryTest.java
new file mode 100644
index 00000000000..bb6ef9d3636
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerFactoryTest.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed;
+
+import lombok.Cleanup;
+import org.apache.commons.lang3.tuple.Pair;
+import 
org.apache.pulsar.broker.delayed.bucket.RecoverDelayedDeliveryTrackerException;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.broker.service.Topic;
+import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.awaitility.Awaitility;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Field;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class DelayedDeliveryTrackerFactoryTest extends ProducerConsumerBase {
+    @BeforeClass
+    @Override
+    public void setup() throws Exception {
+        
conf.setDelayedDeliveryTrackerFactoryClassName(BucketDelayedDeliveryTrackerFactory.class.getName());
+        conf.setDelayedDeliveryMaxNumBuckets(10);
+        conf.setDelayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds(1);
+        conf.setDelayedDeliveryMaxIndexesPerBucketSnapshotSegment(10);
+        conf.setDelayedDeliveryMinIndexCountPerBucket(50);
+        conf.setDelayedDeliveryTickTimeMillis(1024);
+        conf.setDispatcherReadFailureBackoffInitialTimeInMs(1000);
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @Override
+    @AfterClass(alwaysRun = true)
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testFallbackToInMemoryTracker() throws Exception {
+        Pair<BrokerService, PersistentDispatcherMultipleConsumers> pair =
+                mockDelayedDeliveryTrackerFactoryAndDispatcher();
+        BrokerService brokerService = pair.getLeft();
+        PersistentDispatcherMultipleConsumers dispatcher = pair.getRight();
+
+        // Since Mocked BucketDelayedDeliveryTrackerFactory.newTracker0() 
throws RecoverDelayedDeliveryTrackerException,
+        // the factory should be fallback to 
InMemoryDelayedDeliveryTrackerFactory
+        @Cleanup
+        DelayedDeliveryTracker tracker = 
brokerService.getDelayedDeliveryTrackerFactory().newTracker(dispatcher);
+        Assert.assertTrue(tracker instanceof InMemoryDelayedDeliveryTracker);
+
+        DelayedDeliveryTrackerFactory fallbackFactory = 
brokerService.getFallbackDelayedDeliveryTrackerFactory();
+        Assert.assertTrue(fallbackFactory instanceof 
InMemoryDelayedDeliveryTrackerFactory);
+    }
+
+
+    private Pair<BrokerService, PersistentDispatcherMultipleConsumers> 
mockDelayedDeliveryTrackerFactoryAndDispatcher()
+            throws Exception {
+        BrokerService brokerService = Mockito.spy(pulsar.getBrokerService());
+
+        // Mock dispatcher
+        PersistentDispatcherMultipleConsumers dispatcher = 
Mockito.mock(PersistentDispatcherMultipleConsumers.class);
+        Mockito.doReturn("test").when(dispatcher).getName();
+        // Mock BucketDelayedDeliveryTrackerFactory
+        @Cleanup
+        BucketDelayedDeliveryTrackerFactory factory = new 
BucketDelayedDeliveryTrackerFactory();
+        factory = Mockito.spy(factory);
+        factory.initialize(pulsar);
+        Mockito.doThrow(new RecoverDelayedDeliveryTrackerException(new 
RuntimeException()))
+                .when(factory).newTracker0(Mockito.eq(dispatcher));
+        // Mock brokerService
+        
Mockito.doReturn(factory).when(brokerService).getDelayedDeliveryTrackerFactory();
+        // Mock topic and subscription
+        PersistentTopic topic = Mockito.mock(PersistentTopic.class);
+        Mockito.doReturn(brokerService).when(topic).getBrokerService();
+        Subscription subscription = Mockito.mock(Subscription.class);
+        Mockito.doReturn("topic").when(topic).getName();
+        Mockito.doReturn("sub").when(subscription).getName();
+        Mockito.doReturn(topic).when(dispatcher).getTopic();
+        Mockito.doReturn(subscription).when(dispatcher).getSubscription();
+
+        return Pair.of(brokerService, dispatcher);
+    }
+
+    @Test
+    public void testFallbackToInMemoryTrackerFactoryFailed() throws Exception {
+        Pair<BrokerService, PersistentDispatcherMultipleConsumers> pair =
+                mockDelayedDeliveryTrackerFactoryAndDispatcher();
+        BrokerService brokerService = pair.getLeft();
+        PersistentDispatcherMultipleConsumers dispatcher = pair.getRight();
+
+        // Mock InMemoryDelayedDeliveryTrackerFactory
+        @Cleanup
+        InMemoryDelayedDeliveryTrackerFactory factory = new 
InMemoryDelayedDeliveryTrackerFactory();
+        factory = Mockito.spy(factory);
+        factory.initialize(pulsar);
+        // Mock InMemoryDelayedDeliveryTrackerFactory.newTracker0() throws 
RuntimeException
+        Mockito.doThrow(new 
RuntimeException()).when(factory).newTracker0(Mockito.eq(dispatcher));
+
+        // Mock brokerService to return mocked 
InMemoryDelayedDeliveryTrackerFactory
+        Mockito.doAnswer(inv -> 
null).when(brokerService).initializeFallbackDelayedDeliveryTrackerFactory();
+        
Mockito.doReturn(factory).when(brokerService).getFallbackDelayedDeliveryTrackerFactory();
+
+        // Since Mocked BucketDelayedDeliveryTrackerFactory.newTracker0() 
throws RecoverDelayedDeliveryTrackerException,
+        // and Mocked InMemoryDelayedDeliveryTrackerFactory.newTracker0() 
throws RuntimeException,
+        // the tracker instance should be DelayedDeliveryTracker.DISABLE
+        @Cleanup
+        DelayedDeliveryTracker tracker = 
brokerService.getDelayedDeliveryTrackerFactory().newTracker(dispatcher);
+        Assert.assertEquals(tracker, DelayedDeliveryTracker.DISABLE);
+    }
+
+    // 1. Create BucketDelayedDeliveryTracker failed, fallback to 
InMemoryDelayedDeliveryTracker,
+    // 2. Publish delay messages
+    @Test(timeOut = 60_000)
+    public void 
testPublishDelayMessagesAndCreateBucketDelayDeliveryTrackerFailed() throws 
Exception {
+        String topicName = "persistent://public/default/" + UUID.randomUUID();
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .enableBatching(false)
+                .create();
+
+        // Mock BucketDelayedDeliveryTrackerFactory.newTracker0() throws 
RecoverDelayedDeliveryTrackerException
+        PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
+        topic = Mockito.spy(topic);
+        BrokerService brokerService = Mockito.spy(pulsar.getBrokerService());
+        BucketDelayedDeliveryTrackerFactory factory =
+                (BucketDelayedDeliveryTrackerFactory) 
Mockito.spy(brokerService.getDelayedDeliveryTrackerFactory());
+        Mockito.doThrow(new RecoverDelayedDeliveryTrackerException(new 
RuntimeException()))
+                .when(factory).newTracker0(Mockito.any());
+        
Mockito.doReturn(factory).when(brokerService).getDelayedDeliveryTrackerFactory();
+
+        // Return mocked BrokerService
+        Mockito.doReturn(brokerService).when(topic).getBrokerService();
+
+        // Set Mocked topic to BrokerService
+        Field topics = BrokerService.class.getDeclaredField("topics");
+        topics.setAccessible(true);
+        @SuppressWarnings("unchecked")
+        ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> 
topicMap =
+                (ConcurrentOpenHashMap<String, 
CompletableFuture<Optional<Topic>>>) topics.get(brokerService);
+        topicMap.put(topicName, 
CompletableFuture.completedFuture(Optional.of(topic)));
+
+        // Create consumer
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName("sub")
+                .subscriptionType(SubscriptionType.Shared)
+                .messageListener((c, msg) -> {
+                    try {
+                        c.acknowledge(msg);
+                    } catch (PulsarClientException e) {
+                        throw new RuntimeException(e);
+                    }
+                })
+                .subscribe();
+
+        PersistentSubscription subscription = topic.getSubscription("sub");
+        Dispatcher dispatcher = subscription.getDispatcher();
+        Assert.assertTrue(dispatcher instanceof 
PersistentDispatcherMultipleConsumers);
+
+        // Publish a delay message to initialize DelayedDeliveryTracker
+        producer.newMessage().value("test").deliverAfter(10_000, 
TimeUnit.MILLISECONDS).send();
+
+        // Get DelayedDeliveryTracker from Dispatcher
+        PersistentDispatcherMultipleConsumers dispatcher0 = 
(PersistentDispatcherMultipleConsumers) dispatcher;
+        Field trackerField =
+                
PersistentDispatcherMultipleConsumers.class.getDeclaredField("delayedDeliveryTracker");
+        trackerField.setAccessible(true);
+
+        AtomicReference<Optional<DelayedDeliveryTracker>> reference = new 
AtomicReference<>();
+        // Wait until DelayedDeliveryTracker is initialized
+        Awaitility.await().atMost(Duration.ofSeconds(20)).until(() -> {
+            @SuppressWarnings("unchecked")
+            Optional<DelayedDeliveryTracker> optional =
+                    (Optional<DelayedDeliveryTracker>) 
trackerField.get(dispatcher0);
+            if (optional.isPresent()) {
+                reference.set(optional);
+                return true;
+            }
+            return false;
+        });
+
+        Optional<DelayedDeliveryTracker> optional = reference.get();
+        Assert.assertTrue(optional.get() instanceof 
InMemoryDelayedDeliveryTracker);
+
+        // Mock DelayedDeliveryTracker and Count the number of addMessage() 
calls
+        AtomicInteger counter = new AtomicInteger(0);
+        InMemoryDelayedDeliveryTracker tracker = 
(InMemoryDelayedDeliveryTracker) optional.get();
+        tracker =  Mockito.spy(tracker);
+        Mockito.doAnswer(inv -> {
+            counter.incrementAndGet();
+            return inv.callRealMethod();
+        }).when(tracker).addMessage(Mockito.anyLong(), Mockito.anyLong(), 
Mockito.anyLong());
+        // Set Mocked InMemoryDelayedDeliveryTracker back to Dispatcher
+        trackerField.set(dispatcher0, Optional.of(tracker));
+
+        // Publish 10 delay messages, so the counter should be 10
+        for (int i = 0; i < 10; i++) {
+            producer.newMessage().value("test")
+                    .deliverAfter(10_000, TimeUnit.MILLISECONDS).send();
+        }
+
+        try {
+            Awaitility.await().atMost(Duration.ofSeconds(20)).until(() -> 
counter.get() == 10);
+        } finally {
+            consumer.close();
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
index 39b3992fbd1..2b2a3a39b87 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
@@ -182,7 +182,7 @@ public class BucketDelayedDeliveryTrackerTest extends 
AbstractDeliveryTrackerTes
     }
 
     @Test(dataProvider = "delayedTracker", invocationCount = 10)
-    public void testRecoverSnapshot(BucketDelayedDeliveryTracker tracker) {
+    public void testRecoverSnapshot(BucketDelayedDeliveryTracker tracker) 
throws Exception {
         for (int i = 1; i <= 100; i++) {
             tracker.addMessage(i, i, i * 10);
         }
@@ -265,7 +265,7 @@ public class BucketDelayedDeliveryTrackerTest extends 
AbstractDeliveryTrackerTes
     }
 
     @Test(dataProvider = "delayedTracker")
-    public void testMergeSnapshot(final BucketDelayedDeliveryTracker tracker) {
+    public void testMergeSnapshot(final BucketDelayedDeliveryTracker tracker) 
throws Exception {
         for (int i = 1; i <= 110; i++) {
             tracker.addMessage(i, i, i * 10);
             Awaitility.await().untilAsserted(() -> {
@@ -318,7 +318,7 @@ public class BucketDelayedDeliveryTrackerTest extends 
AbstractDeliveryTrackerTes
     }
 
     @Test(dataProvider = "delayedTracker")
-    public void testWithBkException(final BucketDelayedDeliveryTracker 
tracker) {
+    public void testWithBkException(final BucketDelayedDeliveryTracker 
tracker) throws Exception {
         MockBucketSnapshotStorage mockBucketSnapshotStorage = 
(MockBucketSnapshotStorage) bucketSnapshotStorage;
         mockBucketSnapshotStorage.injectCreateException(
                 new BucketSnapshotPersistenceException("Bookie operation 
timeout, op: Create entry"));

Reply via email to