This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2a9d4ac85d8 [improve][test] Support decorating topic, subscription,
dispatcher, ManagedLedger and ManagedCursors instances in tests (#23892)
2a9d4ac85d8 is described below
commit 2a9d4ac85d8d786979afaa0b965cdb27375ae969
Author: Lari Hotari <[email protected]>
AuthorDate: Sat Jan 25 10:59:01 2025 +0200
[improve][test] Support decorating topic, subscription, dispatcher,
ManagedLedger and ManagedCursors instances in tests (#23892)
---
.../mledger/impl/ManagedLedgerFactoryImpl.java | 15 +-
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 10 +-
.../pulsar/broker/ManagedLedgerClientFactory.java | 14 +-
.../apache/pulsar/broker/service/TopicFactory.java | 5 +
.../service/persistent/PersistentSubscription.java | 139 ++++++++------
.../broker/service/persistent/PersistentTopic.java | 12 +-
.../testinterceptor/BrokerTestInterceptor.java | 212 +++++++++++++++++++++
7 files changed, 333 insertions(+), 74 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index f546a487f84..12c3ea12df5 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -410,11 +410,8 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
new
EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
config.getBookKeeperEnsemblePlacementPolicyProperties()))
.thenAccept(bk -> {
- final ManagedLedgerImpl newledger =
config.getShadowSource() == null
- ? new ManagedLedgerImpl(this, bk, store,
config, scheduledExecutor, name,
- mlOwnershipChecker)
- : new ShadowManagedLedgerImpl(this, bk, store,
config, scheduledExecutor, name,
- mlOwnershipChecker);
+ final ManagedLedgerImpl newledger =
+ createManagedLedger(bk, store, name, config,
mlOwnershipChecker);
PendingInitializeManagedLedger pendingLedger = new
PendingInitializeManagedLedger(newledger);
pendingInitializeLedgers.put(name, pendingLedger);
newledger.initialize(new
ManagedLedgerInitializeLedgerCallback() {
@@ -472,6 +469,14 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
});
}
+ protected ManagedLedgerImpl createManagedLedger(BookKeeper bk, MetaStore
store, String name,
+ ManagedLedgerConfig config,
+
Supplier<CompletableFuture<Boolean>> mlOwnershipChecker) {
+ return config.getShadowSource() == null
+ ? new ManagedLedgerImpl(this, bk, store, config,
scheduledExecutor, name, mlOwnershipChecker) :
+ new ShadowManagedLedgerImpl(this, bk, store, config,
scheduledExecutor, name, mlOwnershipChecker);
+ }
+
@Override
public void asyncOpenReadOnlyManagedLedger(String managedLedgerName,
AsyncCallbacks.OpenReadOnlyManagedLedgerCallback
callback,
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 966aa068f2f..4f45fc67b63 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -632,7 +632,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
for (final String cursorName : consumers) {
log.info("[{}] Loading cursor {}", name, cursorName);
final ManagedCursorImpl cursor;
- cursor = new ManagedCursorImpl(bookKeeper,
ManagedLedgerImpl.this, cursorName);
+ cursor =
createCursor(ManagedLedgerImpl.this.bookKeeper, cursorName);
cursor.recover(new VoidCallback() {
@Override
@@ -663,7 +663,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
log.debug("[{}] Recovering cursor {} lazily",
name, cursorName);
}
final ManagedCursorImpl cursor;
- cursor = new ManagedCursorImpl(bookKeeper,
ManagedLedgerImpl.this, cursorName);
+ cursor =
createCursor(ManagedLedgerImpl.this.bookKeeper, cursorName);
CompletableFuture<ManagedCursor> cursorRecoveryFuture
= new CompletableFuture<>();
uninitializedCursors.put(cursorName,
cursorRecoveryFuture);
@@ -1007,7 +1007,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
if (log.isDebugEnabled()) {
log.debug("[{}] Creating new cursor: {}", name, cursorName);
}
- final ManagedCursorImpl cursor = new ManagedCursorImpl(bookKeeper,
this, cursorName);
+ final ManagedCursorImpl cursor = createCursor(bookKeeper, cursorName);
CompletableFuture<ManagedCursor> cursorFuture = new
CompletableFuture<>();
uninitializedCursors.put(cursorName, cursorFuture);
Position position = InitialPosition.Earliest == initialPosition ?
getFirstPosition() : getLastPosition();
@@ -1039,6 +1039,10 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
});
}
+ protected ManagedCursorImpl createCursor(BookKeeper bookKeeper, String
cursorName) {
+ return new ManagedCursorImpl(bookKeeper, this, cursorName);
+ }
+
@Override
public synchronized void asyncDeleteCursor(final String consumerName,
final DeleteCursorCallback callback,
final Object ctx) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index 737bc69bf24..3d945afe4c1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -116,8 +116,8 @@ public class ManagedLedgerClientFactory implements
ManagedLedgerStorage {
try {
this.managedLedgerFactory =
- new ManagedLedgerFactoryImpl(metadataStore, bkFactory,
managedLedgerFactoryConfig, statsLogger,
- openTelemetry);
+ createManagedLedgerFactory(metadataStore, openTelemetry,
bkFactory, managedLedgerFactoryConfig,
+ statsLogger);
} catch (Exception e) {
statsProvider.stop();
defaultBkClient.close();
@@ -147,6 +147,16 @@ public class ManagedLedgerClientFactory implements
ManagedLedgerStorage {
};
}
+ protected ManagedLedgerFactoryImpl
createManagedLedgerFactory(MetadataStoreExtended metadataStore,
+
OpenTelemetry openTelemetry,
+
BookkeeperFactoryForCustomEnsemblePlacementPolicy
+
bkFactory,
+
ManagedLedgerFactoryConfig managedLedgerFactoryConfig,
+ StatsLogger
statsLogger) throws Exception {
+ return new ManagedLedgerFactoryImpl(metadataStore, bkFactory,
managedLedgerFactoryConfig, statsLogger,
+ openTelemetry);
+ }
+
@Override
public Collection<ManagedLedgerStorageClass> getStorageClasses() {
return List.of(getDefaultStorageClass());
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicFactory.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicFactory.java
index f8bac9d8134..523f995cc5d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicFactory.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicFactory.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service;
import java.io.Closeable;
+import java.io.IOException;
import org.apache.bookkeeper.mledger.ManagedLedger;
/**
@@ -28,4 +29,8 @@ import org.apache.bookkeeper.mledger.ManagedLedger;
public interface TopicFactory extends Closeable {
<T extends Topic> T create(String topic, ManagedLedger ledger,
BrokerService brokerService, Class<T> topicClazz);
+
+ default void close() throws IOException {
+ // default implementation
+ }
}
\ No newline at end of file
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index a96a7e75506..275d1ae5818 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -250,70 +250,10 @@ public class PersistentSubscription extends
AbstractSubscription {
}
if (dispatcher == null || !dispatcher.isConsumerConnected()) {
- Dispatcher previousDispatcher = null;
- switch (consumer.subType()) {
- case Exclusive:
- if (dispatcher == null || dispatcher.getType() !=
SubType.Exclusive) {
- previousDispatcher = dispatcher;
- dispatcher = new
PersistentDispatcherSingleActiveConsumer(
- cursor, SubType.Exclusive, 0, topic,
this);
- }
- break;
- case Shared:
- if (dispatcher == null || dispatcher.getType() !=
SubType.Shared) {
- previousDispatcher = dispatcher;
- if
(config.isSubscriptionSharedUseClassicPersistentImplementation()) {
- dispatcher = new
PersistentDispatcherMultipleConsumersClassic(topic, cursor, this);
- } else {
- dispatcher = new
PersistentDispatcherMultipleConsumers(topic, cursor, this);
- }
- }
- break;
- case Failover:
- int partitionIndex =
TopicName.getPartitionIndex(topicName);
- if (partitionIndex < 0) {
- // For non partition topics, use a negative
index so
- // dispatcher won't sort consumers before
picking
- // an active consumer for the topic.
- partitionIndex = -1;
- }
-
- if (dispatcher == null || dispatcher.getType() !=
SubType.Failover) {
- previousDispatcher = dispatcher;
- dispatcher = new
PersistentDispatcherSingleActiveConsumer(cursor, SubType.Failover,
- partitionIndex, topic, this);
- }
- break;
- case Key_Shared:
- KeySharedMeta ksm = consumer.getKeySharedMeta();
- if (dispatcher == null || dispatcher.getType() !=
SubType.Key_Shared
- || !((StickyKeyDispatcher) dispatcher)
- .hasSameKeySharedPolicy(ksm)) {
- previousDispatcher = dispatcher;
- if
(config.isSubscriptionKeySharedUseClassicPersistentImplementation()) {
- dispatcher =
- new
PersistentStickyKeyDispatcherMultipleConsumersClassic(topic, cursor,
- this,
-
topic.getBrokerService().getPulsar().getConfiguration(), ksm);
- } else {
- dispatcher = new
PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor, this,
-
topic.getBrokerService().getPulsar().getConfiguration(), ksm);
- }
- }
- break;
- default:
- return FutureUtil.failedFuture(
- new ServerMetadataException("Unsupported
subscription type"));
- }
-
- if (previousDispatcher != null) {
- previousDispatcher.close().thenRun(() -> {
- log.info("[{}][{}] Successfully closed previous
dispatcher", topicName, subName);
- }).exceptionally(ex -> {
- log.error("[{}][{}] Failed to close previous
dispatcher", topicName, subName, ex);
- return null;
- });
+ if (consumer.subType() == null) {
+ return FutureUtil.failedFuture(new
ServerMetadataException("Unsupported subscription type"));
}
+ dispatcher = reuseOrCreateDispatcher(dispatcher, consumer);
} else {
Optional<CompletableFuture<Void>> compatibilityError =
checkForConsumerCompatibilityErrorWithDispatcher(dispatcher, consumer);
@@ -327,6 +267,79 @@ public class PersistentSubscription extends
AbstractSubscription {
});
}
+ /**
+ * Create a new dispatcher or reuse the existing one when it's compatible
with the new consumer.
+ * This protected method can be overridded for testing purpose for
injecting test dispatcher instances with
+ * special behaviors.
+ * @param dispatcher the existing dispatcher
+ * @param consumer the new consumer
+ * @return the dispatcher to use, either the existing one or a new one
+ */
+ protected Dispatcher reuseOrCreateDispatcher(Dispatcher dispatcher,
Consumer consumer) {
+ Dispatcher previousDispatcher = null;
+ switch (consumer.subType()) {
+ case Exclusive:
+ if (dispatcher == null || dispatcher.getType() !=
SubType.Exclusive) {
+ previousDispatcher = dispatcher;
+ dispatcher = new PersistentDispatcherSingleActiveConsumer(
+ cursor, SubType.Exclusive, 0, topic, this);
+ }
+ break;
+ case Shared:
+ if (dispatcher == null || dispatcher.getType() !=
SubType.Shared) {
+ previousDispatcher = dispatcher;
+ if
(config.isSubscriptionSharedUseClassicPersistentImplementation()) {
+ dispatcher = new
PersistentDispatcherMultipleConsumersClassic(topic, cursor, this);
+ } else {
+ dispatcher = new
PersistentDispatcherMultipleConsumers(topic, cursor, this);
+ }
+ }
+ break;
+ case Failover:
+ int partitionIndex = TopicName.getPartitionIndex(topicName);
+ if (partitionIndex < 0) {
+ // For non partition topics, use a negative index so
+ // dispatcher won't sort consumers before picking
+ // an active consumer for the topic.
+ partitionIndex = -1;
+ }
+
+ if (dispatcher == null || dispatcher.getType() !=
SubType.Failover) {
+ previousDispatcher = dispatcher;
+ dispatcher = new
PersistentDispatcherSingleActiveConsumer(cursor, SubType.Failover,
+ partitionIndex, topic, this);
+ }
+ break;
+ case Key_Shared:
+ KeySharedMeta ksm = consumer.getKeySharedMeta();
+ if (dispatcher == null || dispatcher.getType() !=
SubType.Key_Shared
+ || !((StickyKeyDispatcher) dispatcher)
+ .hasSameKeySharedPolicy(ksm)) {
+ previousDispatcher = dispatcher;
+ if
(config.isSubscriptionKeySharedUseClassicPersistentImplementation()) {
+ dispatcher =
+ new
PersistentStickyKeyDispatcherMultipleConsumersClassic(topic, cursor,
+ this, config, ksm);
+ } else {
+ dispatcher = new
PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor, this,
+ config, ksm);
+ }
+ }
+ break;
+ }
+
+ if (previousDispatcher != null) {
+ previousDispatcher.close().thenRun(() -> {
+ log.info("[{}][{}] Successfully closed previous dispatcher",
topicName, subName);
+ }).exceptionally(ex -> {
+ log.error("[{}][{}] Failed to close previous dispatcher",
topicName, subName, ex);
+ return null;
+ });
+ }
+
+ return dispatcher;
+ }
+
@Override
public synchronized void removeConsumer(Consumer consumer, boolean
isResetCursor) throws BrokerServiceException {
cursor.updateLastActive();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 2325c8286a1..e920c483bb3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -586,7 +586,17 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
});
}
- private PersistentSubscription createPersistentSubscription(String
subscriptionName, ManagedCursor cursor,
+
+ /**
+ * Create a new subscription instance for the topic.
+ * This protected method can be overridden in tests to return a special
test implementation instance.
+ * @param subscriptionName the name of the subscription
+ * @param cursor the cursor to use for the subscription
+ * @param replicated the subscription replication flag
+ * @param subscriptionProperties the subscription properties
+ * @return the subscription instance
+ */
+ protected PersistentSubscription createPersistentSubscription(String
subscriptionName, ManagedCursor cursor,
Boolean replicated, Map<String, String> subscriptionProperties) {
requireNonNull(topicCompactionService);
if (isCompactionSubscription(subscriptionName)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testinterceptor/BrokerTestInterceptor.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testinterceptor/BrokerTestInterceptor.java
new file mode 100644
index 00000000000..a1549b2cb86
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testinterceptor/BrokerTestInterceptor.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.testinterceptor;
+
+import io.opentelemetry.api.OpenTelemetry;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.MetaStore;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.ManagedLedgerClientFactory;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.TopicFactory;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+
+/**
+ * A test interceptor for broker tests that allows to decorate persistent
topics, subscriptions, dispatchers
+ * managed ledger factory, managed ledger and managed cursor instances.
+ */
+public class BrokerTestInterceptor {
+ public static final BrokerTestInterceptor INSTANCE = new
BrokerTestInterceptor();
+
+ // Suppress default constructor for noninstantiability
+ private BrokerTestInterceptor() {
+
+ }
+
+ public static class TestTopicFactory implements TopicFactory {
+ @Override
+ public <T extends Topic> T create(String topic, ManagedLedger ledger,
BrokerService brokerService,
+ Class<T> topicClazz) {
+ if (!topicClazz.isAssignableFrom(PersistentTopic.class)) {
+ throw new UnsupportedOperationException("Unsupported topic
class");
+ }
+ return topicClazz.cast(
+ INSTANCE.getPersistentTopicDecorator().apply(new
TestTopic(topic, ledger, brokerService)));
+ }
+ }
+
+ static class TestTopic extends PersistentTopic {
+
+ public TestTopic(String topic, ManagedLedger ledger, BrokerService
brokerService) {
+ super(topic, ledger, brokerService);
+ }
+
+ @Override
+ protected PersistentSubscription createPersistentSubscription(String
subscriptionName, ManagedCursor cursor,
+ Boolean
replicated,
+
Map<String, String> subscriptionProperties) {
+ return INSTANCE.getPersistentSubscriptionDecorator()
+ .apply(new TestSubscription(this, subscriptionName,
cursor, replicated, subscriptionProperties));
+ }
+ }
+
+ static class TestSubscription extends PersistentSubscription {
+ public TestSubscription(PersistentTopic topic, String
subscriptionName, ManagedCursor cursor,
+ Boolean replicated,
+ Map<String, String> subscriptionProperties) {
+ super(topic, subscriptionName, cursor, replicated,
subscriptionProperties);
+ }
+
+ @Override
+ protected Dispatcher reuseOrCreateDispatcher(Dispatcher dispatcher,
+ Consumer consumer) {
+ Dispatcher previousInstance = dispatcher;
+ dispatcher = super.reuseOrCreateDispatcher(dispatcher, consumer);
+ if (dispatcher != previousInstance) {
+ dispatcher =
INSTANCE.getDispatcherDecorator().apply(dispatcher);
+ }
+ return dispatcher;
+ }
+ }
+
+ public static class TestManagedLedgerStorage extends
ManagedLedgerClientFactory {
+ @Override
+ protected ManagedLedgerFactoryImpl
createManagedLedgerFactory(MetadataStoreExtended metadataStore,
+
OpenTelemetry openTelemetry,
+
ManagedLedgerFactoryImpl.BookkeeperFactoryForCustomEnsemblePlacementPolicy
bkFactory,
+
ManagedLedgerFactoryConfig managedLedgerFactoryConfig,
+
StatsLogger statsLogger) throws Exception {
+ return INSTANCE.managedLedgerFactoryDecorator.apply(
+ new TestManagedLedgerFactoryImpl(metadataStore, bkFactory,
managedLedgerFactoryConfig, statsLogger,
+ openTelemetry));
+ }
+ }
+
+ static class TestManagedLedgerFactoryImpl extends ManagedLedgerFactoryImpl
{
+ public TestManagedLedgerFactoryImpl(MetadataStoreExtended
metadataStore,
+
BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory,
+ ManagedLedgerFactoryConfig config,
StatsLogger statsLogger,
+ OpenTelemetry openTelemetry)
throws Exception {
+ super(metadataStore, bookKeeperGroupFactory, config, statsLogger,
openTelemetry);
+ }
+
+ @Override
+ protected ManagedLedgerImpl createManagedLedger(BookKeeper bk,
MetaStore store, String name,
+ ManagedLedgerConfig
config,
+
Supplier<CompletableFuture<Boolean>> mlOwnershipChecker) {
+ return INSTANCE.managedLedgerDecorator.apply(
+ new TestManagedLedgerImpl(this, bk, store, config,
scheduledExecutor, name, mlOwnershipChecker));
+ }
+ }
+
+ static class TestManagedLedgerImpl extends ManagedLedgerImpl {
+ public TestManagedLedgerImpl(ManagedLedgerFactoryImpl factory,
BookKeeper bookKeeper, MetaStore store,
+ ManagedLedgerConfig config,
+ OrderedScheduler scheduledExecutor,
String name,
+ Supplier<CompletableFuture<Boolean>>
mlOwnershipChecker) {
+ super(factory, bookKeeper, store, config, scheduledExecutor, name,
mlOwnershipChecker);
+ }
+
+ @Override
+ protected ManagedCursorImpl createCursor(BookKeeper bookKeeper, String
cursorName) {
+ return
INSTANCE.managedCursorDecorator.apply(super.createCursor(bookKeeper,
cursorName));
+ }
+ }
+
+ @Getter
+ @Setter
+ private Function<PersistentTopic, PersistentTopic>
persistentTopicDecorator = Function.identity();
+
+ @Getter
+ @Setter
+ private Function<PersistentSubscription, PersistentSubscription>
persistentSubscriptionDecorator = Function.identity();
+
+ @Getter
+ @Setter
+ private Function<Dispatcher, Dispatcher> dispatcherDecorator =
Function.identity();
+
+ @Getter
+ @Setter
+ private Function<ManagedLedgerFactoryImpl, ManagedLedgerFactoryImpl>
managedLedgerFactoryDecorator = Function.identity();
+
+ @Getter
+ @Setter
+ private Function<ManagedLedgerImpl, ManagedLedgerImpl>
managedLedgerDecorator = Function.identity();
+
+ @Getter
+ @Setter
+ private Function<ManagedCursorImpl, ManagedCursorImpl>
managedCursorDecorator = Function.identity();
+
+ public void reset() {
+ persistentTopicDecorator = Function.identity();
+ persistentSubscriptionDecorator = Function.identity();
+ dispatcherDecorator = Function.identity();
+ managedLedgerFactoryDecorator = Function.identity();
+ managedLedgerDecorator = Function.identity();
+ managedCursorDecorator = Function.identity();
+ }
+
+ public void configure(ServiceConfiguration conf) {
+ conf.setTopicFactoryClassName(TestTopicFactory.class.getName());
+
conf.setManagedLedgerStorageClassName(TestManagedLedgerStorage.class.getName());
+ }
+
+ public <T extends Dispatcher> void applyDispatcherSpyDecorator(Class<T>
dispatcherClass,
+
java.util.function.Consumer<T> spyCustomizer) {
+ setDispatcherDecorator(createDispatcherSpyDecorator(dispatcherClass,
spyCustomizer));
+ }
+
+ public static <T extends Dispatcher> Function<Dispatcher, Dispatcher>
createDispatcherSpyDecorator(
+ Class<T> dispatcherClass, java.util.function.Consumer<T>
spyCustomizer) {
+ return dispatcher -> {
+ Dispatcher spy =
BrokerTestUtil.spyWithoutRecordingInvocations(dispatcher);
+ spyCustomizer.accept(dispatcherClass.cast(spy));
+ return spy;
+ };
+ }
+
+ public void
applyCursorSpyDecorator(java.util.function.Consumer<ManagedCursorImpl>
spyCustomizer) {
+ setManagedCursorDecorator(cursor -> {
+ ManagedCursorImpl spy =
BrokerTestUtil.spyWithoutRecordingInvocations(cursor);
+ spyCustomizer.accept(spy);
+ return spy;
+ });
+ }
+}