AuroraTwinkle commented on code in PR #24387: URL: https://github.com/apache/pulsar/pull/24387#discussion_r2199326408
########## pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java: ########## @@ -96,15 +131,143 @@ public void updateServiceUrl(String serviceUrl) throws InvalidServiceURL { throw new InvalidServiceURL(e); } } - this.addressList = addresses; + this.allAddressList = addresses; + this.allAddressSet = ImmutableSet.copyOf(addresses); this.serviceUrl = serviceUrl; this.serviceUri = uri; this.currentIndex = randomIndex(addresses.size()); + if (enableServiceUrlQuarantine) { + this.availableAddressList = new ArrayList<>(addresses); + hostAvailabilityMap.keySet().removeIf(host -> !allAddressSet.contains(host)); + allAddressSet.forEach( + address -> hostAvailabilityMap.putIfAbsent(address, createEndpointStatus(true, address))); + } } private static int randomIndex(int numAddresses) { return numAddresses == 1 ? 0 : io.netty.util.internal.PlatformDependent.threadLocalRandom().nextInt(numAddresses); } + + /** + * The method is executed under a synchronized lock and cannot execute code that may block, such as network io. + * @param address the host address to mark availability for + * @param isAvailable true if the host is available, false otherwise + */ + @Override + public synchronized void markHostAvailability(InetSocketAddress address, boolean isAvailable) { + if (!enableServiceUrlQuarantine) { + return; + } + + if (!allAddressSet.contains(address)) { + // If the address is not part of the original service URL, we ignore it. + log.debug("Address {} is not part of the original service URL, ignoring availability update", address); + return; + } + + AtomicBoolean availableHostsChanged = new AtomicBoolean(false); + hostAvailabilityMap.compute(address, (key, oldStatus) -> { + if (oldStatus == null) { + EndpointStatus endpointStatus = createEndpointStatus(isAvailable, key); + availableHostsChanged.set(true); + return endpointStatus; + } + if (oldStatus.isAvailable() != isAvailable) { + availableHostsChanged.set(true); + } + computeEndpointStatus(isAvailable, oldStatus); + return oldStatus; + }); + + hostAvailabilityMap.forEach((__, endpointStatus) -> { + if (!endpointStatus.isAvailable()) { + computeEndpointStatus(false, endpointStatus); + if (!availableHostsChanged.get() && endpointStatus.isAvailable()) { + availableHostsChanged.set(true); + } + } + }); + + if (availableHostsChanged.get()) { + availableAddressList = hostAvailabilityMap.entrySet() Review Comment: The main purpose of `synchronized` is to ensure the thread safety of `EndpointStatus` calculation, using only immutable collections will not achieve the goal. How do you think about it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org