heesung-sn commented on code in PR #21408:
URL: https://github.com/apache/pulsar/pull/21408#discussion_r1382240906


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java:
##########
@@ -484,14 +485,22 @@ private CompletableFuture<Void> delete(boolean 
failIfHasSubscriptions, boolean c
         return deleteFuture;
     }
 
+
+    @Override
+    public CompletableFuture<Void> close(boolean 
closeWithoutWaitingClientDisconnect) {
+        return close(false, closeWithoutWaitingClientDisconnect);
+    }
+
     /**
      * Close this topic - close all producers and subscriptions associated 
with this topic.
      *
+     * @param closeWithoutDisconnectingClients don't disconnect clients

Review Comment:
   updated.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java:
##########
@@ -156,6 +167,10 @@ void reconnectLater(Throwable exception) {
     }
 
     public void connectionClosed(ClientCnx cnx) {
+        connectionClosed(cnx, null, Optional.empty());
+    }
+
+    public void connectionClosed(ClientCnx cnx, Long initialConnectionDelayMs, 
Optional<URI> hostUrl) {

Review Comment:
   I agree. I will make this `Optional<Long> initialConnectionDelayMs`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -1760,6 +1762,20 @@ protected void handleSend(CommandSend send, ByteBuf 
headersAndPayload) {
             printSendCommandDebug(send, headersAndPayload);
         }
 
+
+        ServiceConfiguration conf = 
getBrokerService().pulsar().getConfiguration();
+        if (producer.getTopic().isFenced()
+                && 
ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(conf)) {

Review Comment:
   This will hit only when the topic is fenced, and loadManagerClassName is a  
dynamic config, and it could change at runtime. Instead, I added a faster check 
func.
   
   



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -732,15 +770,20 @@ private void handleOwnEvent(String serviceUnit, 
ServiceUnitStateData data) {
         if (getOwnerRequest != null) {
             getOwnerRequest.complete(data.dstBroker());
         }
-        stateChangeListeners.notify(serviceUnit, data, null);
+        CompletableFuture<Integer> ownFuture = null;
         if (isTargetBroker(data.dstBroker())) {
-            log(null, serviceUnit, data, null);
             pulsar.getNamespaceService()
                     
.onNamespaceBundleOwned(LoadManagerShared.getNamespaceBundle(pulsar, 
serviceUnit));
             lastOwnEventHandledAt = System.currentTimeMillis();
-        } else if (data.force() && isTargetBroker(data.sourceBroker())) {
-            closeServiceUnit(serviceUnit);
+            ownFuture = CompletableFuture.completedFuture(null);
+        } else if ((data.force() || isTransferCommand(data)) && 
isTargetBroker(data.sourceBroker())) {
+            ownFuture = closeServiceUnit(serviceUnit, false);
+        } else {
+            ownFuture = CompletableFuture.completedFuture(null);
         }
+
+        stateChangeListeners.notifyOnCompletion(ownFuture, serviceUnit, data)
+                .whenComplete((__, e) -> log(e, serviceUnit, data, null));

Review Comment:
   Good point.
   I think we need to log the first and second `if` cases only.
   
   Let me update this.
   



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -732,15 +770,20 @@ private void handleOwnEvent(String serviceUnit, 
ServiceUnitStateData data) {
         if (getOwnerRequest != null) {
             getOwnerRequest.complete(data.dstBroker());
         }
-        stateChangeListeners.notify(serviceUnit, data, null);
+        CompletableFuture<Integer> ownFuture = null;

Review Comment:
   sure. I will update this.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -538,6 +538,44 @@ public CompletableFuture<Optional<String>> 
getOwnerAsync(String serviceUnit) {
         }
     }
 
+    @Override
+    public Optional<String> getAssigned(String serviceUnit) {
+        if (!validateChannelState(Started, true)) {
+            return Optional.empty();
+        }
+
+        ServiceUnitStateData data = tableview.get(serviceUnit);
+        if (data == null) {
+            return Optional.empty();
+        }
+        ServiceUnitState state = state(data);
+        switch (state) {
+            case Owned, Assigning -> {
+                return Optional.of(data.dstBroker());
+            }
+            case Releasing -> {
+                if (data.dstBroker() != null) {
+                    return Optional.of(data.dstBroker());
+                }
+                return Optional.empty();

Review Comment:
   sure. I will update this.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java:
##########
@@ -286,6 +300,37 @@ public static void createSystemTopic(PulsarService pulsar, 
String topic) throws
         }
     }
 
+    /**
+     * Gets the assigned broker for the given topic.
+     * @param pulsar PulsarService instance
+     * @param topic Topic Name
+     * @return the assigned broker's BrokerLookupData instance. Empty, if not 
assigned by Extensible LoadManager.
+     */
+    public static CompletableFuture<Optional<BrokerLookupData>> 
getAssignedBrokerLookupData(PulsarService pulsar,
+                                                                          
String topic) {
+        if 
(ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar.getConfig())) {
+            var topicName = TopicName.get(topic);
+            try {
+                return pulsar.getNamespaceService().getBundleAsync(topicName)
+                        .thenCompose(bundle -> {
+                                    var loadManager = 
ExtensibleLoadManagerImpl.get(pulsar);
+                                    var assigned = 
loadManager.getServiceUnitStateChannel()
+                                            .getAssigned(bundle.toString());
+                                    if (assigned.isPresent()) {
+                                        return 
loadManager.getBrokerRegistry().lookupAsync(assigned.get());
+                                    } else {
+                                        return 
CompletableFuture.completedFuture(Optional.empty());
+                                    }
+                                }
+                        );
+            } catch (Throwable e) {
+                log.error("Failed to DestinationBrokerLookupData for 
topic:{}", topic, e);

Review Comment:
   sure. I will update this.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java:
##########
@@ -66,6 +68,10 @@ protected ConnectionHandler(HandlerState state, Backoff 
backoff, Connection conn
     }
 
     protected void grabCnx() {
+        grabCnx(Optional.empty());
+    }
+
+    protected void grabCnx(Optional<URI> hostURI) {

Review Comment:
   I think our current practice is that we don't do the null check for optional 
args(make sure optional args are not null)
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to