This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 12aaeb932b9 [fix][broker] Handle BucketDelayedDeliveryTracker recover
failed (#22735)
12aaeb932b9 is described below
commit 12aaeb932b9af6828f21e8a5b66e1380ce90726b
Author: 道君 <[email protected]>
AuthorDate: Wed Jul 24 14:40:03 2024 +0800
[fix][broker] Handle BucketDelayedDeliveryTracker recover failed (#22735)
---
.../BucketDelayedDeliveryTrackerFactory.java | 30 ++-
.../broker/delayed/DelayedDeliveryTracker.java | 47 ++++
.../InMemoryDelayedDeliveryTrackerFactory.java | 19 ++
.../bucket/BucketDelayedDeliveryTracker.java | 35 +--
.../RecoverDelayedDeliveryTrackerException.java | 25 +++
.../pulsar/broker/service/BrokerService.java | 28 ++-
.../PersistentDispatcherMultipleConsumers.java | 11 +-
.../delayed/DelayedDeliveryTrackerFactoryTest.java | 242 +++++++++++++++++++++
.../bucket/BucketDelayedDeliveryTrackerTest.java | 6 +-
9 files changed, 420 insertions(+), 23 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
index 17d9795dd90..11ad243e0c9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.delayed;
+import com.google.common.annotations.VisibleForTesting;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
@@ -33,10 +34,15 @@ import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.delayed.bucket.BookkeeperBucketSnapshotStorage;
import org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker;
import org.apache.pulsar.broker.delayed.bucket.BucketSnapshotStorage;
+import
org.apache.pulsar.broker.delayed.bucket.RecoverDelayedDeliveryTrackerException;
+import org.apache.pulsar.broker.service.BrokerService;
import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class BucketDelayedDeliveryTrackerFactory implements
DelayedDeliveryTrackerFactory {
+ private static final Logger log =
LoggerFactory.getLogger(BucketDelayedDeliveryTrackerFactory.class);
BucketSnapshotStorage bucketSnapshotStorage;
@@ -73,8 +79,28 @@ public class BucketDelayedDeliveryTrackerFactory implements
DelayedDeliveryTrack
@Override
public DelayedDeliveryTracker
newTracker(PersistentDispatcherMultipleConsumers dispatcher) {
- return new BucketDelayedDeliveryTracker(dispatcher, timer,
tickTimeMillis, isDelayedDeliveryDeliverAtTimeStrict,
- bucketSnapshotStorage, delayedDeliveryMinIndexCountPerBucket,
+ String topicName = dispatcher.getTopic().getName();
+ String subscriptionName = dispatcher.getSubscription().getName();
+ BrokerService brokerService = dispatcher.getTopic().getBrokerService();
+ DelayedDeliveryTracker tracker;
+
+ try {
+ tracker = newTracker0(dispatcher);
+ } catch (RecoverDelayedDeliveryTrackerException ex) {
+ log.warn("Failed to recover BucketDelayedDeliveryTracker, fallback
to InMemoryDelayedDeliveryTracker."
+ + " topic {}, subscription {}", topicName,
subscriptionName, ex);
+ // If failed to create BucketDelayedDeliveryTracker, fallback to
InMemoryDelayedDeliveryTracker
+ brokerService.initializeFallbackDelayedDeliveryTrackerFactory();
+ tracker =
brokerService.getFallbackDelayedDeliveryTrackerFactory().newTracker(dispatcher);
+ }
+ return tracker;
+ }
+
+ @VisibleForTesting
+ BucketDelayedDeliveryTracker
newTracker0(PersistentDispatcherMultipleConsumers dispatcher)
+ throws RecoverDelayedDeliveryTrackerException {
+ return new BucketDelayedDeliveryTracker(dispatcher, timer,
tickTimeMillis,
+ isDelayedDeliveryDeliverAtTimeStrict, bucketSnapshotStorage,
delayedDeliveryMinIndexCountPerBucket,
TimeUnit.SECONDS.toMillis(delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds),
delayedDeliveryMaxIndexesPerBucketSnapshotSegment,
delayedDeliveryMaxNumBuckets);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
index 78229fef25a..893a4e59c3c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
@@ -85,4 +85,51 @@ public interface DelayedDeliveryTracker extends
AutoCloseable {
* Close the subscription tracker and release all resources.
*/
void close();
+
+ DelayedDeliveryTracker DISABLE = new DelayedDeliveryTracker() {
+ @Override
+ public boolean addMessage(long ledgerId, long entryId, long
deliveryAt) {
+ return false;
+ }
+
+ @Override
+ public boolean hasMessageAvailable() {
+ return false;
+ }
+
+ @Override
+ public long getNumberOfDelayedMessages() {
+ return 0;
+ }
+
+ @Override
+ public long getBufferMemoryUsage() {
+ return 0;
+ }
+
+ @Override
+ public NavigableSet<PositionImpl> getScheduledMessages(int
maxMessages) {
+ return null;
+ }
+
+ @Override
+ public boolean shouldPauseAllDeliveries() {
+ return false;
+ }
+
+ @Override
+ public void resetTickTime(long tickTime) {
+
+ }
+
+ @Override
+ public CompletableFuture<Void> clear() {
+ return null;
+ }
+
+ @Override
+ public void close() {
+
+ }
+ };
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java
index e7dc3f18f46..179cf74db41 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.delayed;
+import com.google.common.annotations.VisibleForTesting;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
@@ -25,8 +26,11 @@ import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class InMemoryDelayedDeliveryTrackerFactory implements
DelayedDeliveryTrackerFactory {
+ private static final Logger log =
LoggerFactory.getLogger(InMemoryDelayedDeliveryTrackerFactory.class);
private Timer timer;
@@ -48,6 +52,21 @@ public class InMemoryDelayedDeliveryTrackerFactory
implements DelayedDeliveryTra
@Override
public DelayedDeliveryTracker
newTracker(PersistentDispatcherMultipleConsumers dispatcher) {
+ String topicName = dispatcher.getTopic().getName();
+ String subscriptionName = dispatcher.getSubscription().getName();
+ DelayedDeliveryTracker tracker = DelayedDeliveryTracker.DISABLE;
+ try {
+ tracker = newTracker0(dispatcher);
+ } catch (Exception e) {
+ // it should never go here
+ log.warn("Failed to create InMemoryDelayedDeliveryTracker, topic
{}, subscription {}",
+ topicName, subscriptionName, e);
+ }
+ return tracker;
+ }
+
+ @VisibleForTesting
+ InMemoryDelayedDeliveryTracker
newTracker0(PersistentDispatcherMultipleConsumers dispatcher) {
return new InMemoryDelayedDeliveryTracker(dispatcher, timer,
tickTimeMillis,
isDelayedDeliveryDeliverAtTimeStrict,
fixedDelayDetectionLookahead);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index f98c9e000f1..1cbae674bd5 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -105,22 +105,24 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
private CompletableFuture<Void> pendingLoad = null;
public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers
dispatcher,
- Timer timer, long tickTimeMillis,
- boolean isDelayedDeliveryDeliverAtTimeStrict,
- BucketSnapshotStorage bucketSnapshotStorage,
- long minIndexCountPerBucket, long
timeStepPerBucketSnapshotSegmentInMillis,
- int maxIndexesPerBucketSnapshotSegment, int
maxNumBuckets) {
+ Timer timer, long tickTimeMillis,
+ boolean
isDelayedDeliveryDeliverAtTimeStrict,
+ BucketSnapshotStorage
bucketSnapshotStorage,
+ long minIndexCountPerBucket, long
timeStepPerBucketSnapshotSegmentInMillis,
+ int
maxIndexesPerBucketSnapshotSegment, int maxNumBuckets)
+ throws RecoverDelayedDeliveryTrackerException {
this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(),
isDelayedDeliveryDeliverAtTimeStrict,
bucketSnapshotStorage, minIndexCountPerBucket,
timeStepPerBucketSnapshotSegmentInMillis,
maxIndexesPerBucketSnapshotSegment, maxNumBuckets);
}
public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers
dispatcher,
- Timer timer, long tickTimeMillis, Clock clock,
- boolean isDelayedDeliveryDeliverAtTimeStrict,
- BucketSnapshotStorage bucketSnapshotStorage,
- long minIndexCountPerBucket, long
timeStepPerBucketSnapshotSegmentInMillis,
- int maxIndexesPerBucketSnapshotSegment, int
maxNumBuckets) {
+ Timer timer, long tickTimeMillis,
Clock clock,
+ boolean
isDelayedDeliveryDeliverAtTimeStrict,
+ BucketSnapshotStorage
bucketSnapshotStorage,
+ long minIndexCountPerBucket, long
timeStepPerBucketSnapshotSegmentInMillis,
+ int
maxIndexesPerBucketSnapshotSegment, int maxNumBuckets)
+ throws RecoverDelayedDeliveryTrackerException {
super(dispatcher, timer, tickTimeMillis, clock,
isDelayedDeliveryDeliverAtTimeStrict);
this.minIndexCountPerBucket = minIndexCountPerBucket;
this.timeStepPerBucketSnapshotSegmentInMillis =
timeStepPerBucketSnapshotSegmentInMillis;
@@ -133,10 +135,17 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
new MutableBucket(dispatcher.getName(),
dispatcher.getCursor(), FutureUtil.Sequencer.create(),
bucketSnapshotStorage);
this.stats = new BucketDelayedMessageIndexStats();
- this.numberDelayedMessages = recoverBucketSnapshot();
+
+ // Close the tracker if failed to recover.
+ try {
+ this.numberDelayedMessages = recoverBucketSnapshot();
+ } catch (RecoverDelayedDeliveryTrackerException e) {
+ close();
+ throw e;
+ }
}
- private synchronized long recoverBucketSnapshot() throws RuntimeException {
+ private synchronized long recoverBucketSnapshot() throws
RecoverDelayedDeliveryTrackerException {
ManagedCursor cursor = this.lastMutableBucket.getCursor();
Map<String, String> cursorProperties = cursor.getCursorProperties();
if (MapUtils.isEmpty(cursorProperties)) {
@@ -181,7 +190,7 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
- throw new RuntimeException(e);
+ throw new RecoverDelayedDeliveryTrackerException(e);
}
for (Map.Entry<Range<Long>, CompletableFuture<List<DelayedIndex>>>
entry : futures.entrySet()) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/RecoverDelayedDeliveryTrackerException.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/RecoverDelayedDeliveryTrackerException.java
new file mode 100644
index 00000000000..71a851100fe
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/RecoverDelayedDeliveryTrackerException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+public class RecoverDelayedDeliveryTrackerException extends Exception {
+ public RecoverDelayedDeliveryTrackerException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 1f322926aa9..8981825b7af 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -104,6 +104,7 @@ import
org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.cache.BundlesQuotas;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerLoader;
+import org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTrackerFactory;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl;
import org.apache.pulsar.broker.loadbalance.LoadManager;
@@ -282,10 +283,11 @@ public class BrokerService implements Closeable {
private final AtomicBoolean blockedDispatcherOnHighUnackedMsgs = new
AtomicBoolean(false);
private final ConcurrentOpenHashSet<PersistentDispatcherMultipleConsumers>
blockedDispatchers;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
-
- @Getter
@VisibleForTesting
private final DelayedDeliveryTrackerFactory delayedDeliveryTrackerFactory;
+ // InMemoryDelayedDeliveryTrackerFactory is for the purpose of
+ // fallback if recover BucketDelayedDeliveryTracker failed.
+ private volatile DelayedDeliveryTrackerFactory
fallbackDelayedDeliveryTrackerFactory;
private final ServerBootstrap defaultServerBootstrap;
private final List<EventLoopGroup> protocolHandlersWorkerGroups = new
ArrayList<>();
@@ -846,6 +848,9 @@ public class BrokerService implements Closeable {
pendingLookupOperationsCounter.close();
try {
delayedDeliveryTrackerFactory.close();
+ if (fallbackDelayedDeliveryTrackerFactory
!= null) {
+
fallbackDelayedDeliveryTrackerFactory.close();
+ }
} catch (Exception e) {
log.warn("Error in closing
delayedDeliveryTrackerFactory", e);
}
@@ -3399,6 +3404,25 @@ public class BrokerService implements Closeable {
}
}
+ /**
+ * Initializes the in-memory delayed delivery tracker factory when
+ * BucketDelayedDeliveryTrackerFactory.newTracker failed.
+ */
+ public synchronized void initializeFallbackDelayedDeliveryTrackerFactory()
{
+ if (fallbackDelayedDeliveryTrackerFactory != null) {
+ return;
+ }
+
+ DelayedDeliveryTrackerFactory factory = new
InMemoryDelayedDeliveryTrackerFactory();
+ try {
+ factory.initialize(pulsar);
+ this.fallbackDelayedDeliveryTrackerFactory = factory;
+ } catch (Exception e) {
+ // it should never go here
+ log.error("Failed to initialize
InMemoryDelayedDeliveryTrackerFactory", e);
+ }
+ }
+
private static class ConfigField {
// field holds the pulsar dynamic configuration.
final Field field;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index f20750fa0c2..01728f94be7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -1165,15 +1165,15 @@ public class PersistentDispatcherMultipleConsumers
extends AbstractDispatcherMul
}
synchronized (this) {
- if (!delayedDeliveryTracker.isPresent()) {
+ if (delayedDeliveryTracker.isEmpty()) {
if (!msgMetadata.hasDeliverAtTime()) {
// No need to initialize the tracker here
return false;
}
// Initialize the tracker the first time we need to use it
- delayedDeliveryTracker = Optional
-
.of(topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this));
+ delayedDeliveryTracker = Optional.of(
+
topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this));
}
delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis());
@@ -1327,5 +1327,10 @@ public class PersistentDispatcherMultipleConsumers
extends AbstractDispatcherMul
return
StickyKeyConsumerSelector.makeStickyKeyHash(peekStickyKey(entry.getDataBuffer()));
}
+
+ public Subscription getSubscription() {
+ return subscription;
+ }
+
private static final Logger log =
LoggerFactory.getLogger(PersistentDispatcherMultipleConsumers.class);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerFactoryTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerFactoryTest.java
new file mode 100644
index 00000000000..bb6ef9d3636
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerFactoryTest.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed;
+
+import lombok.Cleanup;
+import org.apache.commons.lang3.tuple.Pair;
+import
org.apache.pulsar.broker.delayed.bucket.RecoverDelayedDeliveryTrackerException;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.broker.service.Topic;
+import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.awaitility.Awaitility;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Field;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class DelayedDeliveryTrackerFactoryTest extends ProducerConsumerBase {
+ @BeforeClass
+ @Override
+ public void setup() throws Exception {
+
conf.setDelayedDeliveryTrackerFactoryClassName(BucketDelayedDeliveryTrackerFactory.class.getName());
+ conf.setDelayedDeliveryMaxNumBuckets(10);
+ conf.setDelayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds(1);
+ conf.setDelayedDeliveryMaxIndexesPerBucketSnapshotSegment(10);
+ conf.setDelayedDeliveryMinIndexCountPerBucket(50);
+ conf.setDelayedDeliveryTickTimeMillis(1024);
+ conf.setDispatcherReadFailureBackoffInitialTimeInMs(1000);
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @Override
+ @AfterClass(alwaysRun = true)
+ public void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testFallbackToInMemoryTracker() throws Exception {
+ Pair<BrokerService, PersistentDispatcherMultipleConsumers> pair =
+ mockDelayedDeliveryTrackerFactoryAndDispatcher();
+ BrokerService brokerService = pair.getLeft();
+ PersistentDispatcherMultipleConsumers dispatcher = pair.getRight();
+
+ // Since Mocked BucketDelayedDeliveryTrackerFactory.newTracker0()
throws RecoverDelayedDeliveryTrackerException,
+ // the factory should be fallback to
InMemoryDelayedDeliveryTrackerFactory
+ @Cleanup
+ DelayedDeliveryTracker tracker =
brokerService.getDelayedDeliveryTrackerFactory().newTracker(dispatcher);
+ Assert.assertTrue(tracker instanceof InMemoryDelayedDeliveryTracker);
+
+ DelayedDeliveryTrackerFactory fallbackFactory =
brokerService.getFallbackDelayedDeliveryTrackerFactory();
+ Assert.assertTrue(fallbackFactory instanceof
InMemoryDelayedDeliveryTrackerFactory);
+ }
+
+
+ private Pair<BrokerService, PersistentDispatcherMultipleConsumers>
mockDelayedDeliveryTrackerFactoryAndDispatcher()
+ throws Exception {
+ BrokerService brokerService = Mockito.spy(pulsar.getBrokerService());
+
+ // Mock dispatcher
+ PersistentDispatcherMultipleConsumers dispatcher =
Mockito.mock(PersistentDispatcherMultipleConsumers.class);
+ Mockito.doReturn("test").when(dispatcher).getName();
+ // Mock BucketDelayedDeliveryTrackerFactory
+ @Cleanup
+ BucketDelayedDeliveryTrackerFactory factory = new
BucketDelayedDeliveryTrackerFactory();
+ factory = Mockito.spy(factory);
+ factory.initialize(pulsar);
+ Mockito.doThrow(new RecoverDelayedDeliveryTrackerException(new
RuntimeException()))
+ .when(factory).newTracker0(Mockito.eq(dispatcher));
+ // Mock brokerService
+
Mockito.doReturn(factory).when(brokerService).getDelayedDeliveryTrackerFactory();
+ // Mock topic and subscription
+ PersistentTopic topic = Mockito.mock(PersistentTopic.class);
+ Mockito.doReturn(brokerService).when(topic).getBrokerService();
+ Subscription subscription = Mockito.mock(Subscription.class);
+ Mockito.doReturn("topic").when(topic).getName();
+ Mockito.doReturn("sub").when(subscription).getName();
+ Mockito.doReturn(topic).when(dispatcher).getTopic();
+ Mockito.doReturn(subscription).when(dispatcher).getSubscription();
+
+ return Pair.of(brokerService, dispatcher);
+ }
+
+ @Test
+ public void testFallbackToInMemoryTrackerFactoryFailed() throws Exception {
+ Pair<BrokerService, PersistentDispatcherMultipleConsumers> pair =
+ mockDelayedDeliveryTrackerFactoryAndDispatcher();
+ BrokerService brokerService = pair.getLeft();
+ PersistentDispatcherMultipleConsumers dispatcher = pair.getRight();
+
+ // Mock InMemoryDelayedDeliveryTrackerFactory
+ @Cleanup
+ InMemoryDelayedDeliveryTrackerFactory factory = new
InMemoryDelayedDeliveryTrackerFactory();
+ factory = Mockito.spy(factory);
+ factory.initialize(pulsar);
+ // Mock InMemoryDelayedDeliveryTrackerFactory.newTracker0() throws
RuntimeException
+ Mockito.doThrow(new
RuntimeException()).when(factory).newTracker0(Mockito.eq(dispatcher));
+
+ // Mock brokerService to return mocked
InMemoryDelayedDeliveryTrackerFactory
+ Mockito.doAnswer(inv ->
null).when(brokerService).initializeFallbackDelayedDeliveryTrackerFactory();
+
Mockito.doReturn(factory).when(brokerService).getFallbackDelayedDeliveryTrackerFactory();
+
+ // Since Mocked BucketDelayedDeliveryTrackerFactory.newTracker0()
throws RecoverDelayedDeliveryTrackerException,
+ // and Mocked InMemoryDelayedDeliveryTrackerFactory.newTracker0()
throws RuntimeException,
+ // the tracker instance should be DelayedDeliveryTracker.DISABLE
+ @Cleanup
+ DelayedDeliveryTracker tracker =
brokerService.getDelayedDeliveryTrackerFactory().newTracker(dispatcher);
+ Assert.assertEquals(tracker, DelayedDeliveryTracker.DISABLE);
+ }
+
+ // 1. Create BucketDelayedDeliveryTracker failed, fallback to
InMemoryDelayedDeliveryTracker,
+ // 2. Publish delay messages
+ @Test(timeOut = 60_000)
+ public void
testPublishDelayMessagesAndCreateBucketDelayDeliveryTrackerFailed() throws
Exception {
+ String topicName = "persistent://public/default/" + UUID.randomUUID();
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topicName)
+ .enableBatching(false)
+ .create();
+
+ // Mock BucketDelayedDeliveryTrackerFactory.newTracker0() throws
RecoverDelayedDeliveryTrackerException
+ PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topicName).get();
+ topic = Mockito.spy(topic);
+ BrokerService brokerService = Mockito.spy(pulsar.getBrokerService());
+ BucketDelayedDeliveryTrackerFactory factory =
+ (BucketDelayedDeliveryTrackerFactory)
Mockito.spy(brokerService.getDelayedDeliveryTrackerFactory());
+ Mockito.doThrow(new RecoverDelayedDeliveryTrackerException(new
RuntimeException()))
+ .when(factory).newTracker0(Mockito.any());
+
Mockito.doReturn(factory).when(brokerService).getDelayedDeliveryTrackerFactory();
+
+ // Return mocked BrokerService
+ Mockito.doReturn(brokerService).when(topic).getBrokerService();
+
+ // Set Mocked topic to BrokerService
+ Field topics = BrokerService.class.getDeclaredField("topics");
+ topics.setAccessible(true);
+ @SuppressWarnings("unchecked")
+ ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>>
topicMap =
+ (ConcurrentOpenHashMap<String,
CompletableFuture<Optional<Topic>>>) topics.get(brokerService);
+ topicMap.put(topicName,
CompletableFuture.completedFuture(Optional.of(topic)));
+
+ // Create consumer
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName("sub")
+ .subscriptionType(SubscriptionType.Shared)
+ .messageListener((c, msg) -> {
+ try {
+ c.acknowledge(msg);
+ } catch (PulsarClientException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .subscribe();
+
+ PersistentSubscription subscription = topic.getSubscription("sub");
+ Dispatcher dispatcher = subscription.getDispatcher();
+ Assert.assertTrue(dispatcher instanceof
PersistentDispatcherMultipleConsumers);
+
+ // Publish a delay message to initialize DelayedDeliveryTracker
+ producer.newMessage().value("test").deliverAfter(10_000,
TimeUnit.MILLISECONDS).send();
+
+ // Get DelayedDeliveryTracker from Dispatcher
+ PersistentDispatcherMultipleConsumers dispatcher0 =
(PersistentDispatcherMultipleConsumers) dispatcher;
+ Field trackerField =
+
PersistentDispatcherMultipleConsumers.class.getDeclaredField("delayedDeliveryTracker");
+ trackerField.setAccessible(true);
+
+ AtomicReference<Optional<DelayedDeliveryTracker>> reference = new
AtomicReference<>();
+ // Wait until DelayedDeliveryTracker is initialized
+ Awaitility.await().atMost(Duration.ofSeconds(20)).until(() -> {
+ @SuppressWarnings("unchecked")
+ Optional<DelayedDeliveryTracker> optional =
+ (Optional<DelayedDeliveryTracker>)
trackerField.get(dispatcher0);
+ if (optional.isPresent()) {
+ reference.set(optional);
+ return true;
+ }
+ return false;
+ });
+
+ Optional<DelayedDeliveryTracker> optional = reference.get();
+ Assert.assertTrue(optional.get() instanceof
InMemoryDelayedDeliveryTracker);
+
+ // Mock DelayedDeliveryTracker and Count the number of addMessage()
calls
+ AtomicInteger counter = new AtomicInteger(0);
+ InMemoryDelayedDeliveryTracker tracker =
(InMemoryDelayedDeliveryTracker) optional.get();
+ tracker = Mockito.spy(tracker);
+ Mockito.doAnswer(inv -> {
+ counter.incrementAndGet();
+ return inv.callRealMethod();
+ }).when(tracker).addMessage(Mockito.anyLong(), Mockito.anyLong(),
Mockito.anyLong());
+ // Set Mocked InMemoryDelayedDeliveryTracker back to Dispatcher
+ trackerField.set(dispatcher0, Optional.of(tracker));
+
+ // Publish 10 delay messages, so the counter should be 10
+ for (int i = 0; i < 10; i++) {
+ producer.newMessage().value("test")
+ .deliverAfter(10_000, TimeUnit.MILLISECONDS).send();
+ }
+
+ try {
+ Awaitility.await().atMost(Duration.ofSeconds(20)).until(() ->
counter.get() == 10);
+ } finally {
+ consumer.close();
+ }
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
index 1e3e72aa0ec..eef10e49cba 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
@@ -182,7 +182,7 @@ public class BucketDelayedDeliveryTrackerTest extends
AbstractDeliveryTrackerTes
}
@Test(dataProvider = "delayedTracker", invocationCount = 10)
- public void testRecoverSnapshot(BucketDelayedDeliveryTracker tracker) {
+ public void testRecoverSnapshot(BucketDelayedDeliveryTracker tracker)
throws Exception {
for (int i = 1; i <= 100; i++) {
tracker.addMessage(i, i, i * 10);
}
@@ -265,7 +265,7 @@ public class BucketDelayedDeliveryTrackerTest extends
AbstractDeliveryTrackerTes
}
@Test(dataProvider = "delayedTracker")
- public void testMergeSnapshot(final BucketDelayedDeliveryTracker tracker) {
+ public void testMergeSnapshot(final BucketDelayedDeliveryTracker tracker)
throws Exception {
for (int i = 1; i <= 110; i++) {
tracker.addMessage(i, i, i * 10);
Awaitility.await().untilAsserted(() -> {
@@ -318,7 +318,7 @@ public class BucketDelayedDeliveryTrackerTest extends
AbstractDeliveryTrackerTes
}
@Test(dataProvider = "delayedTracker")
- public void testWithBkException(final BucketDelayedDeliveryTracker
tracker) {
+ public void testWithBkException(final BucketDelayedDeliveryTracker
tracker) throws Exception {
MockBucketSnapshotStorage mockBucketSnapshotStorage =
(MockBucketSnapshotStorage) bucketSnapshotStorage;
mockBucketSnapshotStorage.injectCreateException(
new BucketSnapshotPersistenceException("Bookie operation
timeout, op: Create entry"));