This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a12f5541cee [improve][broker] PIP-192: Added VersionId in 
ServiceUnitStateData (#19620)
a12f5541cee is described below

commit a12f5541cee0d51fad654e97487edf1140de3286
Author: Heesung Sohn <[email protected]>
AuthorDate: Wed Mar 1 00:58:33 2023 -0800

    [improve][broker] PIP-192: Added VersionId in ServiceUnitStateData (#19620)
---
 .../channel/ServiceUnitStateChannelImpl.java       |  59 +++--
 .../ServiceUnitStateCompactionStrategy.java        |  11 +-
 .../extensions/channel/ServiceUnitStateData.java   |  15 +-
 .../channel/ServiceUnitStateChannelTest.java       |  12 +-
 .../ServiceUnitStateCompactionStrategyTest.java    | 241 +++++++++++++--------
 .../channel/ServiceUnitStateDataTest.java          |  20 +-
 .../compaction/ServiceUnitStateCompactionTest.java |  36 ++-
 7 files changed, 251 insertions(+), 143 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 9f205f85c54..4819a54fcad 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -103,7 +103,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
             NamespaceName.SYSTEM_NAMESPACE,
             "loadbalancer-service-unit-state").toString();
     private static final long MAX_IN_FLIGHT_STATE_WAITING_TIME_IN_MILLIS = 30 
* 1000; // 30sec
-
+    public static final long VERSION_ID_INIT = 1; // initial versionId
     private static final long OWNERSHIP_MONITOR_DELAY_TIME_IN_SECS = 60;
     public static final long MAX_CLEAN_UP_DELAY_TIME_IN_SECS = 3 * 60; // 3 
mins
     private static final long MIN_CLEAN_UP_DELAY_TIME_IN_SECS = 0; // 0 secs 
to clean immediately
@@ -451,11 +451,25 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         }
     }
 
