dragosvictor commented on code in PR #21408:
URL: https://github.com/apache/pulsar/pull/21408#discussion_r1381138909


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -732,15 +770,20 @@ private void handleOwnEvent(String serviceUnit, 
ServiceUnitStateData data) {
         if (getOwnerRequest != null) {
             getOwnerRequest.complete(data.dstBroker());
         }
-        stateChangeListeners.notify(serviceUnit, data, null);
+        CompletableFuture<Integer> ownFuture = null;

Review Comment:
   The null assignment doesn't seem necessary here.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java:
##########
@@ -286,6 +300,37 @@ public static void createSystemTopic(PulsarService pulsar, 
String topic) throws
         }
     }
 
+    /**
+     * Gets the assigned broker for the given topic.
+     * @param pulsar PulsarService instance
+     * @param topic Topic Name
+     * @return the assigned broker's BrokerLookupData instance. Empty, if not 
assigned by Extensible LoadManager.
+     */
+    public static CompletableFuture<Optional<BrokerLookupData>> 
getAssignedBrokerLookupData(PulsarService pulsar,
+                                                                          
String topic) {
+        if 
(ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar.getConfig())) {
+            var topicName = TopicName.get(topic);
+            try {
+                return pulsar.getNamespaceService().getBundleAsync(topicName)
+                        .thenCompose(bundle -> {
+                                    var loadManager = 
ExtensibleLoadManagerImpl.get(pulsar);
+                                    var assigned = 
loadManager.getServiceUnitStateChannel()
+                                            .getAssigned(bundle.toString());
+                                    if (assigned.isPresent()) {
+                                        return 
loadManager.getBrokerRegistry().lookupAsync(assigned.get());
+                                    } else {
+                                        return 
CompletableFuture.completedFuture(Optional.empty());
+                                    }
+                                }
+                        );
+            } catch (Throwable e) {
+                log.error("Failed to DestinationBrokerLookupData for 
topic:{}", topic, e);

Review Comment:
   Nit: can we improve the error message here?
   
   ```suggestion
                   log.error("Failed to lookup destination broker for 
topic:{}", topic, e);
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java:
##########
@@ -484,14 +485,22 @@ private CompletableFuture<Void> delete(boolean 
failIfHasSubscriptions, boolean c
         return deleteFuture;
     }
 
+
+    @Override
+    public CompletableFuture<Void> close(boolean 
closeWithoutWaitingClientDisconnect) {
+        return close(false, closeWithoutWaitingClientDisconnect);
+    }
+
     /**
      * Close this topic - close all producers and subscriptions associated 
with this topic.
      *
+     * @param closeWithoutDisconnectingClients don't disconnect clients

Review Comment:
   Agree, the naming is confusing at first.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java:
##########
@@ -156,6 +167,10 @@ void reconnectLater(Throwable exception) {
     }
 
     public void connectionClosed(ClientCnx cnx) {
+        connectionClosed(cnx, null, Optional.empty());
+    }
+
+    public void connectionClosed(ClientCnx cnx, Long initialConnectionDelayMs, 
Optional<URI> hostUrl) {

Review Comment:
   Just an observation: it's not clear to me when we're using a raw reference 
and when an Optional. Seems like we could've used `Optional<Long> 
initialConnectionDelayMs` just as well.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -538,6 +538,44 @@ public CompletableFuture<Optional<String>> 
getOwnerAsync(String serviceUnit) {
         }
     }
 
+    @Override
+    public Optional<String> getAssigned(String serviceUnit) {
+        if (!validateChannelState(Started, true)) {
+            return Optional.empty();
+        }
+
+        ServiceUnitStateData data = tableview.get(serviceUnit);
+        if (data == null) {
+            return Optional.empty();
+        }
+        ServiceUnitState state = state(data);
+        switch (state) {
+            case Owned, Assigning -> {
+                return Optional.of(data.dstBroker());
+            }
+            case Releasing -> {
+                if (data.dstBroker() != null) {
+                    return Optional.of(data.dstBroker());
+                }
+                return Optional.empty();

Review Comment:
   Nit: we can simplify this to:
   
   ```suggestion
                   return Optional.ofNullable(data.dstBroker());
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -732,15 +770,20 @@ private void handleOwnEvent(String serviceUnit, 
ServiceUnitStateData data) {
         if (getOwnerRequest != null) {
             getOwnerRequest.complete(data.dstBroker());
         }
-        stateChangeListeners.notify(serviceUnit, data, null);
+        CompletableFuture<Integer> ownFuture = null;
         if (isTargetBroker(data.dstBroker())) {
-            log(null, serviceUnit, data, null);
             pulsar.getNamespaceService()
                     
.onNamespaceBundleOwned(LoadManagerShared.getNamespaceBundle(pulsar, 
serviceUnit));
             lastOwnEventHandledAt = System.currentTimeMillis();
-        } else if (data.force() && isTargetBroker(data.sourceBroker())) {
-            closeServiceUnit(serviceUnit);
+            ownFuture = CompletableFuture.completedFuture(null);
+        } else if ((data.force() || isTransferCommand(data)) && 
isTargetBroker(data.sourceBroker())) {
+            ownFuture = closeServiceUnit(serviceUnit, false);
+        } else {
+            ownFuture = CompletableFuture.completedFuture(null);
         }
+
+        stateChangeListeners.notifyOnCompletion(ownFuture, serviceUnit, data)
+                .whenComplete((__, e) -> log(e, serviceUnit, data, null));

Review Comment:
   Question: why this is needed now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to