This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1cd1aef3c74 [improve][broker] PIP-192 Added 
ServiceUnitStateCompactionStrategy (#19045)
1cd1aef3c74 is described below

commit 1cd1aef3c74fac0a2ded99da05b658578d8481e7
Author: Heesung Sohn <[email protected]>
AuthorDate: Tue Jan 31 00:37:11 2023 -0800

    [improve][broker] PIP-192 Added ServiceUnitStateCompactionStrategy (#19045)
---
 .../org/apache/pulsar/broker/PulsarService.java    |  15 +
 .../extensions/channel/ServiceUnitState.java       |   4 +-
 .../channel/ServiceUnitStateChannelImpl.java       |  81 +-
 .../ServiceUnitStateCompactionStrategy.java        |  89 +++
 .../broker/service/persistent/PersistentTopic.java |  21 +-
 .../compaction/StrategicTwoPhaseCompactor.java     |   1 +
 .../channel/ServiceUnitStateChannelTest.java       |  84 ++-
 .../ServiceUnitStateCompactionStrategyTest.java    |  90 +++
 .../extensions/channel/ServiceUnitStateTest.java   |   4 +-
 .../compaction/ServiceUnitStateCompactionTest.java | 831 +++++++++++++++++++++
 .../apache/pulsar/client/impl/TableViewImpl.java   |   2 +-
 11 files changed, 1168 insertions(+), 54 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 1532b28343c..06b41e46636 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -146,6 +146,7 @@ import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.common.util.ThreadDumpUtil;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.StrategicTwoPhaseCompactor;
 import org.apache.pulsar.compaction.TwoPhaseCompactor;
 import org.apache.pulsar.functions.worker.ErrorNotifier;
 import org.apache.pulsar.functions.worker.WorkerConfig;
@@ -198,6 +199,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
     private TopicPoliciesService topicPoliciesService = 
TopicPoliciesService.DISABLED;
     private BookKeeperClientFactory bkClientFactory;
     private Compactor compactor;
+    private StrategicTwoPhaseCompactor strategicCompactor;
     private ResourceUsageTransportManager resourceUsageTransportManager;
     private ResourceGroupService resourceGroupServiceManager;
 
@@ -1473,6 +1475,19 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
         return this.compactor;
     }
 
+    public StrategicTwoPhaseCompactor newStrategicCompactor() throws 
PulsarServerException {
+        return new StrategicTwoPhaseCompactor(this.getConfiguration(),
+                getClient(), getBookKeeperClient(),
+                getCompactorExecutor());
+    }
+
+    public synchronized StrategicTwoPhaseCompactor getStrategicCompactor() 
throws PulsarServerException {
+        if (this.strategicCompactor == null) {
+            this.strategicCompactor = newStrategicCompactor();
+        }
+        return this.strategicCompactor;
+    }
+
     protected synchronized OrderedScheduler 
getOffloaderScheduler(OffloadPoliciesImpl offloadPolicies) {
         if (this.offloaderScheduler == null) {
             this.offloaderScheduler = OrderedScheduler.newSchedulerBuilder()
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitState.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitState.java
index cd1092a26ea..3225c0ba7bb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitState.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitState.java
@@ -66,7 +66,9 @@ public enum ServiceUnitState {
     Splitting; // the service unit(e.g. bundle) is in the process of splitting.
 
     private static Map<ServiceUnitState, Set<ServiceUnitState>> 
validTransitions = Map.of(
-            Free, Set.of(Owned, Assigned),
+            // (Free -> Released | Splitting) transitions are required
+            // when the topic is compacted in the middle of transfer or split.
+            Free, Set.of(Owned, Assigned, Released, Splitting),
             Owned, Set.of(Assigned, Splitting, Free),
             Assigned, Set.of(Owned, Released, Free),
             Released, Set.of(Owned, Free),
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index a476be974a3..38e8afa50f3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -43,7 +43,7 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.mutable.MutableInt;
@@ -101,7 +101,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     private long totalCleanupCnt = 0;
     private long totalBrokerCleanupTombstoneCnt = 0;
     private long totalServiceUnitCleanupTombstoneCnt = 0;
-    private long totalServiceUnitCleanupErrorCnt = 0;
+    private AtomicLong totalCleanupErrorCnt = new AtomicLong();
     private long totalCleanupScheduledCnt = 0;
     private long totalCleanupIgnoredCnt = 0;
     private long totalCleanupCancelledCnt = 0;
@@ -175,10 +175,11 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
             }
             tableview = pulsar.getClient().newTableViewBuilder(schema)
                     .topic(TOPIC)
-                    // TODO: enable CompactionStrategy
+                    .loadConf(Map.of(
+                            "topicCompactionStrategyClassName",
+                            
ServiceUnitStateCompactionStrategy.class.getName()))
                     .create();
-            // TODO: schedule listen instead of foreachAndListen
-            tableview.forEachAndListen((key, value) -> handle(key, value));
+            tableview.listen((key, value) -> handle(key, value));
             log.debug("Successfully started the channel tableview.");
 
             
pulsar.getLocalMetadataStore().registerSessionListener(this::handleMetadataSessionEvent);
@@ -332,8 +333,6 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         }
 
         ServiceUnitState state = data == null ? Free : data.state();
-
-        // TODO : Add state validation in tableview by the compaction strategy
         switch (state) {
             case Owned -> handleOwnEvent(serviceUnit, data);
             case Assigned -> handleAssignEvent(serviceUnit, data);
@@ -599,7 +598,16 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                     .delayedExecutor(delayInSecs, TimeUnit.SECONDS, 
pulsar.getLoadManagerExecutor());
             totalCleanupScheduledCnt++;
             return CompletableFuture
-                    .runAsync(() -> doCleanup(broker), delayed);
+                    .runAsync(() -> {
+                                try {
+                                    doCleanup(broker);
+                                } catch (Throwable e) {
+                                    log.error("Failed to run the cleanup job 
for the broker {}, "
+                                                    + 
"totalCleanupErrorCnt:{}.",
+                                            broker, 
totalCleanupErrorCnt.incrementAndGet(), e);
+                                }
+                            }
+                            , delayed);
         });
 
         log.info("Scheduled ownership cleanup for broker:{} with delay:{} 
secs. Pending clean jobs:{}.",
@@ -610,8 +618,8 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     private void doCleanup(String broker) {
         long startTime = System.nanoTime();
         log.info("Started ownership cleanup for the inactive broker:{}", 
broker);
-        AtomicInteger serviceUnitTombstoneCnt = new AtomicInteger();
-        AtomicInteger serviceUnitTombstoneErrorCnt = new AtomicInteger();
+        int serviceUnitTombstoneCnt = 0;
+        long totalCleanupErrorCntStart = totalCleanupErrorCnt.get();
         for (Map.Entry<String, ServiceUnitStateData> etr : 
tableview.entrySet()) {
             ServiceUnitStateData stateData = etr.getValue();
             String serviceUnit = etr.getKey();
@@ -619,14 +627,14 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                     || StringUtils.equals(broker, stateData.sourceBroker())) {
                 log.info("Cleaning ownership serviceUnit:{}, stateData:{}.", 
serviceUnit, stateData);
                 tombstoneAsync(serviceUnit).whenComplete((__, e) -> {
-                    if (e == null) {
-                        serviceUnitTombstoneCnt.incrementAndGet();
-                    } else {
-                        log.error("Failed cleaning the ownership 
serviceUnit:{}, stateData:{}.",
-                                serviceUnit, stateData);
-                        serviceUnitTombstoneErrorCnt.incrementAndGet();
+                    if (e != null) {
+                        log.error("Failed cleaning the ownership 
serviceUnit:{}, stateData:{}, "
+                                        + "cleanupErrorCnt:{}.",
+                                serviceUnit, stateData,
+                                totalCleanupErrorCnt.incrementAndGet() - 
totalCleanupErrorCntStart);
                     }
                 });
+                serviceUnitTombstoneCnt++;
             }
         }
 
@@ -636,26 +644,22 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
             log.error("Failed to flush the in-flight messages.", e);
         }
 
-        if (serviceUnitTombstoneCnt.get() > 0) {
+        if (serviceUnitTombstoneCnt > 0) {
             this.totalCleanupCnt++;
-            this.totalServiceUnitCleanupTombstoneCnt += 
serviceUnitTombstoneCnt.get();
+            this.totalServiceUnitCleanupTombstoneCnt += 
serviceUnitTombstoneCnt;
             this.totalBrokerCleanupTombstoneCnt++;
         }
 
-        if (serviceUnitTombstoneErrorCnt.get() > 0) {
-            this.totalServiceUnitCleanupErrorCnt += 
serviceUnitTombstoneErrorCnt.get();
-        }
-
         double cleanupTime = TimeUnit.NANOSECONDS
                 .toMillis((System.nanoTime() - startTime));
         // TODO: clean load data stores
         log.info("Completed a cleanup for the inactive broker:{} in {} ms. "
                         + "Published tombstone for orphan service units: 
serviceUnitTombstoneCnt:{}, "
-                        + "serviceUnitTombstoneErrorCnt:{}, metrics:{} ",
+                        + "approximate cleanupErrorCnt:{}, metrics:{} ",
                 broker,
                 cleanupTime,
                 serviceUnitTombstoneCnt,
-                serviceUnitTombstoneErrorCnt,
+                totalCleanupErrorCntStart - totalCleanupErrorCnt.get(),
                 printCleanupMetrics());
         cleanupJobs.remove(broker);
     }
@@ -675,8 +679,8 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         long startTime = System.nanoTime();
         Set<String> inactiveBrokers = new HashSet<>();
         Set<String> activeBrokers = new HashSet<>(brokers);
-        AtomicInteger serviceUnitTombstoneCnt = new AtomicInteger();
-        AtomicInteger serviceUnitTombstoneErrorCnt = new AtomicInteger();
+        int serviceUnitTombstoneCnt = 0;
+        long totalCleanupErrorCntStart = totalCleanupErrorCnt.get();
         long now = System.currentTimeMillis();
         for (Map.Entry<String, ServiceUnitStateData> etr : 
tableview.entrySet()) {
             String serviceUnit = etr.getKey();
@@ -690,14 +694,14 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                         serviceUnit, stateData);
 
                 tombstoneAsync(serviceUnit).whenComplete((__, e) -> {
-                    if (e == null) {
-                        serviceUnitTombstoneCnt.incrementAndGet();
-                    } else {
-                        log.error("Failed cleaning the ownership 
serviceUnit:{}, stateData:{}.",
-                                serviceUnit, stateData);
-                        serviceUnitTombstoneErrorCnt.incrementAndGet();
+                    if (e != null) {
+                        log.error("Failed cleaning the ownership 
serviceUnit:{}, stateData:{}, "
+                                        + "cleanupErrorCnt:{}.",
+                                serviceUnit, stateData,
+                                totalCleanupErrorCnt.incrementAndGet() - 
totalCleanupErrorCntStart);
                     }
                 });
+                serviceUnitTombstoneCnt++;
             }
         }
 
@@ -711,22 +715,21 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
             log.error("Failed to flush the in-flight messages.", e);
         }
 
-        if (serviceUnitTombstoneCnt.get() > 0) {
-            this.totalServiceUnitCleanupTombstoneCnt += 
serviceUnitTombstoneCnt.get();
+        if (serviceUnitTombstoneCnt > 0) {
+            this.totalServiceUnitCleanupTombstoneCnt += 
serviceUnitTombstoneCnt;
         }
-        this.totalServiceUnitCleanupErrorCnt += 
serviceUnitTombstoneErrorCnt.get();
 
         double monitorTime = TimeUnit.NANOSECONDS
                 .toMillis((System.nanoTime() - startTime));
         log.info("Completed the ownership monitor run in {} ms. "
                         + "Scheduled cleanups for inactiveBrokers:{}. 
inactiveBrokerCount:{}. "
                         + "Published tombstone for orphan service units: 
serviceUnitTombstoneCnt:{}, "
-                        + "serviceUnitTombstoneErrorCnt:{}, metrics:{} ",
+                        + "approximate cleanupErrorCnt:{}, metrics:{} ",
                 monitorTime,
                 inactiveBrokers,
                 inactiveBrokers.size(),
                 serviceUnitTombstoneCnt,
-                serviceUnitTombstoneErrorCnt,
+                totalCleanupErrorCntStart - totalCleanupErrorCnt.get(),
                 printCleanupMetrics());
 
     }
@@ -734,13 +737,13 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     private String printCleanupMetrics() {
         return String.format(
                 "{totalCleanupCnt:%d, totalBrokerCleanupTombstoneCnt:%d, "
-                        + "totalServiceUnitCleanupTombstoneCnt:%d, 
totalServiceUnitCleanupErrorCnt:%d, "
+                        + "totalServiceUnitCleanupTombstoneCnt:%d, 
totalCleanupErrorCnt:%d, "
                         + "totalCleanupScheduledCnt%d, 
totalCleanupIgnoredCnt:%d, totalCleanupCancelledCnt:%d, "
                         + "  activeCleanupJobs:%d}",
                 totalCleanupCnt,
                 totalBrokerCleanupTombstoneCnt,
                 totalServiceUnitCleanupTombstoneCnt,
-                totalServiceUnitCleanupErrorCnt,
+                totalCleanupErrorCnt.get(),
                 totalCleanupScheduledCnt,
                 totalCleanupIgnoredCnt,
                 totalCleanupCancelledCnt,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java
new file mode 100644
index 00000000000..2b21f830dda
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.channel;
+
+import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Assigned;
+import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Free;
+import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned;
+import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Released;
+import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Splitting;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.topics.TopicCompactionStrategy;
+
+public class ServiceUnitStateCompactionStrategy implements 
TopicCompactionStrategy<ServiceUnitStateData> {
+
+    private final Schema<ServiceUnitStateData> schema;
+
+    private boolean checkBrokers = true;
+
+    public ServiceUnitStateCompactionStrategy() {
+        schema = Schema.JSON(ServiceUnitStateData.class);
+    }
+
+    @Override
+    public Schema<ServiceUnitStateData> getSchema() {
+        return schema;
+    }
+
+    @VisibleForTesting
+    public void checkBrokers(boolean check) {
+        this.checkBrokers = check;
+    }
+
+    @Override
+    public boolean shouldKeepLeft(ServiceUnitStateData from, 
ServiceUnitStateData to) {
+        ServiceUnitState prevState = from == null ? Free : from.state();
+        ServiceUnitState state = to == null ? Free : to.state();
+        if (!ServiceUnitState.isValidTransition(prevState, state)) {
+            return true;
+        }
+
+        if (checkBrokers) {
+            if (prevState == Free && (state == Assigned || state == Owned)) {
+                // Free -> Assigned || Owned broker check
+                return StringUtils.isBlank(to.broker());
+            } else if (prevState == Owned && state == Assigned) {
+                // Owned -> Assigned(transfer) broker check
+                return !StringUtils.equals(from.broker(), to.sourceBroker())
+                        || StringUtils.isBlank(to.broker())
+                        || StringUtils.equals(from.broker(), to.broker());
+            } else if (prevState == Assigned && state == Released) {
+                // Assigned -> Released(transfer) broker check
+                return !StringUtils.equals(from.broker(), to.broker())
+                        || !StringUtils.equals(from.sourceBroker(), 
to.sourceBroker());
+            } else if (prevState == Released && state == Owned) {
+                // Released -> Owned(transfer) broker check
+                return !StringUtils.equals(from.broker(), to.broker())
+                        || !StringUtils.equals(from.sourceBroker(), 
to.sourceBroker());
+            } else if (prevState == Assigned && state == Owned) {
+                // Assigned -> Owned broker check
+                return !StringUtils.equals(from.broker(), to.broker())
+                        || !StringUtils.equals(from.sourceBroker(), 
to.sourceBroker());
+            } else if (prevState == Owned && state == Splitting) {
+                // Owned -> Splitting broker check
+                return !StringUtils.equals(from.broker(), to.broker());
+            }
+        }
+
+        return false;
+    }
+
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index dda9c89b726..d009d3778f2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -78,6 +78,8 @@ import org.apache.bookkeeper.net.BookieId;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
+import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
+import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateCompactionStrategy;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import 
org.apache.pulsar.broker.resources.NamespaceResources.PartitionedTopicResources;
 import org.apache.pulsar.broker.service.AbstractReplicator;
@@ -152,6 +154,7 @@ import 
org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
+import org.apache.pulsar.common.topics.TopicCompactionStrategy;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -203,6 +206,11 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     private CompletableFuture<Long> currentCompaction = 
CompletableFuture.completedFuture(COMPACTION_NEVER_RUN);
     private final CompactedTopic compactedTopic;
 
+    // TODO: Create compaction strategy from topic policy when exposing 
strategic compaction to users.
+    private static Map<String, TopicCompactionStrategy> strategicCompactionMap 
= Map.of(
+            ServiceUnitStateChannelImpl.TOPIC,
+            new ServiceUnitStateCompactionStrategy());
+
     private CompletableFuture<MessageIdImpl> currentOffload = 
CompletableFuture.completedFuture(
             (MessageIdImpl) MessageId.earliest);
 
@@ -1571,6 +1579,11 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 }
 
                 if (backlogEstimate > compactionThreshold) {
+                    if (log.isDebugEnabled()) {
+                        log.debug(
+                                "topic:{} backlogEstimate:{} is bigger than 
compactionThreshold:{}. Triggering "
+                                        + "compaction", topic, 
backlogEstimate, compactionThreshold);
+                    }
                     try {
                         triggerCompaction();
                     } catch (AlreadyRunningException are) {
@@ -3000,7 +3013,13 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     public synchronized void triggerCompaction()
             throws PulsarServerException, AlreadyRunningException {
         if (currentCompaction.isDone()) {
-            currentCompaction = 
brokerService.pulsar().getCompactor().compact(topic);
+
+            if (strategicCompactionMap.containsKey(topic)) {
+                currentCompaction = 
brokerService.pulsar().getStrategicCompactor()
+                        .compact(topic, strategicCompactionMap.get(topic));
+            } else {
+                currentCompaction = 
brokerService.pulsar().getCompactor().compact(topic);
+            }
         } else {
             throw new AlreadyRunningException("Compaction already in 
progress");
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
index bb0850efab4..9dc4ec649b6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
@@ -385,6 +385,7 @@ public class StrategicTwoPhaseCompactor extends 
TwoPhaseCompactor {
                             promise.completeExceptionally(e);
                             return;
                         }
+                        outstanding.release(MAX_OUTSTANDING);
                         promise.complete(null);
                         return;
                     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index ad4d0cb2f0b..a16c2be6612 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.loadbalance.extensions.channel;
 
+import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MAX_CLEAN_UP_DELAY_TIME_IN_SECS;
 import static 
org.apache.pulsar.metadata.api.extended.SessionEvent.ConnectionLost;
 import static org.apache.pulsar.metadata.api.extended.SessionEvent.Reconnected;
@@ -44,6 +45,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -51,6 +53,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.pulsar.broker.PulsarServerException;
@@ -60,6 +63,7 @@ import 
org.apache.pulsar.broker.loadbalance.LeaderElectionService;
 import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
 import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.impl.TableViewImpl;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
@@ -89,6 +93,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
     @Override
     protected void setup() throws Exception {
         conf.setAllowAutoTopicCreation(true);
+        conf.setBrokerServiceCompactionMonitorIntervalInSeconds(10);
         super.internalSetup(conf);
 
         admin.tenants().createTenant("pulsar", createDefaultTenantInfo());
@@ -289,8 +294,6 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         var ownerAddr2 = channel2.getOwnerAsync(bundle).get();
 
         assertEquals(ownerAddr1, ownerAddr2);
-        // TODO: check conflict resolution
-        // assertEquals(assignedAddr1, ownerAddr1);
         assertEquals(getOwnerRequests1.size(), 0);
         assertEquals(getOwnerRequests2.size(), 0);
     }
@@ -567,7 +570,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCnt"));
         assertEquals(1, getCleanupMetric(leaderChannel, 
"totalBrokerCleanupTombstoneCnt"));
         assertEquals(2, getCleanupMetric(leaderChannel, 
"totalServiceUnitCleanupTombstoneCnt"));
-        assertEquals(0, getCleanupMetric(leaderChannel, 
"totalServiceUnitCleanupErrorCnt"));
+        assertEquals(0, getCleanupMetric(leaderChannel, 
"totalCleanupErrorCnt"));
         assertEquals(1, getCleanupMetric(leaderChannel, 
"totalCleanupScheduledCnt"));
         assertEquals(0, getCleanupMetric(leaderChannel, 
"totalCleanupIgnoredCnt"));
         assertEquals(0, getCleanupMetric(leaderChannel, 
"totalCleanupCancelledCnt"));
@@ -592,7 +595,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCnt"));
         assertEquals(1, getCleanupMetric(leaderChannel, 
"totalBrokerCleanupTombstoneCnt"));
         assertEquals(2, getCleanupMetric(leaderChannel, 
"totalServiceUnitCleanupTombstoneCnt"));
-        assertEquals(0, getCleanupMetric(leaderChannel, 
"totalServiceUnitCleanupErrorCnt"));
+        assertEquals(0, getCleanupMetric(leaderChannel, 
"totalCleanupErrorCnt"));
         assertEquals(2, getCleanupMetric(leaderChannel, 
"totalCleanupScheduledCnt"));
         assertEquals(0, getCleanupMetric(leaderChannel, 
"totalCleanupIgnoredCnt"));
         assertEquals(0, getCleanupMetric(leaderChannel, 
"totalCleanupCancelledCnt"));
@@ -608,7 +611,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCnt"));
         assertEquals(1, getCleanupMetric(leaderChannel, 
"totalBrokerCleanupTombstoneCnt"));
         assertEquals(2, getCleanupMetric(leaderChannel, 
"totalServiceUnitCleanupTombstoneCnt"));
-        assertEquals(0, getCleanupMetric(leaderChannel, 
"totalServiceUnitCleanupErrorCnt"));
+        assertEquals(0, getCleanupMetric(leaderChannel, 
"totalCleanupErrorCnt"));
         assertEquals(2, getCleanupMetric(leaderChannel, 
"totalCleanupScheduledCnt"));
         assertEquals(0, getCleanupMetric(leaderChannel, 
"totalCleanupIgnoredCnt"));
         assertEquals(1, getCleanupMetric(leaderChannel, 
"totalCleanupCancelledCnt"));
@@ -626,7 +629,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCnt"));
         assertEquals(1, getCleanupMetric(leaderChannel, 
"totalBrokerCleanupTombstoneCnt"));
         assertEquals(2, getCleanupMetric(leaderChannel, 
"totalServiceUnitCleanupTombstoneCnt"));
-        assertEquals(0, getCleanupMetric(leaderChannel, 
"totalServiceUnitCleanupErrorCnt"));
+        assertEquals(0, getCleanupMetric(leaderChannel, 
"totalCleanupErrorCnt"));
         assertEquals(3, getCleanupMetric(leaderChannel, 
"totalCleanupScheduledCnt"));
         assertEquals(0, getCleanupMetric(leaderChannel, 
"totalCleanupIgnoredCnt"));
         assertEquals(1, getCleanupMetric(leaderChannel, 
"totalCleanupCancelledCnt"));
@@ -644,7 +647,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(2, getCleanupMetric(leaderChannel, "totalCleanupCnt"));
         assertEquals(2, getCleanupMetric(leaderChannel, 
"totalBrokerCleanupTombstoneCnt"));
         assertEquals(4, getCleanupMetric(leaderChannel, 
"totalServiceUnitCleanupTombstoneCnt"));
-        assertEquals(0, getCleanupMetric(leaderChannel, 
"totalServiceUnitCleanupErrorCnt"));
+        assertEquals(0, getCleanupMetric(leaderChannel, 
"totalCleanupErrorCnt"));
         assertEquals(3, getCleanupMetric(leaderChannel, 
"totalCleanupScheduledCnt"));
         assertEquals(0, getCleanupMetric(leaderChannel, 
"totalCleanupIgnoredCnt"));
         assertEquals(1, getCleanupMetric(leaderChannel, 
"totalCleanupCancelledCnt"));
@@ -669,7 +672,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(2, getCleanupMetric(leaderChannel, "totalCleanupCnt"));
         assertEquals(2, getCleanupMetric(leaderChannel, 
"totalBrokerCleanupTombstoneCnt"));
         assertEquals(4, getCleanupMetric(leaderChannel, 
"totalServiceUnitCleanupTombstoneCnt"));
-        assertEquals(0, getCleanupMetric(leaderChannel, 
"totalServiceUnitCleanupErrorCnt"));
+        assertEquals(0, getCleanupMetric(leaderChannel, 
"totalCleanupErrorCnt"));
         assertEquals(3, getCleanupMetric(leaderChannel, 
"totalCleanupScheduledCnt"));
         assertEquals(1, getCleanupMetric(leaderChannel, 
"totalCleanupIgnoredCnt"));
         assertEquals(1, getCleanupMetric(leaderChannel, 
"totalCleanupCancelledCnt"));
@@ -682,6 +685,62 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
                 true);
     }
 
+    @Test(priority = 10)
+    public void conflictAndCompactionTest() throws ExecutionException, 
InterruptedException, TimeoutException,
+            IllegalAccessException, PulsarClientException, 
PulsarServerException {
+
+        var producer = (Producer<ServiceUnitStateData>) 
FieldUtils.readDeclaredField(channel1, "producer", true);
+        producer.newMessage().key(bundle).send();
+        var owner1 = channel1.getOwnerAsync(bundle);
+        var owner2 = channel2.getOwnerAsync(bundle);
+        assertNull(owner1.get());
+        assertNull(owner2.get());
+
+        var assigned1 = channel1.publishAssignEventAsync(bundle, 
lookupServiceAddress1);
+        assertNotNull(assigned1);
+
+        waitUntilNewOwner(channel1, bundle, lookupServiceAddress1);
+        waitUntilNewOwner(channel2, bundle, lookupServiceAddress1);
+        String assignedAddr1 = assigned1.get(5, TimeUnit.SECONDS);
+        assertEquals(lookupServiceAddress1, assignedAddr1);
+
+        FieldUtils.writeDeclaredField(channel2,
+                "inFlightStateWaitingTimeInMillis", 3 * 1000, true);
+        var assigned2 = channel2.publishAssignEventAsync(bundle, 
lookupServiceAddress2);
+        assertNotNull(assigned2);
+        Exception ex = null;
+        try {
+            assigned2.join();
+        } catch (CompletionException e) {
+            ex = e;
+        }
+        assertNotNull(ex);
+        assertEquals(TimeoutException.class, ex.getCause().getClass());
+        assertEquals(lookupServiceAddress1, 
channel2.getOwnerAsync(bundle).get());
+        assertEquals(lookupServiceAddress1, 
channel1.getOwnerAsync(bundle).get());
+
+        var compactor = spy (pulsar1.getStrategicCompactor());
+        FieldUtils.writeDeclaredField(pulsar1, "strategicCompactor", 
compactor, true);
+        FieldUtils.writeDeclaredField(pulsar2, "strategicCompactor", 
compactor, true);
+        Awaitility.await()
+                .pollInterval(200, TimeUnit.MILLISECONDS)
+                .atMost(140, TimeUnit.SECONDS)
+                .untilAsserted(() -> verify(compactor, times(1))
+                        .compact(eq(ServiceUnitStateChannelImpl.TOPIC), 
any()));
+
+        var channel3 = new ServiceUnitStateChannelImpl(pulsar1);
+        channel3.start();
+        Awaitility.await()
+                .pollInterval(200, TimeUnit.MILLISECONDS)
+                .atMost(5, TimeUnit.SECONDS)
+                .untilAsserted(() -> assertEquals(
+                        channel3.getOwnerAsync(bundle).get(), 
lookupServiceAddress1));
+        channel3.close();
+        FieldUtils.writeDeclaredField(channel2,
+                "inFlightStateWaitingTimeInMillis", 30 * 1000, true);
+    }
+
+
     // TODO: add the channel recovery test when broker registry is added.
 
     private static ConcurrentOpenHashMap<String, 
CompletableFuture<Optional<String>>> getOwnerRequests(
@@ -768,7 +827,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
                     if (actual == null) {
                         return true;
                     } else {
-                        return actual.state() != ServiceUnitState.Owned;
+                        return actual.state() != Owned;
                     }
                 });
     }
@@ -784,6 +843,11 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
 
     private static long getCleanupMetric(ServiceUnitStateChannel channel, 
String metric)
             throws IllegalAccessException {
-        return (long) FieldUtils.readDeclaredField(channel, metric, true);
+        Object var = FieldUtils.readDeclaredField(channel, metric, true);
+        if (var instanceof AtomicLong) {
+            return ((AtomicLong) var).get();
+        } else {
+            return (long) var;
+        }
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategyTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategyTest.java
new file mode 100644
index 00000000000..49b55f7660a
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategyTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.channel;
+
+import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Assigned;
+import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Free;
+import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned;
+import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Released;
+import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Splitting;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.testng.Assert.assertTrue;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class ServiceUnitStateCompactionStrategyTest {
+    ServiceUnitStateCompactionStrategy strategy = new 
ServiceUnitStateCompactionStrategy();
+
+    ServiceUnitStateData data(ServiceUnitState state) {
+        return new ServiceUnitStateData(state, "broker");
+    }
+
+    ServiceUnitStateData data(ServiceUnitState state, String dst) {
+        return new ServiceUnitStateData(state, dst, "broker");
+    }
+    ServiceUnitStateData data(ServiceUnitState state, String src, String dst) {
+        return new ServiceUnitStateData(state, dst, src);
+    }
+
+    @Test
+    public void test() throws InterruptedException {
+        String dst = "dst";
+        assertTrue(strategy.shouldKeepLeft(data(Free), data(Free)));
+        assertFalse(strategy.shouldKeepLeft(data(Free), data(Assigned)));
+        assertTrue(strategy.shouldKeepLeft(data(Free), data(Assigned, "")));
+        assertFalse(strategy.shouldKeepLeft(data(Free), data(Owned)));
+        assertTrue(strategy.shouldKeepLeft(data(Free), data(Owned, "")));
+        assertFalse(strategy.shouldKeepLeft(data(Free), data(Released)));
+        assertFalse(strategy.shouldKeepLeft(data(Free), data(Splitting)));
+
+        assertFalse(strategy.shouldKeepLeft(data(Assigned), data(Free)));
+        assertTrue(strategy.shouldKeepLeft(data(Assigned), data(Assigned)));
+        assertTrue(strategy.shouldKeepLeft(data(Assigned, "dst2"), data(Owned, 
dst)));
+        assertTrue(strategy.shouldKeepLeft(data(Assigned, "src1", dst), 
data(Owned, "src2", dst)));
+        assertFalse(strategy.shouldKeepLeft(data(Assigned), data(Owned)));
+        assertTrue(strategy.shouldKeepLeft(data(Assigned, "dst2"), 
data(Released, dst)));
+        assertTrue(strategy.shouldKeepLeft(data(Assigned, "src1", dst), 
data(Released, "src2", dst)));
+        assertFalse(strategy.shouldKeepLeft(data(Assigned, dst), 
data(Released, dst)));
+        assertTrue(strategy.shouldKeepLeft(data(Assigned), data(Splitting)));
+
+        assertFalse(strategy.shouldKeepLeft(data(Owned), data(Free)));
+        assertTrue(strategy.shouldKeepLeft(data(Owned), data(Assigned)));
+        assertTrue(strategy.shouldKeepLeft(data(Owned), data(Assigned, "")));
+        assertTrue(strategy.shouldKeepLeft(data(Owned), data(Assigned, "src", 
dst)));
+        assertFalse(strategy.shouldKeepLeft(data(Owned), data(Assigned, dst)));
+        assertTrue(strategy.shouldKeepLeft(data(Owned), data(Owned)));
+        assertTrue(strategy.shouldKeepLeft(data(Owned), data(Released)));
+        assertTrue(strategy.shouldKeepLeft(data(Owned,"dst2"), data(Splitting, 
dst)));
+        assertFalse(strategy.shouldKeepLeft(data(Owned), data(Splitting)));
+
+        assertFalse(strategy.shouldKeepLeft(data(Released), data(Free)));
+        assertTrue(strategy.shouldKeepLeft(data(Released), data(Assigned)));
+        assertTrue(strategy.shouldKeepLeft(data(Released, "dst2"), data(Owned, 
dst)));
+        assertTrue(strategy.shouldKeepLeft(data(Released, "src1", dst), 
data(Owned, "src2", dst)));
+        assertFalse(strategy.shouldKeepLeft(data(Released), data(Owned)));
+        assertTrue(strategy.shouldKeepLeft(data(Released), data(Released)));
+        assertTrue(strategy.shouldKeepLeft(data(Released), data(Splitting)));
+
+        assertFalse(strategy.shouldKeepLeft(data(Splitting), data(Free)));
+        assertTrue(strategy.shouldKeepLeft(data(Splitting), data(Assigned)));
+        assertTrue(strategy.shouldKeepLeft(data(Splitting), data(Owned)));
+        assertTrue(strategy.shouldKeepLeft(data(Splitting), data(Released)));
+        assertTrue(strategy.shouldKeepLeft(data(Splitting), data(Splitting)));
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTest.java
index 304d1df29c9..69e6a2d204c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTest.java
@@ -36,8 +36,8 @@ public class ServiceUnitStateTest {
         assertFalse(ServiceUnitState.isValidTransition(Free, Free));
         assertTrue(ServiceUnitState.isValidTransition(Free, Assigned));
         assertTrue(ServiceUnitState.isValidTransition(Free, Owned));
-        assertFalse(ServiceUnitState.isValidTransition(Free, Released));
-        assertFalse(ServiceUnitState.isValidTransition(Free, Splitting));
+        assertTrue(ServiceUnitState.isValidTransition(Free, Released));
+        assertTrue(ServiceUnitState.isValidTransition(Free, Splitting));
 
         assertTrue(ServiceUnitState.isValidTransition(Assigned, Free));
         assertFalse(ServiceUnitState.isValidTransition(Assigned, Assigned));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java
new file mode 100644
index 00000000000..41eaa640d28
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java
@@ -0,0 +1,831 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Free;
+import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned;
+import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Assigned;
+import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Splitting;
+import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.isValidTransition;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.commons.lang.reflect.FieldUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
+import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateCompactionStrategy;
+import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
+
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-compaction")
+public class ServiceUnitStateCompactionTest extends 
MockedPulsarServiceBaseTest {
+    private ScheduledExecutorService compactionScheduler;
+    private BookKeeper bk;
+    private Schema<ServiceUnitStateData> schema;
+    private ServiceUnitStateCompactionStrategy strategy;
+
+    private ServiceUnitState testState0 = Free;
+    private ServiceUnitState testState1 = Free;
+    private ServiceUnitState testState2 = Free;
+    private ServiceUnitState testState3 = Free;
+    private ServiceUnitState testState4 = Free;
+
+    private static Random RANDOM = new Random();
+
+
+    private ServiceUnitStateData testValue(ServiceUnitState state, String 
broker) {
+        if (state == Free) {
+            return null;
+        }
+        return new ServiceUnitStateData(state, broker);
+    }
+
+    private ServiceUnitStateData testValue0(String broker) {
+        ServiceUnitState to = nextValidState(testState0);
+        testState0 = to;
+        return testValue(to, broker);
+    }
+
+    private ServiceUnitStateData testValue1(String broker) {
+        ServiceUnitState to = nextValidState(testState1);
+        testState1 = to;
+        return testValue(to, broker);
+    }
+
+    private ServiceUnitStateData testValue2(String broker) {
+        ServiceUnitState to = nextValidState(testState2);
+        testState2 = to;
+        return testValue(to, broker);
+    }
+
+    private ServiceUnitStateData testValue3(String broker) {
+        ServiceUnitState to = nextValidState(testState3);
+        testState3 = to;
+        return testValue(to, broker);
+    }
+
+    private ServiceUnitStateData testValue4(String broker) {
+        ServiceUnitState to = nextValidState(testState4);
+        testState4 = to;
+        return testValue(to, broker);
+    }
+
+    private ServiceUnitState nextValidState(ServiceUnitState from) {
+        List<ServiceUnitState> candidates = 
Arrays.stream(ServiceUnitState.values())
+                .filter(to -> to != Free && to != Splitting && 
isValidTransition(from, to))
+                .collect(Collectors.toList());
+        var state=  candidates.get(RANDOM.nextInt(candidates.size()));
+        return state;
+    }
+
+    private ServiceUnitState nextInvalidState(ServiceUnitState from) {
+        List<ServiceUnitState> candidates = 
Arrays.stream(ServiceUnitState.values())
+                .filter(to -> !isValidTransition(from, to))
+                .collect(Collectors.toList());
+        if (candidates.size() == 0) {
+            return null;
+        }
+        return candidates.get(RANDOM.nextInt(candidates.size()));
+    }
+
+    private List<ServiceUnitState> nextStatesToNull(ServiceUnitState from) {
+        if (from == null) {
+            return List.of();
+        }
+        return switch (from) {
+            case Assigned -> List.of(Owned);
+            case Owned -> List.of();
+            case Splitting -> List.of();
+            default -> List.of();
+        };
+    }
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        super.internalSetup();
+
+        admin.clusters().createCluster("use", 
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+        admin.tenants().createTenant("my-property",
+                new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), 
Sets.newHashSet("use")));
+        admin.namespaces().createNamespace("my-property/use/my-ns");
+
+        compactionScheduler = Executors.newSingleThreadScheduledExecutor(
+                new 
ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
+        bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null, 
Optional.empty(), null);
+        schema = Schema.JSON(ServiceUnitStateData.class);
+        strategy = new ServiceUnitStateCompactionStrategy();
+        strategy.checkBrokers(false);
+
+    }
+
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+
+        if (compactionScheduler != null) {
+            compactionScheduler.shutdownNow();
+        }
+    }
+
+
+    public record TestData(
+            String topic,
+            Map<String, ServiceUnitStateData> expected,
+            List<Pair<String, ServiceUnitStateData>> all) {
+
+    }
+    TestData generateTestData() throws PulsarAdminException, 
PulsarClientException {
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+        final int numMessages = 20;
+        final int maxKeys = 5;
+
+        // Configure retention to ensue data is retained for reader
+        admin.namespaces().setRetention("my-property/use/my-ns", new 
RetentionPolicies(-1, -1));
+
+        Producer<ServiceUnitStateData> producer = 
pulsarClient.newProducer(schema)
+                .topic(topic)
+                .enableBatching(true)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
+
+        Map<String, ServiceUnitStateData> expected = new HashMap<>();
+        List<Pair<String, ServiceUnitStateData>> all = new ArrayList<>();
+        Random r = new Random(0);
+
+        pulsarClient.newConsumer(schema)
+                .topic(topic)
+                .subscriptionName("sub1")
+                .readCompacted(true)
+                .subscribe().close();
+
+        for (int j = 0; j < numMessages; j++) {
+            int keyIndex = r.nextInt(maxKeys);
+            String key = "key" + keyIndex;
+            ServiceUnitStateData prev = expected.get(key);
+            ServiceUnitState prevState = prev == null ? Free : prev.state();
+            ServiceUnitState state = r.nextBoolean() ? 
nextInvalidState(prevState) :
+                    nextValidState(prevState);
+            ServiceUnitStateData value = new ServiceUnitStateData(state, key + 
":" + j);
+            producer.newMessage().key(key).value(value).send();
+            if (!strategy.shouldKeepLeft(prev, value)) {
+                expected.put(key, value);
+            }
+            all.add(Pair.of(key, value));
+        }
+        return new TestData(topic, expected, all);
+    }
+
+    @Test
+    public void testCompaction() throws Exception {
+        TestData testData = generateTestData();
+        var topic = testData.topic;
+        var expected = testData.expected;
+        var all = testData.all;
+
+        StrategicTwoPhaseCompactor compactor
+                = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+        compactor.compact(topic, strategy).get();
+
+        PersistentTopicInternalStats internalStats = 
admin.topics().getInternalStats(topic, false);
+        // Compacted topic ledger should have same number of entry equals to 
number of unique key.
+        //Assert.assertEquals(internalStats.compactedLedger.entries, 
expected.size());
+        Assert.assertTrue(internalStats.compactedLedger.ledgerId > -1);
+        Assert.assertFalse(internalStats.compactedLedger.offloaded);
+
+        // consumer with readCompacted enabled only get compacted entries
+        try (Consumer<ServiceUnitStateData> consumer = 
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
+                .readCompacted(true).subscribe()) {
+            while (true) {
+                Message<ServiceUnitStateData> m = consumer.receive(2, 
TimeUnit.SECONDS);
+                Assert.assertEquals(expected.remove(m.getKey()), m.getValue());
+                if (expected.isEmpty()) {
+                    break;
+                }
+            }
+            Assert.assertTrue(expected.isEmpty());
+        }
+
+        // can get full backlog if read compacted disabled
+        try (Consumer<ServiceUnitStateData> consumer = 
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
+                .readCompacted(false).subscribe()) {
+            while (true) {
+                Message<ServiceUnitStateData> m = consumer.receive(2, 
TimeUnit.SECONDS);
+                Pair<String, ServiceUnitStateData> expectedMessage = 
all.remove(0);
+                Assert.assertEquals(expectedMessage.getLeft(), m.getKey());
+                Assert.assertEquals(expectedMessage.getRight(), m.getValue());
+                if (all.isEmpty()) {
+                    break;
+                }
+            }
+            Assert.assertTrue(all.isEmpty());
+        }
+    }
+
+    @Test
+    public void testCompactionWithReader() throws Exception {
+        TestData testData = generateTestData();
+        var topic = testData.topic;
+        var expected = testData.expected;
+        var all = testData.all;
+
+        StrategicTwoPhaseCompactor compactor
+                = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+        compactor.compact(topic, strategy).get();
+
+        // consumer with readCompacted enabled only get compacted entries
+        try (Reader<ServiceUnitStateData> reader = 
pulsarClient.newReader(schema).topic(topic).readCompacted(true)
+                .startMessageId(MessageId.earliest).create()) {
+            while (true) {
+                Message<ServiceUnitStateData> m = reader.readNext(2, 
TimeUnit.SECONDS);
+                Assert.assertEquals(expected.remove(m.getKey()), m.getValue());
+                if (expected.isEmpty()) {
+                    break;
+                }
+            }
+            Assert.assertTrue(expected.isEmpty());
+        }
+
+        // can get full backlog if read compacted disabled
+        try (Reader<ServiceUnitStateData> reader = 
pulsarClient.newReader(schema).topic(topic).readCompacted(false)
+                .startMessageId(MessageId.earliest).create()) {
+            while (true) {
+                Message<ServiceUnitStateData> m = reader.readNext(2, 
TimeUnit.SECONDS);
+                Pair<String, ServiceUnitStateData> expectedMessage = 
all.remove(0);
+                Assert.assertEquals(expectedMessage.getLeft(), m.getKey());
+                Assert.assertEquals(expectedMessage.getRight(), m.getValue());
+                if (all.isEmpty()) {
+                    break;
+                }
+            }
+            Assert.assertTrue(all.isEmpty());
+        }
+    }
+
+
+    @Test
+    public void testCompactionWithTableview() throws Exception {
+        var tv = pulsar.getClient().newTableViewBuilder(schema)
+                .topic("persistent://my-property/use/my-ns/my-topic1")
+                .loadConf(Map.of(
+                        "topicCompactionStrategyClassName",
+                        ServiceUnitStateCompactionStrategy.class.getName()))
+                .create();
+
+        ((ServiceUnitStateCompactionStrategy)
+                FieldUtils.readDeclaredField(tv, "compactionStrategy", true))
+                .checkBrokers(false);
+        TestData testData = generateTestData();
+        var topic = testData.topic;
+        var expected = testData.expected;
+        var expectedCopy = new HashMap<>(expected);
+
+        Awaitility.await()
+                .pollInterval(200, TimeUnit.MILLISECONDS)
+                .atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> assertEquals(expectedCopy.size(), 
tv.size()));
+
+        for(var etr : tv.entrySet()){
+            Assert.assertEquals(expectedCopy.remove(etr.getKey()), 
etr.getValue());
+            if (expectedCopy.isEmpty()) {
+                break;
+            }
+        }
+
+        Assert.assertTrue(expectedCopy.isEmpty());
+        tv.close();;
+
+        StrategicTwoPhaseCompactor compactor
+                = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+        compactor.compact(topic, strategy).get();
+
+        // consumer with readCompacted enabled only get compacted entries
+        var tableview = pulsar.getClient().newTableViewBuilder(schema)
+                .topic(topic)
+                .loadConf(Map.of(
+                        "topicCompactionStrategyClassName",
+                        ServiceUnitStateCompactionStrategy.class.getName()))
+                .create();
+        for(var etr : tableview.entrySet()){
+            Assert.assertEquals(expected.remove(etr.getKey()), etr.getValue());
+            if (expected.isEmpty()) {
+                break;
+            }
+        }
+        Assert.assertTrue(expected.isEmpty());
+        tableview.close();
+
+    }
+
+
+    @Test
+    public void testReadCompactedBeforeCompaction() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+        Producer<ServiceUnitStateData> producer = 
pulsarClient.newProducer(schema)
+                .topic(topic)
+                .enableBatching(true)
+                .create();
+
+        
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
+
+        producer.newMessage().key("key0").value(testValue0( 
"content0")).send();
+        producer.newMessage().key("key0").value(testValue0("content1")).send();
+        producer.newMessage().key("key0").value(testValue0( 
"content2")).send();
+
+        try (Consumer<ServiceUnitStateData> consumer = 
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
+                .readCompacted(true).subscribe()) {
+            Message<ServiceUnitStateData> m = consumer.receive();
+            Assert.assertEquals(m.getKey(), "key0");
+            Assert.assertEquals(m.getValue().broker(), "content0");
+
+            m = consumer.receive();
+            Assert.assertEquals(m.getKey(), "key0");
+            Assert.assertEquals(m.getValue().broker(), "content1");
+
+            m = consumer.receive();
+            Assert.assertEquals(m.getKey(), "key0");
+            Assert.assertEquals(m.getValue().broker(), "content2");
+        }
+
+        StrategicTwoPhaseCompactor compactor
+                = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+        compactor.compact(topic, strategy).get();
+
+        try (Consumer<ServiceUnitStateData> consumer = 
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
+                .readCompacted(true).subscribe()) {
+            Message<ServiceUnitStateData> m = consumer.receive();
+            Assert.assertEquals(m.getKey(), "key0");
+            Assert.assertEquals(m.getValue().broker(), "content2");
+        }
+    }
+
+    @Test
+    public void testReadEntriesAfterCompaction() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+        Producer<ServiceUnitStateData> producer = 
pulsarClient.newProducer(schema)
+                .topic(topic)
+                .enableBatching(true)
+                .create();
+
+        
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
+
+        producer.newMessage().key("key0").value(testValue0( 
"content0")).send();
+        producer.newMessage().key("key0").value(testValue0("content1")).send();
+        producer.newMessage().key("key0").value(testValue0( 
"content2")).send();
+
+        StrategicTwoPhaseCompactor compactor
+                = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+        compactor.compact(topic, strategy).get();
+
+        producer.newMessage().key("key0").value(testValue0("content3")).send();
+
+        try (Consumer<ServiceUnitStateData> consumer = 
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
+                .readCompacted(true).subscribe()) {
+            Message<ServiceUnitStateData> m = consumer.receive();
+            Assert.assertEquals(m.getKey(), "key0");
+            Assert.assertEquals(m.getValue().broker(), "content2");
+
+            m = consumer.receive();
+            Assert.assertEquals(m.getKey(), "key0");
+            Assert.assertEquals(m.getValue().broker(), "content3");
+        }
+    }
+
+    @Test
+    public void testSeekEarliestAfterCompaction() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+        Producer<ServiceUnitStateData> producer = 
pulsarClient.newProducer(schema)
+                .topic(topic)
+                .enableBatching(true)
+                .create();
+
+        producer.newMessage().key("key0").value(testValue0( 
"content0")).send();
+        producer.newMessage().key("key0").value(testValue0("content1")).send();
+        producer.newMessage().key("key0").value(testValue0( 
"content2")).send();
+
+        StrategicTwoPhaseCompactor compactor
+                = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+        compactor.compact(topic, strategy).get();
+
+        try (Consumer<ServiceUnitStateData> consumer = 
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
+                .readCompacted(true).subscribe()) {
+            consumer.seek(MessageId.earliest);
+            Message<ServiceUnitStateData> m = consumer.receive();
+            Assert.assertEquals(m.getKey(), "key0");
+            Assert.assertEquals(m.getValue().broker(), "content2");
+        }
+
+        try (Consumer<ServiceUnitStateData> consumer = 
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
+                .readCompacted(false).subscribe()) {
+            consumer.seek(MessageId.earliest);
+
+            Message<ServiceUnitStateData> m = consumer.receive();
+            Assert.assertEquals(m.getKey(), "key0");
+            Assert.assertEquals(m.getValue().broker(), "content0");
+
+            m = consumer.receive();
+            Assert.assertEquals(m.getKey(), "key0");
+            Assert.assertEquals(m.getValue().broker(), "content1");
+
+            m = consumer.receive();
+            Assert.assertEquals(m.getKey(), "key0");
+            Assert.assertEquals(m.getValue().broker(), "content2");
+        }
+    }
+
+    @Test
+    public void testBrokerRestartAfterCompaction() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+        Producer<ServiceUnitStateData> producer = 
pulsarClient.newProducer(schema)
+                .topic(topic)
+                .enableBatching(true)
+                .create();
+
+        
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
+
+        producer.newMessage().key("key0").value(testValue0( 
"content0")).send();
+        producer.newMessage().key("key0").value(testValue0("content1")).send();
+        producer.newMessage().key("key0").value(testValue0( 
"content2")).send();
+
+        StrategicTwoPhaseCompactor compactor
+                = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+        compactor.compact(topic, strategy).get();
+
+        try (Consumer<ServiceUnitStateData> consumer = 
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
+                .readCompacted(true).subscribe()) {
+            Message<ServiceUnitStateData> m = consumer.receive();
+            Assert.assertEquals(m.getKey(), "key0");
+            Assert.assertEquals(m.getValue().broker(), "content2");
+        }
+
+        stopBroker();
+        try (Consumer<ServiceUnitStateData> consumer = 
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
+                .readCompacted(true).subscribe()) {
+            consumer.receive();
+            Assert.fail("Shouldn't have been able to receive anything");
+        } catch (PulsarClientException e) {
+            // correct behaviour
+        }
+        startBroker();
+
+        try (Consumer<ServiceUnitStateData> consumer = 
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
+                .readCompacted(true).subscribe()) {
+            Message<ServiceUnitStateData> m = consumer.receive();
+            Assert.assertEquals(m.getKey(), "key0");
+            Assert.assertEquals(m.getValue().broker(), "content2");
+        }
+    }
+
+    @Test
+    public void testCompactEmptyTopic() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+        Producer<ServiceUnitStateData> producer = 
pulsarClient.newProducer(schema)
+                .topic(topic)
+                .enableBatching(true)
+                .create();
+
+        
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
+
+        StrategicTwoPhaseCompactor compactor
+                = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+        compactor.compact(topic, strategy).get();
+
+        producer.newMessage().key("key0").value(testValue0( 
"content0")).send();
+
+        try (Consumer<ServiceUnitStateData> consumer = 
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
+                .readCompacted(true).subscribe()) {
+            Message<ServiceUnitStateData> m = consumer.receive();
+            Assert.assertEquals(m.getKey(), "key0");
+            Assert.assertEquals(m.getValue().broker(), "content0");
+        }
+    }
+
+    @Test
+    public void testWholeBatchCompactedOut() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+        // subscribe before sending anything, so that we get all messages
+        pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
+                .readCompacted(true).subscribe().close();
+
+        try (Producer<ServiceUnitStateData> producerNormal = 
pulsarClient.newProducer(schema).topic(topic)
+                .enableBatching(true)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
+             Producer<ServiceUnitStateData> producerBatch = 
pulsarClient.newProducer(schema).topic(topic)
+                     .maxPendingMessages(3)
+                     .enableBatching(true)
+                     .batchingMaxMessages(3)
+                     .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+                     .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                     .create()) {
+            
producerBatch.newMessage().key("key1").value(testValue1("my-message-1")).sendAsync();
+            producerBatch.newMessage().key("key1").value(testValue1( 
"my-message-2")).sendAsync();
+            
producerBatch.newMessage().key("key1").value(testValue1("my-message-3")).sendAsync();
+            producerNormal.newMessage().key("key1").value(testValue1( 
"my-message-4")).send();
+        }
+
+        // compact the topic
+        StrategicTwoPhaseCompactor compactor
+                = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+        compactor.compact(topic, strategy).get();
+
+        try (Consumer<ServiceUnitStateData> consumer = 
pulsarClient.newConsumer(schema).topic(topic)
+                .subscriptionName("sub1").readCompacted(true).subscribe()) {
+            Message<ServiceUnitStateData> message = consumer.receive();
+            Assert.assertEquals(message.getKey(), "key1");
+            Assert.assertEquals(new String(message.getValue().broker()), 
"my-message-4");
+        }
+    }
+
+    public void testCompactionWithLastDeletedKey() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+        Producer<ServiceUnitStateData> producer = 
pulsarClient.newProducer(schema).topic(topic).enableBatching(true)
+                
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
+
+        
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
+
+        producer.newMessage().key("1").value(testValue(Owned, "1")).send();
+        producer.newMessage().key("2").value(testValue(Owned, "3")).send();
+        producer.newMessage().key("3").value(testValue(Owned, "5")).send();
+        producer.newMessage().key("1").value(null).send();
+        producer.newMessage().key("2").value(null).send();
+
+        StrategicTwoPhaseCompactor compactor
+                = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+        compactor.compact(topic, strategy).get();
+
+        Set<String> expected = Sets.newHashSet("3");
+        // consumer with readCompacted enabled only get compacted entries
+        try (Consumer<ServiceUnitStateData> consumer = 
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
+                .readCompacted(true).subscribe()) {
+            Message<ServiceUnitStateData> m = consumer.receive(2, 
TimeUnit.SECONDS);
+            assertTrue(expected.remove(m.getKey()));
+        }
+    }
+
+    @Test(timeOut = 20000)
+    public void testEmptyCompactionLedger() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+        Producer<ServiceUnitStateData> producer = 
pulsarClient.newProducer(schema).topic(topic).enableBatching(true)
+                
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
+
+        
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
+
+        producer.newMessage().key("1").value(testValue(Owned, "1")).send();
+        producer.newMessage().key("2").value(testValue(Owned, "3")).send();
+        producer.newMessage().key("1").value(null).send();
+        producer.newMessage().key("2").value(null).send();
+
+        StrategicTwoPhaseCompactor compactor
+                = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+        compactor.compact(topic, strategy).get();
+
+        // consumer with readCompacted enabled only get compacted entries
+        try (Consumer<ServiceUnitStateData> consumer = 
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
+                .readCompacted(true).subscribe()) {
+            Message<ServiceUnitStateData> m = consumer.receive(2, 
TimeUnit.SECONDS);
+            assertNull(m);
+        }
+    }
+
+    @Test(timeOut = 20000)
+    public void testAllEmptyCompactionLedger() throws Exception {
+        final String topic =
+                
"persistent://my-property/use/my-ns/testAllEmptyCompactionLedger" + 
UUID.randomUUID().toString();
+
+        final int messages = 10;
+
+        // 1.create producer and publish message to the topic.
+        ProducerBuilder<ServiceUnitStateData> builder = 
pulsarClient.newProducer(schema).topic(topic);
+        builder.batchingMaxMessages(messages / 5);
+
+        Producer<ServiceUnitStateData> producer = builder.create();
+
+        List<CompletableFuture<MessageId>> futures = new ArrayList<>(messages);
+        for (int i = 0; i < messages; i++) {
+            
futures.add(producer.newMessage().key("1").value(null).sendAsync());
+        }
+
+        FutureUtil.waitForAll(futures).get();
+
+        // 2.compact the topic.
+        StrategicTwoPhaseCompactor compactor
+                = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+        compactor.compact(topic, strategy).get();
+
+        // consumer with readCompacted enabled only get compacted entries
+        try (Consumer<ServiceUnitStateData> consumer = 
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
+                
.readCompacted(true).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe())
 {
+            Message<ServiceUnitStateData> m = consumer.receive(2, 
TimeUnit.SECONDS);
+            assertNull(m);
+        }
+    }
+
+    @Test(timeOut = 20000)
+    public void testCompactMultipleTimesWithoutEmptyMessage()
+            throws PulsarClientException, ExecutionException, 
InterruptedException {
+        final String topic =
+                
"persistent://my-property/use/my-ns/testCompactMultipleTimesWithoutEmptyMessage"
 + UUID.randomUUID()
+                        .toString();
+
+        final int messages = 10;
+        final String key = "1";
+
+        // 1.create producer and publish message to the topic.
+        ProducerBuilder<ServiceUnitStateData> builder = 
pulsarClient.newProducer(schema).topic(topic);
+        builder.enableBatching(true);
+
+
+        Producer<ServiceUnitStateData> producer = builder.create();
+
+        List<CompletableFuture<MessageId>> futures = new ArrayList<>(messages);
+        for (int i = 0; i < messages; i++) {
+            futures.add(producer.newMessage().key(key).value(testValue0((i + 
""))).sendAsync());
+        }
+
+        FutureUtil.waitForAll(futures).get();
+
+        // 2.compact the topic.
+        StrategicTwoPhaseCompactor compactor
+                = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+        compactor.compact(topic, strategy).get();
+
+        // 3. Send more ten messages
+        futures.clear();
+        for (int i = 0; i < messages; i++) {
+            futures.add(producer.newMessage().key(key).value(testValue0((i + 
10 + ""))).sendAsync());
+        }
+        FutureUtil.waitForAll(futures).get();
+
+        // 4.compact again.
+        compactor.compact(topic, strategy).get();
+
+        try (Consumer<ServiceUnitStateData> consumer = 
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
+                
.readCompacted(true).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe())
 {
+            Message<ServiceUnitStateData> m1 = consumer.receive();
+            assertNotNull(m1);
+            assertEquals(m1.getKey(), key);
+            assertEquals(m1.getValue().broker(), "19");
+            Message<ServiceUnitStateData> none = consumer.receive(2, 
TimeUnit.SECONDS);
+            assertNull(none);
+        }
+    }
+
+    @Test(timeOut = 200000)
+    public void testReadUnCompacted()
+            throws PulsarClientException, ExecutionException, 
InterruptedException {
+        final String topic = 
"persistent://my-property/use/my-ns/testReadUnCompacted" + 
UUID.randomUUID().toString();
+
+        final int messages = 10;
+        final String key = "1";
+
+        // 1.create producer and publish message to the topic.
+        ProducerBuilder<ServiceUnitStateData> builder = 
pulsarClient.newProducer(schema).topic(topic);
+        builder.batchingMaxMessages(messages / 5);
+
+        Producer<ServiceUnitStateData> producer = builder.create();
+
+        List<CompletableFuture<MessageId>> futures = new ArrayList<>(messages);
+        for (int i = 0; i < messages; i++) {
+            futures.add(producer.newMessage().key(key).value(testValue0((i + 
""))).sendAsync());
+        }
+
+        FutureUtil.waitForAll(futures).get();
+
+        // 2.compact the topic.
+        StrategicTwoPhaseCompactor compactor
+                = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+        compactor.compact(topic, strategy).get();
+
+        // 3. Send more ten messages
+        futures.clear();
+        for (int i = 0; i < messages; i++) {
+            futures.add(producer.newMessage().key(key).value(testValue0((i + 
10 + ""))).sendAsync());
+        }
+        FutureUtil.waitForAll(futures).get();
+        try (Consumer<ServiceUnitStateData> consumer = 
pulsarClient.newConsumer(schema)
+                .topic(topic)
+                .subscriptionName("sub1")
+                .readCompacted(true)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe()) {
+            for (int i = 0; i < 11; i++) {
+                Message<ServiceUnitStateData> received = consumer.receive();
+                assertNotNull(received);
+                assertEquals(received.getKey(), key);
+                assertEquals(received.getValue().broker(), i + 9 + "");
+                consumer.acknowledge(received);
+            }
+            Message<ServiceUnitStateData> none = consumer.receive(2, 
TimeUnit.SECONDS);
+            assertNull(none);
+        }
+
+        // 4.Send empty message to delete the key-value in the compacted topic.
+        for (ServiceUnitState state : nextStatesToNull(testState0)) {
+            producer.newMessage().key(key).value(new 
ServiceUnitStateData(state, "xx")).send();
+        }
+        producer.newMessage().key(key).value(null).send();
+
+        // 5.compact the topic.
+        compactor.compact(topic, strategy).get();
+
+        try (Consumer<ServiceUnitStateData> consumer = 
pulsarClient.newConsumer(schema)
+                .topic(topic)
+                .subscriptionName("sub2")
+                .readCompacted(true)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe()) {
+            Message<ServiceUnitStateData> none = consumer.receive(2, 
TimeUnit.SECONDS);
+            assertNull(none);
+        }
+
+        for (int i = 0; i < messages; i++) {
+            futures.add(producer.newMessage().key(key).value(testValue0((i + 
20 + ""))).sendAsync());
+        }
+        FutureUtil.waitForAll(futures).get();
+
+        try (Consumer<ServiceUnitStateData> consumer = 
pulsarClient.newConsumer(schema)
+                .topic(topic)
+                .subscriptionName("sub3")
+                .readCompacted(true)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe()) {
+            for (int i = 0; i < 10; i++) {
+                Message<ServiceUnitStateData> received = consumer.receive();
+                assertNotNull(received);
+                assertEquals(received.getKey(), key);
+                assertEquals(received.getValue().broker(), i + 20 + "");
+                consumer.acknowledge(received);
+            }
+            Message<ServiceUnitStateData> none = consumer.receive(2, 
TimeUnit.SECONDS);
+            assertNull(none);
+        }
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
index 21792ef3893..81771126f76 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
@@ -188,9 +188,9 @@ public class TableViewImpl<T> implements TableView<T> {
                             cur);
                 }
 
-                T prev = data.get(key);
                 boolean update = true;
                 if (compactionStrategy != null) {
+                    T prev = data.get(key);
                     update = !compactionStrategy.shouldKeepLeft(prev, cur);
                 }
 

Reply via email to