This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c69584a888 [feat][broker][PIP-278] Support pluggable topic compaction 
service - part2 (#20718)
4c69584a888 is described below

commit 4c69584a888f1365cd1bd78e0dbd74d10a0aa413
Author: Cong Zhao <[email protected]>
AuthorDate: Mon Jul 17 10:12:50 2023 +0800

    [feat][broker][PIP-278] Support pluggable topic compaction service - part2 
(#20718)
---
 .../org/apache/pulsar/broker/PulsarService.java    |  57 ++++++++----
 .../apache/pulsar/broker/service/ServerCnx.java    | 103 +++++++++++----------
 .../PersistentDispatcherSingleActiveConsumer.java  |   7 +-
 .../service/persistent/PersistentSubscription.java |  31 +++++--
 .../broker/service/persistent/PersistentTopic.java |  76 +++++++++------
 .../apache/pulsar/compaction/CompactedTopic.java   |   8 ++
 .../pulsar/compaction/CompactedTopicImpl.java      |   1 +
 .../pulsar/compaction/CompactedTopicUtils.java     |  98 ++++++++++++++++++++
 .../compaction/CompactionServiceFactory.java       |   4 +-
 .../compaction/PulsarCompactionServiceFactory.java |   3 +
 .../compaction/PulsarTopicCompactionService.java   |   6 ++
 .../pulsar/compaction/TopicCompactionService.java  |   6 +-
 .../pulsar/compaction/TwoPhaseCompactor.java       |  20 +++-
 .../apache/pulsar/broker/admin/AdminApiTest.java   |   7 +-
 .../pulsar/broker/admin/v1/V1_AdminApiTest.java    |   5 +-
 .../service/PersistentTopicConcurrentTest.java     |   4 +
 .../pulsar/broker/service/PersistentTopicTest.java |  19 +++-
 .../service/persistent/MessageDuplicationTest.java |   2 +
 .../pulsar/broker/stats/PrometheusMetricsTest.java |   3 +-
 .../testcontext/AbstractTestPulsarService.java     |  25 +----
 .../MockPulsarCompactionServiceFactory.java        |  42 +++++++++
 .../testcontext/NonStartableTestPulsarService.java |   9 +-
 .../broker/testcontext/PulsarTestContext.java      |  26 +++++-
 .../pulsar/broker/testcontext/SpyConfig.java       |   9 +-
 .../testcontext/StartableTestPulsarService.java    |   8 +-
 .../pulsar/broker/transaction/TransactionTest.java |  17 ++--
 .../pulsar/compaction/CompactedTopicTest.java      |   7 +-
 27 files changed, 434 insertions(+), 169 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 40c5a2d6528..4ffb5b77d54 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -147,9 +147,11 @@ import 
org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
 import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.common.util.ThreadDumpUtil;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.apache.pulsar.compaction.CompactionServiceFactory;
 import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
 import org.apache.pulsar.compaction.StrategicTwoPhaseCompactor;
-import org.apache.pulsar.compaction.TwoPhaseCompactor;
+import org.apache.pulsar.compaction.TopicCompactionService;
 import org.apache.pulsar.functions.worker.ErrorNotifier;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
@@ -198,7 +200,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
     private WebSocketService webSocketService = null;
     private TopicPoliciesService topicPoliciesService = 
TopicPoliciesService.DISABLED;
     private BookKeeperClientFactory bkClientFactory;
-    private Compactor compactor;
+    protected CompactionServiceFactory compactionServiceFactory;
     private StrategicTwoPhaseCompactor strategicCompactor;
     private ResourceUsageTransportManager resourceUsageTransportManager;
     private ResourceGroupService resourceGroupServiceManager;
@@ -452,6 +454,15 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
 
             resetMetricsServlet();
 
+            if (this.compactionServiceFactory != null) {
+                try {
+                    this.compactionServiceFactory.close();
+                } catch (Exception e) {
+                    LOG.warn("CompactionServiceFactory closing failed {}", 
e.getMessage());
+                }
+                this.compactionServiceFactory = null;
+            }
+
             if (this.webSocketService != null) {
                 this.webSocketService.close();
             }
@@ -813,6 +824,9 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
             this.brokerServiceUrl = brokerUrl(config);
             this.brokerServiceUrlTls = brokerUrlTls(config);
 
+            if (this.compactionServiceFactory == null) {
+                this.compactionServiceFactory = loadCompactionServiceFactory();
+            }
 
             if (null != this.webSocketService) {
                 ClusterDataImpl clusterData = ClusterDataImpl.builder()
@@ -1475,25 +1489,16 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
         return this.compactorExecutor;
     }
 
-    // only public so mockito can mock it
-    public Compactor newCompactor() throws PulsarServerException {
-        return new TwoPhaseCompactor(this.getConfiguration(),
-                getClient(), getBookKeeperClient(),
-                getCompactorExecutor());
-    }
-
-    public synchronized Compactor getCompactor() throws PulsarServerException {
-        if (this.compactor == null) {
-            this.compactor = newCompactor();
-        }
-        return this.compactor;
-    }
-
     // This method is used for metrics, which is allowed to as null
     // Because it's no operation on the compactor, so let's remove the  
synchronized on this method
     // to avoid unnecessary lock competition.
+    // Only the pulsar's compaction service provides the compaction stats. The 
compaction service plugin,
+    // it should be done by the plugin itself to expose the compaction metrics.
     public Compactor getNullableCompactor() {
-        return this.compactor;
+        if (this.compactionServiceFactory instanceof 
PulsarCompactionServiceFactory pulsarCompactedServiceFactory) {
+            return pulsarCompactedServiceFactory.getNullableCompactor();
+        }
+        return null;
     }
 
     public StrategicTwoPhaseCompactor newStrategicCompactor() throws 
PulsarServerException {
@@ -1911,4 +1916,22 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
     protected BrokerService newBrokerService(PulsarService pulsar) throws 
Exception {
         return new BrokerService(pulsar, ioEventLoopGroup);
     }
+
+    private CompactionServiceFactory loadCompactionServiceFactory() {
+        String compactionServiceFactoryClassName = 
config.getCompactionServiceFactoryClassName();
+        var compactionServiceFactory =
+                Reflections.createInstance(compactionServiceFactoryClassName, 
CompactionServiceFactory.class,
+                        Thread.currentThread().getContextClassLoader());
+        compactionServiceFactory.initialize(this).join();
+        return compactionServiceFactory;
+    }
+
+    public CompletableFuture<TopicCompactionService> 
newTopicCompactionService(String topic) {
+        try {
+            CompactionServiceFactory compactionServiceFactory = 
this.getCompactionServiceFactory();
+            return compactionServiceFactory.newTopicCompactionService(topic);
+        } catch (Throwable e) {
+            return CompletableFuture.failedFuture(e);
+        }
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index f91793cadfa..bf55dda10be 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2077,64 +2077,73 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
         // If it's not pointing to a valid entry, respond messageId of the 
current position.
         // If the compaction cursor reach the end of the topic, respond 
messageId from compacted ledger
-        Optional<Position> compactionHorizon = 
persistentTopic.getCompactedTopic().getCompactionHorizon();
-        if (lastPosition.getEntryId() == -1 || (compactionHorizon.isPresent()
-                        && lastPosition.compareTo((PositionImpl) 
compactionHorizon.get()) <= 0)) {
-            handleLastMessageIdFromCompactedLedger(persistentTopic, requestId, 
partitionIndex,
-                    markDeletePosition);
-            return;
-        }
+        CompletableFuture<Position> compactionHorizonFuture =
+                
persistentTopic.getTopicCompactionService().getLastCompactedPosition();
 
-        // For a valid position, we read the entry out and parse the batch 
size from its metadata.
-        CompletableFuture<Entry> entryFuture = new CompletableFuture<>();
-        ml.asyncReadEntry(lastPosition, new AsyncCallbacks.ReadEntryCallback() 
{
-            @Override
-            public void readEntryComplete(Entry entry, Object ctx) {
-                entryFuture.complete(entry);
+        compactionHorizonFuture.whenComplete((compactionHorizon, ex) -> {
+            if (ex != null) {
+                log.error("Failed to get compactionHorizon.", ex);
+                writeAndFlush(Commands.newError(requestId, 
ServerError.MetadataError, ex.getMessage()));
+                return;
             }
 
-            @Override
-            public void readEntryFailed(ManagedLedgerException exception, 
Object ctx) {
-                entryFuture.completeExceptionally(exception);
+            if (lastPosition.getEntryId() == -1 || (compactionHorizon != null
+                    && lastPosition.compareTo((PositionImpl) 
compactionHorizon) <= 0)) {
+                handleLastMessageIdFromCompactionService(persistentTopic, 
requestId, partitionIndex,
+                        markDeletePosition);
+                return;
             }
-        }, null);
 
-        CompletableFuture<Integer> batchSizeFuture = 
entryFuture.thenApply(entry -> {
-            MessageMetadata metadata = 
Commands.parseMessageMetadata(entry.getDataBuffer());
-            int batchSize = metadata.getNumMessagesInBatch();
-            entry.release();
-            return metadata.hasNumMessagesInBatch() ? batchSize : -1;
-        });
-
-        batchSizeFuture.whenComplete((batchSize, e) -> {
-            if (e != null) {
-                if (e.getCause() instanceof 
ManagedLedgerException.NonRecoverableLedgerException) {
-                    handleLastMessageIdFromCompactedLedger(persistentTopic, 
requestId, partitionIndex,
-                            markDeletePosition);
-                } else {
-                    writeAndFlush(Commands.newError(
-                            requestId, ServerError.MetadataError,
-                            "Failed to get batch size for entry " + 
e.getMessage()));
+            // For a valid position, we read the entry out and parse the batch 
size from its metadata.
+            CompletableFuture<Entry> entryFuture = new CompletableFuture<>();
+            ml.asyncReadEntry(lastPosition, new 
AsyncCallbacks.ReadEntryCallback() {
+                @Override
+                public void readEntryComplete(Entry entry, Object ctx) {
+                    entryFuture.complete(entry);
                 }
-            } else {
-                int largestBatchIndex = batchSize > 0 ? batchSize - 1 : -1;
 
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] [{}][{}] Get LastMessageId {} 
partitionIndex {}", remoteAddress,
-                            topic.getName(), subscriptionName, lastPosition, 
partitionIndex);
+                @Override
+                public void readEntryFailed(ManagedLedgerException exception, 
Object ctx) {
+                    entryFuture.completeExceptionally(exception);
                 }
+            }, null);
 
-                writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, 
lastPosition.getLedgerId(),
-                        lastPosition.getEntryId(), partitionIndex, 
largestBatchIndex,
-                        markDeletePosition != null ? 
markDeletePosition.getLedgerId() : -1,
-                        markDeletePosition != null ? 
markDeletePosition.getEntryId() : -1));
-            }
+            CompletableFuture<Integer> batchSizeFuture = 
entryFuture.thenApply(entry -> {
+                MessageMetadata metadata = 
Commands.parseMessageMetadata(entry.getDataBuffer());
+                int batchSize = metadata.getNumMessagesInBatch();
+                entry.release();
+                return metadata.hasNumMessagesInBatch() ? batchSize : -1;
+            });
+
+            batchSizeFuture.whenComplete((batchSize, e) -> {
+                if (e != null) {
+                    if (e.getCause() instanceof 
ManagedLedgerException.NonRecoverableLedgerException) {
+                        
handleLastMessageIdFromCompactionService(persistentTopic, requestId, 
partitionIndex,
+                                markDeletePosition);
+                    } else {
+                        writeAndFlush(Commands.newError(
+                                requestId, ServerError.MetadataError,
+                                "Failed to get batch size for entry " + 
e.getMessage()));
+                    }
+                } else {
+                    int largestBatchIndex = batchSize > 0 ? batchSize - 1 : -1;
+
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] [{}][{}] Get LastMessageId {} 
partitionIndex {}", remoteAddress,
+                                topic.getName(), subscriptionName, 
lastPosition, partitionIndex);
+                    }
+
+                    
writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, 
lastPosition.getLedgerId(),
+                            lastPosition.getEntryId(), partitionIndex, 
largestBatchIndex,
+                            markDeletePosition != null ? 
markDeletePosition.getLedgerId() : -1,
+                            markDeletePosition != null ? 
markDeletePosition.getEntryId() : -1));
+                }
+            });
         });
     }
