This is an automated email from the ASF dual-hosted git repository.
guangning pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 362c4f4966a [fix][broker] Gracefully shutdown does not work with admin
cli in standalone (#20709)
362c4f4966a is described below
commit 362c4f4966a59bdb612ba4369d63bcfb23419e32
Author: Kim, Joo Hyuk <[email protected]>
AuthorDate: Wed Jul 26 15:08:16 2023 +0900
[fix][broker] Gracefully shutdown does not work with admin cli in
standalone (#20709)
Fixes : #20617
### Motivation
Currently, clients' shutdown API does not behave consistently in sense that
asynchronicity is not handled explicitly. So issue #20617 happens.
This PR will allow clients know that shutdown is actually triggered.
### Modifications
- Synchronize call to `POST /shutdown` on client side
- Asynchronize explicitly `pulsar().closeAsync()` invocation
---
.../pulsar/broker/admin/impl/BrokersBase.java | 20 +++++++++----
.../org/apache/pulsar/client/admin/Brokers.java | 9 +++---
.../java/org/apache/pulsar/admin/cli/CmdBase.java | 34 ++++++++++++++++++++++
.../org/apache/pulsar/admin/cli/CmdBrokers.java | 4 +--
4 files changed, 56 insertions(+), 11 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index b367ce7aad9..be8390f15f8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -545,16 +545,26 @@ public class BrokersBase extends AdminResource {
@ApiParam(name = "maxConcurrentUnloadPerSec",
value = "if the value absent(value=0) means no concurrent
limitation.")
@QueryParam("maxConcurrentUnloadPerSec") int
maxConcurrentUnloadPerSec,
- @QueryParam("forcedTerminateTopic") @DefaultValue("true") boolean
forcedTerminateTopic
+ @QueryParam("forcedTerminateTopic") @DefaultValue("true") boolean
forcedTerminateTopic,
+ @Suspended final AsyncResponse asyncResponse
) {
validateSuperUserAccess();
- doShutDownBrokerGracefully(maxConcurrentUnloadPerSec,
forcedTerminateTopic);
+ doShutDownBrokerGracefullyAsync(maxConcurrentUnloadPerSec,
forcedTerminateTopic)
+ .thenAccept(__ -> {
+ LOG.info("[{}] Successfully shutdown broker gracefully",
clientAppId());
+ asyncResponse.resume(Response.noContent().build());
+ })
+ .exceptionally(ex -> {
+ LOG.error("[{}] Failed to shutdown broker gracefully",
clientAppId(), ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
- private void doShutDownBrokerGracefully(int maxConcurrentUnloadPerSec,
- boolean forcedTerminateTopic) {
+ private CompletableFuture<Void> doShutDownBrokerGracefullyAsync(int
maxConcurrentUnloadPerSec,
+ boolean
forcedTerminateTopic) {
pulsar().getBrokerService().unloadNamespaceBundlesGracefully(maxConcurrentUnloadPerSec,
forcedTerminateTopic);
- pulsar().closeAsync();
+ return pulsar().closeAsync();
}
}
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java
index 464d02121cf..29c280f8ba5 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java
@@ -326,10 +326,11 @@ public interface Brokers {
CompletableFuture<Void> healthcheckAsync(TopicVersion topicVersion);
/**
- * Shutdown current broker gracefully.
- * @param maxConcurrentUnloadPerSec
- * @param forcedTerminateTopic
- * @return
+ * Trigger the current broker to graceful-shutdown asynchronously.
+ *
+ * @param maxConcurrentUnloadPerSec the maximum number of topics to unload
per second.
+ * This helps control the speed of the
unload operation during shutdown.
+ * @param forcedTerminateTopic if true, topics will be forcefully
terminated during the shutdown process.
*/
CompletableFuture<Void> shutDownBrokerGracefully(int
maxConcurrentUnloadPerSec,
boolean
forcedTerminateTopic);
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
index ce2c44ec1e8..381bc8abcaa 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.admin.cli;
+import static
org.apache.pulsar.client.admin.internal.BaseResource.getApiException;
import com.beust.jcommander.DefaultUsageFormatter;
import com.beust.jcommander.IUsageFormatter;
import com.beust.jcommander.JCommander;
@@ -26,10 +27,15 @@ import com.beust.jcommander.ParameterException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.PulsarAdminException.ConnectException;
+import org.apache.pulsar.client.admin.internal.PulsarAdminImpl;
public abstract class CmdBase {
protected final JCommander jcommander;
@@ -37,6 +43,12 @@ public abstract class CmdBase {
private PulsarAdmin admin;
private IUsageFormatter usageFormatter;
+ /**
+ * Default read timeout in milliseconds.
+ * Used if not found from configuration data in {@link #getReadTimeoutMs()}
+ */
+ private static final long DEFAULT_READ_TIMEOUT_MILLIS = 60000;
+
@Parameter(names = { "--help", "-h" }, help = true, hidden = true)
private boolean help = false;
@@ -124,6 +136,28 @@ public abstract class CmdBase {
return admin;
}
+ protected long getReadTimeoutMs() {
+ PulsarAdmin pulsarAdmin = getAdmin();
+ if (pulsarAdmin instanceof PulsarAdminImpl) {
+ return ((PulsarAdminImpl)
pulsarAdmin).getClientConfigData().getReadTimeoutMs();
+ }
+ return DEFAULT_READ_TIMEOUT_MILLIS;
+ }
+
+ protected <T> T sync(Supplier<CompletableFuture<T>> executor) throws
PulsarAdminException {
+ try {
+ return executor.get().get(getReadTimeoutMs(),
TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ } catch (ExecutionException e) {
+ throw PulsarAdminException.wrap(getApiException(e.getCause()));
+ } catch (Exception e) {
+ throw PulsarAdminException.wrap(getApiException(e));
+ }
+ }
static Map<String, String> parseListKeyValueMap(List<String> metadata) {
Map<String, String> map = null;
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java
index 1e86edcf59c..f1571e96c65 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java
@@ -150,8 +150,8 @@ public class CmdBrokers extends CmdBase {
@Override
void run() throws Exception {
-
getAdmin().brokers().shutDownBrokerGracefully(maxConcurrentUnloadPerSec,
forcedTerminateTopic);
- System.out.println("Successfully trigger broker shutdown
gracefully");
+ sync(() ->
getAdmin().brokers().shutDownBrokerGracefully(maxConcurrentUnloadPerSec,
forcedTerminateTopic));
+ System.out.println("Successfully shutdown broker gracefully");
}
}