This is an automated email from the ASF dual-hosted git repository.
heesung pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 949260f190c [fix][broker] Immediately tombstone Deleted and Free state
bundles (#22743)
949260f190c is described below
commit 949260f190c3ff48d16f9450083c2e8c5c9ff302
Author: Heesung Sohn <[email protected]>
AuthorDate: Tue May 21 15:53:26 2024 -0700
[fix][broker] Immediately tombstone Deleted and Free state bundles (#22743)
---
.../extensions/ExtensibleLoadManagerImpl.java | 5 +--
.../channel/ServiceUnitStateChannelImpl.java | 18 +++++++---
.../extensions/manager/SplitManager.java | 2 +-
.../extensions/manager/UnloadManager.java | 14 ++++++--
.../pulsar/broker/namespace/NamespaceService.java | 5 +--
.../extensions/ExtensibleLoadManagerImplTest.java | 33 +++++++++++++++--
.../channel/ServiceUnitStateChannelTest.java | 42 ++++++++++++----------
.../extensions/manager/SplitManagerTest.java | 27 +++-----------
.../extensions/manager/UnloadManagerTest.java | 27 +++++++++++---
9 files changed, 113 insertions(+), 60 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index c22a4086a63..1e519b3284f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -667,7 +667,8 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
}
public CompletableFuture<Void> unloadNamespaceBundleAsync(ServiceUnitId
bundle,
- Optional<String>
destinationBroker) {
+ Optional<String>
destinationBroker,
+ boolean force) {
if
(NamespaceService.isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString()))
{
log.info("Skip unloading namespace bundle: {}.", bundle);
return CompletableFuture.completedFuture(null);
@@ -686,7 +687,7 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
log.warn(msg);
throw new IllegalArgumentException(msg);
}
- Unload unload = new Unload(sourceBroker,
bundle.toString(), destinationBroker, true);
+ Unload unload = new Unload(sourceBroker,
bundle.toString(), destinationBroker, force);
UnloadDecision unloadDecision =
new UnloadDecision(unload,
UnloadDecision.Label.Success, UnloadDecision.Reason.Admin);
return unloadAsync(unloadDecision,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 7c91cf92708..069ac516551 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -846,6 +846,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
Free, null, data.sourceBroker(),
getNextVersionId(data));
unloadFuture = closeServiceUnit(serviceUnit, true);
}
+ // If the optimized bundle unload is disabled, disconnect the
clients at time of RELEASE.
stateChangeListeners.notifyOnCompletion(unloadFuture
.thenCompose(__ -> pubAsync(serviceUnit, next)),
serviceUnit, data)
.whenComplete((__, e) -> log(e, serviceUnit, data, next));
@@ -866,9 +867,12 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
}
if (isTargetBroker(data.sourceBroker())) {
- stateChangeListeners.notifyOnCompletion(
- data.force() ? closeServiceUnit(serviceUnit, true)
- : CompletableFuture.completedFuture(0),
serviceUnit, data)
+ // If data.force(), try closeServiceUnit and tombstone the bundle.
+ CompletableFuture<Void> future =
+ (data.force() ? closeServiceUnit(serviceUnit, true)
+ .thenCompose(__ -> tombstoneAsync(serviceUnit))
+ :
CompletableFuture.completedFuture(0)).thenApply(__ -> null);
+ stateChangeListeners.notifyOnCompletion(future, serviceUnit, data)
.whenComplete((__, e) -> log(e, serviceUnit, data, null));
} else {
stateChangeListeners.notify(serviceUnit, data, null);
@@ -880,9 +884,13 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
if (getOwnerRequest != null) {
getOwnerRequest.completeExceptionally(new
IllegalStateException(serviceUnit + "has been deleted."));
}
- stateChangeListeners.notify(serviceUnit, data, null);
+
if (isTargetBroker(data.sourceBroker())) {
- log(null, serviceUnit, data, null);
+ stateChangeListeners.notifyOnCompletion(
+ tombstoneAsync(serviceUnit), serviceUnit, data)
+ .whenComplete((__, e) -> log(e, serviceUnit, data, null));
+ } else {
+ stateChangeListeners.notify(serviceUnit, data, null);
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java
index 71ebbc92a87..ac21e4c6241 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java
@@ -97,7 +97,7 @@ public class SplitManager implements StateChangeListener {
return;
}
switch (state) {
- case Deleted, Owned, Init -> this.complete(serviceUnit, t);
+ case Init -> this.complete(serviceUnit, t);
default -> {
if (log.isDebugEnabled()) {
log.debug("Handling {} for service unit {}", data,
serviceUnit);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
index ffae9475243..6b745345c0a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
@@ -175,7 +175,7 @@ public class UnloadManager implements StateChangeListener {
public void handleEvent(String serviceUnit, ServiceUnitStateData data,
Throwable t) {
ServiceUnitState state = ServiceUnitStateData.state(data);
- if (StringUtils.isBlank(data.sourceBroker()) && (state == Owned ||
state == Assigning)) {
+ if ((state == Owned || state == Assigning) &&
StringUtils.isBlank(data.sourceBroker())) {
if (log.isDebugEnabled()) {
log.debug("Skipping {} for service unit {} from the assignment
command.", data, serviceUnit);
}
@@ -195,7 +195,17 @@ public class UnloadManager implements StateChangeListener {
}
switch (state) {
- case Free, Owned -> complete(serviceUnit, t);
+ case Free -> {
+ if (!data.force()) {
+ complete(serviceUnit, t);
+ }
+ }
+ case Init -> {
+ if (data.force()) {
+ complete(serviceUnit, t);
+ }
+ }
+ case Owned -> complete(serviceUnit, t);
case Releasing ->
LatencyMetric.RELEASE.endMeasurement(serviceUnit);
case Assigning -> LatencyMetric.ASSIGN.endMeasurement(serviceUnit);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 96936b3a5c0..80559b736c6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -837,7 +837,7 @@ public class NamespaceService implements AutoCloseable {
boolean
closeWithoutWaitingClientDisconnect) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return ExtensibleLoadManagerImpl.get(loadManager.get())
- .unloadNamespaceBundleAsync(bundle, destinationBroker);
+ .unloadNamespaceBundleAsync(bundle, destinationBroker,
false);
}
// unload namespace bundle
OwnedBundle ob = ownershipCache.getOwnedBundle(bundle);
@@ -1286,7 +1286,8 @@ public class NamespaceService implements AutoCloseable {
CompletableFuture<Void> future;
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
ExtensibleLoadManagerImpl extensibleLoadManager =
ExtensibleLoadManagerImpl.get(loadManager.get());
- future =
extensibleLoadManager.unloadNamespaceBundleAsync(nsBundle, Optional.empty());
+ future = extensibleLoadManager.unloadNamespaceBundleAsync(
+ nsBundle, Optional.empty(), true);
} else {
future = ownershipCache.removeOwnership(nsBundle);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 8b96ed04f64..07855fda4d7 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -739,7 +739,8 @@ public class ExtensibleLoadManagerImplTest extends
ExtensibleLoadManagerImplBase
@Test(timeOut = 30 * 1000)
public void testSplitBundleAdminAPI() throws Exception {
- String namespace = defaultTestNamespace;
+ final String namespace = "public/testSplitBundleAdminAPI";
+ admin.namespaces().createNamespace(namespace, 1);
Pair<TopicName, NamespaceBundle> topicAndBundle =
getBundleIsNotOwnByChangeEventTopic("test-split");
TopicName topicName = topicAndBundle.getLeft();
admin.topics().createPartitionedTopic(topicName.toString(), 10);
@@ -793,6 +794,30 @@ public class ExtensibleLoadManagerImplTest extends
ExtensibleLoadManagerImplBase
} catch (PulsarAdminException ex) {
assertTrue(ex.getMessage().contains("Invalid bundle range"));
}
+
+
+ // delete and retry
+ Awaitility.await()
+ .atMost(5, TimeUnit.SECONDS)
+ .untilAsserted(() -> {
+ admin.namespaces().deleteNamespace(namespace);
+ });
+ admin.namespaces().createNamespace(namespace, 1);
+ admin.namespaces().splitNamespaceBundle(namespace, firstBundle, true,
null);
+
+ Awaitility.await()
+ .atMost(5, TimeUnit.SECONDS)
+ .untilAsserted(() -> {
+ BundlesData bundlesData =
admin.namespaces().getBundles(namespace);
+ assertEquals(bundlesData.getNumBundles(), numBundles + 1);
+ String lowBundle = String.format("0x%08x",
bundleRanges.get(0));
+ String midBundle = String.format("0x%08x", mid);
+ String highBundle = String.format("0x%08x",
bundleRanges.get(1));
+
assertTrue(bundlesData.getBoundaries().contains(lowBundle));
+
assertTrue(bundlesData.getBoundaries().contains(midBundle));
+
assertTrue(bundlesData.getBoundaries().contains(highBundle));
+ assertEquals(splitCount.get(), 2);
+ });
}
@Test(timeOut = 30 * 1000)
@@ -1656,7 +1681,11 @@ public class ExtensibleLoadManagerImplTest extends
ExtensibleLoadManagerImplBase
NamespaceEphemeralData namespaceEphemeralData =
primaryLoadManager.tryAcquiringOwnership(bundle).get();
assertTrue(Set.of(pulsar1.getBrokerServiceUrl(),
pulsar2.getBrokerServiceUrl())
.contains(namespaceEphemeralData.getNativeUrl()));
- admin.namespaces().deleteNamespace(namespace);
+ Awaitility.await()
+ .atMost(5, TimeUnit.SECONDS)
+ .untilAsserted(() -> {
+ admin.namespaces().deleteNamespace(namespace, true);
+ });
}
@Test(timeOut = 30 * 1000)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index 1076f92037f..c0fdd95a6a3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -576,11 +576,11 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
childBundle1Range, Optional.empty(), childBundle2Range,
Optional.empty()));
channel1.publishSplitEventAsync(split);
- waitUntilState(channel1, bundle, Deleted);
- waitUntilState(channel2, bundle, Deleted);
+ waitUntilState(channel1, bundle, Init);
+ waitUntilState(channel2, bundle, Init);
- validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 0, 0,
1, 0);
- validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 0, 0,
1, 0);
+ validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 1, 0,
1, 0);
+ validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 1, 0,
1, 0);
validateEventCounters(channel1, 1, 0, 1, 0, 0, 0);
validateEventCounters(channel2, 0, 0, 0, 0, 0, 0);
// Verify the retry count
@@ -620,7 +620,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
var leader = channel1.isChannelOwnerAsync().get() ? channel1 :
channel2;
validateMonitorCounters(leader,
0,
- 1,
+ 0,
0,
0,
0,
@@ -1236,15 +1236,15 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
var leader = channel1.isChannelOwnerAsync().get() ? channel1 :
channel2;
- waitUntilStateWithMonitor(leader, bundle, Deleted);
- waitUntilStateWithMonitor(channel1, bundle, Deleted);
- waitUntilStateWithMonitor(channel2, bundle, Deleted);
+ waitUntilStateWithMonitor(leader, bundle, Init);
+ waitUntilStateWithMonitor(channel1, bundle, Init);
+ waitUntilStateWithMonitor(channel2, bundle, Init);
var ownerAddr1 = channel1.getOwnerAsync(bundle);
var ownerAddr2 = channel2.getOwnerAsync(bundle);
- assertTrue(ownerAddr1.isCompletedExceptionally());
- assertTrue(ownerAddr2.isCompletedExceptionally());
+ assertTrue(ownerAddr1.get().isEmpty());
+ assertTrue(ownerAddr2.get().isEmpty());
FieldUtils.writeDeclaredField(channel1,
@@ -1428,13 +1428,15 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
var leader = channel1.isChannelOwnerAsync().get() ? channel1 :
channel2;
((ServiceUnitStateChannelImpl) leader)
.monitorOwnerships(List.of(brokerId1, brokerId2));
- waitUntilState(leader, bundle3, Deleted);
- waitUntilState(channel1, bundle3, Deleted);
- waitUntilState(channel2, bundle3, Deleted);
+
+ waitUntilState(leader, bundle3, Init);
+ waitUntilState(channel1, bundle3, Init);
+ waitUntilState(channel2, bundle3, Init);
- validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 2, 1, 0, 0, 0, 0,
1, 0);
- validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 2, 0, 0, 0, 0, 0,
1, 0);
+
+ validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 2, 1, 0, 0, 1, 0,
1, 0);
+ validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 2, 0, 0, 0, 1, 0,
1, 0);
validateEventCounters(channel1, 1, 0, 1, 0, 0, 0);
validateEventCounters(channel2, 0, 0, 0, 0, 0, 0);
@@ -1464,7 +1466,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
validateMonitorCounters(leader,
0,
- 1,
+ 0,
1,
0,
0,
@@ -1542,7 +1544,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
waitUntilNewOwner(channel2, ownedBundle, brokerId2);
assertEquals(Optional.empty(),
channel2.getOwnerAsync(freeBundle).get());
assertTrue(channel2.getOwnerAsync(deletedBundle).isCompletedExceptionally());
-
assertTrue(channel2.getOwnerAsync(splittingBundle).isCompletedExceptionally());
+ assertTrue(channel2.getOwnerAsync(splittingBundle).get().isEmpty());
// clean-up
FieldUtils.writeDeclaredField(leaderChannel,
"maxCleanupDelayTimeInSecs", 3 * 60, true);
@@ -1605,7 +1607,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
waitUntilNewOwner(channel2, ownedBundle, broker);
assertEquals(Optional.empty(),
channel2.getOwnerAsync(freeBundle).get());
assertTrue(channel2.getOwnerAsync(deletedBundle).isCompletedExceptionally());
-
assertTrue(channel2.getOwnerAsync(splittingBundle).isCompletedExceptionally());
+ assertTrue(channel2.getOwnerAsync(splittingBundle).get().isEmpty());
// clean-up
FieldUtils.writeDeclaredField(channel1,
@@ -1663,8 +1665,10 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
"inFlightStateWaitingTimeInMillis", 20 * 1000, true);
start = System.currentTimeMillis();
assertTrue(channel1.getOwnerAsync(bundle).get().isEmpty());
- assertTrue(System.currentTimeMillis() - start < 20_000);
+ waitUntilState(channel1, bundle, Init);
+ waitUntilState(channel2, bundle, Init);
+ assertTrue(System.currentTimeMillis() - start < 20_000);
// simulate ownership cleanup(brokerId1 selected owner) by the leader
channel
overrideTableViews(bundle,
new ServiceUnitStateData(Owned, broker, null, 1));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java
index 3287306ab48..57b7830214b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java
@@ -123,40 +123,23 @@ public class SplitManagerTest {
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Free, dstBroker,
VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequests.size(), 1);
- assertEquals(counter.toMetrics(null).toString(),
- counterExpected.toMetrics(null).toString());
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Deleted, dstBroker,
VERSION_ID_INIT), null);
- counterExpected.update(SplitDecision.Label.Success, Sessions);
- assertEquals(inFlightUnloadRequests.size(), 0);
- assertEquals(counter.toMetrics(null).toString(),
- counterExpected.toMetrics(null).toString());
+ assertEquals(inFlightUnloadRequests.size(), 1);
- // Success with Init state.
- future = manager.waitAsync(CompletableFuture.completedFuture(null),
- bundle, decision, 5, TimeUnit.SECONDS);
- inFlightUnloadRequests = getinFlightUnloadRequests(manager);
+ manager.handleEvent(bundle,
+ new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker,
VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequests.size(), 1);
+
+ // Success with Init state.
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Init, dstBroker,
VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequests.size(), 0);
counterExpected.update(SplitDecision.Label.Success, Sessions);
assertEquals(counter.toMetrics(null).toString(),
counterExpected.toMetrics(null).toString());
- future.get();
- // Success with Owned state.
- future = manager.waitAsync(CompletableFuture.completedFuture(null),
- bundle, decision, 5, TimeUnit.SECONDS);
- inFlightUnloadRequests = getinFlightUnloadRequests(manager);
- assertEquals(inFlightUnloadRequests.size(), 1);
- manager.handleEvent(bundle,
- new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker,
VERSION_ID_INIT), null);
- assertEquals(inFlightUnloadRequests.size(), 0);
- counterExpected.update(SplitDecision.Label.Success, Sessions);
- assertEquals(counter.toMetrics(null).toString(),
- counterExpected.toMetrics(null).toString());
future.get();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java
index 5d0abea3357..f7deb072688 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java
@@ -122,11 +122,15 @@ public class UnloadManagerTest {
assertEquals(inFlightUnloadRequestMap.size(), 1);
manager.handleEvent(bundle,
- new ServiceUnitStateData(ServiceUnitState.Init, null,
srcBroker, VERSION_ID_INIT), null);
+ new ServiceUnitStateData(ServiceUnitState.Free, null,
srcBroker, true, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 1);
+ // Success with Init state.
manager.handleEvent(bundle,
- new ServiceUnitStateData(ServiceUnitState.Free, null,
srcBroker, VERSION_ID_INIT), null);
+ new ServiceUnitStateData(ServiceUnitState.Init, null,
srcBroker, false, VERSION_ID_INIT), null);
+ assertEquals(inFlightUnloadRequestMap.size(), 1);
+ manager.handleEvent(bundle,
+ new ServiceUnitStateData(ServiceUnitState.Init, null,
srcBroker, true, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 0);
future.get();
assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 1);
@@ -136,17 +140,30 @@ public class UnloadManagerTest {
bundle, unloadDecision, 5, TimeUnit.SECONDS);
inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager);
assertEquals(inFlightUnloadRequestMap.size(), 1);
-
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker,
null, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 1);
-
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker,
srcBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 0);
-
future.get();
assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 2);
+
+ // Success with Free state.
+ future = manager.waitAsync(CompletableFuture.completedFuture(null),
+ bundle, unloadDecision, 5, TimeUnit.SECONDS);
+ inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager);
+ assertEquals(inFlightUnloadRequestMap.size(), 1);
+ manager.handleEvent(bundle,
+ new ServiceUnitStateData(ServiceUnitState.Free, dstBroker,
srcBroker, true, VERSION_ID_INIT), null);
+ assertEquals(inFlightUnloadRequestMap.size(), 1);
+ manager.handleEvent(bundle,
+ new ServiceUnitStateData(ServiceUnitState.Free, dstBroker,
srcBroker, false, VERSION_ID_INIT), null);
+ assertEquals(inFlightUnloadRequestMap.size(), 0);
+ future.get();
+
assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 3);
+
+
}
@Test