+    private long getNextVersionId(String serviceUnit) {
+        var data = tableview.get(serviceUnit);
+        return getNextVersionId(data);
+    }
+
+    private long getNextVersionId(ServiceUnitStateData data) {
+        return data == null ? VERSION_ID_INIT : data.versionId() + 1;
+    }
+
     public CompletableFuture<String> publishAssignEventAsync(String 
serviceUnit, String broker) {
+        if (!validateChannelState(Started, true)) {
+            return CompletableFuture.failedFuture(
+                    new IllegalStateException("Invalid channel state:" + 
channelState.name()));
+        }
         EventType eventType = Assign;
         eventCounters.get(eventType).getTotal().incrementAndGet();
         CompletableFuture<String> getOwnerRequest = 
deferGetOwnerRequest(serviceUnit);
-        pubAsync(serviceUnit, new ServiceUnitStateData(Assigning, broker))
+
+        pubAsync(serviceUnit, new ServiceUnitStateData(Assigning, broker, 
getNextVersionId(serviceUnit)))
                 .whenComplete((__, ex) -> {
                     if (ex != null) {
                         getOwnerRequests.remove(serviceUnit, getOwnerRequest);
@@ -469,16 +483,20 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     }
 
     public CompletableFuture<Void> publishUnloadEventAsync(Unload unload) {
+        if (!validateChannelState(Started, true)) {
+            return CompletableFuture.failedFuture(
+                    new IllegalStateException("Invalid channel state:" + 
channelState.name()));
+        }
         EventType eventType = Unload;
         eventCounters.get(eventType).getTotal().incrementAndGet();
         String serviceUnit = unload.serviceUnit();
         CompletableFuture<MessageId> future;
         if (isTransferCommand(unload)) {
             future = pubAsync(serviceUnit, new ServiceUnitStateData(
-                    Assigning, unload.destBroker().get(), 
unload.sourceBroker()));
+                    Assigning, unload.destBroker().get(), 
unload.sourceBroker(), getNextVersionId(serviceUnit)));
         } else {
             future = pubAsync(serviceUnit, new ServiceUnitStateData(
-                    Releasing, unload.sourceBroker()));
+                    Releasing, unload.sourceBroker(), 
getNextVersionId(serviceUnit)));
         }
 
         return future.whenComplete((__, ex) -> {
@@ -489,10 +507,15 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     }
 
     public CompletableFuture<Void> publishSplitEventAsync(Split split) {
+        if (!validateChannelState(Started, true)) {
+            return CompletableFuture.failedFuture(
+                    new IllegalStateException("Invalid channel state:" + 
channelState.name()));
+        }
         EventType eventType = Split;
         eventCounters.get(eventType).getTotal().incrementAndGet();
         String serviceUnit = split.serviceUnit();
-        ServiceUnitStateData next = new ServiceUnitStateData(Splitting, 
split.sourceBroker());
+        ServiceUnitStateData next =
+                new ServiceUnitStateData(Splitting, split.sourceBroker(), 
getNextVersionId(serviceUnit));
         return pubAsync(serviceUnit, next).whenComplete((__, ex) -> {
             if (ex != null) {
                 eventCounters.get(eventType).getFailure().incrementAndGet();
@@ -599,7 +622,8 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     private void handleAssignEvent(String serviceUnit, ServiceUnitStateData 
data) {
         if (isTargetBroker(data.broker())) {
             ServiceUnitStateData next = new ServiceUnitStateData(
-                    isTransferCommand(data) ? Releasing : Owned, 
data.broker(), data.sourceBroker());
+                    isTransferCommand(data) ? Releasing : Owned, 
data.broker(), data.sourceBroker(),
+                    getNextVersionId(data));
             pubAsync(serviceUnit, next)
                     .whenComplete((__, e) -> log(e, serviceUnit, data, next));
         }
@@ -608,7 +632,8 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     private void handleReleaseEvent(String serviceUnit, ServiceUnitStateData 
data) {
         if (isTransferCommand(data)) {
             if (isTargetBroker(data.sourceBroker())) {
-                ServiceUnitStateData next = new ServiceUnitStateData(Owned, 
data.broker(), data.sourceBroker());
+                ServiceUnitStateData next =
+                        new ServiceUnitStateData(Owned, data.broker(), 
data.sourceBroker(), getNextVersionId(data));
                 // TODO: when close, pass message to clients to connect to the 
new broker
                 closeServiceUnit(serviceUnit)
                         .thenCompose(__ -> pubAsync(serviceUnit, next))
@@ -616,7 +641,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
             }
         } else {
             if (isTargetBroker(data.broker())) {
-                ServiceUnitStateData next = new ServiceUnitStateData(Free, 
data.broker());
+                ServiceUnitStateData next = new ServiceUnitStateData(Free, 
data.broker(), getNextVersionId(data));
                 closeServiceUnit(serviceUnit)
                         .thenCompose(__ -> pubAsync(serviceUnit, next))
                         .whenComplete((__, e) -> log(e, serviceUnit, data, 
next));
@@ -660,10 +685,6 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     }
 
     private CompletableFuture<MessageId> pubAsync(String serviceUnit, 
ServiceUnitStateData data) {
-        if (!validateChannelState(Started, true)) {
-            return CompletableFuture.failedFuture(
-                    new IllegalStateException("Invalid channel state:" + 
channelState.name()));
-        }
         CompletableFuture<MessageId> future = new CompletableFuture<>();
         producer.newMessage()
                 .key(serviceUnit)
@@ -774,7 +795,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                 updateFuture.completeExceptionally(new 
BrokerServiceException.ServiceUnitNotReadyException(msg));
                 return;
             }
-            ServiceUnitStateData next = new ServiceUnitStateData(Owned, 
data.broker());
+            ServiceUnitStateData next = new ServiceUnitStateData(Owned, 
data.broker(), VERSION_ID_INIT);
             NamespaceBundles targetNsBundle = splitBundlesPair.getLeft();
             List<NamespaceBundle> splitBundles = 
Collections.unmodifiableList(splitBundlesPair.getRight());
             List<NamespaceBundle> successPublishedBundles =
@@ -812,7 +833,8 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
 
         updateFuture.thenAccept(r -> {
             // Delete the old bundle
-            pubAsync(serviceUnit, new ServiceUnitStateData(Deleted, 
data.broker())).thenRun(() -> {
+            pubAsync(serviceUnit, new ServiceUnitStateData(Deleted, 
data.broker(), getNextVersionId(data)))
+                    .thenRun(() -> {
                 // Update bundled_topic cache for load-report-generation
                 pulsar.getBrokerService().refreshTopicToStatsMaps(bundle);
                 // TODO: Update the load data immediately if needed.
@@ -938,7 +960,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
 
         Optional<String> selectedBroker = 
brokerSelector.select(availableBrokers, null, getContext());
         if (selectedBroker.isPresent()) {
-            var override = new ServiceUnitStateData(Owned, 
selectedBroker.get(), true);
+            var override = new ServiceUnitStateData(Owned, 
selectedBroker.get(), true, getNextVersionId(orphanData));
             log.info("Overriding ownership serviceUnit:{} from orphanData:{} 
to overrideData:{}",
                     serviceUnit, orphanData, override);
             pubAsync(serviceUnit, override).whenComplete((__, e) -> {
@@ -1007,19 +1029,20 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     private Optional<ServiceUnitStateData> getOverrideStateData(String 
serviceUnit, ServiceUnitStateData orphanData,
                                                                 Set<String> 
availableBrokers,
                                                                 
LoadManagerContext context) {
+        long nextVersionId = getNextVersionId(orphanData);
         if (isTransferCommand(orphanData)) {
             // rollback to the src
-            return Optional.of(new ServiceUnitStateData(Owned, 
orphanData.sourceBroker(), true));
+            return Optional.of(new ServiceUnitStateData(Owned, 
orphanData.sourceBroker(), true, nextVersionId));
         } else if (orphanData.state() == Assigning) { // assign
             // roll-forward to another broker
             Optional<String> selectedBroker = 
brokerSelector.select(availableBrokers, null, context);
             if (selectedBroker.isEmpty()) {
                 return Optional.empty();
             }
-            return Optional.of(new ServiceUnitStateData(Owned, 
selectedBroker.get(), true));
+            return Optional.of(new ServiceUnitStateData(Owned, 
selectedBroker.get(), true, nextVersionId));
         } else if (orphanData.state() == Splitting || orphanData.state() == 
Releasing) {
             // rollback to the target broker for split and unload
-            return Optional.of(new ServiceUnitStateData(Owned, 
orphanData.broker(), true));
+            return Optional.of(new ServiceUnitStateData(Owned, 
orphanData.broker(), true, nextVersionId));
         } else {
             var msg = String.format("Failed to get the overrideStateData from 
serviceUnit=%s, orphanData=%s",
                     serviceUnit, orphanData);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java
index d2a585af9d9..8af0f0c027d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java
@@ -50,14 +50,19 @@ public class ServiceUnitStateCompactionStrategy implements 
TopicCompactionStrate
     public boolean shouldKeepLeft(ServiceUnitStateData from, 
ServiceUnitStateData to) {
         if (to == null) {
             return false;
-        } else if (to.force()) {
-            return false;
         }
 
+        // Skip the compaction case where from = null and to.versionId > 1
+        if (from != null && from.versionId() + 1 != to.versionId()) {
+            return true;
+        }
+
+        if (to.force()) {
+            return false;
+        }
 
         ServiceUnitState prevState = state(from);
         ServiceUnitState state = state(to);
-
         if (!ServiceUnitState.isValidTransition(prevState, state)) {
             return true;
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java
index 6a04431de64..ef25acff10a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.loadbalance.extensions.channel;
 
-
 import java.util.Objects;
 import org.apache.commons.lang3.StringUtils;
 
@@ -28,7 +27,7 @@ import org.apache.commons.lang3.StringUtils;
  */
 
 public record ServiceUnitStateData(
-        ServiceUnitState state, String broker, String sourceBroker, boolean 
force, long timestamp) {
+        ServiceUnitState state, String broker, String sourceBroker, boolean 
force, long timestamp, long versionId) {
 
     public ServiceUnitStateData {
         Objects.requireNonNull(state);
@@ -37,16 +36,16 @@ public record ServiceUnitStateData(
         }
     }
 
-    public ServiceUnitStateData(ServiceUnitState state, String broker, String 
sourceBroker) {
-        this(state, broker, sourceBroker, false, System.currentTimeMillis());
+    public ServiceUnitStateData(ServiceUnitState state, String broker, String 
sourceBroker, long versionId) {
+        this(state, broker, sourceBroker, false, System.currentTimeMillis(), 
versionId);
     }
 
-    public ServiceUnitStateData(ServiceUnitState state, String broker) {
-        this(state, broker, null, false, System.currentTimeMillis());
+    public ServiceUnitStateData(ServiceUnitState state, String broker, long 
versionId) {
+        this(state, broker, null, false, System.currentTimeMillis(), 
versionId);
     }
 
-    public ServiceUnitStateData(ServiceUnitState state, String broker, boolean 
force) {
-        this(state, broker, null, force, System.currentTimeMillis());
+    public ServiceUnitStateData(ServiceUnitState state, String broker, boolean 
force, long versionId) {
+        this(state, broker, null, force, System.currentTimeMillis(), 
versionId);
     }
 
     public static ServiceUnitState state(ServiceUnitStateData data) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index 49eee6ecb7a..6aa8fe38760 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -924,26 +924,26 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
     @Test(priority = 11)
     public void ownerLookupCountTests() throws IllegalAccessException {
 
-        overrideTableView(channel1, bundle, new 
ServiceUnitStateData(Assigning, "b1"));
+        overrideTableView(channel1, bundle, new 
ServiceUnitStateData(Assigning, "b1", 1));
         channel1.getOwnerAsync(bundle);
         channel1.getOwnerAsync(bundle);
 
-        overrideTableView(channel1, bundle, new ServiceUnitStateData(Owned, 
"b1"));
+        overrideTableView(channel1, bundle, new ServiceUnitStateData(Owned, 
"b1", 1));
         channel1.getOwnerAsync(bundle);
         channel1.getOwnerAsync(bundle);
         channel1.getOwnerAsync(bundle);
 
-        overrideTableView(channel1, bundle, new 
ServiceUnitStateData(Releasing, "b1"));
+        overrideTableView(channel1, bundle, new 
ServiceUnitStateData(Releasing, "b1", 1));
         channel1.getOwnerAsync(bundle);
         channel1.getOwnerAsync(bundle);
 
-        overrideTableView(channel1, bundle, new 
ServiceUnitStateData(Splitting, "b1"));
+        overrideTableView(channel1, bundle, new 
ServiceUnitStateData(Splitting, "b1", 1));
         channel1.getOwnerAsync(bundle);
 
-        overrideTableView(channel1, bundle, new ServiceUnitStateData(Free, 
"b1"));
+        overrideTableView(channel1, bundle, new ServiceUnitStateData(Free, 
"b1", 1));
         channel1.getOwnerAsync(bundle);
 
-        overrideTableView(channel1, bundle, new ServiceUnitStateData(Deleted, 
"b1"));
+        overrideTableView(channel1, bundle, new ServiceUnitStateData(Deleted, 
"b1", 1));
         channel1.getOwnerAsync(bundle);
         channel1.getOwnerAsync(bundle);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategyTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategyTest.java
index 1a4aba15f9e..0cd05d8bd75 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategyTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategyTest.java
@@ -32,110 +32,173 @@ import org.testng.annotations.Test;
 @Test(groups = "broker")
 public class ServiceUnitStateCompactionStrategyTest {
     ServiceUnitStateCompactionStrategy strategy = new 
ServiceUnitStateCompactionStrategy();
+    String dst = "dst";
+    String src = "src";
 
     ServiceUnitStateData data(ServiceUnitState state) {
-        return new ServiceUnitStateData(state, "broker");
+        return new ServiceUnitStateData(state, "broker", 1);
     }
 
     ServiceUnitStateData data(ServiceUnitState state, String dst) {
-        return new ServiceUnitStateData(state, dst, null);
+        return new ServiceUnitStateData(state, dst, null, 1);
     }
+
     ServiceUnitStateData data(ServiceUnitState state, String src, String dst) {
-        return new ServiceUnitStateData(state, dst, src);
+        return new ServiceUnitStateData(state, dst, src, 1);
+    }
+
+    ServiceUnitStateData data2(ServiceUnitState state) {
+        return new ServiceUnitStateData(state, "broker", 2);
+    }
+
+    ServiceUnitStateData data2(ServiceUnitState state, String dst) {
+        return new ServiceUnitStateData(state, dst, null, 2);
+    }
+
+    ServiceUnitStateData data2(ServiceUnitState state, String src, String dst) 
{
+        return new ServiceUnitStateData(state, dst, src, 2);
     }
 
     @Test
-    public void test() throws InterruptedException {
-        String dst = "dst";
-        String src = "src";
+    public void testVersionId(){
+        assertTrue(strategy.shouldKeepLeft(
+                new ServiceUnitStateData(Assigning, dst, 1),
+                new ServiceUnitStateData(Assigning, dst, 1)));
+
+        assertTrue(strategy.shouldKeepLeft(
+                new ServiceUnitStateData(Assigning, dst, 1),
+                new ServiceUnitStateData(Assigning, dst, 2)));
+
+        assertFalse(strategy.shouldKeepLeft(
+                new ServiceUnitStateData(Owned, dst, src, 10),
+                new ServiceUnitStateData(Assigning, "broker2", dst, 11)));
+
+        assertFalse(strategy.shouldKeepLeft(
+                new ServiceUnitStateData(Owned, dst, src, Long.MAX_VALUE),
+                new ServiceUnitStateData(Assigning, "broker2", dst, 
Long.MAX_VALUE + 1)));
+
+        assertFalse(strategy.shouldKeepLeft(
+                new ServiceUnitStateData(Owned, dst, src, Long.MAX_VALUE + 1),
+                new ServiceUnitStateData(Assigning, "broker2", dst, 
Long.MAX_VALUE + 2)));
 
+        assertTrue(strategy.shouldKeepLeft(
+                new ServiceUnitStateData(Owned, dst, src, 10),
+                new ServiceUnitStateData(Assigning, "broker2", dst, 5)));
+
+    }
+
+    @Test
+    public void testForce(){
         assertFalse(strategy.shouldKeepLeft(
-                new ServiceUnitStateData(Init, dst),
-                new ServiceUnitStateData(Init, dst, true)));
+                new ServiceUnitStateData(Init, dst, 1),
+                new ServiceUnitStateData(Init, dst, true, 2)));
 
+        assertTrue(strategy.shouldKeepLeft(
+                new ServiceUnitStateData(Init, dst, 1),
+                new ServiceUnitStateData(Init, dst, true, 1)));
+    }
+
+    @Test
+    public void testTombstone() {
+        assertFalse(strategy.shouldKeepLeft(
+                data(Init), null));
+        assertFalse(strategy.shouldKeepLeft(
+                data(Assigning), null));
         assertFalse(strategy.shouldKeepLeft(
                 data(Owned), null));
+        assertFalse(strategy.shouldKeepLeft(
+                data(Releasing), null));
+        assertFalse(strategy.shouldKeepLeft(
+                data(Splitting), null));
+        assertFalse(strategy.shouldKeepLeft(
+                data(Free), null));
+        assertFalse(strategy.shouldKeepLeft(
+                data(Deleted), null));
+    }
+
+    @Test
+    public void testTransitionsAndBrokers() {
+
+        assertTrue(strategy.shouldKeepLeft(data(Init), data2(Init)));
+        assertFalse(strategy.shouldKeepLeft(data(Init), data2(Free)));
+        assertFalse(strategy.shouldKeepLeft(data(Init), data2(Assigning)));
+        assertFalse(strategy.shouldKeepLeft(data(Init), data2(Owned)));
+        assertFalse(strategy.shouldKeepLeft(data(Init), data2(Releasing)));
+        assertFalse(strategy.shouldKeepLeft(data(Init), data2(Splitting)));
+        assertFalse(strategy.shouldKeepLeft(data(Init), data2(Deleted)));
+
+        assertTrue(strategy.shouldKeepLeft(data(Assigning), data2(Init)));
+        assertTrue(strategy.shouldKeepLeft(data(Assigning), data2(Free)));
+        assertTrue(strategy.shouldKeepLeft(data(Assigning), data2(Assigning)));
+        assertTrue(strategy.shouldKeepLeft(data(Assigning, "dst1"), 
data2(Owned, "dst2")));
+        assertTrue(strategy.shouldKeepLeft(data(Assigning, dst), data2(Owned, 
src, dst)));
+        assertFalse(strategy.shouldKeepLeft(data(Assigning, dst), data2(Owned, 
dst)));
+        assertTrue(strategy.shouldKeepLeft(data(Assigning, src, dst), 
data2(Releasing, dst)));
+        assertTrue(strategy.shouldKeepLeft(data(Assigning, src, "dst1"), 
data2(Releasing, src, "dst2")));
+        assertTrue(strategy.shouldKeepLeft(data(Assigning, "src1", dst), 
data2(Releasing, "src2", dst)));
+        assertFalse(strategy.shouldKeepLeft(data(Assigning, src, dst), 
data2(Releasing, src, dst)));
+        assertTrue(strategy.shouldKeepLeft(data(Assigning), data2(Splitting, 
dst)));
+        assertTrue(strategy.shouldKeepLeft(data(Assigning), data2(Deleted, 
dst)));
+
+        assertTrue(strategy.shouldKeepLeft(data(Owned), data2(Init)));
+        assertTrue(strategy.shouldKeepLeft(data(Owned), data2(Free)));
+        assertTrue(strategy.shouldKeepLeft(data(Owned, src, "dst1"), 
data2(Assigning, src, "dst2")));
+        assertTrue(strategy.shouldKeepLeft(data(Owned, src, dst), 
data2(Assigning, dst)));
+        assertTrue(strategy.shouldKeepLeft(data(Owned, src, dst), 
data2(Assigning, src, dst)));
+        assertTrue(strategy.shouldKeepLeft(data(Owned, src, dst), 
data2(Assigning, dst, dst)));
+        assertFalse(strategy.shouldKeepLeft(data(Owned, src, dst), 
data2(Assigning, dst, "dst1")));
+        assertTrue(strategy.shouldKeepLeft(data(Owned), data2(Owned)));
+        assertTrue(strategy.shouldKeepLeft(data(Owned), data2(Releasing, 
dst)));
+        assertTrue(strategy.shouldKeepLeft(data(Owned, src, "dst1"), 
data2(Releasing, src, "dst2")));
+        assertTrue(strategy.shouldKeepLeft(data(Owned, "dst1"), 
data2(Releasing, "dst2")));
+        assertFalse(strategy.shouldKeepLeft(data(Owned, dst), data2(Releasing, 
dst)));
+        assertFalse(strategy.shouldKeepLeft(data(Owned, src, dst), 
data2(Releasing, dst)));
+        assertTrue(strategy.shouldKeepLeft(data(Owned, src, "dst1"), 
data2(Splitting, src, "dst2")));
+        assertTrue(strategy.shouldKeepLeft(data(Owned, "dst1"), 
data2(Splitting, "dst2")));
+        assertFalse(strategy.shouldKeepLeft(data(Owned, dst), data2(Splitting, 
dst)));
+        assertFalse(strategy.shouldKeepLeft(data(Owned, src, dst), 
data2(Splitting, dst)));
+        assertTrue(strategy.shouldKeepLeft(data(Owned), data2(Deleted, dst)));
+
+        assertTrue(strategy.shouldKeepLeft(data(Releasing), data2(Init)));
+        assertFalse(strategy.shouldKeepLeft(data(Releasing), data2(Free)));
+        assertTrue(strategy.shouldKeepLeft(data(Releasing, "dst1"), 
data2(Free, "dst2")));
+        assertTrue(strategy.shouldKeepLeft(data(Releasing, "src1", dst), 
data2(Free, "src2", dst)));
+        assertTrue(strategy.shouldKeepLeft(data(Releasing), data2(Assigning)));
+        assertTrue(strategy.shouldKeepLeft(data(Releasing, "dst1"), 
data2(Owned, "dst2")));
+        assertTrue(strategy.shouldKeepLeft(data(Releasing, src, "dst1"), 
data2(Owned, src, "dst2")));
+        assertTrue(strategy.shouldKeepLeft(data(Releasing, "src1", dst), 
data2(Owned, "src2", dst)));
+        assertFalse(strategy.shouldKeepLeft(data(Releasing, src, dst), 
data2(Owned, src, dst)));
+        assertTrue(strategy.shouldKeepLeft(data(Releasing), data2(Releasing)));
+        assertTrue(strategy.shouldKeepLeft(data(Releasing), data2(Splitting)));
+        assertTrue(strategy.shouldKeepLeft(data(Releasing), data2(Deleted, 
dst)));
+
+        assertTrue(strategy.shouldKeepLeft(data(Splitting), data2(Init)));
+        assertTrue(strategy.shouldKeepLeft(data(Splitting), data2(Free)));
+        assertTrue(strategy.shouldKeepLeft(data(Splitting), data2(Assigning)));
+        assertTrue(strategy.shouldKeepLeft(data(Splitting), data2(Owned)));
+        assertTrue(strategy.shouldKeepLeft(data(Splitting), data2(Releasing)));
+        assertTrue(strategy.shouldKeepLeft(data(Splitting), data2(Splitting)));
+        assertTrue(strategy.shouldKeepLeft(data(Splitting, src, "dst1"), 
data2(Deleted, src, "dst2")));
+        assertTrue(strategy.shouldKeepLeft(data(Splitting, "dst1"), 
data2(Deleted, "dst2")));
+        assertTrue(strategy.shouldKeepLeft(data(Splitting, "src1", dst), 
data2(Deleted, "src2", dst)));
+        assertFalse(strategy.shouldKeepLeft(data(Splitting, dst), 
data2(Deleted, dst)));
+        assertFalse(strategy.shouldKeepLeft(data(Splitting, src, dst), 
data2(Deleted, src, dst)));
+
+        assertFalse(strategy.shouldKeepLeft(data(Deleted), data2(Init)));
+        assertTrue(strategy.shouldKeepLeft(data(Deleted), data2(Free)));
+        assertTrue(strategy.shouldKeepLeft(data(Deleted), data2(Assigning)));
+        assertTrue(strategy.shouldKeepLeft(data(Deleted), data2(Owned)));
+        assertTrue(strategy.shouldKeepLeft(data(Deleted), data2(Releasing)));
+        assertTrue(strategy.shouldKeepLeft(data(Deleted), data2(Splitting)));
+        assertTrue(strategy.shouldKeepLeft(data(Deleted), data2(Deleted)));
 
-        assertTrue(strategy.shouldKeepLeft(data(Init), data(Init)));
-        assertFalse(strategy.shouldKeepLeft(data(Init), data(Free)));
-        assertFalse(strategy.shouldKeepLeft(data(Init), data(Assigning)));
-        assertFalse(strategy.shouldKeepLeft(data(Init), data(Owned)));
-        assertFalse(strategy.shouldKeepLeft(data(Init), data(Releasing)));
-        assertFalse(strategy.shouldKeepLeft(data(Init), data(Splitting)));
-        assertFalse(strategy.shouldKeepLeft(data(Init), data(Deleted)));
-
-        assertTrue(strategy.shouldKeepLeft(data(Assigning), data(Init)));
-        assertTrue(strategy.shouldKeepLeft(data(Assigning), data(Free)));
-        assertTrue(strategy.shouldKeepLeft(data(Assigning), data(Assigning)));
-        assertTrue(strategy.shouldKeepLeft(data(Assigning, "dst1"), 
data(Owned, "dst2")));
-        assertTrue(strategy.shouldKeepLeft(data(Assigning, dst), data(Owned, 
src, dst)));
-        assertFalse(strategy.shouldKeepLeft(data(Assigning, dst), data(Owned, 
dst)));
-        assertTrue(strategy.shouldKeepLeft(data(Assigning, src, dst), 
data(Releasing, dst)));
-        assertTrue(strategy.shouldKeepLeft(data(Assigning, src, "dst1"), 
data(Releasing, src, "dst2")));
-        assertTrue(strategy.shouldKeepLeft(data(Assigning, "src1", dst), 
data(Releasing, "src2", dst)));
-        assertFalse(strategy.shouldKeepLeft(data(Assigning, src, dst), 
data(Releasing, src, dst)));
-        assertTrue(strategy.shouldKeepLeft(data(Assigning), data(Splitting, 
dst)));
-        assertTrue(strategy.shouldKeepLeft(data(Assigning), data(Deleted, 
dst)));
-
-        assertTrue(strategy.shouldKeepLeft(data(Owned), data(Init)));
-        assertTrue(strategy.shouldKeepLeft(data(Owned), data(Free)));
-        assertTrue(strategy.shouldKeepLeft(data(Owned, src, "dst1"), 
data(Assigning, src, "dst2")));
-        assertTrue(strategy.shouldKeepLeft(data(Owned, src, dst), 
data(Assigning, dst)));
-        assertTrue(strategy.shouldKeepLeft(data(Owned, src, dst), 
data(Assigning, src, dst)));
-        assertTrue(strategy.shouldKeepLeft(data(Owned, src, dst), 
data(Assigning, dst, dst)));
-        assertFalse(strategy.shouldKeepLeft(data(Owned, src, dst), 
data(Assigning, dst, "dst1")));
-        assertTrue(strategy.shouldKeepLeft(data(Owned), data(Owned)));
-        assertTrue(strategy.shouldKeepLeft(data(Owned), data(Releasing, dst)));
-        assertTrue(strategy.shouldKeepLeft(data(Owned, src, "dst1"), 
data(Releasing, src, "dst2")));
-        assertTrue(strategy.shouldKeepLeft(data(Owned, "dst1"), 
data(Releasing, "dst2")));
-        assertFalse(strategy.shouldKeepLeft(data(Owned, dst), data(Releasing, 
dst)));
-        assertFalse(strategy.shouldKeepLeft(data(Owned, src, dst), 
data(Releasing, dst)));
-        assertTrue(strategy.shouldKeepLeft(data(Owned, src, "dst1"), 
data(Splitting, src, "dst2")));
-        assertTrue(strategy.shouldKeepLeft(data(Owned, "dst1"), 
data(Splitting, "dst2")));
-        assertFalse(strategy.shouldKeepLeft(data(Owned, dst), data(Splitting, 
dst)));
-        assertFalse(strategy.shouldKeepLeft(data(Owned, src, dst), 
data(Splitting, dst)));
-        assertTrue(strategy.shouldKeepLeft(data(Owned), data(Deleted, dst)));
-
-        assertTrue(strategy.shouldKeepLeft(data(Releasing), data(Init)));
-        assertFalse(strategy.shouldKeepLeft(data(Releasing), data(Free)));
-        assertTrue(strategy.shouldKeepLeft(data(Releasing, "dst1"), data(Free, 
"dst2")));
-        assertTrue(strategy.shouldKeepLeft(data(Releasing, "src1", dst), 
data(Free, "src2", dst)));
-        assertTrue(strategy.shouldKeepLeft(data(Releasing), data(Assigning)));
-        assertTrue(strategy.shouldKeepLeft(data(Releasing, "dst1"), 
data(Owned, "dst2")));
-        assertTrue(strategy.shouldKeepLeft(data(Releasing, src, "dst1"), 
data(Owned, src, "dst2")));
-        assertTrue(strategy.shouldKeepLeft(data(Releasing, "src1", dst), 
data(Owned, "src2", dst)));
-        assertFalse(strategy.shouldKeepLeft(data(Releasing, src, dst), 
data(Owned, src, dst)));
-        assertTrue(strategy.shouldKeepLeft(data(Releasing), data(Releasing)));
-        assertTrue(strategy.shouldKeepLeft(data(Releasing), data(Splitting)));
-        assertTrue(strategy.shouldKeepLeft(data(Releasing), data(Deleted, 
dst)));
-
-        assertTrue(strategy.shouldKeepLeft(data(Splitting), data(Init)));
-        assertTrue(strategy.shouldKeepLeft(data(Splitting), data(Free)));
-        assertTrue(strategy.shouldKeepLeft(data(Splitting), data(Assigning)));
-        assertTrue(strategy.shouldKeepLeft(data(Splitting), data(Owned)));
-        assertTrue(strategy.shouldKeepLeft(data(Splitting), data(Releasing)));
-        assertTrue(strategy.shouldKeepLeft(data(Splitting), data(Splitting)));
-        assertTrue(strategy.shouldKeepLeft(data(Splitting, src, "dst1"), 
data(Deleted, src, "dst2")));
-        assertTrue(strategy.shouldKeepLeft(data(Splitting, "dst1"), 
data(Deleted, "dst2")));
-        assertTrue(strategy.shouldKeepLeft(data(Splitting, "src1", dst), 
data(Deleted, "src2", dst)));
-        assertFalse(strategy.shouldKeepLeft(data(Splitting, dst), 
data(Deleted, dst)));
-        assertFalse(strategy.shouldKeepLeft(data(Splitting, src, dst), 
data(Deleted, src, dst)));
-
-        assertFalse(strategy.shouldKeepLeft(data(Deleted), data(Init)));
-        assertTrue(strategy.shouldKeepLeft(data(Deleted), data(Free)));
-        assertTrue(strategy.shouldKeepLeft(data(Deleted), data(Assigning)));
-        assertTrue(strategy.shouldKeepLeft(data(Deleted), data(Owned)));
-        assertTrue(strategy.shouldKeepLeft(data(Deleted), data(Releasing)));
-        assertTrue(strategy.shouldKeepLeft(data(Deleted), data(Splitting)));
-        assertTrue(strategy.shouldKeepLeft(data(Deleted), data(Deleted)));
-
-        assertFalse(strategy.shouldKeepLeft(data(Free), data(Init)));
-        assertTrue(strategy.shouldKeepLeft(data(Free), data(Free)));
-        assertFalse(strategy.shouldKeepLeft(data(Free), data(Assigning)));
-        assertTrue(strategy.shouldKeepLeft(data(Free), data(Assigning, src, 
dst)));
-        assertTrue(strategy.shouldKeepLeft(data(Free), data(Owned)));
-        assertTrue(strategy.shouldKeepLeft(data(Free), data(Releasing)));
-        assertTrue(strategy.shouldKeepLeft(data(Free), data(Splitting)));
-        assertTrue(strategy.shouldKeepLeft(data(Free), data(Deleted)));
+        assertFalse(strategy.shouldKeepLeft(data(Free), data2(Init)));
+        assertTrue(strategy.shouldKeepLeft(data(Free), data2(Free)));
+        assertFalse(strategy.shouldKeepLeft(data(Free), data2(Assigning)));
+        assertTrue(strategy.shouldKeepLeft(data(Free), data2(Assigning, src, 
dst)));
+        assertTrue(strategy.shouldKeepLeft(data(Free), data2(Owned)));
+        assertTrue(strategy.shouldKeepLeft(data(Free), data2(Releasing)));
+        assertTrue(strategy.shouldKeepLeft(data(Free), data2(Splitting)));
+        assertTrue(strategy.shouldKeepLeft(data(Free), data2(Deleted)));
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateDataTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateDataTest.java
index 9617c8a8c2b..a48e2a4db8b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateDataTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateDataTest.java
@@ -33,15 +33,16 @@ public class ServiceUnitStateDataTest {
 
     @Test
     public void testConstructors() throws InterruptedException {
-        ServiceUnitStateData data1 = new ServiceUnitStateData(Owned, "A");
+        ServiceUnitStateData data1 = new ServiceUnitStateData(Owned, "A", 1);
         assertEquals(data1.state(), Owned);
         assertEquals(data1.broker(), "A");
         assertNull(data1.sourceBroker());
-        assertThat(data1.timestamp()).isGreaterThan(0);;
+        assertThat(data1.timestamp()).isGreaterThan(0);
+        ;
 
         Thread.sleep(10);
 
-        ServiceUnitStateData data2 = new ServiceUnitStateData(Assigning, "A", 
"B");
+        ServiceUnitStateData data2 = new ServiceUnitStateData(Assigning, "A", 
"B", 1);
         assertEquals(data2.state(), Assigning);
         assertEquals(data2.broker(), "A");
         assertEquals(data2.sourceBroker(), "B");
@@ -50,23 +51,28 @@ public class ServiceUnitStateDataTest {
 
     @Test(expectedExceptions = NullPointerException.class)
     public void testNullState() {
-        new ServiceUnitStateData(null, "A");
+        new ServiceUnitStateData(null, "A", 1);
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testNullBroker() {
-        new ServiceUnitStateData(Owned, null);
+        new ServiceUnitStateData(Owned, null, 1);
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testEmptyBroker() {
-        new ServiceUnitStateData(Owned, "");
+        new ServiceUnitStateData(Owned, "", 1);
+    }
+
+    @Test
+    public void testZeroVersionId() {
+        new ServiceUnitStateData(Owned, "A", Long.MAX_VALUE + 1);
     }
 
     @Test
     public void jsonWriteAndReadTest() throws JsonProcessingException {
         ObjectMapper mapper = ObjectMapperFactory.create();
-        final ServiceUnitStateData src = new ServiceUnitStateData(Assigning, 
"A", "B");
+        final ServiceUnitStateData src = new ServiceUnitStateData(Assigning, 
"A", "B", 1);
         String json = mapper.writeValueAsString(src);
         ServiceUnitStateData dst = mapper.readValue(json, 
ServiceUnitStateData.class);
         assertEquals(dst, src);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java
index 4c1d4f7d2a8..543b7c629ac 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java
@@ -54,6 +54,7 @@ import org.apache.commons.lang.reflect.FieldUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
+import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
 import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateCompactionStrategy;
 import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -88,20 +89,24 @@ public class ServiceUnitStateCompactionTest extends 
MockedPulsarServiceBaseTest
 
     private ServiceUnitState testState = Init;
 
+    private ServiceUnitStateData testData = null;
+
     private static Random RANDOM = new Random();
 
 
     private ServiceUnitStateData testValue(ServiceUnitState state, String 
broker) {
         if (state == Init) {
-            return null;
+            testData = null;
+        } else {
+            testData = new ServiceUnitStateData(state, broker, 
versionId(testData) + 1);
         }
-        return new ServiceUnitStateData(state, broker);
+
+        return testData;
     }
 
     private ServiceUnitStateData testValue(String broker) {
-        ServiceUnitState to = nextValidStateNonSplit(testState);
-        testState = to;
-        return testValue(to, broker);
+        testState = nextValidStateNonSplit(testState);
+        return testValue(testState, broker);
     }
 
     private ServiceUnitState nextValidState(ServiceUnitState from) {
@@ -149,6 +154,7 @@ public class ServiceUnitStateCompactionTest extends 
MockedPulsarServiceBaseTest
         strategy.checkBrokers(false);
 
         testState = Init;
+        testData = null;
     }
 
 
@@ -202,13 +208,14 @@ public class ServiceUnitStateCompactionTest extends 
MockedPulsarServiceBaseTest
             ServiceUnitState state = invalid ? nextInvalidState(prevState) :
                     nextValidState(prevState);
             ServiceUnitStateData value;
+            long versionId = versionId(prev) + 1;
             if (invalid) {
-                value = new ServiceUnitStateData(state, key + ":" + j, false);
+                value = new ServiceUnitStateData(state, key + ":" + j, false, 
versionId);
             } else {
                 if (state == Init) {
-                    value = new ServiceUnitStateData(state, key + ":" + j, 
true);
+                    value = new ServiceUnitStateData(state, key + ":" + j, 
true, versionId);
                 } else {
-                    value = new ServiceUnitStateData(state, key + ":" + j, 
false);
+                    value = new ServiceUnitStateData(state, key + ":" + j, 
false, versionId);
                 }
             }
 
@@ -560,15 +567,16 @@ public class ServiceUnitStateCompactionTest extends 
MockedPulsarServiceBaseTest
         String bundle = "bundle1";
         String src = "broker0";
         String dst = "broker1";
-        producer.newMessage().key(bundle).value(new 
ServiceUnitStateData(Owned, src)).send();
+        long versionId = 1;
+        producer.newMessage().key(bundle).value(new 
ServiceUnitStateData(Owned, src, versionId++)).send();
         for (int i = 0; i < 3; i++) {
-            var assignedStateData = new ServiceUnitStateData(Assigning, dst, 
src);
+            var assignedStateData = new ServiceUnitStateData(Assigning, dst, 
src, versionId++);
             producer.newMessage().key(bundle).value(assignedStateData).send();
             producer.newMessage().key(bundle).value(assignedStateData).send();
-            var releasedStateData = new ServiceUnitStateData(Releasing, dst, 
src);
+            var releasedStateData = new ServiceUnitStateData(Releasing, dst, 
src, versionId++);
             producer.newMessage().key(bundle).value(releasedStateData).send();
             producer.newMessage().key(bundle).value(releasedStateData).send();
-            var ownedStateData = new ServiceUnitStateData(Owned, dst, src);
+            var ownedStateData = new ServiceUnitStateData(Owned, dst, src, 
versionId++);
             producer.newMessage().key(bundle).value(ownedStateData).send();
             producer.newMessage().key(bundle).value(ownedStateData).send();
             compactor.compact(topic, strategy).get();
@@ -945,4 +953,8 @@ public class ServiceUnitStateCompactionTest extends 
MockedPulsarServiceBaseTest
             assertNull(none);
         }
     }
+
+    public static long versionId(ServiceUnitStateData data) {
+        return data == null ? ServiceUnitStateChannelImpl.VERSION_ID_INIT - 1 
: data.versionId();
+    }
 }


Reply via email to