BewareMyPower commented on code in PR #23433:
URL: https://github.com/apache/pulsar/pull/23433#discussion_r1796387529


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -1462,6 +1478,10 @@ private CompletableFuture<Void> 
healthCheckBrokerAsync(String brokerId) {
     }
 
     private void doHealthCheckBrokerAsyncWithRetries(String brokerId, int 
retry, CompletableFuture<Void> future) {
+        if (channelDisabled()) {

Review Comment:
   I'd like to do the following change to avoid `testOverrideOrphanStateData` 
fail. i.e. `doCleanup(..., false)` won't be skipped. Only the 
`healthCheckBrokerAsync` call will be skipped
   
   ```diff
   diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
   index ea1bf01be5..30d19415e7 100644
   --- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
   +++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
   @@ -1334,17 +1334,14 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
    
        private boolean channelDisabled() {
            final var channelState = this.channelState;
   -        if (channelState == Disabled || channelState == Closed) {
   -            log.warn("[{}] Skip scheduleCleanup because the state is {} 
now", brokerId, channelState);
   -            return true;
   -        }
   -        return false;
   +        return channelState == Disabled || channelState == Closed;
        }
    
        private void scheduleCleanup(String broker, long delayInSecs) {
            var scheduled = new MutableObject<CompletableFuture<Void>>();
            try {
                if (channelDisabled()) {
   +                log.warn("[{}] Skip scheduleCleanup because the state is {} 
now", brokerId, channelState);
                    return;
                }
                cleanupJobs.computeIfAbsent(broker, k -> {
   @@ -1478,10 +1475,6 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
        }
    
        private void doHealthCheckBrokerAsyncWithRetries(String brokerId, int 
retry, CompletableFuture<Void> future) {
   -        if (channelDisabled()) {
   -            future.complete(null);
   -            return;
   -        }
            try {
                var admin = getPulsarAdmin();
                admin.brokers().healthcheckAsync(TopicVersion.V2, 
Optional.of(brokerId))
   @@ -1519,7 +1512,9 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
            }
    
            // if not gracefully, verify the broker is inactive by health-check.
   -        if (!gracefully) {
   +        // When the channel is disabled, the broker is during the close 
phase where the web service might not be
   +        // available so that calling healthCheckBrokerAsync() could fail 
with timeout.
   +        if (!gracefully && !channelDisabled()) {
                try {
                    healthCheckBrokerAsync(broker).get(
                            
pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to