This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a12f5541cee [improve][broker] PIP-192: Added VersionId in
ServiceUnitStateData (#19620)
a12f5541cee is described below
commit a12f5541cee0d51fad654e97487edf1140de3286
Author: Heesung Sohn <[email protected]>
AuthorDate: Wed Mar 1 00:58:33 2023 -0800
[improve][broker] PIP-192: Added VersionId in ServiceUnitStateData (#19620)
---
.../channel/ServiceUnitStateChannelImpl.java | 59 +++--
.../ServiceUnitStateCompactionStrategy.java | 11 +-
.../extensions/channel/ServiceUnitStateData.java | 15 +-
.../channel/ServiceUnitStateChannelTest.java | 12 +-
.../ServiceUnitStateCompactionStrategyTest.java | 241 +++++++++++++--------
.../channel/ServiceUnitStateDataTest.java | 20 +-
.../compaction/ServiceUnitStateCompactionTest.java | 36 ++-
7 files changed, 251 insertions(+), 143 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 9f205f85c54..4819a54fcad 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -103,7 +103,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
NamespaceName.SYSTEM_NAMESPACE,
"loadbalancer-service-unit-state").toString();
private static final long MAX_IN_FLIGHT_STATE_WAITING_TIME_IN_MILLIS = 30
* 1000; // 30sec
-
+ public static final long VERSION_ID_INIT = 1; // initial versionId
private static final long OWNERSHIP_MONITOR_DELAY_TIME_IN_SECS = 60;
public static final long MAX_CLEAN_UP_DELAY_TIME_IN_SECS = 3 * 60; // 3
mins
private static final long MIN_CLEAN_UP_DELAY_TIME_IN_SECS = 0; // 0 secs
to clean immediately
@@ -451,11 +451,25 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
}
}
+ private long getNextVersionId(String serviceUnit) {
+ var data = tableview.get(serviceUnit);
+ return getNextVersionId(data);
+ }
+
+ private long getNextVersionId(ServiceUnitStateData data) {
+ return data == null ? VERSION_ID_INIT : data.versionId() + 1;
+ }
+
public CompletableFuture<String> publishAssignEventAsync(String
serviceUnit, String broker) {
+ if (!validateChannelState(Started, true)) {
+ return CompletableFuture.failedFuture(
+ new IllegalStateException("Invalid channel state:" +
channelState.name()));
+ }
EventType eventType = Assign;
eventCounters.get(eventType).getTotal().incrementAndGet();
CompletableFuture<String> getOwnerRequest =
deferGetOwnerRequest(serviceUnit);
- pubAsync(serviceUnit, new ServiceUnitStateData(Assigning, broker))
+
+ pubAsync(serviceUnit, new ServiceUnitStateData(Assigning, broker,
getNextVersionId(serviceUnit)))
.whenComplete((__, ex) -> {
if (ex != null) {
getOwnerRequests.remove(serviceUnit, getOwnerRequest);
@@ -469,16 +483,20 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
}
public CompletableFuture<Void> publishUnloadEventAsync(Unload unload) {
+ if (!validateChannelState(Started, true)) {
+ return CompletableFuture.failedFuture(
+ new IllegalStateException("Invalid channel state:" +
channelState.name()));
+ }
EventType eventType = Unload;
eventCounters.get(eventType).getTotal().incrementAndGet();
String serviceUnit = unload.serviceUnit();
CompletableFuture<MessageId> future;
if (isTransferCommand(unload)) {
future = pubAsync(serviceUnit, new ServiceUnitStateData(
- Assigning, unload.destBroker().get(),
unload.sourceBroker()));
+ Assigning, unload.destBroker().get(),
unload.sourceBroker(), getNextVersionId(serviceUnit)));
} else {
future = pubAsync(serviceUnit, new ServiceUnitStateData(
- Releasing, unload.sourceBroker()));
+ Releasing, unload.sourceBroker(),
getNextVersionId(serviceUnit)));
}
return future.whenComplete((__, ex) -> {
@@ -489,10 +507,15 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
}
public CompletableFuture<Void> publishSplitEventAsync(Split split) {
+ if (!validateChannelState(Started, true)) {
+ return CompletableFuture.failedFuture(
+ new IllegalStateException("Invalid channel state:" +
channelState.name()));
+ }
EventType eventType = Split;
eventCounters.get(eventType).getTotal().incrementAndGet();
String serviceUnit = split.serviceUnit();
- ServiceUnitStateData next = new ServiceUnitStateData(Splitting,
split.sourceBroker());
+ ServiceUnitStateData next =
+ new ServiceUnitStateData(Splitting, split.sourceBroker(),
getNextVersionId(serviceUnit));
return pubAsync(serviceUnit, next).whenComplete((__, ex) -> {
if (ex != null) {
eventCounters.get(eventType).getFailure().incrementAndGet();
@@ -599,7 +622,8 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
private void handleAssignEvent(String serviceUnit, ServiceUnitStateData
data) {
if (isTargetBroker(data.broker())) {
ServiceUnitStateData next = new ServiceUnitStateData(
- isTransferCommand(data) ? Releasing : Owned,
data.broker(), data.sourceBroker());
+ isTransferCommand(data) ? Releasing : Owned,
data.broker(), data.sourceBroker(),
+ getNextVersionId(data));
pubAsync(serviceUnit, next)
.whenComplete((__, e) -> log(e, serviceUnit, data, next));
}
@@ -608,7 +632,8 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
private void handleReleaseEvent(String serviceUnit, ServiceUnitStateData
data) {
if (isTransferCommand(data)) {
if (isTargetBroker(data.sourceBroker())) {
- ServiceUnitStateData next = new ServiceUnitStateData(Owned,
data.broker(), data.sourceBroker());
+ ServiceUnitStateData next =
+ new ServiceUnitStateData(Owned, data.broker(),
data.sourceBroker(), getNextVersionId(data));
// TODO: when close, pass message to clients to connect to the
new broker
closeServiceUnit(serviceUnit)
.thenCompose(__ -> pubAsync(serviceUnit, next))
@@ -616,7 +641,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
}
} else {
if (isTargetBroker(data.broker())) {
- ServiceUnitStateData next = new ServiceUnitStateData(Free,
data.broker());
+ ServiceUnitStateData next = new ServiceUnitStateData(Free,
data.broker(), getNextVersionId(data));
closeServiceUnit(serviceUnit)
.thenCompose(__ -> pubAsync(serviceUnit, next))
.whenComplete((__, e) -> log(e, serviceUnit, data,
next));
@@ -660,10 +685,6 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
}
private CompletableFuture<MessageId> pubAsync(String serviceUnit,
ServiceUnitStateData data) {
- if (!validateChannelState(Started, true)) {
- return CompletableFuture.failedFuture(
- new IllegalStateException("Invalid channel state:" +
channelState.name()));
- }
CompletableFuture<MessageId> future = new CompletableFuture<>();
producer.newMessage()
.key(serviceUnit)
@@ -774,7 +795,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
updateFuture.completeExceptionally(new
BrokerServiceException.ServiceUnitNotReadyException(msg));
return;
}
- ServiceUnitStateData next = new ServiceUnitStateData(Owned,
data.broker());
+ ServiceUnitStateData next = new ServiceUnitStateData(Owned,
data.broker(), VERSION_ID_INIT);
NamespaceBundles targetNsBundle = splitBundlesPair.getLeft();
List<NamespaceBundle> splitBundles =
Collections.unmodifiableList(splitBundlesPair.getRight());
List<NamespaceBundle> successPublishedBundles =
@@ -812,7 +833,8 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
updateFuture.thenAccept(r -> {
// Delete the old bundle
- pubAsync(serviceUnit, new ServiceUnitStateData(Deleted,
data.broker())).thenRun(() -> {
+ pubAsync(serviceUnit, new ServiceUnitStateData(Deleted,
data.broker(), getNextVersionId(data)))
+ .thenRun(() -> {
// Update bundled_topic cache for load-report-generation
pulsar.getBrokerService().refreshTopicToStatsMaps(bundle);
// TODO: Update the load data immediately if needed.
@@ -938,7 +960,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
Optional<String> selectedBroker =
brokerSelector.select(availableBrokers, null, getContext());
if (selectedBroker.isPresent()) {
- var override = new ServiceUnitStateData(Owned,
selectedBroker.get(), true);
+ var override = new ServiceUnitStateData(Owned,
selectedBroker.get(), true, getNextVersionId(orphanData));
log.info("Overriding ownership serviceUnit:{} from orphanData:{}
to overrideData:{}",
serviceUnit, orphanData, override);
pubAsync(serviceUnit, override).whenComplete((__, e) -> {
@@ -1007,19 +1029,20 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
private Optional<ServiceUnitStateData> getOverrideStateData(String
serviceUnit, ServiceUnitStateData orphanData,
Set<String>
availableBrokers,
LoadManagerContext context) {
+ long nextVersionId = getNextVersionId(orphanData);
if (isTransferCommand(orphanData)) {
// rollback to the src
- return Optional.of(new ServiceUnitStateData(Owned,
orphanData.sourceBroker(), true));
+ return Optional.of(new ServiceUnitStateData(Owned,
orphanData.sourceBroker(), true, nextVersionId));
} else if (orphanData.state() == Assigning) { // assign
// roll-forward to another broker
Optional<String> selectedBroker =
brokerSelector.select(availableBrokers, null, context);
if (selectedBroker.isEmpty()) {
return Optional.empty();
}
- return Optional.of(new ServiceUnitStateData(Owned,
selectedBroker.get(), true));
+ return Optional.of(new ServiceUnitStateData(Owned,
selectedBroker.get(), true, nextVersionId));
} else if (orphanData.state() == Splitting || orphanData.state() ==
Releasing) {
// rollback to the target broker for split and unload
- return Optional.of(new ServiceUnitStateData(Owned,
orphanData.broker(), true));
+ return Optional.of(new ServiceUnitStateData(Owned,
orphanData.broker(), true, nextVersionId));
} else {
var msg = String.format("Failed to get the overrideStateData from
serviceUnit=%s, orphanData=%s",
serviceUnit, orphanData);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java
index d2a585af9d9..8af0f0c027d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java
@@ -50,14 +50,19 @@ public class ServiceUnitStateCompactionStrategy implements
TopicCompactionStrate
public boolean shouldKeepLeft(ServiceUnitStateData from,
ServiceUnitStateData to) {
if (to == null) {
return false;
- } else if (to.force()) {
- return false;
}
+ // Skip the compaction case where from = null and to.versionId > 1
+ if (from != null && from.versionId() + 1 != to.versionId()) {
+ return true;
+ }
+
+ if (to.force()) {
+ return false;
+ }
ServiceUnitState prevState = state(from);
ServiceUnitState state = state(to);
-
if (!ServiceUnitState.isValidTransition(prevState, state)) {
return true;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java
index 6a04431de64..ef25acff10a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.loadbalance.extensions.channel;
-
import java.util.Objects;
import org.apache.commons.lang3.StringUtils;
@@ -28,7 +27,7 @@ import org.apache.commons.lang3.StringUtils;
*/
public record ServiceUnitStateData(
- ServiceUnitState state, String broker, String sourceBroker, boolean
force, long timestamp) {
+ ServiceUnitState state, String broker, String sourceBroker, boolean
force, long timestamp, long versionId) {
public ServiceUnitStateData {
Objects.requireNonNull(state);
@@ -37,16 +36,16 @@ public record ServiceUnitStateData(
}
}
- public ServiceUnitStateData(ServiceUnitState state, String broker, String
sourceBroker) {
- this(state, broker, sourceBroker, false, System.currentTimeMillis());
+ public ServiceUnitStateData(ServiceUnitState state, String broker, String
sourceBroker, long versionId) {
+ this(state, broker, sourceBroker, false, System.currentTimeMillis(),
versionId);
}
- public ServiceUnitStateData(ServiceUnitState state, String broker) {
- this(state, broker, null, false, System.currentTimeMillis());
+ public ServiceUnitStateData(ServiceUnitState state, String broker, long
versionId) {
+ this(state, broker, null, false, System.currentTimeMillis(),
versionId);
}
- public ServiceUnitStateData(ServiceUnitState state, String broker, boolean
force) {
- this(state, broker, null, force, System.currentTimeMillis());
+ public ServiceUnitStateData(ServiceUnitState state, String broker, boolean
force, long versionId) {
+ this(state, broker, null, force, System.currentTimeMillis(),
versionId);
}
public static ServiceUnitState state(ServiceUnitStateData data) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index 49eee6ecb7a..6aa8fe38760 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -924,26 +924,26 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
@Test(priority = 11)
public void ownerLookupCountTests() throws IllegalAccessException {
- overrideTableView(channel1, bundle, new
ServiceUnitStateData(Assigning, "b1"));
+ overrideTableView(channel1, bundle, new
ServiceUnitStateData(Assigning, "b1", 1));
channel1.getOwnerAsync(bundle);
channel1.getOwnerAsync(bundle);
- overrideTableView(channel1, bundle, new ServiceUnitStateData(Owned,
"b1"));
+ overrideTableView(channel1, bundle, new ServiceUnitStateData(Owned,
"b1", 1));
channel1.getOwnerAsync(bundle);
channel1.getOwnerAsync(bundle);
channel1.getOwnerAsync(bundle);
- overrideTableView(channel1, bundle, new
ServiceUnitStateData(Releasing, "b1"));
+ overrideTableView(channel1, bundle, new
ServiceUnitStateData(Releasing, "b1", 1));
channel1.getOwnerAsync(bundle);
channel1.getOwnerAsync(bundle);
- overrideTableView(channel1, bundle, new
ServiceUnitStateData(Splitting, "b1"));
+ overrideTableView(channel1, bundle, new
ServiceUnitStateData(Splitting, "b1", 1));
channel1.getOwnerAsync(bundle);
- overrideTableView(channel1, bundle, new ServiceUnitStateData(Free,
"b1"));
+ overrideTableView(channel1, bundle, new ServiceUnitStateData(Free,
"b1", 1));
channel1.getOwnerAsync(bundle);
- overrideTableView(channel1, bundle, new ServiceUnitStateData(Deleted,
"b1"));
+ overrideTableView(channel1, bundle, new ServiceUnitStateData(Deleted,
"b1", 1));
channel1.getOwnerAsync(bundle);
channel1.getOwnerAsync(bundle);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategyTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategyTest.java
index 1a4aba15f9e..0cd05d8bd75 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategyTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategyTest.java
@@ -32,110 +32,173 @@ import org.testng.annotations.Test;
@Test(groups = "broker")
public class ServiceUnitStateCompactionStrategyTest {
ServiceUnitStateCompactionStrategy strategy = new
ServiceUnitStateCompactionStrategy();
+ String dst = "dst";
+ String src = "src";
ServiceUnitStateData data(ServiceUnitState state) {
- return new ServiceUnitStateData(state, "broker");
+ return new ServiceUnitStateData(state, "broker", 1);
}
ServiceUnitStateData data(ServiceUnitState state, String dst) {
- return new ServiceUnitStateData(state, dst, null);
+ return new ServiceUnitStateData(state, dst, null, 1);
}
+
ServiceUnitStateData data(ServiceUnitState state, String src, String dst) {
- return new ServiceUnitStateData(state, dst, src);
+ return new ServiceUnitStateData(state, dst, src, 1);
+ }
+
+ ServiceUnitStateData data2(ServiceUnitState state) {
+ return new ServiceUnitStateData(state, "broker", 2);
+ }
+
+ ServiceUnitStateData data2(ServiceUnitState state, String dst) {
+ return new ServiceUnitStateData(state, dst, null, 2);
+ }
+
+ ServiceUnitStateData data2(ServiceUnitState state, String src, String dst)
{
+ return new ServiceUnitStateData(state, dst, src, 2);
}
@Test
- public void test() throws InterruptedException {
- String dst = "dst";
- String src = "src";
+ public void testVersionId(){
+ assertTrue(strategy.shouldKeepLeft(
+ new ServiceUnitStateData(Assigning, dst, 1),
+ new ServiceUnitStateData(Assigning, dst, 1)));
+
+ assertTrue(strategy.shouldKeepLeft(
+ new ServiceUnitStateData(Assigning, dst, 1),
+ new ServiceUnitStateData(Assigning, dst, 2)));
+
+ assertFalse(strategy.shouldKeepLeft(
+ new ServiceUnitStateData(Owned, dst, src, 10),
+ new ServiceUnitStateData(Assigning, "broker2", dst, 11)));
+
+ assertFalse(strategy.shouldKeepLeft(
+ new ServiceUnitStateData(Owned, dst, src, Long.MAX_VALUE),
+ new ServiceUnitStateData(Assigning, "broker2", dst,
Long.MAX_VALUE + 1)));
+
+ assertFalse(strategy.shouldKeepLeft(
+ new ServiceUnitStateData(Owned, dst, src, Long.MAX_VALUE + 1),
+ new ServiceUnitStateData(Assigning, "broker2", dst,
Long.MAX_VALUE + 2)));
+ assertTrue(strategy.shouldKeepLeft(
+ new ServiceUnitStateData(Owned, dst, src, 10),
+ new ServiceUnitStateData(Assigning, "broker2", dst, 5)));
+
+ }
+
+ @Test
+ public void testForce(){
assertFalse(strategy.shouldKeepLeft(
- new ServiceUnitStateData(Init, dst),
- new ServiceUnitStateData(Init, dst, true)));
+ new ServiceUnitStateData(Init, dst, 1),
+ new ServiceUnitStateData(Init, dst, true, 2)));
+ assertTrue(strategy.shouldKeepLeft(
+ new ServiceUnitStateData(Init, dst, 1),
+ new ServiceUnitStateData(Init, dst, true, 1)));
+ }
+
+ @Test
+ public void testTombstone() {
+ assertFalse(strategy.shouldKeepLeft(
+ data(Init), null));
+ assertFalse(strategy.shouldKeepLeft(
+ data(Assigning), null));
assertFalse(strategy.shouldKeepLeft(
data(Owned), null));
+ assertFalse(strategy.shouldKeepLeft(
+ data(Releasing), null));
+ assertFalse(strategy.shouldKeepLeft(
+ data(Splitting), null));
+ assertFalse(strategy.shouldKeepLeft(
+ data(Free), null));
+ assertFalse(strategy.shouldKeepLeft(
+ data(Deleted), null));
+ }
+
+ @Test
+ public void testTransitionsAndBrokers() {
+
+ assertTrue(strategy.shouldKeepLeft(data(Init), data2(Init)));
+ assertFalse(strategy.shouldKeepLeft(data(Init), data2(Free)));
+ assertFalse(strategy.shouldKeepLeft(data(Init), data2(Assigning)));
+ assertFalse(strategy.shouldKeepLeft(data(Init), data2(Owned)));
+ assertFalse(strategy.shouldKeepLeft(data(Init), data2(Releasing)));
+ assertFalse(strategy.shouldKeepLeft(data(Init), data2(Splitting)));
+ assertFalse(strategy.shouldKeepLeft(data(Init), data2(Deleted)));
+
+ assertTrue(strategy.shouldKeepLeft(data(Assigning), data2(Init)));
+ assertTrue(strategy.shouldKeepLeft(data(Assigning), data2(Free)));
+ assertTrue(strategy.shouldKeepLeft(data(Assigning), data2(Assigning)));
+ assertTrue(strategy.shouldKeepLeft(data(Assigning, "dst1"),
data2(Owned, "dst2")));
+ assertTrue(strategy.shouldKeepLeft(data(Assigning, dst), data2(Owned,
src, dst)));
+ assertFalse(strategy.shouldKeepLeft(data(Assigning, dst), data2(Owned,
dst)));
+ assertTrue(strategy.shouldKeepLeft(data(Assigning, src, dst),
data2(Releasing, dst)));
+ assertTrue(strategy.shouldKeepLeft(data(Assigning, src, "dst1"),
data2(Releasing, src, "dst2")));
+ assertTrue(strategy.shouldKeepLeft(data(Assigning, "src1", dst),
data2(Releasing, "src2", dst)));
+ assertFalse(strategy.shouldKeepLeft(data(Assigning, src, dst),
data2(Releasing, src, dst)));
+ assertTrue(strategy.shouldKeepLeft(data(Assigning), data2(Splitting,
dst)));
+ assertTrue(strategy.shouldKeepLeft(data(Assigning), data2(Deleted,
dst)));
+
+ assertTrue(strategy.shouldKeepLeft(data(Owned), data2(Init)));
+ assertTrue(strategy.shouldKeepLeft(data(Owned), data2(Free)));
+ assertTrue(strategy.shouldKeepLeft(data(Owned, src, "dst1"),
data2(Assigning, src, "dst2")));
+ assertTrue(strategy.shouldKeepLeft(data(Owned, src, dst),
data2(Assigning, dst)));
+ assertTrue(strategy.shouldKeepLeft(data(Owned, src, dst),
data2(Assigning, src, dst)));
+ assertTrue(strategy.shouldKeepLeft(data(Owned, src, dst),
data2(Assigning, dst, dst)));
+ assertFalse(strategy.shouldKeepLeft(data(Owned, src, dst),
data2(Assigning, dst, "dst1")));
+ assertTrue(strategy.shouldKeepLeft(data(Owned), data2(Owned)));
+ assertTrue(strategy.shouldKeepLeft(data(Owned), data2(Releasing,
dst)));
+ assertTrue(strategy.shouldKeepLeft(data(Owned, src, "dst1"),
data2(Releasing, src, "dst2")));
+ assertTrue(strategy.shouldKeepLeft(data(Owned, "dst1"),
data2(Releasing, "dst2")));
+ assertFalse(strategy.shouldKeepLeft(data(Owned, dst), data2(Releasing,
dst)));
+ assertFalse(strategy.shouldKeepLeft(data(Owned, src, dst),
data2(Releasing, dst)));
+ assertTrue(strategy.shouldKeepLeft(data(Owned, src, "dst1"),
data2(Splitting, src, "dst2")));
+ assertTrue(strategy.shouldKeepLeft(data(Owned, "dst1"),
data2(Splitting, "dst2")));
+ assertFalse(strategy.shouldKeepLeft(data(Owned, dst), data2(Splitting,
dst)));
+ assertFalse(strategy.shouldKeepLeft(data(Owned, src, dst),
data2(Splitting, dst)));
+ assertTrue(strategy.shouldKeepLeft(data(Owned), data2(Deleted, dst)));
+
+ assertTrue(strategy.shouldKeepLeft(data(Releasing), data2(Init)));
+ assertFalse(strategy.shouldKeepLeft(data(Releasing), data2(Free)));
+ assertTrue(strategy.shouldKeepLeft(data(Releasing, "dst1"),
data2(Free, "dst2")));
+ assertTrue(strategy.shouldKeepLeft(data(Releasing, "src1", dst),
data2(Free, "src2", dst)));
+ assertTrue(strategy.shouldKeepLeft(data(Releasing), data2(Assigning)));
+ assertTrue(strategy.shouldKeepLeft(data(Releasing, "dst1"),
data2(Owned, "dst2")));
+ assertTrue(strategy.shouldKeepLeft(data(Releasing, src, "dst1"),
data2(Owned, src, "dst2")));
+ assertTrue(strategy.shouldKeepLeft(data(Releasing, "src1", dst),
data2(Owned, "src2", dst)));
+ assertFalse(strategy.shouldKeepLeft(data(Releasing, src, dst),
data2(Owned, src, dst)));
+ assertTrue(strategy.shouldKeepLeft(data(Releasing), data2(Releasing)));
+ assertTrue(strategy.shouldKeepLeft(data(Releasing), data2(Splitting)));
+ assertTrue(strategy.shouldKeepLeft(data(Releasing), data2(Deleted,
dst)));
+
+ assertTrue(strategy.shouldKeepLeft(data(Splitting), data2(Init)));
+ assertTrue(strategy.shouldKeepLeft(data(Splitting), data2(Free)));
+ assertTrue(strategy.shouldKeepLeft(data(Splitting), data2(Assigning)));
+ assertTrue(strategy.shouldKeepLeft(data(Splitting), data2(Owned)));
+ assertTrue(strategy.shouldKeepLeft(data(Splitting), data2(Releasing)));
+ assertTrue(strategy.shouldKeepLeft(data(Splitting), data2(Splitting)));
+ assertTrue(strategy.shouldKeepLeft(data(Splitting, src, "dst1"),
data2(Deleted, src, "dst2")));
+ assertTrue(strategy.shouldKeepLeft(data(Splitting, "dst1"),
data2(Deleted, "dst2")));
+ assertTrue(strategy.shouldKeepLeft(data(Splitting, "src1", dst),
data2(Deleted, "src2", dst)));
+ assertFalse(strategy.shouldKeepLeft(data(Splitting, dst),
data2(Deleted, dst)));
+ assertFalse(strategy.shouldKeepLeft(data(Splitting, src, dst),
data2(Deleted, src, dst)));
+
+ assertFalse(strategy.shouldKeepLeft(data(Deleted), data2(Init)));
+ assertTrue(strategy.shouldKeepLeft(data(Deleted), data2(Free)));
+ assertTrue(strategy.shouldKeepLeft(data(Deleted), data2(Assigning)));
+ assertTrue(strategy.shouldKeepLeft(data(Deleted), data2(Owned)));
+ assertTrue(strategy.shouldKeepLeft(data(Deleted), data2(Releasing)));
+ assertTrue(strategy.shouldKeepLeft(data(Deleted), data2(Splitting)));
+ assertTrue(strategy.shouldKeepLeft(data(Deleted), data2(Deleted)));
- assertTrue(strategy.shouldKeepLeft(data(Init), data(Init)));
- assertFalse(strategy.shouldKeepLeft(data(Init), data(Free)));
- assertFalse(strategy.shouldKeepLeft(data(Init), data(Assigning)));
- assertFalse(strategy.shouldKeepLeft(data(Init), data(Owned)));
- assertFalse(strategy.shouldKeepLeft(data(Init), data(Releasing)));
- assertFalse(strategy.shouldKeepLeft(data(Init), data(Splitting)));
- assertFalse(strategy.shouldKeepLeft(data(Init), data(Deleted)));
-
- assertTrue(strategy.shouldKeepLeft(data(Assigning), data(Init)));
- assertTrue(strategy.shouldKeepLeft(data(Assigning), data(Free)));
- assertTrue(strategy.shouldKeepLeft(data(Assigning), data(Assigning)));
- assertTrue(strategy.shouldKeepLeft(data(Assigning, "dst1"),
data(Owned, "dst2")));
- assertTrue(strategy.shouldKeepLeft(data(Assigning, dst), data(Owned,
src, dst)));
- assertFalse(strategy.shouldKeepLeft(data(Assigning, dst), data(Owned,
dst)));
- assertTrue(strategy.shouldKeepLeft(data(Assigning, src, dst),
data(Releasing, dst)));
- assertTrue(strategy.shouldKeepLeft(data(Assigning, src, "dst1"),
data(Releasing, src, "dst2")));
- assertTrue(strategy.shouldKeepLeft(data(Assigning, "src1", dst),
data(Releasing, "src2", dst)));
- assertFalse(strategy.shouldKeepLeft(data(Assigning, src, dst),
data(Releasing, src, dst)));
- assertTrue(strategy.shouldKeepLeft(data(Assigning), data(Splitting,
dst)));
- assertTrue(strategy.shouldKeepLeft(data(Assigning), data(Deleted,
dst)));
-
- assertTrue(strategy.shouldKeepLeft(data(Owned), data(Init)));
- assertTrue(strategy.shouldKeepLeft(data(Owned), data(Free)));
- assertTrue(strategy.shouldKeepLeft(data(Owned, src, "dst1"),
data(Assigning, src, "dst2")));
- assertTrue(strategy.shouldKeepLeft(data(Owned, src, dst),
data(Assigning, dst)));
- assertTrue(strategy.shouldKeepLeft(data(Owned, src, dst),
data(Assigning, src, dst)));
- assertTrue(strategy.shouldKeepLeft(data(Owned, src, dst),
data(Assigning, dst, dst)));
- assertFalse(strategy.shouldKeepLeft(data(Owned, src, dst),
data(Assigning, dst, "dst1")));
- assertTrue(strategy.shouldKeepLeft(data(Owned), data(Owned)));
- assertTrue(strategy.shouldKeepLeft(data(Owned), data(Releasing, dst)));
- assertTrue(strategy.shouldKeepLeft(data(Owned, src, "dst1"),
data(Releasing, src, "dst2")));
- assertTrue(strategy.shouldKeepLeft(data(Owned, "dst1"),
data(Releasing, "dst2")));
- assertFalse(strategy.shouldKeepLeft(data(Owned, dst), data(Releasing,
dst)));
- assertFalse(strategy.shouldKeepLeft(data(Owned, src, dst),
data(Releasing, dst)));
- assertTrue(strategy.shouldKeepLeft(data(Owned, src, "dst1"),
data(Splitting, src, "dst2")));
- assertTrue(strategy.shouldKeepLeft(data(Owned, "dst1"),
data(Splitting, "dst2")));
- assertFalse(strategy.shouldKeepLeft(data(Owned, dst), data(Splitting,
dst)));
- assertFalse(strategy.shouldKeepLeft(data(Owned, src, dst),
data(Splitting, dst)));
- assertTrue(strategy.shouldKeepLeft(data(Owned), data(Deleted, dst)));
-
- assertTrue(strategy.shouldKeepLeft(data(Releasing), data(Init)));
- assertFalse(strategy.shouldKeepLeft(data(Releasing), data(Free)));
- assertTrue(strategy.shouldKeepLeft(data(Releasing, "dst1"), data(Free,
"dst2")));
- assertTrue(strategy.shouldKeepLeft(data(Releasing, "src1", dst),
data(Free, "src2", dst)));
- assertTrue(strategy.shouldKeepLeft(data(Releasing), data(Assigning)));
- assertTrue(strategy.shouldKeepLeft(data(Releasing, "dst1"),
data(Owned, "dst2")));
- assertTrue(strategy.shouldKeepLeft(data(Releasing, src, "dst1"),
data(Owned, src, "dst2")));
- assertTrue(strategy.shouldKeepLeft(data(Releasing, "src1", dst),
data(Owned, "src2", dst)));
- assertFalse(strategy.shouldKeepLeft(data(Releasing, src, dst),
data(Owned, src, dst)));
- assertTrue(strategy.shouldKeepLeft(data(Releasing), data(Releasing)));
- assertTrue(strategy.shouldKeepLeft(data(Releasing), data(Splitting)));
- assertTrue(strategy.shouldKeepLeft(data(Releasing), data(Deleted,
dst)));
-
- assertTrue(strategy.shouldKeepLeft(data(Splitting), data(Init)));
- assertTrue(strategy.shouldKeepLeft(data(Splitting), data(Free)));
- assertTrue(strategy.shouldKeepLeft(data(Splitting), data(Assigning)));
- assertTrue(strategy.shouldKeepLeft(data(Splitting), data(Owned)));
- assertTrue(strategy.shouldKeepLeft(data(Splitting), data(Releasing)));
- assertTrue(strategy.shouldKeepLeft(data(Splitting), data(Splitting)));
- assertTrue(strategy.shouldKeepLeft(data(Splitting, src, "dst1"),
data(Deleted, src, "dst2")));
- assertTrue(strategy.shouldKeepLeft(data(Splitting, "dst1"),
data(Deleted, "dst2")));
- assertTrue(strategy.shouldKeepLeft(data(Splitting, "src1", dst),
data(Deleted, "src2", dst)));
- assertFalse(strategy.shouldKeepLeft(data(Splitting, dst),
data(Deleted, dst)));
- assertFalse(strategy.shouldKeepLeft(data(Splitting, src, dst),
data(Deleted, src, dst)));
-
- assertFalse(strategy.shouldKeepLeft(data(Deleted), data(Init)));
- assertTrue(strategy.shouldKeepLeft(data(Deleted), data(Free)));
- assertTrue(strategy.shouldKeepLeft(data(Deleted), data(Assigning)));
- assertTrue(strategy.shouldKeepLeft(data(Deleted), data(Owned)));
- assertTrue(strategy.shouldKeepLeft(data(Deleted), data(Releasing)));
- assertTrue(strategy.shouldKeepLeft(data(Deleted), data(Splitting)));
- assertTrue(strategy.shouldKeepLeft(data(Deleted), data(Deleted)));
-
- assertFalse(strategy.shouldKeepLeft(data(Free), data(Init)));
- assertTrue(strategy.shouldKeepLeft(data(Free), data(Free)));
- assertFalse(strategy.shouldKeepLeft(data(Free), data(Assigning)));
- assertTrue(strategy.shouldKeepLeft(data(Free), data(Assigning, src,
dst)));
- assertTrue(strategy.shouldKeepLeft(data(Free), data(Owned)));
- assertTrue(strategy.shouldKeepLeft(data(Free), data(Releasing)));
- assertTrue(strategy.shouldKeepLeft(data(Free), data(Splitting)));
- assertTrue(strategy.shouldKeepLeft(data(Free), data(Deleted)));
+ assertFalse(strategy.shouldKeepLeft(data(Free), data2(Init)));
+ assertTrue(strategy.shouldKeepLeft(data(Free), data2(Free)));
+ assertFalse(strategy.shouldKeepLeft(data(Free), data2(Assigning)));
+ assertTrue(strategy.shouldKeepLeft(data(Free), data2(Assigning, src,
dst)));
+ assertTrue(strategy.shouldKeepLeft(data(Free), data2(Owned)));
+ assertTrue(strategy.shouldKeepLeft(data(Free), data2(Releasing)));
+ assertTrue(strategy.shouldKeepLeft(data(Free), data2(Splitting)));
+ assertTrue(strategy.shouldKeepLeft(data(Free), data2(Deleted)));
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateDataTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateDataTest.java
index 9617c8a8c2b..a48e2a4db8b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateDataTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateDataTest.java
@@ -33,15 +33,16 @@ public class ServiceUnitStateDataTest {
@Test
public void testConstructors() throws InterruptedException {
- ServiceUnitStateData data1 = new ServiceUnitStateData(Owned, "A");
+ ServiceUnitStateData data1 = new ServiceUnitStateData(Owned, "A", 1);
assertEquals(data1.state(), Owned);
assertEquals(data1.broker(), "A");
assertNull(data1.sourceBroker());
- assertThat(data1.timestamp()).isGreaterThan(0);;
+ assertThat(data1.timestamp()).isGreaterThan(0);
+ ;
Thread.sleep(10);
- ServiceUnitStateData data2 = new ServiceUnitStateData(Assigning, "A",
"B");
+ ServiceUnitStateData data2 = new ServiceUnitStateData(Assigning, "A",
"B", 1);
assertEquals(data2.state(), Assigning);
assertEquals(data2.broker(), "A");
assertEquals(data2.sourceBroker(), "B");
@@ -50,23 +51,28 @@ public class ServiceUnitStateDataTest {
@Test(expectedExceptions = NullPointerException.class)
public void testNullState() {
- new ServiceUnitStateData(null, "A");
+ new ServiceUnitStateData(null, "A", 1);
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testNullBroker() {
- new ServiceUnitStateData(Owned, null);
+ new ServiceUnitStateData(Owned, null, 1);
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testEmptyBroker() {
- new ServiceUnitStateData(Owned, "");
+ new ServiceUnitStateData(Owned, "", 1);
+ }
+
+ @Test
+ public void testZeroVersionId() {
+ new ServiceUnitStateData(Owned, "A", Long.MAX_VALUE + 1);
}
@Test
public void jsonWriteAndReadTest() throws JsonProcessingException {
ObjectMapper mapper = ObjectMapperFactory.create();
- final ServiceUnitStateData src = new ServiceUnitStateData(Assigning,
"A", "B");
+ final ServiceUnitStateData src = new ServiceUnitStateData(Assigning,
"A", "B", 1);
String json = mapper.writeValueAsString(src);
ServiceUnitStateData dst = mapper.readValue(json,
ServiceUnitStateData.class);
assertEquals(dst, src);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java
index 4c1d4f7d2a8..543b7c629ac 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java
@@ -54,6 +54,7 @@ import org.apache.commons.lang.reflect.FieldUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateCompactionStrategy;
import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -88,20 +89,24 @@ public class ServiceUnitStateCompactionTest extends
MockedPulsarServiceBaseTest
private ServiceUnitState testState = Init;
+ private ServiceUnitStateData testData = null;
+
private static Random RANDOM = new Random();
private ServiceUnitStateData testValue(ServiceUnitState state, String
broker) {
if (state == Init) {
- return null;
+ testData = null;
+ } else {
+ testData = new ServiceUnitStateData(state, broker,
versionId(testData) + 1);
}
- return new ServiceUnitStateData(state, broker);
+
+ return testData;
}
private ServiceUnitStateData testValue(String broker) {
- ServiceUnitState to = nextValidStateNonSplit(testState);
- testState = to;
- return testValue(to, broker);
+ testState = nextValidStateNonSplit(testState);
+ return testValue(testState, broker);
}
private ServiceUnitState nextValidState(ServiceUnitState from) {
@@ -149,6 +154,7 @@ public class ServiceUnitStateCompactionTest extends
MockedPulsarServiceBaseTest
strategy.checkBrokers(false);
testState = Init;
+ testData = null;
}
@@ -202,13 +208,14 @@ public class ServiceUnitStateCompactionTest extends
MockedPulsarServiceBaseTest
ServiceUnitState state = invalid ? nextInvalidState(prevState) :
nextValidState(prevState);
ServiceUnitStateData value;
+ long versionId = versionId(prev) + 1;
if (invalid) {
- value = new ServiceUnitStateData(state, key + ":" + j, false);
+ value = new ServiceUnitStateData(state, key + ":" + j, false,
versionId);
} else {
if (state == Init) {
- value = new ServiceUnitStateData(state, key + ":" + j,
true);
+ value = new ServiceUnitStateData(state, key + ":" + j,
true, versionId);
} else {
- value = new ServiceUnitStateData(state, key + ":" + j,
false);
+ value = new ServiceUnitStateData(state, key + ":" + j,
false, versionId);
}
}
@@ -560,15 +567,16 @@ public class ServiceUnitStateCompactionTest extends
MockedPulsarServiceBaseTest
String bundle = "bundle1";
String src = "broker0";
String dst = "broker1";
- producer.newMessage().key(bundle).value(new
ServiceUnitStateData(Owned, src)).send();
+ long versionId = 1;
+ producer.newMessage().key(bundle).value(new
ServiceUnitStateData(Owned, src, versionId++)).send();
for (int i = 0; i < 3; i++) {
- var assignedStateData = new ServiceUnitStateData(Assigning, dst,
src);
+ var assignedStateData = new ServiceUnitStateData(Assigning, dst,
src, versionId++);
producer.newMessage().key(bundle).value(assignedStateData).send();
producer.newMessage().key(bundle).value(assignedStateData).send();
- var releasedStateData = new ServiceUnitStateData(Releasing, dst,
src);
+ var releasedStateData = new ServiceUnitStateData(Releasing, dst,
src, versionId++);
producer.newMessage().key(bundle).value(releasedStateData).send();
producer.newMessage().key(bundle).value(releasedStateData).send();
- var ownedStateData = new ServiceUnitStateData(Owned, dst, src);
+ var ownedStateData = new ServiceUnitStateData(Owned, dst, src,
versionId++);
producer.newMessage().key(bundle).value(ownedStateData).send();
producer.newMessage().key(bundle).value(ownedStateData).send();
compactor.compact(topic, strategy).get();
@@ -945,4 +953,8 @@ public class ServiceUnitStateCompactionTest extends
MockedPulsarServiceBaseTest
assertNull(none);
}
}
+
+ public static long versionId(ServiceUnitStateData data) {
+ return data == null ? ServiceUnitStateChannelImpl.VERSION_ID_INIT - 1
: data.versionId();
+ }
}