-
-    private void handleLastMessageIdFromCompactedLedger(PersistentTopic 
persistentTopic, long requestId,
-            int partitionIndex, PositionImpl markDeletePosition) {
-        
persistentTopic.getCompactedTopic().readLastEntryOfCompactedLedger().thenAccept(entry
 -> {
+    private void handleLastMessageIdFromCompactionService(PersistentTopic 
persistentTopic, long requestId,
+                                                          int partitionIndex, 
PositionImpl markDeletePosition) {
+        
persistentTopic.getTopicCompactionService().readLastCompactedEntry().thenAccept(entry
 -> {
             if (entry != null) {
                 try {
                     // in this case, all the data has been compacted, so 
return the last position
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index d9d0f6adc87..6de113d6db9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -50,9 +50,11 @@ import org.apache.pulsar.broker.service.SendMessageInfo;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
 import 
org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.util.Codec;
+import org.apache.pulsar.compaction.CompactedTopicUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -347,8 +349,9 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
                 }
                 havePendingRead = true;
                 if (consumer.readCompacted()) {
-                    topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, 
messagesToRead, isFirstRead,
-                            this, consumer);
+                    boolean readFromEarliest = isFirstRead && 
MessageId.earliest.equals(consumer.getStartMessageId());
+                    
CompactedTopicUtils.readCompactedEntries(topic.getTopicCompactionService(), 
cursor, messagesToRead,
+                            readFromEarliest, this, consumer);
                 } else {
                     ReadEntriesCtx readEntriesCtx =
                             ReadEntriesCtx.create(consumer, 
consumer.getConsumerEpoch());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 09a99618062..1b8b47da78f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -772,16 +772,26 @@ public class PersistentSubscription extends 
AbstractSubscription implements Subs
             log.info("[{}][{}] Successfully disconnected consumers from 
subscription, proceeding with cursor reset",
                     topicName, subName);
 
-            try {
-                boolean forceReset = false;
-                if (topic.getCompactedTopic() != null && 
topic.getCompactedTopic().getCompactionHorizon().isPresent()) {
-                    PositionImpl horizon = (PositionImpl) 
topic.getCompactedTopic().getCompactionHorizon().get();
+            CompletableFuture<Boolean> forceReset = new CompletableFuture<>();
+            if (topic.getTopicCompactionService() == null) {
+                forceReset.complete(false);
+            } else {
+                
topic.getTopicCompactionService().getLastCompactedPosition().thenAccept(lastCompactedPosition
 -> {
                     PositionImpl resetTo = (PositionImpl) finalPosition;
-                    if (horizon.compareTo(resetTo) >= 0) {
-                        forceReset = true;
+                    if (lastCompactedPosition != null && 
resetTo.compareTo(lastCompactedPosition.getLedgerId(),
+                            lastCompactedPosition.getEntryId()) <= 0) {
+                        forceReset.complete(true);
+                    } else {
+                        forceReset.complete(false);
                     }
-                }
-                cursor.asyncResetCursor(finalPosition, forceReset, new 
AsyncCallbacks.ResetCursorCallback() {
+                }).exceptionally(ex -> {
+                    forceReset.completeExceptionally(ex);
+                    return null;
+                });
+            }
+
+            forceReset.thenAccept(forceResetValue -> {
+                cursor.asyncResetCursor(finalPosition, forceResetValue, new 
AsyncCallbacks.ResetCursorCallback() {
                     @Override
                     public void resetComplete(Object ctx) {
                         if (log.isDebugEnabled()) {
@@ -811,11 +821,12 @@ public class PersistentSubscription extends 
AbstractSubscription implements Subs
                         }
                     }
                 });
-            } catch (Exception e) {
+            }).exceptionally((e) -> {
                 log.error("[{}][{}] Error while resetting cursor", topicName, 
subName, e);
                 IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
                 future.completeExceptionally(new BrokerServiceException(e));
-            }
+                return null;
+            });
         });
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 12691d1c677..1e055eccc42 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -167,11 +167,12 @@ import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
-import org.apache.pulsar.compaction.CompactedTopic;
 import org.apache.pulsar.compaction.CompactedTopicContext;
 import org.apache.pulsar.compaction.CompactedTopicImpl;
 import org.apache.pulsar.compaction.Compactor;
 import org.apache.pulsar.compaction.CompactorMXBean;
+import org.apache.pulsar.compaction.PulsarTopicCompactionService;
+import org.apache.pulsar.compaction.TopicCompactionService;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
 import org.apache.pulsar.utils.StatsOutputStream;
@@ -210,9 +211,9 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
     protected final MessageDeduplication messageDeduplication;
 
-    private static final long COMPACTION_NEVER_RUN = -0xfebecffeL;
+    private static final Long COMPACTION_NEVER_RUN = -0xfebecffeL;
     private CompletableFuture<Long> currentCompaction = 
CompletableFuture.completedFuture(COMPACTION_NEVER_RUN);
-    private final CompactedTopic compactedTopic;
+    private TopicCompactionService topicCompactionService;
 
     // TODO: Create compaction strategy from topic policy when exposing 
strategic compaction to users.
     private static Map<String, TopicCompactionStrategy> strategicCompactionMap 
= Map.of(
@@ -296,29 +297,11 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 
brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
         registerTopicPolicyListener();
 
-        this.compactedTopic = new 
CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
-
-        for (ManagedCursor cursor : ledger.getCursors()) {
-            if (cursor.getName().equals(DEDUPLICATION_CURSOR_NAME)
-                    || cursor.getName().startsWith(replicatorPrefix)) {
-                // This is not a regular subscription, we are going to
-                // ignore it for now and let the message dedup logic to take 
care of it
-            } else {
-                final String subscriptionName = Codec.decode(cursor.getName());
-                subscriptions.put(subscriptionName, 
createPersistentSubscription(subscriptionName, cursor,
-                        
PersistentSubscription.isCursorFromReplicatedSubscription(cursor),
-                        cursor.getCursorProperties()));
-                // subscription-cursor gets activated by default: deactivate 
as there is no active subscription right
-                // now
-                subscriptions.get(subscriptionName).deactivateCursor();
-            }
-        }
         this.messageDeduplication = new 
MessageDeduplication(brokerService.pulsar(), this, ledger);
         if (ledger.getProperties().containsKey(TOPIC_EPOCH_PROPERTY_NAME)) {
             topicEpoch = 
Optional.of(Long.parseLong(ledger.getProperties().get(TOPIC_EPOCH_PROPERTY_NAME)));
         }
 
-        checkReplicatedSubscriptionControllerState();
         TopicName topicName = TopicName.get(topic);
         if 
(brokerService.getPulsar().getConfiguration().isTransactionCoordinatorEnabled()
                 && !isEventSystemTopic(topicName)) {
@@ -338,6 +321,11 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     @Override
     public CompletableFuture<Void> initialize() {
         List<CompletableFuture<Void>> futures = new ArrayList<>();
+        
futures.add(brokerService.getPulsar().newTopicCompactionService(topic).thenAccept(service
 -> {
+            PersistentTopic.this.topicCompactionService = service;
+            this.createPersistentSubscriptions();
+        }));
+
         for (ManagedCursor cursor : ledger.getCursors()) {
             if (cursor.getName().startsWith(replicatorPrefix)) {
                 String localCluster = 
brokerService.pulsar().getConfiguration().getClusterName();
@@ -406,7 +394,6 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 .expectedItems(16)
                 .concurrencyLevel(1)
                 .build();
-        this.compactedTopic = new 
CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
         this.backloggedCursorThresholdEntries =
                 
brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
 
@@ -434,6 +421,25 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         return pendingWriteOps;
     }
 
+    private void createPersistentSubscriptions() {
+        for (ManagedCursor cursor : ledger.getCursors()) {
+                if (cursor.getName().equals(DEDUPLICATION_CURSOR_NAME)
+                        || cursor.getName().startsWith(replicatorPrefix)) {
+                    // This is not a regular subscription, we are going to
+                    // ignore it for now and let the message dedup logic to 
take care of it
+                } else {
+                    final String subscriptionName = 
Codec.decode(cursor.getName());
+                    subscriptions.put(subscriptionName, 
createPersistentSubscription(subscriptionName, cursor,
+                            
PersistentSubscription.isCursorFromReplicatedSubscription(cursor),
+                            cursor.getCursorProperties()));
+                    // subscription-cursor gets activated by default: 
deactivate as there is no active subscription
+                    // right now
+                    subscriptions.get(subscriptionName).deactivateCursor();
+                }
+        }
+        checkReplicatedSubscriptionControllerState();
+    }
+
     /**
      * Unload a subscriber.
      * @throws SubscriptionNotFoundException If subscription not founded.
@@ -481,8 +487,10 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
     private PersistentSubscription createPersistentSubscription(String 
subscriptionName, ManagedCursor cursor,
             boolean replicated, Map<String, String> subscriptionProperties) {
-        Objects.requireNonNull(compactedTopic);
-        if (isCompactionSubscription(subscriptionName)) {
+        Objects.requireNonNull(topicCompactionService);
+        if (isCompactionSubscription(subscriptionName)
+                && topicCompactionService instanceof 
PulsarTopicCompactionService pulsarTopicCompactionService) {
+            CompactedTopicImpl compactedTopic = 
pulsarTopicCompactionService.getCompactedTopic();
             return new PulsarCompactorSubscription(this, compactedTopic, 
subscriptionName, cursor);
         } else {
             return new PersistentSubscription(this, subscriptionName, cursor, 
replicated, subscriptionProperties);
@@ -1477,6 +1485,14 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             });
         }
 
+        if (topicCompactionService != null) {
+            try {
+                topicCompactionService.close();
+            } catch (Exception e) {
+                log.warn("Error close topicCompactionService ", e);
+            }
+        }
+
         CompletableFuture<Void> clientCloseFuture = 
closeWithoutWaitingClientDisconnect
                 ? CompletableFuture.completedFuture(null)
                 : FutureUtil.waitForAll(futures);
@@ -2517,7 +2533,9 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
     public Optional<CompactedTopicContext> getCompactedTopicContext() {
         try {
-            return ((CompactedTopicImpl) 
compactedTopic).getCompactedTopicContext();
+            if (topicCompactionService instanceof PulsarTopicCompactionService 
pulsarCompactedService) {
+                return 
pulsarCompactedService.getCompactedTopic().getCompactedTopicContext();
+            }
         } catch (ExecutionException | InterruptedException e) {
             log.warn("[{}]Fail to get ledger information for compacted 
topic.", topic);
         }
@@ -3167,7 +3185,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 currentCompaction = 
brokerService.pulsar().getStrategicCompactor()
                         .compact(topic, strategicCompactionMap.get(topic));
             } else {
-                currentCompaction = 
brokerService.pulsar().getCompactor().compact(topic);
+                currentCompaction = 
topicCompactionService.compact().thenApply(x -> null);
             }
             currentCompaction.whenComplete((ignore, ex) -> {
                if (ex != null){
@@ -3188,7 +3206,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             return 
LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.RUNNING);
         } else {
             try {
-                if (current.join() == COMPACTION_NEVER_RUN) {
+                if (Objects.equals(current.join(), COMPACTION_NEVER_RUN)) {
                     return 
LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.NOT_RUN);
                 } else {
                     return 
LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.SUCCESS);
@@ -3311,8 +3329,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         return replicatedSubscriptionsController;
     }
 
-    public CompactedTopic getCompactedTopic() {
-        return compactedTopic;
+    public TopicCompactionService getTopicCompactionService() {
+        return this.topicCompactionService;
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
index e1a10b3bbb2..99e2f8a9624 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
@@ -29,6 +29,14 @@ import org.apache.pulsar.broker.service.Consumer;
 public interface CompactedTopic {
     CompletableFuture<CompactedTopicContext> newCompactedLedger(Position p, 
long compactedLedgerId);
     CompletableFuture<Void> deleteCompactedLedger(long compactedLedgerId);
+
+    /**
+     * Read entries from compacted topic.
+     *
+     * @deprecated Use {@link 
CompactedTopicUtils#readCompactedEntries(TopicCompactionService, ManagedCursor,
+     * int, boolean, ReadEntriesCallback, Consumer)} instead.
+     */
+    @Deprecated
     void asyncReadEntriesOrWait(ManagedCursor cursor,
                                 int numberOfEntriesToRead,
                                 boolean isFirstRead,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 1f117338ea0..fd20c31d32f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -87,6 +87,7 @@ public class CompactedTopicImpl implements CompactedTopic {
     }
 
     @Override
+    @Deprecated
     public void asyncReadEntriesOrWait(ManagedCursor cursor,
                                        int numberOfEntriesToRead,
                                        boolean isFirstRead,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
new file mode 100644
index 00000000000..4cd21cbb03e
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
+import com.google.common.annotations.Beta;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import javax.annotation.Nullable;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.pulsar.broker.service.Consumer;
+import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
+import org.apache.pulsar.common.util.FutureUtil;
+
+public class CompactedTopicUtils {
+
+    @Beta
+    public static void readCompactedEntries(TopicCompactionService 
topicCompactionService, ManagedCursor cursor,
+                                            int numberOfEntriesToRead, boolean 
readFromEarliest,
+                                            AsyncCallbacks.ReadEntriesCallback 
callback, @Nullable Consumer consumer) {
+        Objects.requireNonNull(topicCompactionService);
+        Objects.requireNonNull(cursor);
+        checkArgument(numberOfEntriesToRead > 0);
+        Objects.requireNonNull(callback);
+
+        final PositionImpl readPosition;
+        if (readFromEarliest) {
+            readPosition = PositionImpl.EARLIEST;
+        } else {
+            readPosition = (PositionImpl) cursor.getReadPosition();
+        }
+
+        // TODO: redeliver epoch link 
https://github.com/apache/pulsar/issues/13690
+        PersistentDispatcherSingleActiveConsumer.ReadEntriesCtx readEntriesCtx 
=
+                
PersistentDispatcherSingleActiveConsumer.ReadEntriesCtx.create(consumer, 
DEFAULT_CONSUMER_EPOCH);
+
+        CompletableFuture<Position> lastCompactedPositionFuture = 
topicCompactionService.getLastCompactedPosition();
+
+        lastCompactedPositionFuture.thenCompose(lastCompactedPosition -> {
+            if (lastCompactedPosition == null
+                    || readPosition.compareTo(
+                    lastCompactedPosition.getLedgerId(), 
lastCompactedPosition.getEntryId()) > 0) {
+                cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, 
readEntriesCtx, PositionImpl.LATEST);
+                return CompletableFuture.completedFuture(null);
+            }
+
+            return topicCompactionService.readCompactedEntries(readPosition, 
numberOfEntriesToRead)
+                    .thenAccept(entries -> {
+                        if (CollectionUtils.isEmpty(entries)) {
+                            Position seekToPosition = 
lastCompactedPosition.getNext();
+                            if 
(readPosition.compareTo(seekToPosition.getLedgerId(), 
seekToPosition.getEntryId()) > 0) {
+                                seekToPosition = readPosition;
+                            }
+                            cursor.seek(seekToPosition);
+                            
callback.readEntriesComplete(Collections.emptyList(), readEntriesCtx);
+                        }
+
+                        Entry lastEntry = entries.get(entries.size() - 1);
+                        cursor.seek(lastEntry.getPosition().getNext(), true);
+                        callback.readEntriesComplete(entries, readEntriesCtx);
+                    });
+        }).exceptionally((exception) -> {
+            exception = FutureUtil.unwrapCompletionException(exception);
+            ManagedLedgerException managedLedgerException;
+            if (exception instanceof ManagedLedgerException) {
+                managedLedgerException = (ManagedLedgerException) exception;
+            } else {
+                managedLedgerException = new ManagedLedgerException(exception);
+            }
+            callback.readEntriesFailed(managedLedgerException, readEntriesCtx);
+            return null;
+        });
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionServiceFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionServiceFactory.java
index de1abfbea95..7bb30372e45 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionServiceFactory.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionServiceFactory.java
@@ -18,14 +18,14 @@
  */
 package org.apache.pulsar.compaction;
 
-import com.google.common.annotations.Beta;
 import java.util.concurrent.CompletableFuture;
 import javax.annotation.Nonnull;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.common.classification.InterfaceAudience;
+import org.apache.pulsar.common.classification.InterfaceStability;
 
-@Beta
 @InterfaceAudience.Public
[email protected]
 public interface CompactionServiceFactory extends AutoCloseable {
 
     /**
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java
index dd817ca35f1..424733ad581 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java
@@ -24,11 +24,14 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import lombok.AccessLevel;
+import lombok.Getter;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 
 public class PulsarCompactionServiceFactory implements 
CompactionServiceFactory {
 
+    @Getter(AccessLevel.PROTECTED)
     private PulsarService pulsarService;
 
     private volatile Compactor compactor;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java
index 0a8bf9d69a2..dd218c9be51 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java
@@ -22,6 +22,7 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.pulsar.compaction.CompactedTopicImpl.COMPACT_LEDGER_EMPTY;
 import static 
org.apache.pulsar.compaction.CompactedTopicImpl.NEWER_THAN_COMPACTED;
 import static org.apache.pulsar.compaction.CompactedTopicImpl.findStartPoint;
+import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.NoSuchElementException;
@@ -108,4 +109,9 @@ public class PulsarTopicCompactionService implements 
TopicCompactionService {
     public CompactedTopicImpl getCompactedTopic() {
         return compactedTopic;
     }
+
+    @Override
+    public void close() throws IOException {
+        // noop
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java
index 6b64b9ce0fd..74df0dafabd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java
@@ -18,17 +18,17 @@
  */
 package org.apache.pulsar.compaction;
 
-import com.google.common.annotations.Beta;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import javax.annotation.Nonnull;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.common.classification.InterfaceAudience;
+import org.apache.pulsar.common.classification.InterfaceStability;
 
-@Beta
 @InterfaceAudience.Public
-public interface TopicCompactionService {
[email protected]
+public interface TopicCompactionService extends AutoCloseable {
     /**
      * Compact the topic.
      * Topic Compaction is a key-based retention mechanism. It keeps the most 
recent value for a given key and
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 821dd9c0c9d..7d3b5863cb6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -22,12 +22,14 @@ import io.netty.buffer.ByteBuf;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiPredicate;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
@@ -59,7 +61,6 @@ import org.slf4j.LoggerFactory;
 public class TwoPhaseCompactor extends Compactor {
     private static final Logger log = 
LoggerFactory.getLogger(TwoPhaseCompactor.class);
     private static final int MAX_OUTSTANDING = 500;
-    protected static final String COMPACTED_TOPIC_LEDGER_PROPERTY = 
"CompactedTopicLedger";
     private final Duration phaseOneLoopReadTimeout;
 
     public TwoPhaseCompactor(ServiceConfiguration conf,
@@ -128,8 +129,7 @@ public class TwoPhaseCompactor extends Compactor {
                 mxBean.addCompactionReadOp(reader.getTopic(), 
m.getHeadersAndPayload().readableBytes());
                 if (RawBatchConverter.isReadableBatch(m)) {
                     try {
-                        for (ImmutableTriple<MessageId, String, Integer> e : 
RawBatchConverter
-                                .extractIdsAndKeysAndSize(m)) {
+                        for (ImmutableTriple<MessageId, String, Integer> e : 
extractIdsAndKeysAndSizeFromBatch(m)) {
                             if (e != null) {
                                 if (e.getRight() > 0) {
                                     MessageId old = 
latestForKey.put(e.getMiddle(), e.getLeft());
@@ -238,7 +238,7 @@ public class TwoPhaseCompactor extends Compactor {
                 mxBean.addCompactionReadOp(reader.getTopic(), 
m.getHeadersAndPayload().readableBytes());
                 if (RawBatchConverter.isReadableBatch(m)) {
                     try {
-                        messageToAdd = RawBatchConverter.rebatchMessage(
+                        messageToAdd = rebatchMessage(
                                 m, (key, subid) -> 
subid.equals(latestForKey.get(key)));
                     } catch (IOException ioe) {
                         log.info("Error decoding batch for message {}. Whole 
batch will be included in output",
@@ -386,7 +386,7 @@ public class TwoPhaseCompactor extends Compactor {
         return bkf;
     }
 
-    private static Pair<String, Integer> extractKeyAndSize(RawMessage m) {
+    protected Pair<String, Integer> extractKeyAndSize(RawMessage m) {
         ByteBuf headersAndPayload = m.getHeadersAndPayload();
         MessageMetadata msgMetadata = 
Commands.parseMessageMetadata(headersAndPayload);
         if (msgMetadata.hasPartitionKey()) {
@@ -400,6 +400,16 @@ public class TwoPhaseCompactor extends Compactor {
         }
     }
 
+    protected List<ImmutableTriple<MessageId, String, Integer>> 
extractIdsAndKeysAndSizeFromBatch(RawMessage msg)
+            throws IOException {
+        return RawBatchConverter.extractIdsAndKeysAndSize(msg);
+    }
+
+    protected Optional<RawMessage> rebatchMessage(RawMessage msg, 
BiPredicate<String, MessageId> filter)
+            throws IOException {
+        return RawBatchConverter.rebatchMessage(msg, filter);
+    }
+
     private static class PhaseOneResult {
         final MessageId from;
         final MessageId to; // last undeleted messageId
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 7b047179ee4..342a409c4ae 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -140,6 +140,7 @@ import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
 import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -3123,7 +3124,7 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
 
         // mock actual compaction, we don't need to really run it
         CompletableFuture<Long> promise = new CompletableFuture<Long>();
-        Compactor compactor = pulsar.getCompactor();
+        Compactor compactor = 
((PulsarCompactionServiceFactory)pulsar.getCompactionServiceFactory()).getCompactor();
         doReturn(promise).when(compactor).compact(topicName);
         admin.topics().triggerCompaction(topicName);
 
@@ -3159,7 +3160,7 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
 
         // mock actual compaction, we don't need to really run it
         CompletableFuture<Long> promise = new CompletableFuture<>();
-        Compactor compactor = pulsar.getCompactor();
+        Compactor compactor = 
((PulsarCompactionServiceFactory)pulsar.getCompactionServiceFactory()).getCompactor();
         doReturn(promise).when(compactor).compact(topicName + "-partition-0");
 
         CompletableFuture<Long> promise1 = new CompletableFuture<>();
@@ -3203,7 +3204,7 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
 
         // mock actual compaction, we don't need to really run it
         CompletableFuture<Long> promise = new CompletableFuture<Long>();
-        Compactor compactor = pulsar.getCompactor();
+        Compactor compactor = 
((PulsarCompactionServiceFactory)pulsar.getCompactionServiceFactory()).getCompactor();
         doReturn(promise).when(compactor).compact(topicName);
         admin.topics().triggerCompaction(topicName);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
index e720c9b7613..ab83c8fec03 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
@@ -111,6 +111,7 @@ import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
 import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -2075,7 +2076,7 @@ public class V1_AdminApiTest extends 
MockedPulsarServiceBaseTest {
 
         // mock actual compaction, we don't need to really run it
         CompletableFuture<Long> promise = new CompletableFuture<>();
-        Compactor compactor = pulsar.getCompactor();
+        Compactor compactor = 
((PulsarCompactionServiceFactory)pulsar.getCompactionServiceFactory()).getCompactor();
         doReturn(promise).when(compactor).compact(topicName);
         admin.topics().triggerCompaction(topicName);
 
@@ -2112,7 +2113,7 @@ public class V1_AdminApiTest extends 
MockedPulsarServiceBaseTest {
 
         // mock actual compaction, we don't need to really run it
         CompletableFuture<Long> promise = new CompletableFuture<>();
-        Compactor compactor = pulsar.getCompactor();
+        Compactor compactor = 
((PulsarCompactionServiceFactory)pulsar.getCompactionServiceFactory()).getCompactor();
         doReturn(promise).when(compactor).compact(topicName);
         admin.topics().triggerCompaction(topicName);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
index 6ce6de1916f..cbbb8808f3d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
@@ -119,6 +119,7 @@ public class PersistentTopicConcurrentTest extends 
MockedBookKeeperTestCase {
     public void testConcurrentTopicAndSubscriptionDelete() throws Exception {
         // create topic
         final PersistentTopic topic = (PersistentTopic) 
brokerService.getOrCreateTopic(successTopicName).get();
+        topic.initialize().join();
         CommandSubscribe cmd = new CommandSubscribe()
                 .setConsumerId(1)
                 .setTopic(successTopicName)
@@ -177,6 +178,7 @@ public class PersistentTopicConcurrentTest extends 
MockedBookKeeperTestCase {
     public void testConcurrentTopicGCAndSubscriptionDelete() throws Exception {
         // create topic
         final PersistentTopic topic = (PersistentTopic) 
brokerService.getOrCreateTopic(successTopicName).get();
+        topic.initialize().join();
         CommandSubscribe cmd = new CommandSubscribe()
                 .setConsumerId(1)
                 .setTopic(successTopicName)
@@ -241,6 +243,7 @@ public class PersistentTopicConcurrentTest extends 
MockedBookKeeperTestCase {
     public void testConcurrentTopicDeleteAndUnsubscribe() throws Exception {
         // create topic
         final PersistentTopic topic = (PersistentTopic) 
brokerService.getOrCreateTopic(successTopicName).get();
+        topic.initialize().join();
         CommandSubscribe cmd = new CommandSubscribe()
                 .setConsumerId(1)
                 .setTopic(successTopicName)
@@ -299,6 +302,7 @@ public class PersistentTopicConcurrentTest extends 
MockedBookKeeperTestCase {
     public void testConcurrentTopicDeleteAndSubsUnsubscribe() throws Exception 
{
         // create topic
         final PersistentTopic topic = (PersistentTopic) 
brokerService.getOrCreateTopic(successTopicName).get();
+        topic.initialize().join();
         CommandSubscribe cmd = new CommandSubscribe()
                 .setConsumerId(1)
                 .setTopic(successTopicName)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index c49df3e85ce..078208f7e44 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -93,13 +93,13 @@ import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.broker.service.persistent.PulsarCompactorSubscription;
 import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.service.persistent.PulsarCompactorSubscription;
 import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -132,6 +132,7 @@ import org.apache.pulsar.compaction.CompactedTopic;
 import org.apache.pulsar.compaction.CompactedTopicContext;
 import org.apache.pulsar.compaction.Compactor;
 import org.apache.pulsar.compaction.CompactorMXBean;
+import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
 import org.awaitility.Awaitility;
@@ -630,6 +631,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     @Test
     public void testSubscribeFail() throws Exception {
         PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
+        topic.initialize().join();
 
         // Empty subscription name
         CommandSubscribe cmd = new CommandSubscribe()
@@ -666,6 +668,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     @Test
     public void testSubscribeUnsubscribe() throws Exception {
         PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
+        topic.initialize().join();
 
         CommandSubscribe cmd = new CommandSubscribe()
                 .setConsumerId(1)
@@ -1238,6 +1241,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     public void testDeleteAndUnsubscribeTopic() throws Exception {
         // create topic
         final PersistentTopic topic = (PersistentTopic) 
brokerService.getOrCreateTopic(successTopicName).get();
+        topic.initialize().join();
         CommandSubscribe cmd = new CommandSubscribe()
                 .setConsumerId(1)
                 .setTopic(successTopicName)
@@ -1347,6 +1351,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     @Test
     public void testDeleteTopicRaceConditions() throws Exception {
         PersistentTopic topic = (PersistentTopic) 
brokerService.getOrCreateTopic(successTopicName).get();
+        topic.initialize().join();
 
         // override ledger deletion callback to slow down deletion
         doAnswer(invocationOnMock -> {
@@ -1537,6 +1542,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
                 .setSubType(SubType.Failover);
 
         // 1. Subscribe with non partition topic
+        topic1.initialize().join();
         Future<Consumer> f1 = topic1.subscribe(getSubscriptionOption(cmd1));
         f1.get();
 
@@ -1552,6 +1558,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
                 .setRequestId(1)
                 .setSubType(SubType.Failover);
 
+        topic2.initialize();
         Future<Consumer> f2 = topic2.subscribe(getSubscriptionOption(cmd2));
         f2.get();
 
@@ -1826,7 +1833,8 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     @Test
     public void testCompactionTriggeredAfterThresholdFirstInvocation() throws 
Exception {
         CompletableFuture<Long> compactPromise = new CompletableFuture<>();
-        Compactor compactor = 
pulsarTestContext.getPulsarService().getCompactor();
+        Compactor compactor = ((PulsarCompactionServiceFactory) 
pulsarTestContext.getPulsarService()
+                .getCompactionServiceFactory()).getCompactor();
         doReturn(compactPromise).when(compactor).compact(anyString());
 
         Policies policies = new Policies();
@@ -1857,7 +1865,8 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     @Test
     public void testCompactionTriggeredAfterThresholdSecondInvocation() throws 
Exception {
         CompletableFuture<Long> compactPromise = new CompletableFuture<>();
-        Compactor compactor = 
pulsarTestContext.getPulsarService().getCompactor();
+        Compactor compactor = ((PulsarCompactionServiceFactory) 
pulsarTestContext.getPulsarService()
+                .getCompactionServiceFactory()).getCompactor();
         doReturn(compactPromise).when(compactor).compact(anyString());
 
         ManagedCursor subCursor = mock(ManagedCursor.class);
@@ -1891,7 +1900,8 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     @Test
     public void testCompactionDisabledWithZeroThreshold() throws Exception {
         CompletableFuture<Long> compactPromise = new CompletableFuture<>();
-        Compactor compactor = 
pulsarTestContext.getPulsarService().getCompactor();
+        Compactor compactor = ((PulsarCompactionServiceFactory) 
pulsarTestContext.getPulsarService()
+                .getCompactionServiceFactory()).getCompactor();
         doReturn(compactPromise).when(compactor).compact(anyString());
 
         Policies policies = new Policies();
@@ -2161,6 +2171,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
             return null;
         }).when(mockLedger).asyncOpenCursor(any(), any(), any(), any(), any(), 
any());
         PersistentTopic topic = new PersistentTopic(successTopicName, 
mockLedger, brokerService);
+        topic.initialize().join();
 
         CommandSubscribe cmd = new CommandSubscribe()
                 .setConsumerId(1)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
index 19583a4455e..a6656815281 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
@@ -51,6 +51,7 @@ import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.compaction.CompactionServiceFactory;
 import org.testng.annotations.Test;
 
 @Slf4j
@@ -235,6 +236,7 @@ public class MessageDuplicationTest {
 
         doReturn(serviceConfiguration).when(pulsarService).getConfiguration();
         
doReturn(mock(PulsarResources.class)).when(pulsarService).getPulsarResources();
+        
doReturn(mock(CompactionServiceFactory.class)).when(pulsarService).getCompactionServiceFactory();
 
         ManagedLedger managedLedger = mock(ManagedLedger.class);
         MessageDeduplication messageDeduplication = spy(new 
MessageDeduplication(pulsarService, mock(PersistentTopic.class), 
managedLedger));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index c4e41074a1a..31d468394ff 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -81,6 +81,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
 import org.apache.zookeeper.CreateMode;
 import org.awaitility.Awaitility;
 import org.mockito.Mockito;
@@ -1739,7 +1740,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
                     .value(data)
                     .send();
         }
-        Compactor compactor = pulsar.getCompactor();
+        Compactor compactor = 
((PulsarCompactionServiceFactory)pulsar.getCompactionServiceFactory()).getCompactor();
         compactor.compact(topicName).get();
         statsOut = new ByteArrayOutputStream();
         PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
index 517d57d0042..fcea99c7259 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
@@ -26,7 +26,7 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer;
-import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.CompactionServiceFactory;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@@ -38,11 +38,11 @@ import 
org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
  */
 abstract class AbstractTestPulsarService extends PulsarService {
     protected final SpyConfig spyConfig;
-    private boolean compactorExists;
 
     public AbstractTestPulsarService(SpyConfig spyConfig, ServiceConfiguration 
config,
                                      MetadataStoreExtended localMetadataStore,
-                                     MetadataStoreExtended 
configurationMetadataStore, Compactor compactor,
+                                     MetadataStoreExtended 
configurationMetadataStore,
+                                     CompactionServiceFactory 
compactionServiceFactory,
                                      BrokerInterceptor brokerInterceptor,
                                      BookKeeperClientFactory 
bookKeeperClientFactory) {
         super(config);
@@ -51,7 +51,7 @@ abstract class AbstractTestPulsarService extends 
PulsarService {
                 
NonClosingProxyHandler.createNonClosingProxy(localMetadataStore, 
MetadataStoreExtended.class));
         setConfigurationMetadataStore(
                 
NonClosingProxyHandler.createNonClosingProxy(configurationMetadataStore, 
MetadataStoreExtended.class));
-        setCompactor(compactor);
+        super.setCompactionServiceFactory(compactionServiceFactory);
         setBrokerInterceptor(brokerInterceptor);
         setBkClientFactory(bookKeeperClientFactory);
     }
@@ -76,23 +76,6 @@ abstract class AbstractTestPulsarService extends 
PulsarService {
         return getLocalMetadataStore();
     }
 
-    @Override
-    protected void setCompactor(Compactor compactor) {
-        if (compactor != null) {
-            compactorExists = true;
-        }
-        super.setCompactor(compactor);
-    }
-
-    @Override
-    public Compactor newCompactor() throws PulsarServerException {
-        if (compactorExists) {
-            return getCompactor();
-        } else {
-            return spyConfig.getCompactor().spy(super.newCompactor());
-        }
-    }
-
     @Override
     public BookKeeperClientFactory newBookKeeperClientFactory() {
         return getBkClientFactory();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/MockPulsarCompactionServiceFactory.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/MockPulsarCompactionServiceFactory.java
new file mode 100644
index 00000000000..77959fe6ce9
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/MockPulsarCompactionServiceFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.testcontext;
+
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
+
+public class MockPulsarCompactionServiceFactory extends 
PulsarCompactionServiceFactory {
+    private final Compactor compactor;
+    private final SpyConfig spyConfig;
+
+    public MockPulsarCompactionServiceFactory(SpyConfig spyConfig, Compactor 
compactor) {
+        this.compactor = compactor;
+        this.spyConfig = spyConfig;
+    }
+
+    @Override
+    protected Compactor newCompactor() throws PulsarServerException {
+        if (this.compactor != null) {
+            return this.compactor;
+        } else {
+            return spyConfig.getCompactor().spy(super.newCompactor());
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
index af365ed3193..2896f338e4a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
@@ -45,7 +45,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.CompactionServiceFactory;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 
@@ -60,13 +60,14 @@ class NonStartableTestPulsarService extends 
AbstractTestPulsarService {
     public NonStartableTestPulsarService(SpyConfig spyConfig, 
ServiceConfiguration config,
                                          MetadataStoreExtended 
localMetadataStore,
                                          MetadataStoreExtended 
configurationMetadataStore,
-                                         Compactor compactor, 
BrokerInterceptor brokerInterceptor,
+                                         CompactionServiceFactory 
compactionServiceFactory,
+                                         BrokerInterceptor brokerInterceptor,
                                          BookKeeperClientFactory 
bookKeeperClientFactory,
                                          PulsarResources pulsarResources,
                                          ManagedLedgerStorage 
managedLedgerClientFactory,
                                          Function<BrokerService, 
BrokerService> brokerServiceCustomizer) {
-        super(spyConfig, config, localMetadataStore, 
configurationMetadataStore, compactor, brokerInterceptor,
-                bookKeeperClientFactory);
+        super(spyConfig, config, localMetadataStore, 
configurationMetadataStore, compactionServiceFactory,
+                brokerInterceptor, bookKeeperClientFactory);
         setPulsarResources(pulsarResources);
         setManagedLedgerClientFactory(managedLedgerClientFactory);
         try {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
index f490ebd5ed2..40a42286fda 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
@@ -54,7 +54,9 @@ import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.ServerCnx;
 import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
 import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
+import org.apache.pulsar.compaction.CompactionServiceFactory;
 import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -135,6 +137,8 @@ public class PulsarTestContext implements AutoCloseable {
 
     private final Compactor compactor;
 
+    private final CompactionServiceFactory compactionServiceFactory;
+
     private final BrokerService brokerService;
 
     @Getter(AccessLevel.NONE)
@@ -659,10 +663,19 @@ public class PulsarTestContext implements AutoCloseable {
         protected void initializePulsarServices(SpyConfig spyConfig, Builder 
builder) {
             BookKeeperClientFactory bookKeeperClientFactory =
                     new MockBookKeeperClientFactory(builder.bookKeeperClient);
+            CompactionServiceFactory compactionServiceFactory = 
builder.compactionServiceFactory;
+            if (builder.compactionServiceFactory == null && 
builder.config.getCompactionServiceFactoryClassName()
+                    .equals(PulsarCompactionServiceFactory.class.getName())) {
+                compactionServiceFactory = new 
MockPulsarCompactionServiceFactory(spyConfig, builder.compactor);
+            }
             PulsarService pulsarService = spyConfig.getPulsarService()
                     .spy(StartableTestPulsarService.class, spyConfig, 
builder.config, builder.localMetadataStore,
-                            builder.configurationMetadataStore, 
builder.compactor, builder.brokerInterceptor,
+                            builder.configurationMetadataStore, 
compactionServiceFactory,
+                            builder.brokerInterceptor,
                             bookKeeperClientFactory, 
builder.brokerServiceCustomizer);
+            if (compactionServiceFactory != null) {
+                compactionServiceFactory.initialize(pulsarService);
+            }
             registerCloseable(() -> {
                 pulsarService.close();
                 resetSpyOrMock(pulsarService);
@@ -717,11 +730,20 @@ public class PulsarTestContext implements AutoCloseable {
             }
             BookKeeperClientFactory bookKeeperClientFactory =
                     new MockBookKeeperClientFactory(builder.bookKeeperClient);
+            CompactionServiceFactory compactionServiceFactory = 
builder.compactionServiceFactory;
+            if (builder.compactionServiceFactory == null && 
builder.config.getCompactionServiceFactoryClassName()
+                    .equals(PulsarCompactionServiceFactory.class.getName())) {
+                compactionServiceFactory = new 
MockPulsarCompactionServiceFactory(spyConfig, builder.compactor);
+            }
             PulsarService pulsarService = spyConfig.getPulsarService()
                     .spy(NonStartableTestPulsarService.class, spyConfig, 
builder.config, builder.localMetadataStore,
-                            builder.configurationMetadataStore, 
builder.compactor, builder.brokerInterceptor,
+                            builder.configurationMetadataStore, 
compactionServiceFactory,
+                            builder.brokerInterceptor,
                             bookKeeperClientFactory, builder.pulsarResources,
                             builder.managedLedgerClientFactory, 
builder.brokerServiceCustomizer);
+            if (compactionServiceFactory != null) {
+                compactionServiceFactory.initialize(pulsarService);
+            }
             registerCloseable(() -> {
                 pulsarService.close();
                 resetSpyOrMock(pulsarService);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/SpyConfig.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/SpyConfig.java
index de51cee8f24..8c42998ab0b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/SpyConfig.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/SpyConfig.java
@@ -98,9 +98,15 @@ public class SpyConfig {
      */
     private final SpyType bookKeeperClient;
     /**
-     * Spy configuration for {@link PulsarService#getCompactor()}.
+     * Spy configuration for {@link 
PulsarService#getCompactionServiceFactory#getCompactor()}.
      */
     private final SpyType compactor;
+
+    /**
+     * Spy configuration for {@link 
PulsarService#getCompactionServiceFactory()}.
+     */
+
+    private final SpyType compactedServiceFactory;
     /**
      * Spy configuration for {@link PulsarService#getNamespaceService()}.
      */
@@ -128,6 +134,7 @@ public class SpyConfig {
         spyConfigBuilder.brokerService(defaultSpyType);
         spyConfigBuilder.bookKeeperClient(defaultSpyType);
         spyConfigBuilder.compactor(defaultSpyType);
+        spyConfigBuilder.compactedServiceFactory(defaultSpyType);
         spyConfigBuilder.namespaceService(defaultSpyType);
         return spyConfigBuilder;
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/StartableTestPulsarService.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/StartableTestPulsarService.java
index 8a485a07496..a5964c4a55d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/StartableTestPulsarService.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/StartableTestPulsarService.java
@@ -28,7 +28,7 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.CompactionServiceFactory;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 
 /**
@@ -41,12 +41,12 @@ class StartableTestPulsarService extends 
AbstractTestPulsarService {
     public StartableTestPulsarService(SpyConfig spyConfig, 
ServiceConfiguration config,
                                       MetadataStoreExtended localMetadataStore,
                                       MetadataStoreExtended 
configurationMetadataStore,
-                                      Compactor compactor,
+                                      CompactionServiceFactory 
compactionServiceFactory,
                                       BrokerInterceptor brokerInterceptor,
                                       BookKeeperClientFactory 
bookKeeperClientFactory,
                                       Function<BrokerService, BrokerService> 
brokerServiceCustomizer) {
-        super(spyConfig, config, localMetadataStore, 
configurationMetadataStore, compactor, brokerInterceptor,
-                bookKeeperClientFactory);
+        super(spyConfig, config, localMetadataStore, 
configurationMetadataStore, compactionServiceFactory,
+                brokerInterceptor, bookKeeperClientFactory);
         this.brokerServiceCustomizer = brokerServiceCustomizer;
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index c4ec2ec766e..4e50401fc11 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -21,8 +21,8 @@ package org.apache.pulsar.broker.transaction;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static 
org.apache.pulsar.common.naming.SystemTopicNames.PENDING_ACK_STORE_CURSOR_NAME;
 import static 
org.apache.pulsar.common.naming.SystemTopicNames.PENDING_ACK_STORE_SUFFIX;
-import static 
org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
 import static 
org.apache.pulsar.transaction.coordinator.impl.DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS;
+import static 
org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.doAnswer;
@@ -42,12 +42,12 @@ import io.netty.util.Timeout;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
-import java.util.ArrayList;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Map;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -99,8 +99,8 @@ import 
org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
 import 
org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferProvider;
 import 
org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferRecoverCallBack;
 import 
org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
-import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
 import 
org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
+import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
 import 
org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
 import 
org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckReplyCallBack;
 import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
@@ -124,9 +124,9 @@ import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.ConsumerBase;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessagesImpl;
+import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
-import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.common.events.EventType;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.SystemTopicNames;
@@ -138,14 +138,16 @@ import 
org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.compaction.CompactionServiceFactory;
+import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
 import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
 import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
-import 
org.apache.pulsar.transaction.coordinator.impl.MLTransactionSequenceIdGenerator;
 import 
org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
+import 
org.apache.pulsar.transaction.coordinator.impl.MLTransactionSequenceIdGenerator;
 import 
org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
 import org.awaitility.Awaitility;
 import org.mockito.invocation.InvocationOnMock;
@@ -1571,6 +1573,9 @@ public class TransactionTest extends TransactionTestBase {
         
when(pulsar.getTransactionBufferSnapshotServiceFactory()).thenReturn(transactionBufferSnapshotServiceFactory);
         TopicTransactionBufferProvider topicTransactionBufferProvider = new 
TopicTransactionBufferProvider();
         
when(pulsar.getTransactionBufferProvider()).thenReturn(topicTransactionBufferProvider);
+        CompactionServiceFactory compactionServiceFactory = new 
PulsarCompactionServiceFactory();
+        compactionServiceFactory.initialize(pulsar);
+        
when(pulsar.getCompactionServiceFactory()).thenReturn(compactionServiceFactory);
         // Mock BacklogQuotaManager
         BacklogQuotaManager backlogQuotaManager = 
mock(BacklogQuotaManager.class);
         // Mock brokerService.
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index 957671b7f8d..96adc67a27b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -21,10 +21,8 @@ package org.apache.pulsar.compaction;
 import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
 import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
 import com.google.common.collect.Sets;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
-
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -36,9 +34,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.IntStream;
-
 import lombok.Cleanup;
-
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
@@ -47,7 +43,6 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
-import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.LongRunningProcessStatus;
@@ -665,7 +660,7 @@ public class CompactedTopicTest extends 
MockedPulsarServiceBaseTest {
 
         producer.newMessage().key("k").value(("value").getBytes()).send();
         producer.newMessage().key("k").value(null).send();
-        pulsar.getCompactor().compact(topic).get();
+        
((PulsarCompactionServiceFactory)pulsar.getCompactionServiceFactory()).getCompactor().compact(topic).get();
 
         Awaitility.await()
                 .pollInterval(3, TimeUnit.SECONDS)

Reply via email to