dragosvictor commented on code in PR #21894:
URL: https://github.com/apache/pulsar/pull/21894#discussion_r1451592628


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderBroker.java:
##########
@@ -30,5 +30,23 @@
 @AllArgsConstructor
 @NoArgsConstructor
 public class LeaderBroker {
+    private String lookupServiceAddress;
     private String serviceUrl;
+
+    public String getLookupServiceAddress() {
+        if (lookupServiceAddress != null) {
+            return lookupServiceAddress;
+        } else {
+            // for backward compatibility at runtime with older versions of 
Pulsar
+            return parseHostAndPort(serviceUrl);
+        }
+    }
+
+    private static String parseHostAndPort(String serviceUrl) {
+        int uriSeparatorPos = serviceUrl.indexOf("://");
+        if (uriSeparatorPos == -1) {
+            throw new IllegalArgumentException("'" + serviceUrl + "' isn't an 
URI.");
+        }
+        return serviceUrl.substring(uriSeparatorPos + 3);
+    }

Review Comment:
   This can be an instance method instead of static.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java:
##########
@@ -1695,9 +1703,10 @@ public String getSafeBrokerServiceUrl() {
     }
 
     public String getLookupServiceAddress() {
-        return String.format("%s:%s", advertisedAddress, 
config.getWebServicePortTls().isPresent()
-                ? config.getWebServicePortTls().get()
-                : config.getWebServicePort().orElseThrow());
+        if (lookupServiceAddress == null) {
+            throw new IllegalStateException("lookupServiceAddress is not 
initialized before start has been called");
+        }
+        return lookupServiceAddress;

Review Comment:
   Is this an assertion? If so, seems fair to express it as such:
   ```suggestion
           assert lookupServiceAddress != null;
           return lookupServiceAddress;
   ```
   
   If not, we can maybe simplify it to
   
   ```suggestion
           return Objects.requireNonNull(lookupServiceAddress);
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java:
##########
@@ -235,17 +233,17 @@ public static CompletableFuture<Set<String>> 
applyNamespacePoliciesAsync(
                 } else {
                     // non-persistent topic can be assigned to only those 
brokers that enabled for non-persistent topic
                     if (isNonPersistentTopic
-                            && 
!brokerTopicLoadingPredicate.isEnableNonPersistentTopics(brokerUrlString)) {
+                            && 
!brokerTopicLoadingPredicate.isEnableNonPersistentTopics(broker)) {
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("Filter broker- [{}] because it doesn't 
support non-persistent namespace - [{}]",
-                                    brokerUrl.getHost(), namespace.toString());
+                                    broker, namespace.toString());
                         }
                     } else if (!isNonPersistentTopic
-                            && 
!brokerTopicLoadingPredicate.isEnablePersistentTopics(brokerUrlString)) {
+                            && 
!brokerTopicLoadingPredicate.isEnablePersistentTopics(broker)) {

Review Comment:
   Nit: this can fit on one line now.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java:
##########
@@ -142,13 +141,13 @@ public static void applyNamespacePolicies(final 
ServiceUnitId serviceUnit,
             } else {
                 // non-persistent topic can be assigned to only those brokers 
that enabled for non-persistent topic
                 if (isNonPersistentTopic
-                        && 
!brokerTopicLoadingPredicate.isEnableNonPersistentTopics(brokerUrlString)) {
+                        && 
!brokerTopicLoadingPredicate.isEnableNonPersistentTopics(broker)) {
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Filter broker- [{}] because it doesn't 
support non-persistent namespace - [{}]",
                                 brokerUrl.getHost(), namespace.toString());
                     }
                 } else if (!isNonPersistentTopic
-                        && 
!brokerTopicLoadingPredicate.isEnablePersistentTopics(brokerUrlString)) {
+                        && 
!brokerTopicLoadingPredicate.isEnablePersistentTopics(broker)) {

Review Comment:
   Nit: this can fit on one line now.



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java:
##########
@@ -272,10 +272,9 @@ public void testPrimary() throws Exception {
         sortedRankingsInstance.get().put(lr.getRank(rd), rus);
         setObjectField(SimpleLoadManagerImpl.class, loadManager, 
"sortedRankings", sortedRankingsInstance);
 
-        final Optional<ResourceUnit> leastLoaded = 
loadManager.getLeastLoaded(NamespaceName.get("pulsar/use/primary-ns.10"));
-        // broker is not active so found should be null
-        assertFalse(leastLoaded.isPresent());
-
+        ResourceUnit found = 
loadManager.getLeastLoaded(NamespaceName.get("pulsar/use/primary-ns.10")).get();
+        // TODO: this test doesn't make sense. This was the original assertion.
+        assertNotEquals(found, null, "did not find a broker when expected one 
to be found");

Review Comment:
   I'm curious how this was passing before since the condition is negated now.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java:
##########
@@ -142,13 +141,13 @@ public static void applyNamespacePolicies(final 
ServiceUnitId serviceUnit,
             } else {
                 // non-persistent topic can be assigned to only those brokers 
that enabled for non-persistent topic
                 if (isNonPersistentTopic
-                        && 
!brokerTopicLoadingPredicate.isEnableNonPersistentTopics(brokerUrlString)) {
+                        && 
!brokerTopicLoadingPredicate.isEnableNonPersistentTopics(broker)) {

Review Comment:
   Nit: this can fit on one line now.



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java:
##########
@@ -102,8 +102,8 @@ void setup() throws Exception {
 
         createTenant(pulsarAdmins[BROKER_COUNT - 1]);
         for (int i = 0; i < BROKER_COUNT; i++) {
-            String topic = String.format("%s/%s/%s:%s", 
NamespaceService.SLA_NAMESPACE_PROPERTY, "my-cluster",
-                    pulsarServices[i].getAdvertisedAddress(), 
brokerWebServicePorts[i]);
+            String topic = String.format("%s/%s/%s", 
NamespaceService.SLA_NAMESPACE_PROPERTY, "my-cluster",
+                    pulsarServices[i].getLookupServiceAddress());
             pulsarAdmins[0].namespaces().createNamespace(topic);

Review Comment:
   Nit: 
`org.apache.pulsar.broker.namespace.NamespaceService#getSLAMonitorNamespace` 
does what we want here:
   
   ```suggestion
               var namespaceName = 
NamespaceService.getSLAMonitorNamespace(pulsarServices[i].getLookupServiceAddress(),
                       pulsarServices[i].getConfig());
               
pulsarAdmins[0].namespaces().createNamespace(namespaceName.toString());
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -430,13 +430,8 @@ public CompletableFuture<Optional<String>> 
getChannelOwnerAsync() {
         }
 
         return leaderElectionService.readCurrentLeader().thenApply(leader -> {
-                    //expecting http://broker-xyz:port
-                    // TODO: discard this protocol prefix removal
-                    //  by a util func that returns 
lookupServiceAddress(serviceUrl)
                     if (leader.isPresent()) {
-                        String broker = leader.get().getServiceUrl();
-                        broker = broker.substring(broker.lastIndexOf('/') + 1);
-                        return Optional.of(broker);
+                        return 
Optional.of(leader.get().getLookupServiceAddress());
                     } else {
                         return Optional.empty();
                     }

Review Comment:
   This reads more concise IMO:
   
   ```
   return leader.map(LeaderBroker::getLookupServiceAddress);
   ```
   
   and it could further be rolled up into a lambda by the caller.



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java:
##########
@@ -81,9 +81,9 @@ public void testGetLeaderBroker()
         assertTrue(leaderBroker.isPresent());
         log.info("Leader broker is {}", leaderBroker);
         for (PulsarAdmin admin : getAllAdmins()) {
-            String serviceUrl = 
admin.brokers().getLeaderBroker().getServiceUrl();
-            log.info("Pulsar admin get leader broker is {}", serviceUrl);
-            assertEquals(leaderBroker.get().getServiceUrl(), serviceUrl);
+            String lookupServiceAddress = 
admin.brokers().getLeaderBroker().getLookupServiceAddress();
+            log.info("Pulsar admin get leader broker is {}", 
lookupServiceAddress);

Review Comment:
   Nit: I vote we remove this log statement. We're already getting a log 
statement with its value if the assertion below fails.
   
   ```suggestion
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java:
##########
@@ -235,17 +233,17 @@ public static CompletableFuture<Set<String>> 
applyNamespacePoliciesAsync(
                 } else {
                     // non-persistent topic can be assigned to only those 
brokers that enabled for non-persistent topic
                     if (isNonPersistentTopic
-                            && 
!brokerTopicLoadingPredicate.isEnableNonPersistentTopics(brokerUrlString)) {
+                            && 
!brokerTopicLoadingPredicate.isEnableNonPersistentTopics(broker)) {

Review Comment:
   Nit: this can fit on one line now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to