dragosvictor commented on code in PR #21894:
URL: https://github.com/apache/pulsar/pull/21894#discussion_r1451592628
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderBroker.java:
##########
@@ -30,5 +30,23 @@
@AllArgsConstructor
@NoArgsConstructor
public class LeaderBroker {
+ private String lookupServiceAddress;
private String serviceUrl;
+
+ public String getLookupServiceAddress() {
+ if (lookupServiceAddress != null) {
+ return lookupServiceAddress;
+ } else {
+ // for backward compatibility at runtime with older versions of
Pulsar
+ return parseHostAndPort(serviceUrl);
+ }
+ }
+
+ private static String parseHostAndPort(String serviceUrl) {
+ int uriSeparatorPos = serviceUrl.indexOf("://");
+ if (uriSeparatorPos == -1) {
+ throw new IllegalArgumentException("'" + serviceUrl + "' isn't an
URI.");
+ }
+ return serviceUrl.substring(uriSeparatorPos + 3);
+ }
Review Comment:
This can be an instance method instead of static.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java:
##########
@@ -1695,9 +1703,10 @@ public String getSafeBrokerServiceUrl() {
}
public String getLookupServiceAddress() {
- return String.format("%s:%s", advertisedAddress,
config.getWebServicePortTls().isPresent()
- ? config.getWebServicePortTls().get()
- : config.getWebServicePort().orElseThrow());
+ if (lookupServiceAddress == null) {
+ throw new IllegalStateException("lookupServiceAddress is not
initialized before start has been called");
+ }
+ return lookupServiceAddress;
Review Comment:
Is this an assertion? If so, seems fair to express it as such:
```suggestion
assert lookupServiceAddress != null;
return lookupServiceAddress;
```
If not, we can maybe simplify it to
```suggestion
return Objects.requireNonNull(lookupServiceAddress);
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java:
##########
@@ -235,17 +233,17 @@ public static CompletableFuture<Set<String>>
applyNamespacePoliciesAsync(
} else {
// non-persistent topic can be assigned to only those
brokers that enabled for non-persistent topic
if (isNonPersistentTopic
- &&
!brokerTopicLoadingPredicate.isEnableNonPersistentTopics(brokerUrlString)) {
+ &&
!brokerTopicLoadingPredicate.isEnableNonPersistentTopics(broker)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Filter broker- [{}] because it doesn't
support non-persistent namespace - [{}]",
- brokerUrl.getHost(), namespace.toString());
+ broker, namespace.toString());
}
} else if (!isNonPersistentTopic
- &&
!brokerTopicLoadingPredicate.isEnablePersistentTopics(brokerUrlString)) {
+ &&
!brokerTopicLoadingPredicate.isEnablePersistentTopics(broker)) {
Review Comment:
Nit: this can fit on one line now.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java:
##########
@@ -142,13 +141,13 @@ public static void applyNamespacePolicies(final
ServiceUnitId serviceUnit,
} else {
// non-persistent topic can be assigned to only those brokers
that enabled for non-persistent topic
if (isNonPersistentTopic
- &&
!brokerTopicLoadingPredicate.isEnableNonPersistentTopics(brokerUrlString)) {
+ &&
!brokerTopicLoadingPredicate.isEnableNonPersistentTopics(broker)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Filter broker- [{}] because it doesn't
support non-persistent namespace - [{}]",
brokerUrl.getHost(), namespace.toString());
}
} else if (!isNonPersistentTopic
- &&
!brokerTopicLoadingPredicate.isEnablePersistentTopics(brokerUrlString)) {
+ &&
!brokerTopicLoadingPredicate.isEnablePersistentTopics(broker)) {
Review Comment:
Nit: this can fit on one line now.
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java:
##########
@@ -272,10 +272,9 @@ public void testPrimary() throws Exception {
sortedRankingsInstance.get().put(lr.getRank(rd), rus);
setObjectField(SimpleLoadManagerImpl.class, loadManager,
"sortedRankings", sortedRankingsInstance);
- final Optional<ResourceUnit> leastLoaded =
loadManager.getLeastLoaded(NamespaceName.get("pulsar/use/primary-ns.10"));
- // broker is not active so found should be null
- assertFalse(leastLoaded.isPresent());
-
+ ResourceUnit found =
loadManager.getLeastLoaded(NamespaceName.get("pulsar/use/primary-ns.10")).get();
+ // TODO: this test doesn't make sense. This was the original assertion.
+ assertNotEquals(found, null, "did not find a broker when expected one
to be found");
Review Comment:
I'm curious how this was passing before since the condition is negated now.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java:
##########
@@ -142,13 +141,13 @@ public static void applyNamespacePolicies(final
ServiceUnitId serviceUnit,
} else {
// non-persistent topic can be assigned to only those brokers
that enabled for non-persistent topic
if (isNonPersistentTopic
- &&
!brokerTopicLoadingPredicate.isEnableNonPersistentTopics(brokerUrlString)) {
+ &&
!brokerTopicLoadingPredicate.isEnableNonPersistentTopics(broker)) {
Review Comment:
Nit: this can fit on one line now.
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java:
##########
@@ -102,8 +102,8 @@ void setup() throws Exception {
createTenant(pulsarAdmins[BROKER_COUNT - 1]);
for (int i = 0; i < BROKER_COUNT; i++) {
- String topic = String.format("%s/%s/%s:%s",
NamespaceService.SLA_NAMESPACE_PROPERTY, "my-cluster",
- pulsarServices[i].getAdvertisedAddress(),
brokerWebServicePorts[i]);
+ String topic = String.format("%s/%s/%s",
NamespaceService.SLA_NAMESPACE_PROPERTY, "my-cluster",
+ pulsarServices[i].getLookupServiceAddress());
pulsarAdmins[0].namespaces().createNamespace(topic);
Review Comment:
Nit:
`org.apache.pulsar.broker.namespace.NamespaceService#getSLAMonitorNamespace`
does what we want here:
```suggestion
var namespaceName =
NamespaceService.getSLAMonitorNamespace(pulsarServices[i].getLookupServiceAddress(),
pulsarServices[i].getConfig());
pulsarAdmins[0].namespaces().createNamespace(namespaceName.toString());
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -430,13 +430,8 @@ public CompletableFuture<Optional<String>>
getChannelOwnerAsync() {
}
return leaderElectionService.readCurrentLeader().thenApply(leader -> {
- //expecting http://broker-xyz:port
- // TODO: discard this protocol prefix removal
- // by a util func that returns
lookupServiceAddress(serviceUrl)
if (leader.isPresent()) {
- String broker = leader.get().getServiceUrl();
- broker = broker.substring(broker.lastIndexOf('/') + 1);
- return Optional.of(broker);
+ return
Optional.of(leader.get().getLookupServiceAddress());
} else {
return Optional.empty();
}
Review Comment:
This reads more concise IMO:
```
return leader.map(LeaderBroker::getLookupServiceAddress);
```
and it could further be rolled up into a lambda by the caller.
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java:
##########
@@ -81,9 +81,9 @@ public void testGetLeaderBroker()
assertTrue(leaderBroker.isPresent());
log.info("Leader broker is {}", leaderBroker);
for (PulsarAdmin admin : getAllAdmins()) {
- String serviceUrl =
admin.brokers().getLeaderBroker().getServiceUrl();
- log.info("Pulsar admin get leader broker is {}", serviceUrl);
- assertEquals(leaderBroker.get().getServiceUrl(), serviceUrl);
+ String lookupServiceAddress =
admin.brokers().getLeaderBroker().getLookupServiceAddress();
+ log.info("Pulsar admin get leader broker is {}",
lookupServiceAddress);
Review Comment:
Nit: I vote we remove this log statement. We're already getting a log
statement with its value if the assertion below fails.
```suggestion
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java:
##########
@@ -235,17 +233,17 @@ public static CompletableFuture<Set<String>>
applyNamespacePoliciesAsync(
} else {
// non-persistent topic can be assigned to only those
brokers that enabled for non-persistent topic
if (isNonPersistentTopic
- &&
!brokerTopicLoadingPredicate.isEnableNonPersistentTopics(brokerUrlString)) {
+ &&
!brokerTopicLoadingPredicate.isEnableNonPersistentTopics(broker)) {
Review Comment:
Nit: this can fit on one line now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]