This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 257ca7f6286dd39009b675f794e324adafcde985 Author: 道君 <[email protected]> AuthorDate: Wed Jul 24 14:40:03 2024 +0800 [fix][broker] Handle BucketDelayedDeliveryTracker recover failed (#22735) (cherry picked from commit 1c53841cc7f585bdd8ff6702d74f37491d8cc9c6) --- .../BucketDelayedDeliveryTrackerFactory.java | 30 ++- .../broker/delayed/DelayedDeliveryTracker.java | 47 ++++ .../InMemoryDelayedDeliveryTrackerFactory.java | 19 ++ .../bucket/BucketDelayedDeliveryTracker.java | 35 +-- .../RecoverDelayedDeliveryTrackerException.java | 25 +++ .../pulsar/broker/service/BrokerService.java | 28 ++- .../PersistentDispatcherMultipleConsumers.java | 11 +- .../delayed/DelayedDeliveryTrackerFactoryTest.java | 242 +++++++++++++++++++++ .../bucket/BucketDelayedDeliveryTrackerTest.java | 6 +- 9 files changed, 420 insertions(+), 23 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java index 33076fd51a8..69a08bd2be4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.delayed; +import com.google.common.annotations.VisibleForTesting; import io.netty.util.HashedWheelTimer; import io.netty.util.Timer; import io.netty.util.concurrent.DefaultThreadFactory; @@ -33,10 +34,15 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.delayed.bucket.BookkeeperBucketSnapshotStorage; import org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker; import org.apache.pulsar.broker.delayed.bucket.BucketSnapshotStorage; +import org.apache.pulsar.broker.delayed.bucket.RecoverDelayedDeliveryTrackerException; +import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.apache.pulsar.common.util.FutureUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class BucketDelayedDeliveryTrackerFactory implements DelayedDeliveryTrackerFactory { + private static final Logger log = LoggerFactory.getLogger(BucketDelayedDeliveryTrackerFactory.class); BucketSnapshotStorage bucketSnapshotStorage; @@ -73,8 +79,28 @@ public class BucketDelayedDeliveryTrackerFactory implements DelayedDeliveryTrack @Override public DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers dispatcher) { - return new BucketDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis, isDelayedDeliveryDeliverAtTimeStrict, - bucketSnapshotStorage, delayedDeliveryMinIndexCountPerBucket, + String topicName = dispatcher.getTopic().getName(); + String subscriptionName = dispatcher.getSubscription().getName(); + BrokerService brokerService = dispatcher.getTopic().getBrokerService(); + DelayedDeliveryTracker tracker; + + try { + tracker = newTracker0(dispatcher); + } catch (RecoverDelayedDeliveryTrackerException ex) { + log.warn("Failed to recover BucketDelayedDeliveryTracker, fallback to InMemoryDelayedDeliveryTracker." + + " topic {}, subscription {}", topicName, subscriptionName, ex); + // If failed to create BucketDelayedDeliveryTracker, fallback to InMemoryDelayedDeliveryTracker + brokerService.initializeFallbackDelayedDeliveryTrackerFactory(); + tracker = brokerService.getFallbackDelayedDeliveryTrackerFactory().newTracker(dispatcher); + } + return tracker; + } + + @VisibleForTesting + BucketDelayedDeliveryTracker newTracker0(PersistentDispatcherMultipleConsumers dispatcher) + throws RecoverDelayedDeliveryTrackerException { + return new BucketDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis, + isDelayedDeliveryDeliverAtTimeStrict, bucketSnapshotStorage, delayedDeliveryMinIndexCountPerBucket, TimeUnit.SECONDS.toMillis(delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds), delayedDeliveryMaxIndexesPerBucketSnapshotSegment, delayedDeliveryMaxNumBuckets); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java index 78229fef25a..893a4e59c3c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java @@ -85,4 +85,51 @@ public interface DelayedDeliveryTracker extends AutoCloseable { * Close the subscription tracker and release all resources. */ void close(); + + DelayedDeliveryTracker DISABLE = new DelayedDeliveryTracker() { + @Override + public boolean addMessage(long ledgerId, long entryId, long deliveryAt) { + return false; + } + + @Override + public boolean hasMessageAvailable() { + return false; + } + + @Override + public long getNumberOfDelayedMessages() { + return 0; + } + + @Override + public long getBufferMemoryUsage() { + return 0; + } + + @Override + public NavigableSet<PositionImpl> getScheduledMessages(int maxMessages) { + return null; + } + + @Override + public boolean shouldPauseAllDeliveries() { + return false; + } + + @Override + public void resetTickTime(long tickTime) { + + } + + @Override + public CompletableFuture<Void> clear() { + return null; + } + + @Override + public void close() { + + } + }; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java index e7dc3f18f46..179cf74db41 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.delayed; +import com.google.common.annotations.VisibleForTesting; import io.netty.util.HashedWheelTimer; import io.netty.util.Timer; import io.netty.util.concurrent.DefaultThreadFactory; @@ -25,8 +26,11 @@ import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class InMemoryDelayedDeliveryTrackerFactory implements DelayedDeliveryTrackerFactory { + private static final Logger log = LoggerFactory.getLogger(InMemoryDelayedDeliveryTrackerFactory.class); private Timer timer; @@ -48,6 +52,21 @@ public class InMemoryDelayedDeliveryTrackerFactory implements DelayedDeliveryTra @Override public DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers dispatcher) { + String topicName = dispatcher.getTopic().getName(); + String subscriptionName = dispatcher.getSubscription().getName(); + DelayedDeliveryTracker tracker = DelayedDeliveryTracker.DISABLE; + try { + tracker = newTracker0(dispatcher); + } catch (Exception e) { + // it should never go here + log.warn("Failed to create InMemoryDelayedDeliveryTracker, topic {}, subscription {}", + topicName, subscriptionName, e); + } + return tracker; + } + + @VisibleForTesting + InMemoryDelayedDeliveryTracker newTracker0(PersistentDispatcherMultipleConsumers dispatcher) { return new InMemoryDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis, isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index f98c9e000f1..1cbae674bd5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -105,22 +105,24 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker private CompletableFuture<Void> pendingLoad = null; public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, - Timer timer, long tickTimeMillis, - boolean isDelayedDeliveryDeliverAtTimeStrict, - BucketSnapshotStorage bucketSnapshotStorage, - long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegmentInMillis, - int maxIndexesPerBucketSnapshotSegment, int maxNumBuckets) { + Timer timer, long tickTimeMillis, + boolean isDelayedDeliveryDeliverAtTimeStrict, + BucketSnapshotStorage bucketSnapshotStorage, + long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegmentInMillis, + int maxIndexesPerBucketSnapshotSegment, int maxNumBuckets) + throws RecoverDelayedDeliveryTrackerException { this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict, bucketSnapshotStorage, minIndexCountPerBucket, timeStepPerBucketSnapshotSegmentInMillis, maxIndexesPerBucketSnapshotSegment, maxNumBuckets); } public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, - Timer timer, long tickTimeMillis, Clock clock, - boolean isDelayedDeliveryDeliverAtTimeStrict, - BucketSnapshotStorage bucketSnapshotStorage, - long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegmentInMillis, - int maxIndexesPerBucketSnapshotSegment, int maxNumBuckets) { + Timer timer, long tickTimeMillis, Clock clock, + boolean isDelayedDeliveryDeliverAtTimeStrict, + BucketSnapshotStorage bucketSnapshotStorage, + long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegmentInMillis, + int maxIndexesPerBucketSnapshotSegment, int maxNumBuckets) + throws RecoverDelayedDeliveryTrackerException { super(dispatcher, timer, tickTimeMillis, clock, isDelayedDeliveryDeliverAtTimeStrict); this.minIndexCountPerBucket = minIndexCountPerBucket; this.timeStepPerBucketSnapshotSegmentInMillis = timeStepPerBucketSnapshotSegmentInMillis; @@ -133,10 +135,17 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker new MutableBucket(dispatcher.getName(), dispatcher.getCursor(), FutureUtil.Sequencer.create(), bucketSnapshotStorage); this.stats = new BucketDelayedMessageIndexStats(); - this.numberDelayedMessages = recoverBucketSnapshot(); + + // Close the tracker if failed to recover. + try { + this.numberDelayedMessages = recoverBucketSnapshot(); + } catch (RecoverDelayedDeliveryTrackerException e) { + close(); + throw e; + } } - private synchronized long recoverBucketSnapshot() throws RuntimeException { + private synchronized long recoverBucketSnapshot() throws RecoverDelayedDeliveryTrackerException { ManagedCursor cursor = this.lastMutableBucket.getCursor(); Map<String, String> cursorProperties = cursor.getCursorProperties(); if (MapUtils.isEmpty(cursorProperties)) { @@ -181,7 +190,7 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); } - throw new RuntimeException(e); + throw new RecoverDelayedDeliveryTrackerException(e); } for (Map.Entry<Range<Long>, CompletableFuture<List<DelayedIndex>>> entry : futures.entrySet()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/RecoverDelayedDeliveryTrackerException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/RecoverDelayedDeliveryTrackerException.java new file mode 100644 index 00000000000..71a851100fe --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/RecoverDelayedDeliveryTrackerException.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed.bucket; + +public class RecoverDelayedDeliveryTrackerException extends Exception { + public RecoverDelayedDeliveryTrackerException(Throwable cause) { + super(cause); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index ca3c9834be0..0254d8087d0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -104,6 +104,7 @@ import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.cache.BundlesQuotas; import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory; import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerLoader; +import org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTrackerFactory; import org.apache.pulsar.broker.intercept.BrokerInterceptor; import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl; import org.apache.pulsar.broker.loadbalance.LoadManager; @@ -272,10 +273,11 @@ public class BrokerService implements Closeable { private static final AtomicBoolean blockedDispatcherOnHighUnackedMsgs = new AtomicBoolean(false); private final ConcurrentOpenHashSet<PersistentDispatcherMultipleConsumers> blockedDispatchers; private final ReadWriteLock lock = new ReentrantReadWriteLock(); - - @Getter @VisibleForTesting private final DelayedDeliveryTrackerFactory delayedDeliveryTrackerFactory; + // InMemoryDelayedDeliveryTrackerFactory is for the purpose of + // fallback if recover BucketDelayedDeliveryTracker failed. + private volatile DelayedDeliveryTrackerFactory fallbackDelayedDeliveryTrackerFactory; private final ServerBootstrap defaultServerBootstrap; private final List<EventLoopGroup> protocolHandlersWorkerGroups = new ArrayList<>(); @@ -884,6 +886,9 @@ public class BrokerService implements Closeable { pulsarStats.close(); try { delayedDeliveryTrackerFactory.close(); + if (fallbackDelayedDeliveryTrackerFactory != null) { + fallbackDelayedDeliveryTrackerFactory.close(); + } } catch (Exception e) { log.warn("Error in closing delayedDeliveryTrackerFactory", e); } @@ -3457,6 +3462,25 @@ public class BrokerService implements Closeable { } } + /** + * Initializes the in-memory delayed delivery tracker factory when + * BucketDelayedDeliveryTrackerFactory.newTracker failed. + */ + public synchronized void initializeFallbackDelayedDeliveryTrackerFactory() { + if (fallbackDelayedDeliveryTrackerFactory != null) { + return; + } + + DelayedDeliveryTrackerFactory factory = new InMemoryDelayedDeliveryTrackerFactory(); + try { + factory.initialize(pulsar); + this.fallbackDelayedDeliveryTrackerFactory = factory; + } catch (Exception e) { + // it should never go here + log.error("Failed to initialize InMemoryDelayedDeliveryTrackerFactory", e); + } + } + private static class ConfigField { final Field field; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 2f99d57aa45..a04b1fc3ad1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -1083,15 +1083,15 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul } synchronized (this) { - if (!delayedDeliveryTracker.isPresent()) { + if (delayedDeliveryTracker.isEmpty()) { if (!msgMetadata.hasDeliverAtTime()) { // No need to initialize the tracker here return false; } // Initialize the tracker the first time we need to use it - delayedDeliveryTracker = Optional - .of(topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this)); + delayedDeliveryTracker = Optional.of( + topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this)); } delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis()); @@ -1245,5 +1245,10 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul return StickyKeyConsumerSelector.makeStickyKeyHash(peekStickyKey(entry.getDataBuffer())); } + + public Subscription getSubscription() { + return subscription; + } + private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherMultipleConsumers.class); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerFactoryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerFactoryTest.java new file mode 100644 index 00000000000..bb6ef9d3636 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerFactoryTest.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed; + +import lombok.Cleanup; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.delayed.bucket.RecoverDelayedDeliveryTrackerException; +import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.Dispatcher; +import org.apache.pulsar.broker.service.Subscription; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.api.*; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; +import org.awaitility.Awaitility; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.lang.reflect.Field; +import java.time.Duration; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +public class DelayedDeliveryTrackerFactoryTest extends ProducerConsumerBase { + @BeforeClass + @Override + public void setup() throws Exception { + conf.setDelayedDeliveryTrackerFactoryClassName(BucketDelayedDeliveryTrackerFactory.class.getName()); + conf.setDelayedDeliveryMaxNumBuckets(10); + conf.setDelayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds(1); + conf.setDelayedDeliveryMaxIndexesPerBucketSnapshotSegment(10); + conf.setDelayedDeliveryMinIndexCountPerBucket(50); + conf.setDelayedDeliveryTickTimeMillis(1024); + conf.setDispatcherReadFailureBackoffInitialTimeInMs(1000); + super.internalSetup(); + super.producerBaseSetup(); + } + + @Override + @AfterClass(alwaysRun = true) + public void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testFallbackToInMemoryTracker() throws Exception { + Pair<BrokerService, PersistentDispatcherMultipleConsumers> pair = + mockDelayedDeliveryTrackerFactoryAndDispatcher(); + BrokerService brokerService = pair.getLeft(); + PersistentDispatcherMultipleConsumers dispatcher = pair.getRight(); + + // Since Mocked BucketDelayedDeliveryTrackerFactory.newTracker0() throws RecoverDelayedDeliveryTrackerException, + // the factory should be fallback to InMemoryDelayedDeliveryTrackerFactory + @Cleanup + DelayedDeliveryTracker tracker = brokerService.getDelayedDeliveryTrackerFactory().newTracker(dispatcher); + Assert.assertTrue(tracker instanceof InMemoryDelayedDeliveryTracker); + + DelayedDeliveryTrackerFactory fallbackFactory = brokerService.getFallbackDelayedDeliveryTrackerFactory(); + Assert.assertTrue(fallbackFactory instanceof InMemoryDelayedDeliveryTrackerFactory); + } + + + private Pair<BrokerService, PersistentDispatcherMultipleConsumers> mockDelayedDeliveryTrackerFactoryAndDispatcher() + throws Exception { + BrokerService brokerService = Mockito.spy(pulsar.getBrokerService()); + + // Mock dispatcher + PersistentDispatcherMultipleConsumers dispatcher = Mockito.mock(PersistentDispatcherMultipleConsumers.class); + Mockito.doReturn("test").when(dispatcher).getName(); + // Mock BucketDelayedDeliveryTrackerFactory + @Cleanup + BucketDelayedDeliveryTrackerFactory factory = new BucketDelayedDeliveryTrackerFactory(); + factory = Mockito.spy(factory); + factory.initialize(pulsar); + Mockito.doThrow(new RecoverDelayedDeliveryTrackerException(new RuntimeException())) + .when(factory).newTracker0(Mockito.eq(dispatcher)); + // Mock brokerService + Mockito.doReturn(factory).when(brokerService).getDelayedDeliveryTrackerFactory(); + // Mock topic and subscription + PersistentTopic topic = Mockito.mock(PersistentTopic.class); + Mockito.doReturn(brokerService).when(topic).getBrokerService(); + Subscription subscription = Mockito.mock(Subscription.class); + Mockito.doReturn("topic").when(topic).getName(); + Mockito.doReturn("sub").when(subscription).getName(); + Mockito.doReturn(topic).when(dispatcher).getTopic(); + Mockito.doReturn(subscription).when(dispatcher).getSubscription(); + + return Pair.of(brokerService, dispatcher); + } + + @Test + public void testFallbackToInMemoryTrackerFactoryFailed() throws Exception { + Pair<BrokerService, PersistentDispatcherMultipleConsumers> pair = + mockDelayedDeliveryTrackerFactoryAndDispatcher(); + BrokerService brokerService = pair.getLeft(); + PersistentDispatcherMultipleConsumers dispatcher = pair.getRight(); + + // Mock InMemoryDelayedDeliveryTrackerFactory + @Cleanup + InMemoryDelayedDeliveryTrackerFactory factory = new InMemoryDelayedDeliveryTrackerFactory(); + factory = Mockito.spy(factory); + factory.initialize(pulsar); + // Mock InMemoryDelayedDeliveryTrackerFactory.newTracker0() throws RuntimeException + Mockito.doThrow(new RuntimeException()).when(factory).newTracker0(Mockito.eq(dispatcher)); + + // Mock brokerService to return mocked InMemoryDelayedDeliveryTrackerFactory + Mockito.doAnswer(inv -> null).when(brokerService).initializeFallbackDelayedDeliveryTrackerFactory(); + Mockito.doReturn(factory).when(brokerService).getFallbackDelayedDeliveryTrackerFactory(); + + // Since Mocked BucketDelayedDeliveryTrackerFactory.newTracker0() throws RecoverDelayedDeliveryTrackerException, + // and Mocked InMemoryDelayedDeliveryTrackerFactory.newTracker0() throws RuntimeException, + // the tracker instance should be DelayedDeliveryTracker.DISABLE + @Cleanup + DelayedDeliveryTracker tracker = brokerService.getDelayedDeliveryTrackerFactory().newTracker(dispatcher); + Assert.assertEquals(tracker, DelayedDeliveryTracker.DISABLE); + } + + // 1. Create BucketDelayedDeliveryTracker failed, fallback to InMemoryDelayedDeliveryTracker, + // 2. Publish delay messages + @Test(timeOut = 60_000) + public void testPublishDelayMessagesAndCreateBucketDelayDeliveryTrackerFailed() throws Exception { + String topicName = "persistent://public/default/" + UUID.randomUUID(); + + @Cleanup + Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .topic(topicName) + .enableBatching(false) + .create(); + + // Mock BucketDelayedDeliveryTrackerFactory.newTracker0() throws RecoverDelayedDeliveryTrackerException + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); + topic = Mockito.spy(topic); + BrokerService brokerService = Mockito.spy(pulsar.getBrokerService()); + BucketDelayedDeliveryTrackerFactory factory = + (BucketDelayedDeliveryTrackerFactory) Mockito.spy(brokerService.getDelayedDeliveryTrackerFactory()); + Mockito.doThrow(new RecoverDelayedDeliveryTrackerException(new RuntimeException())) + .when(factory).newTracker0(Mockito.any()); + Mockito.doReturn(factory).when(brokerService).getDelayedDeliveryTrackerFactory(); + + // Return mocked BrokerService + Mockito.doReturn(brokerService).when(topic).getBrokerService(); + + // Set Mocked topic to BrokerService + Field topics = BrokerService.class.getDeclaredField("topics"); + topics.setAccessible(true); + @SuppressWarnings("unchecked") + ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topicMap = + (ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>>) topics.get(brokerService); + topicMap.put(topicName, CompletableFuture.completedFuture(Optional.of(topic))); + + // Create consumer + Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topicName) + .subscriptionName("sub") + .subscriptionType(SubscriptionType.Shared) + .messageListener((c, msg) -> { + try { + c.acknowledge(msg); + } catch (PulsarClientException e) { + throw new RuntimeException(e); + } + }) + .subscribe(); + + PersistentSubscription subscription = topic.getSubscription("sub"); + Dispatcher dispatcher = subscription.getDispatcher(); + Assert.assertTrue(dispatcher instanceof PersistentDispatcherMultipleConsumers); + + // Publish a delay message to initialize DelayedDeliveryTracker + producer.newMessage().value("test").deliverAfter(10_000, TimeUnit.MILLISECONDS).send(); + + // Get DelayedDeliveryTracker from Dispatcher + PersistentDispatcherMultipleConsumers dispatcher0 = (PersistentDispatcherMultipleConsumers) dispatcher; + Field trackerField = + PersistentDispatcherMultipleConsumers.class.getDeclaredField("delayedDeliveryTracker"); + trackerField.setAccessible(true); + + AtomicReference<Optional<DelayedDeliveryTracker>> reference = new AtomicReference<>(); + // Wait until DelayedDeliveryTracker is initialized + Awaitility.await().atMost(Duration.ofSeconds(20)).until(() -> { + @SuppressWarnings("unchecked") + Optional<DelayedDeliveryTracker> optional = + (Optional<DelayedDeliveryTracker>) trackerField.get(dispatcher0); + if (optional.isPresent()) { + reference.set(optional); + return true; + } + return false; + }); + + Optional<DelayedDeliveryTracker> optional = reference.get(); + Assert.assertTrue(optional.get() instanceof InMemoryDelayedDeliveryTracker); + + // Mock DelayedDeliveryTracker and Count the number of addMessage() calls + AtomicInteger counter = new AtomicInteger(0); + InMemoryDelayedDeliveryTracker tracker = (InMemoryDelayedDeliveryTracker) optional.get(); + tracker = Mockito.spy(tracker); + Mockito.doAnswer(inv -> { + counter.incrementAndGet(); + return inv.callRealMethod(); + }).when(tracker).addMessage(Mockito.anyLong(), Mockito.anyLong(), Mockito.anyLong()); + // Set Mocked InMemoryDelayedDeliveryTracker back to Dispatcher + trackerField.set(dispatcher0, Optional.of(tracker)); + + // Publish 10 delay messages, so the counter should be 10 + for (int i = 0; i < 10; i++) { + producer.newMessage().value("test") + .deliverAfter(10_000, TimeUnit.MILLISECONDS).send(); + } + + try { + Awaitility.await().atMost(Duration.ofSeconds(20)).until(() -> counter.get() == 10); + } finally { + consumer.close(); + } + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java index 39b3992fbd1..2b2a3a39b87 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java @@ -182,7 +182,7 @@ public class BucketDelayedDeliveryTrackerTest extends AbstractDeliveryTrackerTes } @Test(dataProvider = "delayedTracker", invocationCount = 10) - public void testRecoverSnapshot(BucketDelayedDeliveryTracker tracker) { + public void testRecoverSnapshot(BucketDelayedDeliveryTracker tracker) throws Exception { for (int i = 1; i <= 100; i++) { tracker.addMessage(i, i, i * 10); } @@ -265,7 +265,7 @@ public class BucketDelayedDeliveryTrackerTest extends AbstractDeliveryTrackerTes } @Test(dataProvider = "delayedTracker") - public void testMergeSnapshot(final BucketDelayedDeliveryTracker tracker) { + public void testMergeSnapshot(final BucketDelayedDeliveryTracker tracker) throws Exception { for (int i = 1; i <= 110; i++) { tracker.addMessage(i, i, i * 10); Awaitility.await().untilAsserted(() -> { @@ -318,7 +318,7 @@ public class BucketDelayedDeliveryTrackerTest extends AbstractDeliveryTrackerTes } @Test(dataProvider = "delayedTracker") - public void testWithBkException(final BucketDelayedDeliveryTracker tracker) { + public void testWithBkException(final BucketDelayedDeliveryTracker tracker) throws Exception { MockBucketSnapshotStorage mockBucketSnapshotStorage = (MockBucketSnapshotStorage) bucketSnapshotStorage; mockBucketSnapshotStorage.injectCreateException( new BucketSnapshotPersistenceException("Bookie operation timeout, op: Create entry"));
