This is an automated email from the ASF dual-hosted git repository.

guangning pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 362c4f4966a [fix][broker] Gracefully shutdown does not work with admin 
cli in standalone (#20709)
362c4f4966a is described below

commit 362c4f4966a59bdb612ba4369d63bcfb23419e32
Author: Kim, Joo Hyuk <[email protected]>
AuthorDate: Wed Jul 26 15:08:16 2023 +0900

    [fix][broker] Gracefully shutdown does not work with admin cli in 
standalone (#20709)
    
    Fixes : #20617
    
    ### Motivation
    
    Currently, clients' shutdown API does not behave consistently in sense that 
asynchronicity is not handled explicitly. So issue #20617 happens.
    
    This PR will allow clients know that shutdown is actually triggered.
    
    ### Modifications
    
    - Synchronize call to `POST /shutdown` on client side
    - Asynchronize explicitly `pulsar().closeAsync()` invocation
---
 .../pulsar/broker/admin/impl/BrokersBase.java      | 20 +++++++++----
 .../org/apache/pulsar/client/admin/Brokers.java    |  9 +++---
 .../java/org/apache/pulsar/admin/cli/CmdBase.java  | 34 ++++++++++++++++++++++
 .../org/apache/pulsar/admin/cli/CmdBrokers.java    |  4 +--
 4 files changed, 56 insertions(+), 11 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index b367ce7aad9..be8390f15f8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -545,16 +545,26 @@ public class BrokersBase extends AdminResource {
             @ApiParam(name = "maxConcurrentUnloadPerSec",
                     value = "if the value absent(value=0) means no concurrent 
limitation.")
             @QueryParam("maxConcurrentUnloadPerSec") int 
maxConcurrentUnloadPerSec,
-            @QueryParam("forcedTerminateTopic") @DefaultValue("true") boolean 
forcedTerminateTopic
+            @QueryParam("forcedTerminateTopic") @DefaultValue("true") boolean 
forcedTerminateTopic,
+            @Suspended final AsyncResponse asyncResponse
     ) {
         validateSuperUserAccess();
-        doShutDownBrokerGracefully(maxConcurrentUnloadPerSec, 
forcedTerminateTopic);
+        doShutDownBrokerGracefullyAsync(maxConcurrentUnloadPerSec, 
forcedTerminateTopic)
+                .thenAccept(__ -> {
+                    LOG.info("[{}] Successfully shutdown broker gracefully", 
clientAppId());
+                    asyncResponse.resume(Response.noContent().build());
+                })
+                .exceptionally(ex -> {
+            LOG.error("[{}] Failed to shutdown broker gracefully", 
clientAppId(), ex);
+            resumeAsyncResponseExceptionally(asyncResponse, ex);
+            return null;
+        });
     }
 
-    private void doShutDownBrokerGracefully(int maxConcurrentUnloadPerSec,
-                                            boolean forcedTerminateTopic) {
+    private CompletableFuture<Void> doShutDownBrokerGracefullyAsync(int 
maxConcurrentUnloadPerSec,
+                                                                    boolean 
forcedTerminateTopic) {
         
pulsar().getBrokerService().unloadNamespaceBundlesGracefully(maxConcurrentUnloadPerSec,
 forcedTerminateTopic);
-        pulsar().closeAsync();
+        return pulsar().closeAsync();
     }
 }
 
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java
index 464d02121cf..29c280f8ba5 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java
@@ -326,10 +326,11 @@ public interface Brokers {
     CompletableFuture<Void> healthcheckAsync(TopicVersion topicVersion);
 
     /**
-     * Shutdown current broker gracefully.
-     * @param maxConcurrentUnloadPerSec
-     * @param forcedTerminateTopic
-     * @return
+     * Trigger the current broker to graceful-shutdown asynchronously.
+     *
+     * @param maxConcurrentUnloadPerSec the maximum number of topics to unload 
per second.
+     *                                  This helps control the speed of the 
unload operation during shutdown.
+     * @param forcedTerminateTopic if true, topics will be forcefully 
terminated during the shutdown process.
      */
     CompletableFuture<Void> shutDownBrokerGracefully(int 
maxConcurrentUnloadPerSec,
                                                      boolean 
forcedTerminateTopic);
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
index ce2c44ec1e8..381bc8abcaa 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.admin.cli;
 
+import static 
org.apache.pulsar.client.admin.internal.BaseResource.getApiException;
 import com.beust.jcommander.DefaultUsageFormatter;
 import com.beust.jcommander.IUsageFormatter;
 import com.beust.jcommander.JCommander;
@@ -26,10 +27,15 @@ import com.beust.jcommander.ParameterException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Supplier;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.PulsarAdminException.ConnectException;
+import org.apache.pulsar.client.admin.internal.PulsarAdminImpl;
 
 public abstract class CmdBase {
     protected final JCommander jcommander;
@@ -37,6 +43,12 @@ public abstract class CmdBase {
     private PulsarAdmin admin;
     private IUsageFormatter usageFormatter;
 
+    /**
+     * Default read timeout in milliseconds.
+     * Used if not found from configuration data in {@link #getReadTimeoutMs()}
+     */
+    private static final long DEFAULT_READ_TIMEOUT_MILLIS = 60000;
+
     @Parameter(names = { "--help", "-h" }, help = true, hidden = true)
     private boolean help = false;
 
@@ -124,6 +136,28 @@ public abstract class CmdBase {
         return admin;
     }
 
+    protected long getReadTimeoutMs() {
+        PulsarAdmin pulsarAdmin = getAdmin();
+        if (pulsarAdmin instanceof PulsarAdminImpl) {
+            return ((PulsarAdminImpl) 
pulsarAdmin).getClientConfigData().getReadTimeoutMs();
+        }
+        return DEFAULT_READ_TIMEOUT_MILLIS;
+    }
+
+    protected <T> T sync(Supplier<CompletableFuture<T>> executor) throws 
PulsarAdminException {
+        try {
+            return executor.get().get(getReadTimeoutMs(), 
TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        } catch (ExecutionException e) {
+            throw PulsarAdminException.wrap(getApiException(e.getCause()));
+        } catch (Exception e) {
+            throw PulsarAdminException.wrap(getApiException(e));
+        }
+    }
 
     static Map<String, String> parseListKeyValueMap(List<String> metadata) {
         Map<String, String> map = null;
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java
index 1e86edcf59c..f1571e96c65 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java
@@ -150,8 +150,8 @@ public class CmdBrokers extends CmdBase {
 
         @Override
         void run() throws Exception {
-            
getAdmin().brokers().shutDownBrokerGracefully(maxConcurrentUnloadPerSec, 
forcedTerminateTopic);
-            System.out.println("Successfully trigger broker shutdown 
gracefully");
+            sync(() -> 
getAdmin().brokers().shutDownBrokerGracefully(maxConcurrentUnloadPerSec, 
forcedTerminateTopic));
+            System.out.println("Successfully shutdown broker gracefully");
         }
 
     }

Reply via email to