dragosvictor commented on code in PR #21408:
URL: https://github.com/apache/pulsar/pull/21408#discussion_r1381138909
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -732,15 +770,20 @@ private void handleOwnEvent(String serviceUnit,
ServiceUnitStateData data) {
if (getOwnerRequest != null) {
getOwnerRequest.complete(data.dstBroker());
}
- stateChangeListeners.notify(serviceUnit, data, null);
+ CompletableFuture<Integer> ownFuture = null;
Review Comment:
The null assignment doesn't seem necessary here.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java:
##########
@@ -286,6 +300,37 @@ public static void createSystemTopic(PulsarService pulsar,
String topic) throws
}
}
+ /**
+ * Gets the assigned broker for the given topic.
+ * @param pulsar PulsarService instance
+ * @param topic Topic Name
+ * @return the assigned broker's BrokerLookupData instance. Empty, if not
assigned by Extensible LoadManager.
+ */
+ public static CompletableFuture<Optional<BrokerLookupData>>
getAssignedBrokerLookupData(PulsarService pulsar,
+
String topic) {
+ if
(ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar.getConfig())) {
+ var topicName = TopicName.get(topic);
+ try {
+ return pulsar.getNamespaceService().getBundleAsync(topicName)
+ .thenCompose(bundle -> {
+ var loadManager =
ExtensibleLoadManagerImpl.get(pulsar);
+ var assigned =
loadManager.getServiceUnitStateChannel()
+ .getAssigned(bundle.toString());
+ if (assigned.isPresent()) {
+ return
loadManager.getBrokerRegistry().lookupAsync(assigned.get());
+ } else {
+ return
CompletableFuture.completedFuture(Optional.empty());
+ }
+ }
+ );
+ } catch (Throwable e) {
+ log.error("Failed to DestinationBrokerLookupData for
topic:{}", topic, e);
Review Comment:
Nit: can we improve the error message here?
```suggestion
log.error("Failed to lookup destination broker for
topic:{}", topic, e);
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java:
##########
@@ -484,14 +485,22 @@ private CompletableFuture<Void> delete(boolean
failIfHasSubscriptions, boolean c
return deleteFuture;
}
+
+ @Override
+ public CompletableFuture<Void> close(boolean
closeWithoutWaitingClientDisconnect) {
+ return close(false, closeWithoutWaitingClientDisconnect);
+ }
+
/**
* Close this topic - close all producers and subscriptions associated
with this topic.
*
+ * @param closeWithoutDisconnectingClients don't disconnect clients
Review Comment:
Agree, the naming is confusing at first.
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java:
##########
@@ -156,6 +167,10 @@ void reconnectLater(Throwable exception) {
}
public void connectionClosed(ClientCnx cnx) {
+ connectionClosed(cnx, null, Optional.empty());
+ }
+
+ public void connectionClosed(ClientCnx cnx, Long initialConnectionDelayMs,
Optional<URI> hostUrl) {
Review Comment:
Just an observation: it's not clear to me when we're using a raw reference
and when an Optional. Seems like we could've used `Optional<Long>
initialConnectionDelayMs` just as well.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -538,6 +538,44 @@ public CompletableFuture<Optional<String>>
getOwnerAsync(String serviceUnit) {
}
}
+ @Override
+ public Optional<String> getAssigned(String serviceUnit) {
+ if (!validateChannelState(Started, true)) {
+ return Optional.empty();
+ }
+
+ ServiceUnitStateData data = tableview.get(serviceUnit);
+ if (data == null) {
+ return Optional.empty();
+ }
+ ServiceUnitState state = state(data);
+ switch (state) {
+ case Owned, Assigning -> {
+ return Optional.of(data.dstBroker());
+ }
+ case Releasing -> {
+ if (data.dstBroker() != null) {
+ return Optional.of(data.dstBroker());
+ }
+ return Optional.empty();
Review Comment:
Nit: we can simplify this to:
```suggestion
return Optional.ofNullable(data.dstBroker());
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -732,15 +770,20 @@ private void handleOwnEvent(String serviceUnit,
ServiceUnitStateData data) {
if (getOwnerRequest != null) {
getOwnerRequest.complete(data.dstBroker());
}
- stateChangeListeners.notify(serviceUnit, data, null);
+ CompletableFuture<Integer> ownFuture = null;
if (isTargetBroker(data.dstBroker())) {
- log(null, serviceUnit, data, null);
pulsar.getNamespaceService()
.onNamespaceBundleOwned(LoadManagerShared.getNamespaceBundle(pulsar,
serviceUnit));
lastOwnEventHandledAt = System.currentTimeMillis();
- } else if (data.force() && isTargetBroker(data.sourceBroker())) {
- closeServiceUnit(serviceUnit);
+ ownFuture = CompletableFuture.completedFuture(null);
+ } else if ((data.force() || isTransferCommand(data)) &&
isTargetBroker(data.sourceBroker())) {
+ ownFuture = closeServiceUnit(serviceUnit, false);
+ } else {
+ ownFuture = CompletableFuture.completedFuture(null);
}
+
+ stateChangeListeners.notifyOnCompletion(ownFuture, serviceUnit, data)
+ .whenComplete((__, e) -> log(e, serviceUnit, data, null));
Review Comment:
Question: why this is needed now?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]