This is an automated email from the ASF dual-hosted git repository.
kwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 421d707419e [improve][broker] PIP-192 made split handler idempotent
(#19988)
421d707419e is described below
commit 421d707419ee42378b9c5b37812bd5c617ef2c9a
Author: Heesung Sohn <[email protected]>
AuthorDate: Sat Apr 8 03:49:16 2023 -0700
[improve][broker] PIP-192 made split handler idempotent (#19988)
Master Issue: https://github.com/apache/pulsar/issues/16691
### Motivation
Raising a PR to implement: https://github.com/apache/pulsar/issues/16691
### Modifications
This PR
- Makes split handler idempotent .
- Makes Leader's orphan monitor keep trying to send split msg until
finished.
- Select bundle boundaries at the SplitScheduler to have the same split
boundaries for each Split handler retry.
- Adds a split condition to check if the parent's Splitting state has moved.
- Made Admin Unload command forceful to unload any bundles in invalid
states.
---
conf/broker.conf | 2 +-
.../apache/pulsar/broker/ServiceConfiguration.java | 2 +-
.../extensions/ExtensibleLoadManagerImpl.java | 2 +-
.../channel/ServiceUnitStateChannelImpl.java | 263 +++++++++-----
.../extensions/channel/ServiceUnitStateData.java | 15 +
.../loadbalance/extensions/models/Split.java | 6 +-
.../loadbalance/extensions/models/Unload.java | 10 +-
.../DefaultNamespaceBundleSplitStrategyImpl.java | 76 +++-
.../pulsar/broker/namespace/NamespaceService.java | 10 +-
.../common/naming/NamespaceBundleFactory.java | 8 +-
.../naming/NamespaceBundleSplitAlgorithm.java | 3 +
.../SpecifiedPositionsBundleSplitAlgorithm.java | 47 ++-
.../extensions/ExtensibleLoadManagerImplTest.java | 1 +
.../channel/ServiceUnitStateChannelTest.java | 399 ++++++++++++++++++---
.../extensions/channel/models/SplitTest.java | 8 +-
.../extensions/scheduler/SplitSchedulerTest.java | 19 +-
.../DefaultNamespaceBundleSplitStrategyTest.java | 81 ++++-
...SpecifiedPositionsBundleSplitAlgorithmTest.java | 15 +
18 files changed, 767 insertions(+), 200 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 1bbb12313f9..414756abcb7 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1455,7 +1455,7 @@
loadBalancerNamespaceBundleSplitConditionHitCountThreshold=3
# the service-unit state channel when there are a large number of bundles.
# minimum value = 30 secs
# (only used in load balancer extension logics)
-loadBalancerServiceUnitStateCleanUpDelayTimeInSeconds=604800
+loadBalancerServiceUnitStateTombstoneDelayTimeInSeconds=3600
### --- Replication --- ###
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 52cebe15f6a..c86358b9b5b 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2609,7 +2609,7 @@ public class ServiceConfiguration implements
PulsarConfiguration {
+ "minimum value = 30 secs"
+ "(only used in load balancer extension logics)"
)
- private long loadBalancerServiceUnitStateCleanUpDelayTimeInSeconds =
604800;
+ private long loadBalancerServiceUnitStateTombstoneDelayTimeInSeconds =
3600;
@FieldContext(
category = CATEGORY_LOAD_BALANCER,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index c1234b6dab2..e054a1bc662 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -449,7 +449,7 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
log.warn(msg);
throw new IllegalArgumentException(msg);
}
- Unload unload = new Unload(sourceBroker,
bundle.toString(), destinationBroker);
+ Unload unload = new Unload(sourceBroker,
bundle.toString(), destinationBroker, true);
UnloadDecision unloadDecision =
new UnloadDecision(unload,
UnloadDecision.Label.Success, UnloadDecision.Reason.Admin);
return unloadAsync(unloadDecision,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index ec79698db1f..bd62c53be60 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -46,7 +46,6 @@ import static
org.apache.pulsar.metadata.api.extended.SessionEvent.SessionLost;
import static
org.apache.pulsar.metadata.api.extended.SessionEvent.SessionReestablished;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -61,6 +60,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -200,7 +200,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
CompletableFuture<String>>newBuilder().build();
this.cleanupJobs = ConcurrentOpenHashMap.<String,
CompletableFuture<Void>>newBuilder().build();
this.stateChangeListeners = new StateChangeListeners();
- this.semiTerminalStateWaitingTimeInMillis =
config.getLoadBalancerServiceUnitStateCleanUpDelayTimeInSeconds()
+ this.semiTerminalStateWaitingTimeInMillis =
config.getLoadBalancerServiceUnitStateTombstoneDelayTimeInSeconds()
* 1000;
this.inFlightStateWaitingTimeInMillis =
MAX_IN_FLIGHT_STATE_WAITING_TIME_IN_MILLIS;
this.ownershipMonitorDelayTimeInSecs =
OWNERSHIP_MONITOR_DELAY_TIME_IN_SECS;
@@ -563,10 +563,11 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
ServiceUnitStateData next;
if (isTransferCommand(unload)) {
next = new ServiceUnitStateData(
- Releasing, unload.destBroker().get(),
unload.sourceBroker(), getNextVersionId(serviceUnit));
+ Releasing, unload.destBroker().get(),
unload.sourceBroker(),
+ unload.force(), getNextVersionId(serviceUnit));
} else {
next = new ServiceUnitStateData(
- Releasing, null, unload.sourceBroker(),
getNextVersionId(serviceUnit));
+ Releasing, null, unload.sourceBroker(), unload.force(),
getNextVersionId(serviceUnit));
}
return pubAsync(serviceUnit, next).whenComplete((__, ex) -> {
if (ex != null) {
@@ -660,7 +661,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
long handlerTotalCount = getHandlerTotalCounter(data).get();
long handlerFailureCount =
getHandlerFailureCounter(data).get();
log.info("{} handled {} event for serviceUnit:{}, cur:{},
next:{}, "
- + "totalHandledRequests{},
totalFailedRequests:{}",
+ + "totalHandledRequests:{},
totalFailedRequests:{}",
lookupServiceAddress, getLogEventTag(data),
serviceUnit,
data == null ? "" : data,
next == null ? "" : next,
@@ -671,7 +672,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
long handlerTotalCount = getHandlerTotalCounter(data).get();
long handlerFailureCount =
getHandlerFailureCounter(data).incrementAndGet();
log.error("{} failed to handle {} event for serviceUnit:{},
cur:{}, next:{}, "
- + "totalHandledRequests{}, totalFailedRequests:{}",
+ + "totalHandledRequests:{},
totalFailedRequests:{}",
lookupServiceAddress, getLogEventTag(data), serviceUnit,
data == null ? "" : data,
next == null ? "" : next,
@@ -857,110 +858,158 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
boundariesSet.add(subBundle.getKeyRange().upperEndpoint());
});
boundaries = new ArrayList<>(boundariesSet);
- nsBundleSplitAlgorithm =
NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE_ALGO;
+ nsBundleSplitAlgorithm =
NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE_FORCE_ALGO;
}
final AtomicInteger counter = new AtomicInteger(0);
+ var childBundles =
data.splitServiceUnitToDestBroker().keySet().stream()
+ .map(child -> bundleFactory.getBundle(
+ bundle.getNamespaceObject().toString(), child))
+ .collect(Collectors.toList());
this.splitServiceUnitOnceAndRetry(namespaceService, bundleFactory,
nsBundleSplitAlgorithm,
- bundle, boundaries, serviceUnit, data, counter, startTime,
completionFuture);
+ bundle, childBundles, boundaries, data, counter, startTime,
completionFuture);
return completionFuture;
}
+
+
@VisibleForTesting
protected void splitServiceUnitOnceAndRetry(NamespaceService
namespaceService,
NamespaceBundleFactory
bundleFactory,
NamespaceBundleSplitAlgorithm
algorithm,
- NamespaceBundle bundle,
+ NamespaceBundle parentBundle,
+ List<NamespaceBundle>
childBundles,
List<Long> boundaries,
- String serviceUnit,
- ServiceUnitStateData data,
+ ServiceUnitStateData
parentData,
AtomicInteger counter,
long startTime,
CompletableFuture<Void>
completionFuture) {
- CompletableFuture<List<NamespaceBundle>> updateFuture = new
CompletableFuture<>();
-
- namespaceService.getSplitBoundary(bundle, algorithm, boundaries)
- .thenAccept(splitBundlesPair -> {
- // Split and updateNamespaceBundles. Update may fail because of
concurrent write to Zookeeper.
- if (splitBundlesPair == null) {
- String msg = format("Bundle %s not found under namespace",
serviceUnit);
- updateFuture.completeExceptionally(new
BrokerServiceException.ServiceUnitNotReadyException(msg));
- return;
- }
- ServiceUnitStateData next = new ServiceUnitStateData(Owned,
data.sourceBroker(), VERSION_ID_INIT);
- NamespaceBundles targetNsBundle = splitBundlesPair.getLeft();
- List<NamespaceBundle> splitBundles =
Collections.unmodifiableList(splitBundlesPair.getRight());
- List<NamespaceBundle> successPublishedBundles =
- Collections.synchronizedList(new
ArrayList<>(splitBundles.size()));
- List<CompletableFuture<Void>> futures = new
ArrayList<>(splitBundles.size());
- for (NamespaceBundle sBundle : splitBundles) {
- futures.add(pubAsync(sBundle.toString(), next).thenAccept(__
-> successPublishedBundles.add(sBundle)));
- }
- NamespaceName nsname = bundle.getNamespaceObject();
- FutureUtil.waitForAll(futures)
- .thenCompose(__ ->
namespaceService.updateNamespaceBundles(nsname, targetNsBundle))
- .thenCompose(__ ->
namespaceService.updateNamespaceBundlesForPolicies(nsname, targetNsBundle))
- .thenRun(() -> {
-
bundleFactory.invalidateBundleCache(bundle.getNamespaceObject());
- updateFuture.complete(splitBundles);
- }).exceptionally(e -> {
- // Clean the new bundle when has exception.
- List<CompletableFuture<Void>> futureList = new
ArrayList<>();
- for (NamespaceBundle sBundle :
successPublishedBundles) {
-
futureList.add(tombstoneAsync(sBundle.toString()).thenAccept(__ -> {}));
- }
- FutureUtil.waitForAll(futureList)
- .whenComplete((__, ex) -> {
- if (ex != null) {
- log.warn("Clean new bundles failed,",
ex);
- }
- updateFuture.completeExceptionally(e);
- });
- return null;
- });
- }).exceptionally(e -> {
- updateFuture.completeExceptionally(e);
- return null;
- });
+ ownChildBundles(childBundles, parentData)
+ .thenCompose(__ -> getSplitNamespaceBundles(
+ namespaceService, bundleFactory, algorithm,
parentBundle, childBundles, boundaries))
+ .thenCompose(namespaceBundles ->
updateSplitNamespaceBundlesAsync(
+ namespaceService, bundleFactory, parentBundle,
namespaceBundles))
+ .thenAccept(__ -> // Update bundled_topic cache for
load-report-generation
+
pulsar.getBrokerService().refreshTopicToStatsMaps(parentBundle))
+ .thenAccept(__ -> pubAsync(parentBundle.toString(), new
ServiceUnitStateData(
+ Deleted, null, parentData.sourceBroker(),
getNextVersionId(parentData))))
+ .thenAccept(__ -> {
+ double splitBundleTime =
TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - startTime));
+ log.info("Successfully split {} parent namespace-bundle to
{} in {} ms",
+ parentBundle, childBundles, splitBundleTime);
+ completionFuture.complete(null);
+ })
+ .exceptionally(ex -> {
+ // Retry several times on BadVersion
+ Throwable throwable =
FutureUtil.unwrapCompletionException(ex);
+ if ((throwable instanceof
MetadataStoreException.BadVersionException)
+ && (counter.incrementAndGet() <
NamespaceService.BUNDLE_SPLIT_RETRY_LIMIT)) {
+ log.warn("Failed to update bundle range in metadata
store. Retrying {} th / {} limit",
+ counter.get(),
NamespaceService.BUNDLE_SPLIT_RETRY_LIMIT, ex);
+ pulsar.getExecutor().schedule(() ->
splitServiceUnitOnceAndRetry(
+ namespaceService, bundleFactory, algorithm,
parentBundle, childBundles,
+ boundaries, parentData, counter,
startTime, completionFuture),
+ 100, MILLISECONDS);
+ } else {
+ // Retry enough, or meet other exception
+ String msg = format("Failed to split bundle %s,
Retried %d th / %d limit, reason %s",
+ parentBundle.toString(), counter.get(),
+ NamespaceService.BUNDLE_SPLIT_RETRY_LIMIT,
throwable.getMessage());
+ log.warn(msg, throwable);
+ completionFuture.completeExceptionally(
+ new
BrokerServiceException.ServiceUnitNotReadyException(msg));
+ }
+ return null;
+ });
+ }
- updateFuture.thenAccept(r -> {
- // Delete the old bundle
- pubAsync(serviceUnit, new ServiceUnitStateData(
- Deleted, null, data.sourceBroker(),
getNextVersionId(data)))
- .thenRun(() -> {
- // Update bundled_topic cache for load-report-generation
- pulsar.getBrokerService().refreshTopicToStatsMaps(bundle);
- // TODO: Update the load data immediately if needed.
- completionFuture.complete(null);
- double splitBundleTime =
TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - startTime));
- log.info("Successfully split {} parent namespace-bundle to {}
in {} ms", serviceUnit, r,
- splitBundleTime);
- }).exceptionally(e -> {
- double splitBundleTime =
TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - startTime));
- String msg = format("Failed to free bundle %s in %s ms, under
namespace [%s] with error %s",
- bundle.getNamespaceObject().toString(),
splitBundleTime, bundle, e.getMessage());
- completionFuture.completeExceptionally(new
BrokerServiceException.ServiceUnitNotReadyException(msg));
- return null;
- });
- }).exceptionally(ex -> {
- // Retry several times on BadVersion
- Throwable throwable = FutureUtil.unwrapCompletionException(ex);
- if ((throwable instanceof
MetadataStoreException.BadVersionException)
- && (counter.incrementAndGet() <
NamespaceService.BUNDLE_SPLIT_RETRY_LIMIT)) {
- log.warn("Failed to update bundle range in metadata store.
Retrying {} th / {} limit",
- counter.get(),
NamespaceService.BUNDLE_SPLIT_RETRY_LIMIT, ex);
- pulsar.getExecutor().schedule(() ->
splitServiceUnitOnceAndRetry(namespaceService, bundleFactory,
- algorithm, bundle, boundaries, serviceUnit, data,
counter, startTime, completionFuture),
- 100, MILLISECONDS);
- } else if (throwable instanceof IllegalArgumentException) {
- completionFuture.completeExceptionally(throwable);
+ private CompletableFuture<Void> ownChildBundles(List<NamespaceBundle>
childBundles,
+ ServiceUnitStateData
parentData) {
+ List<CompletableFuture<Void>> futures = new
ArrayList<>(childBundles.size());
+ var debug = debug();
+ for (var childBundle : childBundles) {
+ var childBundleStr = childBundle.toString();
+ var childData = tableview.get(childBundleStr);
+ if (childData != null) {
+ if (debug) {
+ log.info("Already owned child bundle:{}", childBundleStr);
+ }
} else {
- // Retry enough, or meet other exception
- String msg = format("Bundle: %s not success update nsBundles,
counter %d, reason %s",
- bundle.toString(), counter.get(),
throwable.getMessage());
- completionFuture.completeExceptionally(new
BrokerServiceException.ServiceUnitNotReadyException(msg));
+ childData = new ServiceUnitStateData(Owned,
parentData.sourceBroker(),
+ VERSION_ID_INIT);
+ futures.add(pubAsync(childBundleStr, childData).thenApply(__
-> null));
}
- return null;
- });
+ }
+
+ if (!futures.isEmpty()) {
+ return FutureUtil.waitForAll(futures);
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+
+ private CompletableFuture<NamespaceBundles>
getSplitNamespaceBundles(NamespaceService namespaceService,
+
NamespaceBundleFactory bundleFactory,
+
NamespaceBundleSplitAlgorithm algorithm,
+
NamespaceBundle parentBundle,
+
List<NamespaceBundle> childBundles,
+
List<Long> boundaries) {
+ CompletableFuture future = new CompletableFuture();
+ final var debug = debug();
+ var targetNsBundle =
bundleFactory.getBundles(parentBundle.getNamespaceObject());
+ boolean found = false;
+ try {
+ targetNsBundle.validateBundle(parentBundle);
+ } catch (IllegalArgumentException e) {
+ if (debug) {
+ log.info("Namespace bundles do not contain the parent
bundle:{}",
+ parentBundle);
+ }
+ for (var childBundle : childBundles) {
+ try {
+ targetNsBundle.validateBundle(childBundle);
+ if (debug) {
+ log.info("Namespace bundles contain the child
bundle:{}",
+ childBundle);
+ }
+ } catch (Exception ex) {
+ future.completeExceptionally(
+ new
BrokerServiceException.ServiceUnitNotReadyException(
+ "Namespace bundles do not contain the
child bundle:" + childBundle, e));
+ return future;
+ }
+ }
+ found = true;
+ } catch (Exception e) {
+ future.completeExceptionally(
+ new BrokerServiceException.ServiceUnitNotReadyException(
+ "Failed to validate the parent bundle in the
namespace bundles.", e));
+ return future;
+ }
+ if (found) {
+ future.complete(targetNsBundle);
+ return future;
+ } else {
+ return namespaceService.getSplitBoundary(parentBundle, algorithm,
boundaries)
+ .thenApply(splitBundlesPair -> splitBundlesPair.getLeft());
+ }
+ }
+
+ private CompletableFuture<Void> updateSplitNamespaceBundlesAsync(
+ NamespaceService namespaceService,
+ NamespaceBundleFactory bundleFactory,
+ NamespaceBundle parentBundle,
+ NamespaceBundles splitNamespaceBundles) {
+ var namespaceName = parentBundle.getNamespaceObject();
+ return namespaceService.updateNamespaceBundles(
+ namespaceName, splitNamespaceBundles)
+ .thenCompose(__ ->
namespaceService.updateNamespaceBundlesForPolicies(
+ namespaceName, splitNamespaceBundles))
+ .thenAccept(__ -> {
+
bundleFactory.invalidateBundleCache(parentBundle.getNamespaceObject());
+ if (debug()) {
+ log.info("Successfully updated split namespace bundles
and namespace bundle cache.");
+ }
+ });
}
public void handleMetadataSessionEvent(SessionEvent e) {
@@ -1053,11 +1102,23 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
broker, delayInSecs, cleanupJobs.size());
}
+
+ private ServiceUnitStateData
getOverrideInactiveBrokerStateData(ServiceUnitStateData orphanData,
+ String
selectedBroker) {
+ if (orphanData.state() == Splitting) {
+ return new ServiceUnitStateData(Splitting, orphanData.dstBroker(),
selectedBroker,
+ Map.copyOf(orphanData.splitServiceUnitToDestBroker()),
+ true, getNextVersionId(orphanData));
+ } else {
+ return new ServiceUnitStateData(Owned, selectedBroker, true,
getNextVersionId(orphanData));
+ }
+ }
+
private void overrideOwnership(String serviceUnit, ServiceUnitStateData
orphanData) {
+
Optional<String> selectedBroker = selectBroker(serviceUnit);
if (selectedBroker.isPresent()) {
- var override =
- new ServiceUnitStateData(Owned, selectedBroker.get(),
true, getNextVersionId(orphanData));
+ var override = getOverrideInactiveBrokerStateData(orphanData,
selectedBroker.get());
log.info("Overriding ownership serviceUnit:{} from orphanData:{}
to overrideData:{}",
serviceUnit, orphanData, override);
publishOverrideEventAsync(serviceUnit, orphanData, override)
@@ -1157,7 +1218,13 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
case Assigning: {
return getRollForwardStateData(serviceUnit, nextVersionId);
}
- case Splitting, Releasing: {
+ case Splitting: {
+ return Optional.of(new ServiceUnitStateData(Splitting,
+ orphanData.dstBroker(), orphanData.sourceBroker(),
+ Map.copyOf(orphanData.splitServiceUnitToDestBroker()),
+ true, nextVersionId));
+ }
+ case Releasing: {
if (availableBrokers.contains(orphanData.sourceBroker())) {
// rollback to the src
return Optional.of(new ServiceUnitStateData(Owned,
orphanData.sourceBroker(), true, nextVersionId));
@@ -1219,7 +1286,6 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
inactiveBrokers.add(dstBroker);
} else if (isInFlightState(state)
&& now - stateData.timestamp() >
inFlightStateWaitingTimeInMillis) {
- log.warn("Found orphan serviceUnit:{}, stateData:{}",
serviceUnit, stateData);
orphanServiceUnits.put(serviceUnit, stateData);
}
} else if (now - stateData.timestamp() >
semiTerminalStateWaitingTimeInMillis) {
@@ -1249,6 +1315,9 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
var overrideData = getOverrideInFlightStateData(
orphanServiceUnit, orphanData, activeBrokers);
if (overrideData.isPresent()) {
+ log.info("Overriding in-flight state ownership
serviceUnit:{} "
+ + "from orphanData:{} to overrideData:{}",
+ orphanServiceUnit, orphanData, overrideData);
publishOverrideEventAsync(orphanServiceUnit, orphanData,
overrideData.get())
.whenComplete((__, e) -> {
if (e != null) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java
index 52c7e27d650..307d3a4acb1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java
@@ -45,10 +45,25 @@ public record ServiceUnitStateData(
System.currentTimeMillis(), versionId);
}
+ public ServiceUnitStateData(ServiceUnitState state, String dstBroker,
String sourceBroker,
+ Map<String, Optional<String>>
splitServiceUnitToDestBroker, boolean force,
+ long versionId) {
+ this(state, dstBroker, sourceBroker, splitServiceUnitToDestBroker,
force,
+ System.currentTimeMillis(), versionId);
+ }
+
public ServiceUnitStateData(ServiceUnitState state, String dstBroker,
String sourceBroker, long versionId) {
this(state, dstBroker, sourceBroker, null, false,
System.currentTimeMillis(), versionId);
}
+ public ServiceUnitStateData(ServiceUnitState state, String dstBroker,
String sourceBroker, boolean force,
+ long versionId) {
+ this(state, dstBroker, sourceBroker, null, force,
+ System.currentTimeMillis(), versionId);
+ }
+
+
+
public ServiceUnitStateData(ServiceUnitState state, String dstBroker, long
versionId) {
this(state, dstBroker, null, null, false, System.currentTimeMillis(),
versionId);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/Split.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/Split.java
index ac9a36e6dbf..690fac59bc9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/Split.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/Split.java
@@ -30,12 +30,8 @@ public record Split(
public Split {
Objects.requireNonNull(serviceUnit);
- if (splitServiceUnitToDestBroker != null &&
splitServiceUnitToDestBroker.size() != 2) {
+ if (splitServiceUnitToDestBroker == null ||
splitServiceUnitToDestBroker.size() != 2) {
throw new IllegalArgumentException("Split service unit should be
split into 2 service units.");
}
}
-
- public Split(String serviceUnit, String sourceBroker) {
- this(serviceUnit, sourceBroker, null);
- }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/Unload.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/Unload.java
index d474011919d..753f8a942ce 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/Unload.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/Unload.java
@@ -24,12 +24,18 @@ import java.util.Optional;
/**
* Defines the information required to unload or transfer a service unit(e.g.
bundle).
*/
-public record Unload(String sourceBroker, String serviceUnit, Optional<String>
destBroker) {
+public record Unload(String sourceBroker, String serviceUnit, Optional<String>
destBroker, boolean force) {
public Unload {
Objects.requireNonNull(sourceBroker);
Objects.requireNonNull(serviceUnit);
}
+
+
public Unload(String sourceBroker, String serviceUnit) {
- this(sourceBroker, serviceUnit, Optional.empty());
+ this(sourceBroker, serviceUnit, Optional.empty(), false);
+ }
+
+ public Unload(String sourceBroker, String serviceUnit, Optional<String>
destBroker) {
+ this(sourceBroker, serviceUnit, destBroker, false);
}
}
\ No newline at end of file
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java
index bd8b3aa6654..7875c07b122 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java
@@ -26,18 +26,23 @@ import static
org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecisi
import static
org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
+import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
@@ -53,12 +58,14 @@ public class DefaultNamespaceBundleSplitStrategyImpl
implements NamespaceBundleS
private final Set<SplitDecision> decisionCache;
private final Map<String, Integer> namespaceBundleCount;
private final Map<String, Integer> splitConditionHitCounts;
+ private final Map<String, String> splittingBundles;
private final SplitCounter counter;
public DefaultNamespaceBundleSplitStrategyImpl(SplitCounter counter) {
decisionCache = new HashSet<>();
namespaceBundleCount = new HashMap<>();
splitConditionHitCounts = new HashMap<>();
+ splittingBundles = new HashMap<>();
this.counter = counter;
}
@@ -67,6 +74,7 @@ public class DefaultNamespaceBundleSplitStrategyImpl
implements NamespaceBundleS
public Set<SplitDecision> findBundlesToSplit(LoadManagerContext context,
PulsarService pulsar) {
decisionCache.clear();
namespaceBundleCount.clear();
+ splittingBundles.clear();
final ServiceConfiguration conf = pulsar.getConfiguration();
int maxBundleCount = conf.getLoadBalancerNamespaceMaximumBundles();
long maxBundleTopics = conf.getLoadBalancerNamespaceBundleMaxTopics();
@@ -78,6 +86,15 @@ public class DefaultNamespaceBundleSplitStrategyImpl
implements NamespaceBundleS
boolean debug = log.isDebugEnabled() ||
conf.isLoadBalancerDebugModeEnabled();
var channel = ServiceUnitStateChannelImpl.get(pulsar);
+ for (var etr : channel.getOwnershipEntrySet()) {
+ var eData = etr.getValue();
+ if (eData.state() == ServiceUnitState.Splitting) {
+ String bundle = etr.getKey();
+ final String bundleRange =
LoadManagerShared.getBundleRangeFromBundleName(bundle);
+ splittingBundles.put(bundle, bundleRange);
+ }
+ }
+
Map<String, NamespaceBundleStats> bundleStatsMap =
pulsar.getBrokerService().getBundleStats();
NamespaceBundleFactory namespaceBundleFactory =
pulsar.getNamespaceService().getNamespaceBundleFactory();
@@ -177,6 +194,27 @@ public class DefaultNamespaceBundleSplitStrategyImpl
implements NamespaceBundleS
continue;
}
+ var ranges = bundleRange.split("_");
+ var foundSplittingBundle = false;
+ for (var etr : splittingBundles.entrySet()) {
+ var splittingBundle = etr.getKey();
+ if (splittingBundle.startsWith(namespace)) {
+ var splittingBundleRange = etr.getValue();
+ if (splittingBundleRange.startsWith(ranges[0])
+ || splittingBundleRange.endsWith(ranges[1])) {
+ if (debug) {
+ log.info(String.format(CANNOT_SPLIT_BUNDLE_MSG
+ + " (parent) bundle:%s is in Splitting
state.", bundle, splittingBundle));
+ }
+ foundSplittingBundle = true;
+ break;
+ }
+ }
+ }
+ if (foundSplittingBundle) {
+ continue;
+ }
+
if (debug) {
log.info(String.format(
"Splitting bundle: %s. "
@@ -193,7 +231,43 @@ public class DefaultNamespaceBundleSplitStrategyImpl
implements NamespaceBundleS
));
}
var decision = new SplitDecision();
- decision.setSplit(new Split(bundle,
context.brokerRegistry().getBrokerId()));
+ var namespaceService = pulsar.getNamespaceService();
+ var namespaceBundle = namespaceService.getNamespaceBundleFactory()
+ .getBundle(namespaceName, bundleRange);
+ NamespaceBundleSplitAlgorithm algorithm =
+ namespaceService.getNamespaceBundleSplitAlgorithmByName(
+ conf.getDefaultNamespaceBundleSplitAlgorithm());
+ List<Long> splitBoundary = null;
+ try {
+ splitBoundary = namespaceService
+ .getSplitBoundary(namespaceBundle, null, algorithm)
+ .get(conf.getMetadataStoreOperationTimeoutSeconds(),
TimeUnit.SECONDS);
+ } catch (Throwable e) {
+ counter.update(Failure, Unknown);
+ log.warn(String.format(CANNOT_SPLIT_BUNDLE_MSG + " Failed to
get split boundaries.", bundle, e));
+ continue;
+ }
+ if (splitBoundary == null) {
+ counter.update(Failure, Unknown);
+ log.warn(String.format(CANNOT_SPLIT_BUNDLE_MSG + " The split
boundaries is null.", bundle));
+ continue;
+ }
+ if (splitBoundary.size() != 1) {
+ counter.update(Failure, Unknown);
+ log.warn(String.format(CANNOT_SPLIT_BUNDLE_MSG + " The size of
split boundaries is not 1. "
+ + "splitBoundary:%s", bundle, splitBoundary));
+ continue;
+ }
+
+ var parentRange = namespaceBundle.getKeyRange();
+ var leftChildBundle =
namespaceBundleFactory.getBundle(namespaceBundle.getNamespaceObject(),
+
NamespaceBundleFactory.getRange(parentRange.lowerEndpoint(),
splitBoundary.get(0)));
+ var rightChildBundle =
namespaceBundleFactory.getBundle(namespaceBundle.getNamespaceObject(),
+ NamespaceBundleFactory.getRange(splitBoundary.get(0),
parentRange.upperEndpoint()));
+ Map<String, Optional<String>> splitServiceUnitToDestBroker =
Map.of(
+ leftChildBundle.getBundleRange(), Optional.empty(),
+ rightChildBundle.getBundleRange(), Optional.empty());
+ decision.setSplit(new Split(bundle,
context.brokerRegistry().getBrokerId(), splitServiceUnitToDestBroker));
decision.succeed(reason);
decisionCache.add(decision);
int bundleNum = namespaceBundleCount.getOrDefault(namespace, 0);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index d092ef04018..c9cd33d6d87 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -967,9 +967,7 @@ public class NamespaceService implements AutoCloseable {
*/
public CompletableFuture<Pair<NamespaceBundles, List<NamespaceBundle>>>
getSplitBoundary(
NamespaceBundle bundle, NamespaceBundleSplitAlgorithm
nsBundleSplitAlgorithm, List<Long> boundaries) {
- BundleSplitOption bundleSplitOption = getBundleSplitOption(bundle,
boundaries, config);
- CompletableFuture<List<Long>> splitBoundary =
- nsBundleSplitAlgorithm.getSplitBoundary(bundleSplitOption);
+ CompletableFuture<List<Long>> splitBoundary = getSplitBoundary(bundle,
boundaries, nsBundleSplitAlgorithm);
return splitBoundary.thenCompose(splitBoundaries -> {
if (splitBoundaries == null || splitBoundaries.size() ==
0) {
LOG.info("[{}] No valid boundary found in {} to split
bundle {}",
@@ -981,6 +979,12 @@ public class NamespaceService implements AutoCloseable {
});
}
+ public CompletableFuture<List<Long>> getSplitBoundary(
+ NamespaceBundle bundle, List<Long> boundaries,
NamespaceBundleSplitAlgorithm nsBundleSplitAlgorithm) {
+ BundleSplitOption bundleSplitOption = getBundleSplitOption(bundle,
boundaries, config);
+ return nsBundleSplitAlgorithm.getSplitBoundary(bundleSplitOption);
+ }
+
private BundleSplitOption getBundleSplitOption(NamespaceBundle bundle,
List<Long> boundaries,
ServiceConfiguration
config) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
index f0f7deb940d..937d2763767 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
@@ -272,8 +272,7 @@ public class NamespaceBundleFactory {
String[] boundaries = bundleRange.split("_");
Long lowerEndpoint = Long.decode(boundaries[0]);
Long upperEndpoint = Long.decode(boundaries[1]);
- Range<Long> hashRange = Range.range(lowerEndpoint, BoundType.CLOSED,
upperEndpoint,
- (upperEndpoint.equals(NamespaceBundles.FULL_UPPER_BOUND)) ?
BoundType.CLOSED : BoundType.OPEN);
+ Range<Long> hashRange = getRange(lowerEndpoint, upperEndpoint);
return getBundle(NamespaceName.get(namespace), hashRange);
}
@@ -414,4 +413,9 @@ public class NamespaceBundleFactory {
return Joiner.on("/").join(i);
}
+ public static Range<Long> getRange(Long lowerEndpoint, Long upperEndpoint)
{
+ return Range.range(lowerEndpoint, BoundType.CLOSED, upperEndpoint,
+ (upperEndpoint.equals(NamespaceBundles.FULL_UPPER_BOUND)) ?
BoundType.CLOSED : BoundType.OPEN);
+ }
+
}
\ No newline at end of file
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithm.java
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithm.java
index 01a61d81665..1fd6fbcd6ea 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithm.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithm.java
@@ -39,6 +39,9 @@ public interface NamespaceBundleSplitAlgorithm {
NamespaceBundleSplitAlgorithm TOPIC_COUNT_EQUALLY_DIVIDE_ALGO = new
TopicCountEquallyDivideBundleSplitAlgorithm();
NamespaceBundleSplitAlgorithm SPECIFIED_POSITIONS_DIVIDE_ALGO =
new SpecifiedPositionsBundleSplitAlgorithm();
+
+ NamespaceBundleSplitAlgorithm SPECIFIED_POSITIONS_DIVIDE_FORCE_ALGO =
+ new SpecifiedPositionsBundleSplitAlgorithm(true);
NamespaceBundleSplitAlgorithm FLOW_OR_QPS_EQUALLY_DIVIDE_ALGO = new
FlowOrQpsEquallyDivideBundleSplitAlgorithm();
static NamespaceBundleSplitAlgorithm of(String algorithmName) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/SpecifiedPositionsBundleSplitAlgorithm.java
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/SpecifiedPositionsBundleSplitAlgorithm.java
index 1fd9a85cfa7..4a5f8a831c2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/SpecifiedPositionsBundleSplitAlgorithm.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/SpecifiedPositionsBundleSplitAlgorithm.java
@@ -29,6 +29,16 @@ import org.apache.pulsar.broker.namespace.NamespaceService;
* This algorithm divides the bundle into several parts by the specified
positions.
*/
public class SpecifiedPositionsBundleSplitAlgorithm implements
NamespaceBundleSplitAlgorithm{
+
+ private boolean force;
+
+ public SpecifiedPositionsBundleSplitAlgorithm() {
+ force = false;
+ }
+
+ public SpecifiedPositionsBundleSplitAlgorithm(boolean force) {
+ this.force = force;
+ }
@Override
public CompletableFuture<List<Long>> getSplitBoundary(BundleSplitOption
bundleSplitOption) {
NamespaceService service = bundleSplitOption.getService();
@@ -39,19 +49,28 @@ public class SpecifiedPositionsBundleSplitAlgorithm
implements NamespaceBundleSp
}
// sort all positions
Collections.sort(positions);
- return
service.getOwnedTopicListForNamespaceBundle(bundle).thenCompose(topics -> {
- if (topics == null || topics.size() <= 1) {
- return CompletableFuture.completedFuture(null);
- }
- List<Long> splitBoundaries = positions
- .stream()
- .filter(position -> position > bundle.getLowerEndpoint()
&& position < bundle.getUpperEndpoint())
- .collect(Collectors.toList());
-
- if (splitBoundaries.size() == 0) {
- return CompletableFuture.completedFuture(null);
- }
- return CompletableFuture.completedFuture(splitBoundaries);
- });
+ if (force) {
+ return getBoundaries(bundle, positions);
+ } else {
+ return service.getOwnedTopicListForNamespaceBundle(bundle)
+ .thenCompose(topics -> {
+ if (topics == null || topics.size() <= 1) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return getBoundaries(bundle, positions);
+ });
+ }
+ }
+
+ private CompletableFuture<List<Long>> getBoundaries(NamespaceBundle
bundle, List<Long> positions) {
+ List<Long> splitBoundaries = positions
+ .stream()
+ .filter(position -> position > bundle.getLowerEndpoint() &&
position < bundle.getUpperEndpoint())
+ .collect(Collectors.toList());
+
+ if (splitBoundaries.size() == 0) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return CompletableFuture.completedFuture(splitBoundaries);
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 57590d8d7da..ae7ceeed928 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -133,6 +133,7 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
conf.setLoadBalancerSheddingEnabled(false);
+ conf.setLoadBalancerDebugModeEnabled(true);
super.internalSetup(conf);
pulsar1 = pulsar;
ServiceConfiguration defaultConf = getDefaultConf();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index 2d98bc5cae6..77c80187a63 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -107,9 +107,16 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
private String lookupServiceAddress1;
private String lookupServiceAddress2;
private String bundle;
-
private String bundle1;
private String bundle2;
+ private String bundle3;
+ private String childBundle1Range;
+ private String childBundle2Range;
+ private String childBundle11;
+ private String childBundle12;
+
+ private String childBundle31;
+ private String childBundle32;
private PulsarTestContext additionalPulsarTestContext;
private LoadManagerContext loadManagerContext;
@@ -122,6 +129,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
@Override
protected void setup() throws Exception {
conf.setAllowAutoTopicCreation(true);
+ conf.setLoadBalancerDebugModeEnabled(true);
conf.setBrokerServiceCompactionMonitorIntervalInSeconds(10);
super.internalSetup(conf);
@@ -149,15 +157,23 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
lookupServiceAddress2 = (String)
FieldUtils.readDeclaredField(channel2, "lookupServiceAddress",
true);
- bundle = String.format("%s/%s", "public/default",
"0x00000000_0xffffffff");
- bundle1 = String.format("%s/%s", "public/default",
"0x00000000_0xfffffff0");
- bundle2 = String.format("%s/%s", "public/default",
"0xfffffff0_0xffffffff");
+ bundle = "public/default/0x00000000_0xffffffff";
+ bundle1 = "public/default/0x00000000_0xfffffff0";
+ bundle2 = "public/default/0xfffffff0_0xffffffff";
+ bundle3 = "public/default3/0x00000000_0xffffffff";
+ childBundle1Range = "0x7fffffff_0xffffffff";
+ childBundle2Range = "0x00000000_0x7fffffff";
+
+ childBundle11 = "public/default/" + childBundle1Range;
+ childBundle12 = "public/default/" + childBundle2Range;
+
+ childBundle31 = "public/default3/" + childBundle1Range;
+ childBundle32 = "public/default3/" + childBundle2Range;
}
@BeforeMethod
protected void initChannels() throws Exception {
- cleanTableView(channel1, bundle);
- cleanTableView(channel2, bundle);
+ cleanTableViews();
cleanOwnershipMonitorCounters(channel1);
cleanOwnershipMonitorCounters(channel2);
cleanOpsCounters(channel1);
@@ -299,7 +315,9 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
}
}
try {
- channel.publishSplitEventAsync(new Split(bundle,
lookupServiceAddress1))
+ Split split = new Split(bundle, lookupServiceAddress1, Map.of(
+ childBundle1Range, Optional.empty(), childBundle2Range,
Optional.empty()));
+ channel.publishSplitEventAsync(split)
.get(2, TimeUnit.SECONDS);
} catch (ExecutionException e) {
if (e.getCause() instanceof IllegalStateException) {
@@ -496,7 +514,8 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
assertEquals(0, getOwnerRequests2.size());
// recovered, check the monitor update state : Assigned -> Owned
-
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress1))).when(loadManager).selectAsync(any());
+
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress1)))
+ .when(loadManager).selectAsync(any());
FieldUtils.writeDeclaredField(channel2, "producer", producer, true);
FieldUtils.writeDeclaredField(channel1,
"inFlightStateWaitingTimeInMillis", 1 , true);
@@ -564,32 +583,33 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
doReturn(CompletableFuture.completedFuture(List.of("test-topic-1",
"test-topic-2")))
.when(namespaceService).getOwnedTopicListForNamespaceBundle(any());
- Split split = new Split(bundle, ownerAddr1.get());
+ // Assert child bundle ownerships in the channels.
+
+ Split split = new Split(bundle, ownerAddr1.get(), Map.of(
+ childBundle1Range, Optional.empty(), childBundle2Range,
Optional.empty()));
channel1.publishSplitEventAsync(split);
waitUntilState(channel1, bundle, Deleted);
waitUntilState(channel2, bundle, Deleted);
- validateHandlerCounters(channel1, 1, 0, 9, 0, 0, 0, 1, 0, 0, 0, 6, 0,
1, 0);
- validateHandlerCounters(channel2, 1, 0, 9, 0, 0, 0, 1, 0, 0, 0, 6, 0,
1, 0);
+ validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 0, 0,
1, 0);
+ validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 0, 0,
1, 0);
validateEventCounters(channel1, 1, 0, 1, 0, 0, 0);
validateEventCounters(channel2, 0, 0, 0, 0, 0, 0);
// Verify the retry count
verify(((ServiceUnitStateChannelImpl) channel1),
times(badVersionExceptionCount + 1))
.splitServiceUnitOnceAndRetry(any(), any(), any(), any(),
any(), any(), any(), any(), anyLong(), any());
- // Assert child bundle ownerships in the channels.
- String childBundle1 = "public/default/0x7fffffff_0xffffffff";
- String childBundle2 = "public/default/0x00000000_0x7fffffff";
- waitUntilNewOwner(channel1, childBundle1, lookupServiceAddress1);
- waitUntilNewOwner(channel1, childBundle2, lookupServiceAddress1);
- waitUntilNewOwner(channel2, childBundle1, lookupServiceAddress1);
- waitUntilNewOwner(channel2, childBundle2, lookupServiceAddress1);
- assertEquals(Optional.of(lookupServiceAddress1),
channel1.getOwnerAsync(childBundle1).get());
- assertEquals(Optional.of(lookupServiceAddress1),
channel1.getOwnerAsync(childBundle2).get());
- assertEquals(Optional.of(lookupServiceAddress1),
channel2.getOwnerAsync(childBundle1).get());
- assertEquals(Optional.of(lookupServiceAddress1),
channel2.getOwnerAsync(childBundle2).get());
+
+ waitUntilNewOwner(channel1, childBundle11, lookupServiceAddress1);
+ waitUntilNewOwner(channel1, childBundle12, lookupServiceAddress1);
+ waitUntilNewOwner(channel2, childBundle11, lookupServiceAddress1);
+ waitUntilNewOwner(channel2, childBundle12, lookupServiceAddress1);
+ assertEquals(Optional.of(lookupServiceAddress1),
channel1.getOwnerAsync(childBundle11).get());
+ assertEquals(Optional.of(lookupServiceAddress1),
channel1.getOwnerAsync(childBundle12).get());
+ assertEquals(Optional.of(lookupServiceAddress1),
channel2.getOwnerAsync(childBundle11).get());
+ assertEquals(Optional.of(lookupServiceAddress1),
channel2.getOwnerAsync(childBundle12).get());
// try the monitor and check the monitor moves `Deleted` -> `Init`
@@ -620,10 +640,10 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
0,
0);
- cleanTableView(channel1, childBundle1);
- cleanTableView(channel2, childBundle1);
- cleanTableView(channel1, childBundle2);
- cleanTableView(channel2, childBundle2);
+ cleanTableView(channel1, childBundle11);
+ cleanTableView(channel2, childBundle11);
+ cleanTableView(channel1, childBundle12);
+ cleanTableView(channel2, childBundle12);
FieldUtils.writeDeclaredField(channel1,
"inFlightStateWaitingTimeInMillis", 30 * 1000, true);
@@ -714,8 +734,8 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
var owner1 = channel1.getOwnerAsync(bundle1);
var owner2 = channel2.getOwnerAsync(bundle2);
-
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2))).when(loadManager).selectAsync(any());
-
+
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2)))
+ .when(loadManager).selectAsync(any());
assertTrue(owner1.get().isEmpty());
assertTrue(owner2.get().isEmpty());
@@ -916,8 +936,12 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
Awaitility.await()
.pollInterval(200, TimeUnit.MILLISECONDS)
.atMost(140, TimeUnit.SECONDS)
- .untilAsserted(() -> verify(compactor, times(1))
- .compact(eq(ServiceUnitStateChannelImpl.TOPIC),
any()));
+ .untilAsserted(() -> {
+ channel1.publishAssignEventAsync(bundle,
lookupServiceAddress1);
+ verify(compactor, times(1))
+ .compact(eq(ServiceUnitStateChannelImpl.TOPIC),
any());
+ });
+
var channel3 = createChannel(pulsar);
channel3.start();
@@ -1049,7 +1073,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
}
@Test(priority = 13)
- public void assignTestWhenDestBrokerFails()
+ public void assignTestWhenDestBrokerProducerFails()
throws ExecutionException, InterruptedException,
IllegalAccessException {
Unload unload = new Unload(lookupServiceAddress1, bundle,
Optional.empty());
@@ -1076,7 +1100,8 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
"inFlightStateWaitingTimeInMillis", 3 * 1000, true);
FieldUtils.writeDeclaredField(channel2,
"inFlightStateWaitingTimeInMillis", 3 * 1000, true);
-
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2))).when(loadManager).selectAsync(any());
+
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2)))
+ .when(loadManager).selectAsync(any());
channel1.publishAssignEventAsync(bundle, lookupServiceAddress2);
// channel1 is broken. the assign won't be complete.
waitUntilState(channel1, bundle);
@@ -1132,7 +1157,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
}
@Test(priority = 14)
- public void splitTestWhenDestBrokerFails()
+ public void splitTestWhenProducerFails()
throws ExecutionException, InterruptedException,
IllegalAccessException {
@@ -1165,7 +1190,12 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
"inFlightStateWaitingTimeInMillis", 3 * 1000, true);
FieldUtils.writeDeclaredField(channel2,
"inFlightStateWaitingTimeInMillis", 3 * 1000, true);
- channel2.publishSplitEventAsync(new Split(bundle,
lookupServiceAddress1, null));
+ // Assert child bundle ownerships in the channels.
+
+
+ Split split = new Split(bundle, lookupServiceAddress1, Map.of(
+ childBundle1Range, Optional.empty(), childBundle2Range,
Optional.empty()));
+ channel2.publishSplitEventAsync(split);
// channel1 is broken. the split won't be complete.
waitUntilState(channel1, bundle);
waitUntilState(channel2, bundle);
@@ -1180,29 +1210,19 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
FieldUtils.writeDeclaredField(channel2,
"inFlightStateWaitingTimeInMillis", 1 , true);
- ((ServiceUnitStateChannelImpl) channel1).monitorOwnerships(
- List.of(lookupServiceAddress1, lookupServiceAddress2));
- ((ServiceUnitStateChannelImpl) channel2).monitorOwnerships(
- List.of(lookupServiceAddress1, lookupServiceAddress2));
+ var leader = channel1.isChannelOwnerAsync().get() ? channel1 :
channel2;
- waitUntilNewOwner(channel1, bundle, lookupServiceAddress1);
- waitUntilNewOwner(channel2, bundle, lookupServiceAddress1);
- var ownerAddr1 = channel1.getOwnerAsync(bundle).get();
- var ownerAddr2 = channel2.getOwnerAsync(bundle).get();
+ waitUntilStateWithMonitor(leader, bundle, Deleted);
+ waitUntilStateWithMonitor(channel1, bundle, Deleted);
+ waitUntilStateWithMonitor(channel2, bundle, Deleted);
- assertEquals(ownerAddr1, ownerAddr2);
- assertEquals(ownerAddr1, Optional.of(lookupServiceAddress1));
+ var ownerAddr1 = channel1.getOwnerAsync(bundle);
+ var ownerAddr2 = channel2.getOwnerAsync(bundle);
+
+ assertTrue(ownerAddr1.isCompletedExceptionally());
+ assertTrue(ownerAddr2.isCompletedExceptionally());
- var leader = channel1.isChannelOwnerAsync().get() ? channel1 :
channel2;
- validateMonitorCounters(leader,
- 0,
- 0,
- 1,
- 0,
- 0,
- 0,
- 0);
FieldUtils.writeDeclaredField(channel1,
"inFlightStateWaitingTimeInMillis", 30 * 1000, true);
@@ -1273,6 +1293,243 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
assertFalse(channel1.isOwner(bundle));
}
+ @Test(priority = 16)
+ public void splitAndRetryFailureTest() throws Exception {
+ channel1.publishAssignEventAsync(bundle3, lookupServiceAddress1);
+ waitUntilNewOwner(channel1, bundle3, lookupServiceAddress1);
+ waitUntilNewOwner(channel2, bundle3, lookupServiceAddress1);
+ var ownerAddr1 = channel1.getOwnerAsync(bundle3).get();
+ var ownerAddr2 = channel2.getOwnerAsync(bundle3).get();
+ assertEquals(ownerAddr1, Optional.of(lookupServiceAddress1));
+ assertEquals(ownerAddr2, Optional.of(lookupServiceAddress1));
+ assertTrue(ownerAddr1.isPresent());
+
+ NamespaceService namespaceService = spy(pulsar1.getNamespaceService());
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ int badVersionExceptionCount = 10;
+ AtomicInteger count = new AtomicInteger(badVersionExceptionCount);
+ future.completeExceptionally(new
MetadataStoreException.BadVersionException("BadVersion"));
+ doAnswer(invocationOnMock -> {
+ if (count.decrementAndGet() > 0) {
+ return future;
+ }
+ // Call the real method
+ reset(namespaceService);
+ doReturn(CompletableFuture.completedFuture(List.of("test-topic-1",
"test-topic-2")))
+
.when(namespaceService).getOwnedTopicListForNamespaceBundle(any());
+ return future;
+ }).when(namespaceService).updateNamespaceBundlesForPolicies(any(),
any());
+ doReturn(namespaceService).when(pulsar1).getNamespaceService();
+ doReturn(CompletableFuture.completedFuture(List.of("test-topic-1",
"test-topic-2")))
+
.when(namespaceService).getOwnedTopicListForNamespaceBundle(any());
+
+ // Assert child bundle ownerships in the channels.
+
+ Split split = new Split(bundle3, ownerAddr1.get(), Map.of(
+ childBundle1Range, Optional.empty(), childBundle2Range,
Optional.empty()));
+ channel1.publishSplitEventAsync(split);
+
+ FieldUtils.writeDeclaredField(channel1,
+ "inFlightStateWaitingTimeInMillis", 1 , true);
+ FieldUtils.writeDeclaredField(channel2,
+ "inFlightStateWaitingTimeInMillis", 1 , true);
+
+ Awaitility.await()
+ .pollInterval(200, TimeUnit.MILLISECONDS)
+ .atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() -> {
+ assertEquals(3, count.get());
+ });
+ var leader = channel1.isChannelOwnerAsync().get() ? channel1 :
channel2;
+ ((ServiceUnitStateChannelImpl) leader)
+ .monitorOwnerships(List.of(lookupServiceAddress1,
lookupServiceAddress2));
+ waitUntilState(leader, bundle3, Deleted);
+ waitUntilState(channel1, bundle3, Deleted);
+ waitUntilState(channel2, bundle3, Deleted);
+
+
+ validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 2, 1, 0, 0, 0, 0,
1, 0);
+ validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 2, 0, 0, 0, 0, 0,
1, 0);
+ validateEventCounters(channel1, 1, 0, 1, 0, 0, 0);
+ validateEventCounters(channel2, 0, 0, 0, 0, 0, 0);
+
+ waitUntilNewOwner(channel1, childBundle31, lookupServiceAddress1);
+ waitUntilNewOwner(channel1, childBundle32, lookupServiceAddress1);
+ waitUntilNewOwner(channel2, childBundle31, lookupServiceAddress1);
+ waitUntilNewOwner(channel2, childBundle32, lookupServiceAddress1);
+ assertEquals(Optional.of(lookupServiceAddress1),
channel1.getOwnerAsync(childBundle31).get());
+ assertEquals(Optional.of(lookupServiceAddress1),
channel1.getOwnerAsync(childBundle32).get());
+ assertEquals(Optional.of(lookupServiceAddress1),
channel2.getOwnerAsync(childBundle31).get());
+ assertEquals(Optional.of(lookupServiceAddress1),
channel2.getOwnerAsync(childBundle32).get());
+
+
+ // try the monitor and check the monitor moves `Deleted` -> `Init`
+
+ FieldUtils.writeDeclaredField(channel1,
+ "semiTerminalStateWaitingTimeInMillis", 1, true);
+ FieldUtils.writeDeclaredField(channel2,
+ "semiTerminalStateWaitingTimeInMillis", 1, true);
+
+ ((ServiceUnitStateChannelImpl) channel1).monitorOwnerships(
+ List.of(lookupServiceAddress1, lookupServiceAddress2));
+ ((ServiceUnitStateChannelImpl) channel2).monitorOwnerships(
+ List.of(lookupServiceAddress1, lookupServiceAddress2));
+ waitUntilState(channel1, bundle3, Init);
+ waitUntilState(channel2, bundle3, Init);
+
+ validateMonitorCounters(leader,
+ 0,
+ 1,
+ 1,
+ 0,
+ 0,
+ 0,
+ 0);
+
+
+ cleanTableViews();
+
+ FieldUtils.writeDeclaredField(channel1,
+ "inFlightStateWaitingTimeInMillis", 30 * 1000, true);
+ FieldUtils.writeDeclaredField(channel1,
+ "semiTerminalStateWaitingTimeInMillis", 300 * 1000, true);
+
+ FieldUtils.writeDeclaredField(channel2,
+ "inFlightStateWaitingTimeInMillis", 30 * 1000, true);
+ FieldUtils.writeDeclaredField(channel2,
+ "semiTerminalStateWaitingTimeInMillis", 300 * 1000, true);
+ }
+
+ @Test(priority = 17)
+ public void testOverrideInactiveBrokerStateData()
+ throws IllegalAccessException, ExecutionException,
InterruptedException, TimeoutException {
+
+ var leaderChannel = channel1;
+ var followerChannel = channel2;
+ String leader = channel1.getChannelOwnerAsync().get(2,
TimeUnit.SECONDS).get();
+ String leader2 = channel2.getChannelOwnerAsync().get(2,
TimeUnit.SECONDS).get();
+ assertEquals(leader, leader2);
+ if (leader.equals(lookupServiceAddress2)) {
+ leaderChannel = channel2;
+ followerChannel = channel1;
+ }
+
+ String broker = lookupServiceAddress1;
+
+ // test override states
+ String releasingBundle = "public/releasing/0xfffffff0_0xffffffff";
+ String splittingBundle = bundle;
+ String assigningBundle = "public/assigning/0xfffffff0_0xffffffff";
+ String freeBundle = "public/free/0xfffffff0_0xffffffff";
+ String deletedBundle = "public/deleted/0xfffffff0_0xffffffff";
+ String ownedBundle = "public/owned/0xfffffff0_0xffffffff";
+ overrideTableViews(releasingBundle,
+ new ServiceUnitStateData(Releasing, null, broker, 1));
+ overrideTableViews(splittingBundle,
+ new ServiceUnitStateData(Splitting, null, broker,
+ Map.of(childBundle1Range, Optional.empty(),
+ childBundle2Range, Optional.empty()), 1));
+ overrideTableViews(assigningBundle,
+ new ServiceUnitStateData(Assigning, broker, null, 1));
+ overrideTableViews(freeBundle,
+ new ServiceUnitStateData(Free, null, broker, 1));
+ overrideTableViews(deletedBundle,
+ new ServiceUnitStateData(Deleted, null, broker, 1));
+ overrideTableViews(ownedBundle,
+ new ServiceUnitStateData(Owned, broker, null, 1));
+
+ // test stable metadata state
+
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2)))
+ .when(loadManager).selectAsync(any());
+ leaderChannel.handleMetadataSessionEvent(SessionReestablished);
+ followerChannel.handleMetadataSessionEvent(SessionReestablished);
+ FieldUtils.writeDeclaredField(leaderChannel,
"lastMetadataSessionEventTimestamp",
+ System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS
* 1000 + 1000), true);
+ FieldUtils.writeDeclaredField(followerChannel,
"lastMetadataSessionEventTimestamp",
+ System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS
* 1000 + 1000), true);
+ leaderChannel.handleBrokerRegistrationEvent(broker,
NotificationType.Deleted);
+ followerChannel.handleBrokerRegistrationEvent(broker,
NotificationType.Deleted);
+
+ waitUntilNewOwner(channel2, releasingBundle, lookupServiceAddress2);
+ waitUntilNewOwner(channel2, childBundle11, lookupServiceAddress2);
+ waitUntilNewOwner(channel2, childBundle12, lookupServiceAddress2);
+ waitUntilNewOwner(channel2, assigningBundle, lookupServiceAddress2);
+ waitUntilNewOwner(channel2, ownedBundle, lookupServiceAddress2);
+ assertEquals(Optional.empty(),
channel2.getOwnerAsync(freeBundle).get());
+
assertTrue(channel2.getOwnerAsync(deletedBundle).isCompletedExceptionally());
+
assertTrue(channel2.getOwnerAsync(splittingBundle).isCompletedExceptionally());
+
+ // clean-up
+ FieldUtils.writeDeclaredField(leaderChannel,
"maxCleanupDelayTimeInSecs", 3 * 60, true);
+ cleanTableViews();
+
+ }
+
+ @Test(priority = 18)
+ public void testOverrideOrphanStateData()
+ throws IllegalAccessException, ExecutionException,
InterruptedException, TimeoutException {
+
+ var leaderChannel = channel1;
+ var followerChannel = channel2;
+ String leader = channel1.getChannelOwnerAsync().get(2,
TimeUnit.SECONDS).get();
+ String leader2 = channel2.getChannelOwnerAsync().get(2,
TimeUnit.SECONDS).get();
+ assertEquals(leader, leader2);
+ if (leader.equals(lookupServiceAddress2)) {
+ leaderChannel = channel2;
+ followerChannel = channel1;
+ }
+
+ String broker = lookupServiceAddress1;
+
+ // test override states
+ String releasingBundle = "public/releasing/0xfffffff0_0xffffffff";
+ String splittingBundle = bundle;
+ String assigningBundle = "public/assigning/0xfffffff0_0xffffffff";
+ String freeBundle = "public/free/0xfffffff0_0xffffffff";
+ String deletedBundle = "public/deleted/0xfffffff0_0xffffffff";
+ String ownedBundle = "public/owned/0xfffffff0_0xffffffff";
+ overrideTableViews(releasingBundle,
+ new ServiceUnitStateData(Releasing, null, broker, 1));
+ overrideTableViews(splittingBundle,
+ new ServiceUnitStateData(Splitting, null, broker,
+ Map.of(childBundle1Range, Optional.empty(),
+ childBundle2Range, Optional.empty()), 1));
+ overrideTableViews(assigningBundle,
+ new ServiceUnitStateData(Assigning, broker, null, 1));
+ overrideTableViews(freeBundle,
+ new ServiceUnitStateData(Free, null, broker, 1));
+ overrideTableViews(deletedBundle,
+ new ServiceUnitStateData(Deleted, null, broker, 1));
+ overrideTableViews(ownedBundle,
+ new ServiceUnitStateData(Owned, broker, null, 1));
+
+ // test stable metadata state
+
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2)))
+ .when(loadManager).selectAsync(any());
+ FieldUtils.writeDeclaredField(leaderChannel,
"inFlightStateWaitingTimeInMillis",
+ -1, true);
+ FieldUtils.writeDeclaredField(followerChannel,
"inFlightStateWaitingTimeInMillis",
+ -1, true);
+ ((ServiceUnitStateChannelImpl) leaderChannel)
+ .monitorOwnerships(List.of(lookupServiceAddress1,
lookupServiceAddress2));
+
+ waitUntilNewOwner(channel2, releasingBundle, broker);
+ waitUntilNewOwner(channel2, childBundle11, broker);
+ waitUntilNewOwner(channel2, childBundle12, broker);
+ waitUntilNewOwner(channel2, assigningBundle, lookupServiceAddress2);
+ waitUntilNewOwner(channel2, ownedBundle, broker);
+ assertEquals(Optional.empty(),
channel2.getOwnerAsync(freeBundle).get());
+
assertTrue(channel2.getOwnerAsync(deletedBundle).isCompletedExceptionally());
+
assertTrue(channel2.getOwnerAsync(splittingBundle).isCompletedExceptionally());
+
+ // clean-up
+ FieldUtils.writeDeclaredField(channel1,
+ "inFlightStateWaitingTimeInMillis", 30 * 1000, true);
+ FieldUtils.writeDeclaredField(channel2,
+ "inFlightStateWaitingTimeInMillis", 30 * 1000, true);
+ cleanTableViews();
+ }
+
private static ConcurrentOpenHashMap<String,
CompletableFuture<Optional<String>>> getOwnerRequests(
ServiceUnitStateChannel channel) throws IllegalAccessException {
@@ -1375,6 +1632,22 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
});
}
+ private void waitUntilStateWithMonitor(ServiceUnitStateChannel channel,
String key, ServiceUnitState expected)
+ throws IllegalAccessException {
+ TableViewImpl<ServiceUnitStateData> tv =
(TableViewImpl<ServiceUnitStateData>)
+ FieldUtils.readField(channel, "tableview", true);
+ Awaitility.await()
+ .pollInterval(200, TimeUnit.MILLISECONDS)
+ .atMost(10, TimeUnit.SECONDS)
+ .until(() -> { // wait until true
+ ((ServiceUnitStateChannelImpl) channel)
+ .monitorOwnerships(List.of(lookupServiceAddress1,
lookupServiceAddress2));
+ ServiceUnitStateData data = tv.get(key);
+ ServiceUnitState actual = state(data);
+ return actual == expected;
+ });
+ }
+
private static void cleanTableView(ServiceUnitStateChannel channel, String
serviceUnit)
throws IllegalAccessException {
var tv = (TableViewImpl<ServiceUnitStateData>)
@@ -1384,6 +1657,26 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
cache.remove(serviceUnit);
}
+ private void cleanTableViews()
+ throws IllegalAccessException {
+ var tv1 = (TableViewImpl<ServiceUnitStateData>)
+ FieldUtils.readField(channel1, "tableview", true);
+ var cache1 = (ConcurrentMap<String, ServiceUnitStateData>)
+ FieldUtils.readField(tv1, "data", true);
+ cache1.clear();
+
+ var tv2 = (TableViewImpl<ServiceUnitStateData>)
+ FieldUtils.readField(channel2, "tableview", true);
+ var cache2 = (ConcurrentMap<String, ServiceUnitStateData>)
+ FieldUtils.readField(tv2, "data", true);
+ cache2.clear();
+ }
+
+ private void overrideTableViews(String serviceUnit, ServiceUnitStateData
val) throws IllegalAccessException {
+ overrideTableView(channel1, serviceUnit, val);
+ overrideTableView(channel2, serviceUnit, val);
+ }
+
private static void overrideTableView(ServiceUnitStateChannel channel,
String serviceUnit, ServiceUnitStateData val)
throws IllegalAccessException {
var tv = (TableViewImpl<ServiceUnitStateData>)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/models/SplitTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/models/SplitTest.java
index 1e23ea74dca..fd50f0d8aed 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/models/SplitTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/models/SplitTest.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.broker.loadbalance.extensions.channel.models;
import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNull;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@@ -56,12 +55,9 @@ public class SplitTest {
new Split("A", "B", map);
}
- @Test
+ @Test(expectedExceptions = IllegalArgumentException.class)
public void testNullSplitServiceUnitToDestBroker() {
- var split = new Split("A", "B");
- assertEquals(split.serviceUnit(), "A");
- assertEquals(split.sourceBroker(), "B");
- assertNull(split.splitServiceUnitToDestBroker());
+ var split = new Split("A", "B", null);
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitSchedulerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitSchedulerTest.java
index f7726f8e7a1..f56ad90833b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitSchedulerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitSchedulerTest.java
@@ -28,6 +28,8 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import java.util.List;
+import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
@@ -56,6 +58,15 @@ public class SplitSchedulerTest {
NamespaceBundleSplitStrategy strategy;
String bundle1 = "tenant/namespace/0x00000000_0xFFFFFFFF";
String bundle2 = "tenant/namespace/0x00000000_0x0FFFFFFF";
+
+ String childBundle12 = "tenant/namespace/0x7fffffff_0xffffffff";
+
+ String childBundle11 = "tenant/namespace/0x00000000_0x7fffffff";
+
+ String childBundle22 = "tenant/namespace/0x7fffffff_0x0fffffff";
+
+ String childBundle21 = "tenant/namespace/0x00000000_0x7fffffff";
+
String broker = "broker-1";
SplitDecision decision1;
SplitDecision decision2;
@@ -77,11 +88,15 @@ public class SplitSchedulerTest {
doReturn(CompletableFuture.completedFuture(null)).when(channel).publishSplitEventAsync(any());
decision1 = new SplitDecision();
- decision1.setSplit(new Split(bundle1, broker));
+ Split split = new Split(bundle1, broker, Map.of(
+ childBundle11, Optional.empty(), childBundle12,
Optional.empty()));
+ decision1.setSplit(split);
decision1.succeed(SplitDecision.Reason.MsgRate);
decision2 = new SplitDecision();
- decision2.setSplit(new Split(bundle2, broker));
+ Split split2 = new Split(bundle2, broker, Map.of(
+ childBundle21, Optional.empty(), childBundle22,
Optional.empty()));
+ decision2.setSplit(split2);
decision2.succeed(SplitDecision.Reason.Sessions);
Set<SplitDecision> decisions = Set.of(decision1, decision2);
doReturn(decisions).when(strategy).findBundlesToSplit(any(), any());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyTest.java
index 8c765e7a3df..8b489c92af0 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyTest.java
@@ -28,9 +28,13 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
+import com.google.common.hash.Hashing;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -38,15 +42,19 @@ import
org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry;
import
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerWrapper;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
-import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
+import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
+import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.PulsarStats;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -71,7 +79,12 @@ public class DefaultNamespaceBundleSplitStrategyTest {
String bundle1 = "tenant/namespace/0x00000000_0xFFFFFFFF";
String bundle2 = "tenant/namespace/0x00000000_0x0FFFFFFF";
-
+ Long splitBoundary1 = 0x7fffffffL;
+ Long splitBoundary2 = 0x07ffffffL;
+ String childBundle12 = "0x7fffffff_0xffffffff";
+ String childBundle11 = "0x00000000_0x7fffffff";
+ String childBundle22 = "0x07ffffff_0x0fffffff";
+ String childBundle21 = "0x00000000_0x07ffffff";
String broker = "broker-1";
@BeforeMethod
@@ -90,19 +103,21 @@ public class DefaultNamespaceBundleSplitStrategyTest {
brokerService = mock(BrokerService.class);
pulsarStats = mock(PulsarStats.class);
namespaceService = mock(NamespaceService.class);
- namespaceBundleFactory = mock(NamespaceBundleFactory.class);
+
loadManagerContext = mock(LoadManagerContext.class);
brokerRegistry = mock(BrokerRegistry.class);
loadManagerWrapper = mock(ExtensibleLoadManagerWrapper.class);
loadManager = mock(ExtensibleLoadManagerImpl.class);
channel = mock(ServiceUnitStateChannelImpl.class);
+
+
doReturn(mock(MetadataStoreExtended.class)).when(pulsar).getLocalMetadataStore();
+ namespaceBundleFactory = spy(new NamespaceBundleFactory(pulsar,
Hashing.crc32()));
doReturn(brokerService).when(pulsar).getBrokerService();
doReturn(config).when(pulsar).getConfiguration();
doReturn(pulsarStats).when(brokerService).getPulsarStats();
doReturn(namespaceService).when(pulsar).getNamespaceService();
doReturn(namespaceBundleFactory).when(namespaceService).getNamespaceBundleFactory();
- doReturn(true).when(namespaceBundleFactory).canSplitBundle(any());
doReturn(brokerRegistry).when(loadManagerContext).brokerRegistry();
doReturn(broker).when(brokerRegistry).getBrokerId();
doReturn(new
AtomicReference(loadManagerWrapper)).when(pulsar).getLoadManager();
@@ -110,6 +125,18 @@ public class DefaultNamespaceBundleSplitStrategyTest {
doReturn(channel).when(loadManager).getServiceUnitStateChannel();
doReturn(true).when(channel).isOwner(any());
+ var namespaceBundle1 = namespaceBundleFactory.getBundle(
+ LoadManagerShared.getNamespaceNameFromBundleName(bundle1),
+ LoadManagerShared.getBundleRangeFromBundleName(bundle1));
+ var namespaceBundle2 = namespaceBundleFactory.getBundle(
+ LoadManagerShared.getNamespaceNameFromBundleName(bundle2),
+ LoadManagerShared.getBundleRangeFromBundleName(bundle2));
+ doReturn(CompletableFuture.completedFuture(
+
List.of(splitBoundary1))).when(namespaceService).getSplitBoundary(
+ eq(namespaceBundle1), eq((List<Long>)null), any());
+ doReturn(CompletableFuture.completedFuture(
+
List.of(splitBoundary2))).when(namespaceService).getSplitBoundary(
+ eq(namespaceBundle2), eq((List<Long>)null), any());
bundleStats = new LinkedHashMap<>();
NamespaceBundleStats stats1 = new NamespaceBundleStats();
@@ -184,6 +211,20 @@ public class DefaultNamespaceBundleSplitStrategyTest {
verify(counter, times(2)).update(eq(SplitDecision.Label.Failure),
eq(Unknown));
}
+ public void testSplittingBundle() {
+ var counter = spy(new SplitCounter());
+
config.setLoadBalancerNamespaceBundleSplitConditionHitCountThreshold(0);
+ bundleStats.values().forEach(v -> v.msgRateIn =
config.getLoadBalancerNamespaceBundleMaxMsgRate() + 1);
+ doReturn(Map.of("tenant/namespace/0x00000000_0xFFFFFFFF",
+ new ServiceUnitStateData(ServiceUnitState.Splitting, broker,
1)).entrySet())
+ .when(channel).getOwnershipEntrySet();
+ var strategy = new DefaultNamespaceBundleSplitStrategyImpl(counter);
+ var actual = strategy.findBundlesToSplit(loadManagerContext, pulsar);
+ var expected = Set.of();
+ assertEquals(actual, expected);
+ verify(counter, times(0)).update(eq(SplitDecision.Label.Failure),
eq(Unknown));
+ }
+
public void testMaxMsgRate() {
var counter = spy(new SplitCounter());
var strategy = new DefaultNamespaceBundleSplitStrategyImpl(counter);
@@ -196,14 +237,18 @@ public class DefaultNamespaceBundleSplitStrategyTest {
var actual = strategy.findBundlesToSplit(loadManagerContext,
pulsar);
if (i == threshold) {
SplitDecision decision1 = new SplitDecision();
- decision1.setSplit(new Split(bundle1, broker));
+ Split split = new Split(bundle1, broker, Map.of(
+ childBundle11, Optional.empty(), childBundle12,
Optional.empty()));
+ decision1.setSplit(split);
decision1.succeed(SplitDecision.Reason.MsgRate);
assertEquals(actual, Set.of(decision1));
verify(counter,
times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown));
} else if (i == threshold + 1) {
SplitDecision decision1 = new SplitDecision();
- decision1.setSplit(new Split(bundle2, broker));
+ Split split = new Split(bundle2, broker, Map.of(
+ childBundle21, Optional.empty(), childBundle22,
Optional.empty()));
+ decision1.setSplit(split);
decision1.succeed(SplitDecision.Reason.MsgRate);
assertEquals(actual, Set.of(decision1));
@@ -224,14 +269,18 @@ public class DefaultNamespaceBundleSplitStrategyTest {
var actual = strategy.findBundlesToSplit(loadManagerContext,
pulsar);
if (i == threshold) {
SplitDecision decision1 = new SplitDecision();
- decision1.setSplit(new Split(bundle1, broker));
+ Split split = new Split(bundle1, broker, Map.of(
+ childBundle11, Optional.empty(), childBundle12,
Optional.empty()));
+ decision1.setSplit(split);
decision1.succeed(SplitDecision.Reason.Topics);
assertEquals(actual, Set.of(decision1));
verify(counter,
times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown));
} else if (i == threshold + 1) {
SplitDecision decision1 = new SplitDecision();
- decision1.setSplit(new Split(bundle2, broker));
+ Split split = new Split(bundle2, broker, Map.of(
+ childBundle21, Optional.empty(), childBundle22,
Optional.empty()));
+ decision1.setSplit(split);
decision1.succeed(SplitDecision.Reason.Topics);
assertEquals(actual, Set.of(decision1));
@@ -255,14 +304,18 @@ public class DefaultNamespaceBundleSplitStrategyTest {
var actual = strategy.findBundlesToSplit(loadManagerContext,
pulsar);
if (i == threshold) {
SplitDecision decision1 = new SplitDecision();
- decision1.setSplit(new Split(bundle1, broker));
+ Split split = new Split(bundle1, broker, Map.of(
+ childBundle11, Optional.empty(), childBundle12,
Optional.empty()));
+ decision1.setSplit(split);
decision1.succeed(SplitDecision.Reason.Sessions);
assertEquals(actual, Set.of(decision1));
verify(counter,
times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown));
} else if (i == threshold + 1) {
SplitDecision decision1 = new SplitDecision();
- decision1.setSplit(new Split(bundle2, broker));
+ Split split = new Split(bundle2, broker, Map.of(
+ childBundle21, Optional.empty(), childBundle22,
Optional.empty()));
+ decision1.setSplit(split);
decision1.succeed(SplitDecision.Reason.Sessions);
assertEquals(actual, Set.of(decision1));
@@ -286,14 +339,18 @@ public class DefaultNamespaceBundleSplitStrategyTest {
var actual = strategy.findBundlesToSplit(loadManagerContext,
pulsar);
if (i == threshold) {
SplitDecision decision1 = new SplitDecision();
- decision1.setSplit(new Split(bundle1, broker));
+ Split split = new Split(bundle1, broker, Map.of(
+ childBundle11, Optional.empty(), childBundle12,
Optional.empty()));
+ decision1.setSplit(split);
decision1.succeed(SplitDecision.Reason.Bandwidth);
assertEquals(actual, Set.of(decision1));
verify(counter,
times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown));
} else if (i == threshold + 1) {
SplitDecision decision1 = new SplitDecision();
- decision1.setSplit(new Split(bundle2, broker));
+ Split split = new Split(bundle2, broker, Map.of(
+ childBundle21, Optional.empty(), childBundle22,
Optional.empty()));
+ decision1.setSplit(split);
decision1.succeed(SplitDecision.Reason.Bandwidth);
assertEquals(actual, Set.of(decision1));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/SpecifiedPositionsBundleSplitAlgorithmTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/SpecifiedPositionsBundleSplitAlgorithmTest.java
index ec458def65b..80f9c279a55 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/SpecifiedPositionsBundleSplitAlgorithmTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/SpecifiedPositionsBundleSplitAlgorithmTest.java
@@ -34,6 +34,21 @@ import static org.testng.Assert.fail;
public class SpecifiedPositionsBundleSplitAlgorithmTest {
+ @Test
+ public void testEmptyTopicWithForce() {
+ SpecifiedPositionsBundleSplitAlgorithm algorithm = new
SpecifiedPositionsBundleSplitAlgorithm(true);
+ NamespaceService mockNamespaceService = mock(NamespaceService.class);
+ NamespaceBundle mockNamespaceBundle = mock(NamespaceBundle.class);
+ doReturn(1L).when(mockNamespaceBundle).getLowerEndpoint();
+ doReturn(1000L).when(mockNamespaceBundle).getUpperEndpoint();
+ doReturn(CompletableFuture.completedFuture(List.of()))
+
.when(mockNamespaceService).getOwnedTopicListForNamespaceBundle(mockNamespaceBundle);
+ List<Long> splitPositions =
+ algorithm.getSplitBoundary(new
BundleSplitOption(mockNamespaceService, mockNamespaceBundle,
+ Arrays.asList(1L, 2L))).join();
+ assertEquals(splitPositions, Arrays.asList(2L));
+ }
+
@Test
public void testTotalTopicsSizeLessThan1() {
SpecifiedPositionsBundleSplitAlgorithm algorithm = new
SpecifiedPositionsBundleSplitAlgorithm();