gaoran10 commented on code in PR #21213:
URL: https://github.com/apache/pulsar/pull/21213#discussion_r1339404998


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java:
##########
@@ -443,30 +496,37 @@ private CompletableFuture<Optional<BrokerLookupData>> 
getBrokerLookupData(
     public CompletableFuture<NamespaceEphemeralData> 
tryAcquiringOwnership(NamespaceBundle namespaceBundle) {
         log.info("Try acquiring ownership for bundle: {} - {}.", 
namespaceBundle, brokerRegistry.getBrokerId());
         final String bundle = namespaceBundle.toString();
-        return dedupeLookupRequest(bundle, k -> {
-            final CompletableFuture<String> owner =
-                    this.getOwnerAsync(namespaceBundle, bundle, true);
-            return getBrokerLookupData(owner.thenApply(Optional::ofNullable), 
bundle);
-        }).thenApply(brokerLookupData -> {
-            if (brokerLookupData.isEmpty()) {
-                throw new IllegalStateException(
-                        "Failed to get the broker lookup data for bundle: " + 
bundle);
-            }
-            return brokerLookupData.get().toNamespaceEphemeralData();
-        });
+        return assign(Optional.empty(), namespaceBundle)
+                .thenApply(brokerLookupData -> {
+                    if (brokerLookupData.isEmpty()) {
+                        String errorMsg = String.format(
+                                "Failed to get the broker lookup data for 
bundle:%s", bundle);
+                        log.error(errorMsg);
+                        throw new IllegalStateException(errorMsg);
+                    }
+                    return brokerLookupData.get().toNamespaceEphemeralData();
+                });
     }
 
     private CompletableFuture<Optional<BrokerLookupData>> dedupeLookupRequest(
             String key, Function<String, 
CompletableFuture<Optional<BrokerLookupData>>> provider) {
-        CompletableFuture<Optional<BrokerLookupData>> future = 
lookupRequests.computeIfAbsent(key, provider);

Review Comment:
   Got it, thanks.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -1216,48 +1213,19 @@ private synchronized void doCleanup(String broker)  {
         log.info("Started ownership cleanup for the inactive broker:{}", 
broker);
         int orphanServiceUnitCleanupCnt = 0;
         long totalCleanupErrorCntStart = totalCleanupErrorCnt.get();
-        String heartbeatNamespace =
-                NamespaceName.get(String.format(HEARTBEAT_NAMESPACE_FMT, 
config.getClusterName(), broker)).toString();
-        String heartbeatNamespaceV2 =
-                NamespaceName.get(String.format(HEARTBEAT_NAMESPACE_FMT_V2, 
broker)).toString();
-
         Map<String, ServiceUnitStateData> orphanSystemServiceUnits = new 
HashMap<>();
         for (var etr : tableview.entrySet()) {
             var stateData = etr.getValue();
             var serviceUnit = etr.getKey();
             var state = state(stateData);
-            if (StringUtils.equals(broker, stateData.dstBroker())) {
-                if (isActiveState(state)) {
-                    if (serviceUnit.startsWith(SYSTEM_NAMESPACE.toString())) {
-                        orphanSystemServiceUnits.put(serviceUnit, stateData);
-                    } else if (serviceUnit.startsWith(heartbeatNamespace)
-                            || serviceUnit.startsWith(heartbeatNamespaceV2)) {
-                        // Skip the heartbeat namespace
-                        log.info("Skip override heartbeat namespace bundle"

Review Comment:
   Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to