AuroraTwinkle commented on code in PR #24387:
URL: https://github.com/apache/pulsar/pull/24387#discussion_r2199326408


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java:
##########
@@ -96,15 +131,143 @@ public void updateServiceUrl(String serviceUrl) throws 
InvalidServiceURL {
                 throw new InvalidServiceURL(e);
             }
         }
-        this.addressList = addresses;
+        this.allAddressList = addresses;
+        this.allAddressSet = ImmutableSet.copyOf(addresses);
         this.serviceUrl = serviceUrl;
         this.serviceUri = uri;
         this.currentIndex = randomIndex(addresses.size());
+        if (enableServiceUrlQuarantine) {
+            this.availableAddressList = new ArrayList<>(addresses);
+            hostAvailabilityMap.keySet().removeIf(host -> 
!allAddressSet.contains(host));
+            allAddressSet.forEach(
+                    address -> hostAvailabilityMap.putIfAbsent(address, 
createEndpointStatus(true, address)));
+        }
     }
 
     private static int randomIndex(int numAddresses) {
         return numAddresses == 1
                 ?
                 0 : 
io.netty.util.internal.PlatformDependent.threadLocalRandom().nextInt(numAddresses);
     }
+
+    /**
+     * The method is executed under a synchronized lock and cannot execute 
code that may block, such as network io.
+     * @param address the host address to mark availability for
+     * @param isAvailable true if the host is available, false otherwise
+     */
+    @Override
+    public synchronized void markHostAvailability(InetSocketAddress address, 
boolean isAvailable) {
+        if (!enableServiceUrlQuarantine) {
+            return;
+        }
+
+        if (!allAddressSet.contains(address)) {
+            // If the address is not part of the original service URL, we 
ignore it.
+            log.debug("Address {} is not part of the original service URL, 
ignoring availability update", address);
+            return;
+        }
+
+        AtomicBoolean availableHostsChanged = new AtomicBoolean(false);
+        hostAvailabilityMap.compute(address, (key, oldStatus) -> {
+            if (oldStatus == null) {
+                EndpointStatus endpointStatus = 
createEndpointStatus(isAvailable, key);
+                availableHostsChanged.set(true);
+                return endpointStatus;
+            }
+            if (oldStatus.isAvailable() != isAvailable) {
+                availableHostsChanged.set(true);
+            }
+            computeEndpointStatus(isAvailable, oldStatus);
+            return oldStatus;
+        });
+
+        hostAvailabilityMap.forEach((__, endpointStatus) -> {
+            if (!endpointStatus.isAvailable()) {
+                computeEndpointStatus(false, endpointStatus);
+                if (!availableHostsChanged.get() && 
endpointStatus.isAvailable()) {
+                    availableHostsChanged.set(true);
+                }
+            }
+        });
+
+        if (availableHostsChanged.get()) {
+            availableAddressList = hostAvailabilityMap.entrySet()

Review Comment:
   The main purpose of `synchronized` is to ensure the thread safety of 
`EndpointStatus` calculation, using only immutable collections will not achieve 
the goal. How do you think about it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to