This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 4a59a1daa4a [fix][broker] Gracefully shutdown does not work with admin 
cli in standalone (#20709)
4a59a1daa4a is described below

commit 4a59a1daa4ae31f48fc5aed2150de6f5d31cf794
Author: Kim, Joo Hyuk <[email protected]>
AuthorDate: Wed Jul 26 15:08:16 2023 +0900

    [fix][broker] Gracefully shutdown does not work with admin cli in 
standalone (#20709)
    
    Fixes : #20617
    
    Currently, clients' shutdown API does not behave consistently in sense that 
asynchronicity is not handled explicitly. So issue #20617 happens.
    
    This PR will allow clients know that shutdown is actually triggered.
    
    - Synchronize call to `POST /shutdown` on client side
    - Asynchronize explicitly `pulsar().closeAsync()` invocation
    
    (cherry picked from commit 362c4f4966a59bdb612ba4369d63bcfb23419e32)
---
 .../pulsar/broker/admin/impl/BrokersBase.java      | 20 ++++++++---
 .../org/apache/pulsar/client/admin/Brokers.java    |  9 ++---
 .../java/org/apache/pulsar/admin/cli/CmdBase.java  | 42 ++++++++++++++++++++--
 .../org/apache/pulsar/admin/cli/CmdBrokers.java    |  4 +--
 4 files changed, 62 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 2aa6ee78864..ac730c01eab 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -546,16 +546,26 @@ public class BrokersBase extends AdminResource {
             @ApiParam(name = "maxConcurrentUnloadPerSec",
                     value = "if the value absent(value=0) means no concurrent 
limitation.")
             @QueryParam("maxConcurrentUnloadPerSec") int 
maxConcurrentUnloadPerSec,
-            @QueryParam("forcedTerminateTopic") @DefaultValue("true") boolean 
forcedTerminateTopic
+            @QueryParam("forcedTerminateTopic") @DefaultValue("true") boolean 
forcedTerminateTopic,
+            @Suspended final AsyncResponse asyncResponse
     ) {
         validateSuperUserAccess();
-        doShutDownBrokerGracefully(maxConcurrentUnloadPerSec, 
forcedTerminateTopic);
+        doShutDownBrokerGracefullyAsync(maxConcurrentUnloadPerSec, 
forcedTerminateTopic)
+                .thenAccept(__ -> {
+                    LOG.info("[{}] Successfully shutdown broker gracefully", 
clientAppId());
+                    asyncResponse.resume(Response.noContent().build());
+                })
+                .exceptionally(ex -> {
+            LOG.error("[{}] Failed to shutdown broker gracefully", 
clientAppId(), ex);
+            resumeAsyncResponseExceptionally(asyncResponse, ex);
+            return null;
+        });
     }
 
-    private void doShutDownBrokerGracefully(int maxConcurrentUnloadPerSec,
-                                            boolean forcedTerminateTopic) {
+    private CompletableFuture<Void> doShutDownBrokerGracefullyAsync(int 
maxConcurrentUnloadPerSec,
+                                                                    boolean 
forcedTerminateTopic) {
         
pulsar().getBrokerService().unloadNamespaceBundlesGracefully(maxConcurrentUnloadPerSec,
 forcedTerminateTopic);
-        pulsar().closeAsync();
+        return pulsar().closeAsync();
     }
 }
 
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java
index 9f77e4615b1..b5c7cad9278 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java
@@ -326,10 +326,11 @@ public interface Brokers {
     CompletableFuture<Void> healthcheckAsync(TopicVersion topicVersion);
 
     /**
-     * Shutdown current broker gracefully.
-     * @param maxConcurrentUnloadPerSec
-     * @param forcedTerminateTopic
-     * @return
+     * Trigger the current broker to graceful-shutdown asynchronously.
+     *
+     * @param maxConcurrentUnloadPerSec the maximum number of topics to unload 
per second.
+     *                                  This helps control the speed of the 
unload operation during shutdown.
+     * @param forcedTerminateTopic if true, topics will be forcefully 
terminated during the shutdown process.
      */
     CompletableFuture<Void> shutDownBrokerGracefully(int 
maxConcurrentUnloadPerSec,
                                                      boolean 
forcedTerminateTopic);
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
index 8a3ecf14ec8..9f2a97a4156 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.admin.cli;
 
+import static 
org.apache.pulsar.client.admin.internal.BaseResource.getApiException;
 import com.beust.jcommander.DefaultUsageFormatter;
 import com.beust.jcommander.IUsageFormatter;
 import com.beust.jcommander.JCommander;
@@ -26,10 +27,15 @@ import com.beust.jcommander.ParameterException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Supplier;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.PulsarAdminException.ConnectException;
+import org.apache.pulsar.client.admin.internal.PulsarAdminImpl;
 
 public abstract class CmdBase {
     protected final JCommander jcommander;
@@ -37,8 +43,18 @@ public abstract class CmdBase {
     private PulsarAdmin admin;
     private IUsageFormatter usageFormatter;
 
-    @Parameter(names = { "-h", "--help" }, help = true, hidden = true)
-    private boolean help;
+    /**
+     * Default read timeout in milliseconds.
+     * Used if not found from configuration data in {@link #getReadTimeoutMs()}
+     */
+    private static final long DEFAULT_READ_TIMEOUT_MILLIS = 60000;
+
+    @Parameter(names = { "--help", "-h" }, help = true, hidden = true)
+    private boolean help = false;
+
+    public boolean isHelp() {
+        return help;
+    }
 
     public CmdBase(String cmdName, Supplier<PulsarAdmin> adminSupplier) {
         this.adminSupplier = adminSupplier;
@@ -114,6 +130,28 @@ public abstract class CmdBase {
         return admin;
     }
 
+    protected long getReadTimeoutMs() {
+        PulsarAdmin pulsarAdmin = getAdmin();
+        if (pulsarAdmin instanceof PulsarAdminImpl) {
+            return ((PulsarAdminImpl) 
pulsarAdmin).getClientConfigData().getReadTimeoutMs();
+        }
+        return DEFAULT_READ_TIMEOUT_MILLIS;
+    }
+
+    protected <T> T sync(Supplier<CompletableFuture<T>> executor) throws 
PulsarAdminException {
+        try {
+            return executor.get().get(getReadTimeoutMs(), 
TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        } catch (ExecutionException e) {
+            throw PulsarAdminException.wrap(getApiException(e.getCause()));
+        } catch (Exception e) {
+            throw PulsarAdminException.wrap(getApiException(e));
+        }
+    }
 
     static Map<String, String> parseListKeyValueMap(List<String> metadata) {
         Map<String, String> map = null;
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java
index a713198db10..15a2136bd3a 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java
@@ -150,8 +150,8 @@ public class CmdBrokers extends CmdBase {
 
         @Override
         void run() throws Exception {
-            
getAdmin().brokers().shutDownBrokerGracefully(maxConcurrentUnloadPerSec, 
forcedTerminateTopic);
-            System.out.println("Successfully trigger broker shutdown 
gracefully");
+            sync(() -> 
getAdmin().brokers().shutDownBrokerGracefully(maxConcurrentUnloadPerSec, 
forcedTerminateTopic));
+            System.out.println("Successfully shutdown broker gracefully");
         }
 
     }

Reply via email to