This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5543857db99717930e94cf60cd80db2b5929df3a Author: Lari Hotari <[email protected]> AuthorDate: Sat Jan 25 10:59:01 2025 +0200 [improve][test] Support decorating topic, subscription, dispatcher, ManagedLedger and ManagedCursors instances in tests (#23892) (cherry picked from commit 2a9d4ac85d8d786979afaa0b965cdb27375ae969) --- .../mledger/impl/ManagedLedgerFactoryImpl.java | 15 +- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 10 +- .../pulsar/broker/ManagedLedgerClientFactory.java | 14 +- .../apache/pulsar/broker/service/TopicFactory.java | 5 + .../service/persistent/PersistentSubscription.java | 139 ++++++++------ .../broker/service/persistent/PersistentTopic.java | 12 +- .../testinterceptor/BrokerTestInterceptor.java | 212 +++++++++++++++++++++ 7 files changed, 333 insertions(+), 74 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index f546a487f84..12c3ea12df5 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -410,11 +410,8 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(), config.getBookKeeperEnsemblePlacementPolicyProperties())) .thenAccept(bk -> { - final ManagedLedgerImpl newledger = config.getShadowSource() == null - ? new ManagedLedgerImpl(this, bk, store, config, scheduledExecutor, name, - mlOwnershipChecker) - : new ShadowManagedLedgerImpl(this, bk, store, config, scheduledExecutor, name, - mlOwnershipChecker); + final ManagedLedgerImpl newledger = + createManagedLedger(bk, store, name, config, mlOwnershipChecker); PendingInitializeManagedLedger pendingLedger = new PendingInitializeManagedLedger(newledger); pendingInitializeLedgers.put(name, pendingLedger); newledger.initialize(new ManagedLedgerInitializeLedgerCallback() { @@ -472,6 +469,14 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { }); } + protected ManagedLedgerImpl createManagedLedger(BookKeeper bk, MetaStore store, String name, + ManagedLedgerConfig config, + Supplier<CompletableFuture<Boolean>> mlOwnershipChecker) { + return config.getShadowSource() == null + ? new ManagedLedgerImpl(this, bk, store, config, scheduledExecutor, name, mlOwnershipChecker) : + new ShadowManagedLedgerImpl(this, bk, store, config, scheduledExecutor, name, mlOwnershipChecker); + } + @Override public void asyncOpenReadOnlyManagedLedger(String managedLedgerName, AsyncCallbacks.OpenReadOnlyManagedLedgerCallback callback, diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 966aa068f2f..4f45fc67b63 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -632,7 +632,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { for (final String cursorName : consumers) { log.info("[{}] Loading cursor {}", name, cursorName); final ManagedCursorImpl cursor; - cursor = new ManagedCursorImpl(bookKeeper, ManagedLedgerImpl.this, cursorName); + cursor = createCursor(ManagedLedgerImpl.this.bookKeeper, cursorName); cursor.recover(new VoidCallback() { @Override @@ -663,7 +663,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { log.debug("[{}] Recovering cursor {} lazily", name, cursorName); } final ManagedCursorImpl cursor; - cursor = new ManagedCursorImpl(bookKeeper, ManagedLedgerImpl.this, cursorName); + cursor = createCursor(ManagedLedgerImpl.this.bookKeeper, cursorName); CompletableFuture<ManagedCursor> cursorRecoveryFuture = new CompletableFuture<>(); uninitializedCursors.put(cursorName, cursorRecoveryFuture); @@ -1007,7 +1007,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { if (log.isDebugEnabled()) { log.debug("[{}] Creating new cursor: {}", name, cursorName); } - final ManagedCursorImpl cursor = new ManagedCursorImpl(bookKeeper, this, cursorName); + final ManagedCursorImpl cursor = createCursor(bookKeeper, cursorName); CompletableFuture<ManagedCursor> cursorFuture = new CompletableFuture<>(); uninitializedCursors.put(cursorName, cursorFuture); Position position = InitialPosition.Earliest == initialPosition ? getFirstPosition() : getLastPosition(); @@ -1039,6 +1039,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { }); } + protected ManagedCursorImpl createCursor(BookKeeper bookKeeper, String cursorName) { + return new ManagedCursorImpl(bookKeeper, this, cursorName); + } + @Override public synchronized void asyncDeleteCursor(final String consumerName, final DeleteCursorCallback callback, final Object ctx) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java index 737bc69bf24..3d945afe4c1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java @@ -116,8 +116,8 @@ public class ManagedLedgerClientFactory implements ManagedLedgerStorage { try { this.managedLedgerFactory = - new ManagedLedgerFactoryImpl(metadataStore, bkFactory, managedLedgerFactoryConfig, statsLogger, - openTelemetry); + createManagedLedgerFactory(metadataStore, openTelemetry, bkFactory, managedLedgerFactoryConfig, + statsLogger); } catch (Exception e) { statsProvider.stop(); defaultBkClient.close(); @@ -147,6 +147,16 @@ public class ManagedLedgerClientFactory implements ManagedLedgerStorage { }; } + protected ManagedLedgerFactoryImpl createManagedLedgerFactory(MetadataStoreExtended metadataStore, + OpenTelemetry openTelemetry, + BookkeeperFactoryForCustomEnsemblePlacementPolicy + bkFactory, + ManagedLedgerFactoryConfig managedLedgerFactoryConfig, + StatsLogger statsLogger) throws Exception { + return new ManagedLedgerFactoryImpl(metadataStore, bkFactory, managedLedgerFactoryConfig, statsLogger, + openTelemetry); + } + @Override public Collection<ManagedLedgerStorageClass> getStorageClasses() { return List.of(getDefaultStorageClass()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicFactory.java index f8bac9d8134..523f995cc5d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicFactory.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service; import java.io.Closeable; +import java.io.IOException; import org.apache.bookkeeper.mledger.ManagedLedger; /** @@ -28,4 +29,8 @@ import org.apache.bookkeeper.mledger.ManagedLedger; public interface TopicFactory extends Closeable { <T extends Topic> T create(String topic, ManagedLedger ledger, BrokerService brokerService, Class<T> topicClazz); + + default void close() throws IOException { + // default implementation + } } \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 609bbd0d734..d5be2a5a8b7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -246,70 +246,10 @@ public class PersistentSubscription extends AbstractSubscription { } if (dispatcher == null || !dispatcher.isConsumerConnected()) { - Dispatcher previousDispatcher = null; - switch (consumer.subType()) { - case Exclusive: - if (dispatcher == null || dispatcher.getType() != SubType.Exclusive) { - previousDispatcher = dispatcher; - dispatcher = new PersistentDispatcherSingleActiveConsumer( - cursor, SubType.Exclusive, 0, topic, this); - } - break; - case Shared: - if (dispatcher == null || dispatcher.getType() != SubType.Shared) { - previousDispatcher = dispatcher; - if (config.isSubscriptionSharedUseClassicPersistentImplementation()) { - dispatcher = new PersistentDispatcherMultipleConsumersClassic(topic, cursor, this); - } else { - dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursor, this); - } - } - break; - case Failover: - int partitionIndex = TopicName.getPartitionIndex(topicName); - if (partitionIndex < 0) { - // For non partition topics, use a negative index so - // dispatcher won't sort consumers before picking - // an active consumer for the topic. - partitionIndex = -1; - } - - if (dispatcher == null || dispatcher.getType() != SubType.Failover) { - previousDispatcher = dispatcher; - dispatcher = new PersistentDispatcherSingleActiveConsumer(cursor, SubType.Failover, - partitionIndex, topic, this); - } - break; - case Key_Shared: - KeySharedMeta ksm = consumer.getKeySharedMeta(); - if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared - || !((StickyKeyDispatcher) dispatcher) - .hasSameKeySharedPolicy(ksm)) { - previousDispatcher = dispatcher; - if (config.isSubscriptionKeySharedUseClassicPersistentImplementation()) { - dispatcher = - new PersistentStickyKeyDispatcherMultipleConsumersClassic(topic, cursor, - this, - topic.getBrokerService().getPulsar().getConfiguration(), ksm); - } else { - dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor, this, - topic.getBrokerService().getPulsar().getConfiguration(), ksm); - } - } - break; - default: - return FutureUtil.failedFuture( - new ServerMetadataException("Unsupported subscription type")); - } - - if (previousDispatcher != null) { - previousDispatcher.close().thenRun(() -> { - log.info("[{}][{}] Successfully closed previous dispatcher", topicName, subName); - }).exceptionally(ex -> { - log.error("[{}][{}] Failed to close previous dispatcher", topicName, subName, ex); - return null; - }); + if (consumer.subType() == null) { + return FutureUtil.failedFuture(new ServerMetadataException("Unsupported subscription type")); } + dispatcher = reuseOrCreateDispatcher(dispatcher, consumer); } else { Optional<CompletableFuture<Void>> compatibilityError = checkForConsumerCompatibilityErrorWithDispatcher(dispatcher, consumer); @@ -323,6 +263,79 @@ public class PersistentSubscription extends AbstractSubscription { }); } + /** + * Create a new dispatcher or reuse the existing one when it's compatible with the new consumer. + * This protected method can be overridded for testing purpose for injecting test dispatcher instances with + * special behaviors. + * @param dispatcher the existing dispatcher + * @param consumer the new consumer + * @return the dispatcher to use, either the existing one or a new one + */ + protected Dispatcher reuseOrCreateDispatcher(Dispatcher dispatcher, Consumer consumer) { + Dispatcher previousDispatcher = null; + switch (consumer.subType()) { + case Exclusive: + if (dispatcher == null || dispatcher.getType() != SubType.Exclusive) { + previousDispatcher = dispatcher; + dispatcher = new PersistentDispatcherSingleActiveConsumer( + cursor, SubType.Exclusive, 0, topic, this); + } + break; + case Shared: + if (dispatcher == null || dispatcher.getType() != SubType.Shared) { + previousDispatcher = dispatcher; + if (config.isSubscriptionSharedUseClassicPersistentImplementation()) { + dispatcher = new PersistentDispatcherMultipleConsumersClassic(topic, cursor, this); + } else { + dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursor, this); + } + } + break; + case Failover: + int partitionIndex = TopicName.getPartitionIndex(topicName); + if (partitionIndex < 0) { + // For non partition topics, use a negative index so + // dispatcher won't sort consumers before picking + // an active consumer for the topic. + partitionIndex = -1; + } + + if (dispatcher == null || dispatcher.getType() != SubType.Failover) { + previousDispatcher = dispatcher; + dispatcher = new PersistentDispatcherSingleActiveConsumer(cursor, SubType.Failover, + partitionIndex, topic, this); + } + break; + case Key_Shared: + KeySharedMeta ksm = consumer.getKeySharedMeta(); + if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared + || !((StickyKeyDispatcher) dispatcher) + .hasSameKeySharedPolicy(ksm)) { + previousDispatcher = dispatcher; + if (config.isSubscriptionKeySharedUseClassicPersistentImplementation()) { + dispatcher = + new PersistentStickyKeyDispatcherMultipleConsumersClassic(topic, cursor, + this, config, ksm); + } else { + dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor, this, + config, ksm); + } + } + break; + } + + if (previousDispatcher != null) { + previousDispatcher.close().thenRun(() -> { + log.info("[{}][{}] Successfully closed previous dispatcher", topicName, subName); + }).exceptionally(ex -> { + log.error("[{}][{}] Failed to close previous dispatcher", topicName, subName, ex); + return null; + }); + } + + return dispatcher; + } + @Override public synchronized void removeConsumer(Consumer consumer, boolean isResetCursor) throws BrokerServiceException { cursor.updateLastActive(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 0f508445958..b0113e75e22 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -583,7 +583,17 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal }); } - private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor, + + /** + * Create a new subscription instance for the topic. + * This protected method can be overridden in tests to return a special test implementation instance. + * @param subscriptionName the name of the subscription + * @param cursor the cursor to use for the subscription + * @param replicated the subscription replication flag + * @param subscriptionProperties the subscription properties + * @return the subscription instance + */ + protected PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor, Boolean replicated, Map<String, String> subscriptionProperties) { requireNonNull(topicCompactionService); if (isCompactionSubscription(subscriptionName) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testinterceptor/BrokerTestInterceptor.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testinterceptor/BrokerTestInterceptor.java new file mode 100644 index 00000000000..a1549b2cb86 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testinterceptor/BrokerTestInterceptor.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.testinterceptor; + +import io.opentelemetry.api.OpenTelemetry; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.function.Supplier; +import lombok.Getter; +import lombok.Setter; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.MetaStore; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.ManagedLedgerClientFactory; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.Dispatcher; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.TopicFactory; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; + +/** + * A test interceptor for broker tests that allows to decorate persistent topics, subscriptions, dispatchers + * managed ledger factory, managed ledger and managed cursor instances. + */ +public class BrokerTestInterceptor { + public static final BrokerTestInterceptor INSTANCE = new BrokerTestInterceptor(); + + // Suppress default constructor for noninstantiability + private BrokerTestInterceptor() { + + } + + public static class TestTopicFactory implements TopicFactory { + @Override + public <T extends Topic> T create(String topic, ManagedLedger ledger, BrokerService brokerService, + Class<T> topicClazz) { + if (!topicClazz.isAssignableFrom(PersistentTopic.class)) { + throw new UnsupportedOperationException("Unsupported topic class"); + } + return topicClazz.cast( + INSTANCE.getPersistentTopicDecorator().apply(new TestTopic(topic, ledger, brokerService))); + } + } + + static class TestTopic extends PersistentTopic { + + public TestTopic(String topic, ManagedLedger ledger, BrokerService brokerService) { + super(topic, ledger, brokerService); + } + + @Override + protected PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor, + Boolean replicated, + Map<String, String> subscriptionProperties) { + return INSTANCE.getPersistentSubscriptionDecorator() + .apply(new TestSubscription(this, subscriptionName, cursor, replicated, subscriptionProperties)); + } + } + + static class TestSubscription extends PersistentSubscription { + public TestSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor, + Boolean replicated, + Map<String, String> subscriptionProperties) { + super(topic, subscriptionName, cursor, replicated, subscriptionProperties); + } + + @Override + protected Dispatcher reuseOrCreateDispatcher(Dispatcher dispatcher, + Consumer consumer) { + Dispatcher previousInstance = dispatcher; + dispatcher = super.reuseOrCreateDispatcher(dispatcher, consumer); + if (dispatcher != previousInstance) { + dispatcher = INSTANCE.getDispatcherDecorator().apply(dispatcher); + } + return dispatcher; + } + } + + public static class TestManagedLedgerStorage extends ManagedLedgerClientFactory { + @Override + protected ManagedLedgerFactoryImpl createManagedLedgerFactory(MetadataStoreExtended metadataStore, + OpenTelemetry openTelemetry, + ManagedLedgerFactoryImpl.BookkeeperFactoryForCustomEnsemblePlacementPolicy bkFactory, + ManagedLedgerFactoryConfig managedLedgerFactoryConfig, + StatsLogger statsLogger) throws Exception { + return INSTANCE.managedLedgerFactoryDecorator.apply( + new TestManagedLedgerFactoryImpl(metadataStore, bkFactory, managedLedgerFactoryConfig, statsLogger, + openTelemetry)); + } + } + + static class TestManagedLedgerFactoryImpl extends ManagedLedgerFactoryImpl { + public TestManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, + BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory, + ManagedLedgerFactoryConfig config, StatsLogger statsLogger, + OpenTelemetry openTelemetry) throws Exception { + super(metadataStore, bookKeeperGroupFactory, config, statsLogger, openTelemetry); + } + + @Override + protected ManagedLedgerImpl createManagedLedger(BookKeeper bk, MetaStore store, String name, + ManagedLedgerConfig config, + Supplier<CompletableFuture<Boolean>> mlOwnershipChecker) { + return INSTANCE.managedLedgerDecorator.apply( + new TestManagedLedgerImpl(this, bk, store, config, scheduledExecutor, name, mlOwnershipChecker)); + } + } + + static class TestManagedLedgerImpl extends ManagedLedgerImpl { + public TestManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, MetaStore store, + ManagedLedgerConfig config, + OrderedScheduler scheduledExecutor, String name, + Supplier<CompletableFuture<Boolean>> mlOwnershipChecker) { + super(factory, bookKeeper, store, config, scheduledExecutor, name, mlOwnershipChecker); + } + + @Override + protected ManagedCursorImpl createCursor(BookKeeper bookKeeper, String cursorName) { + return INSTANCE.managedCursorDecorator.apply(super.createCursor(bookKeeper, cursorName)); + } + } + + @Getter + @Setter + private Function<PersistentTopic, PersistentTopic> persistentTopicDecorator = Function.identity(); + + @Getter + @Setter + private Function<PersistentSubscription, PersistentSubscription> persistentSubscriptionDecorator = Function.identity(); + + @Getter + @Setter + private Function<Dispatcher, Dispatcher> dispatcherDecorator = Function.identity(); + + @Getter + @Setter + private Function<ManagedLedgerFactoryImpl, ManagedLedgerFactoryImpl> managedLedgerFactoryDecorator = Function.identity(); + + @Getter + @Setter + private Function<ManagedLedgerImpl, ManagedLedgerImpl> managedLedgerDecorator = Function.identity(); + + @Getter + @Setter + private Function<ManagedCursorImpl, ManagedCursorImpl> managedCursorDecorator = Function.identity(); + + public void reset() { + persistentTopicDecorator = Function.identity(); + persistentSubscriptionDecorator = Function.identity(); + dispatcherDecorator = Function.identity(); + managedLedgerFactoryDecorator = Function.identity(); + managedLedgerDecorator = Function.identity(); + managedCursorDecorator = Function.identity(); + } + + public void configure(ServiceConfiguration conf) { + conf.setTopicFactoryClassName(TestTopicFactory.class.getName()); + conf.setManagedLedgerStorageClassName(TestManagedLedgerStorage.class.getName()); + } + + public <T extends Dispatcher> void applyDispatcherSpyDecorator(Class<T> dispatcherClass, + java.util.function.Consumer<T> spyCustomizer) { + setDispatcherDecorator(createDispatcherSpyDecorator(dispatcherClass, spyCustomizer)); + } + + public static <T extends Dispatcher> Function<Dispatcher, Dispatcher> createDispatcherSpyDecorator( + Class<T> dispatcherClass, java.util.function.Consumer<T> spyCustomizer) { + return dispatcher -> { + Dispatcher spy = BrokerTestUtil.spyWithoutRecordingInvocations(dispatcher); + spyCustomizer.accept(dispatcherClass.cast(spy)); + return spy; + }; + } + + public void applyCursorSpyDecorator(java.util.function.Consumer<ManagedCursorImpl> spyCustomizer) { + setManagedCursorDecorator(cursor -> { + ManagedCursorImpl spy = BrokerTestUtil.spyWithoutRecordingInvocations(cursor); + spyCustomizer.accept(spy); + return spy; + }); + } +}
