This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new a3fcaa2 Revert "Fix Broker HealthCheck Endpoint Exposes Race
Conditions."
a3fcaa2 is described below
commit a3fcaa2de105f723f967dd9aea2a6a4877b98319
Author: technoboy <[email protected]>
AuthorDate: Wed Mar 9 16:17:52 2022 +0800
Revert "Fix Broker HealthCheck Endpoint Exposes Race Conditions."
This reverts commit df10859d1dbb4def553a321e92215eb980adf151.
---
.../pulsar/broker/admin/impl/BrokersBase.java | 195 +++++++++------------
.../pulsar/broker/web/PulsarWebResource.java | 16 --
.../broker/admin/AdminApiHealthCheckTest.java | 91 ----------
.../org/apache/pulsar/common/util/FutureUtil.java | 17 +-
4 files changed, 82 insertions(+), 237 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 15303b1..ad4a6ff 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -24,12 +24,9 @@ import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -50,10 +47,9 @@ import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.broker.service.Subscription;
-import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
@@ -304,133 +300,102 @@ public class BrokersBase extends PulsarWebResource {
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Cluster doesn't exist"),
@ApiResponse(code = 500, message = "Internal server error")})
- public void healthcheck(@Suspended AsyncResponse asyncResponse) {
- validateSuperUserAccess();
- internalRunHealthCheck()
- .thenAccept(__ -> {
- LOG.info("[{}] Successfully run health check.",
clientAppId());
- asyncResponse.resume("ok");
- }).exceptionally(ex -> {
- LOG.error("[{}] Fail to run health check.", clientAppId(),
ex);
- resumeAsyncResponseExceptionally(asyncResponse, ex);
- return null;
- });
- }
-
- private CompletableFuture<Void> internalRunHealthCheck() {
+ public void healthcheck(@Suspended AsyncResponse asyncResponse) throws
Exception {
String topic;
PulsarClient client;
try {
+ validateSuperUserAccess();
String heartbeatNamespace = NamespaceService.getHeartbeatNamespace(
pulsar().getAdvertisedAddress(),
pulsar().getConfiguration());
topic = String.format("persistent://%s/%s", heartbeatNamespace,
HEALTH_CHECK_TOPIC_SUFFIX);
LOG.info("Running healthCheck with topic={}", topic);
client = pulsar().getClient();
- } catch (PulsarServerException e) {
- LOG.error("[{}] Fail to run health check while get client.",
clientAppId());
- throw new RestException(e);
+ } catch (Exception e) {
+ LOG.error("Error getting heathcheck topic info", e);
+ throw new PulsarServerException(e);
}
String messageStr = UUID.randomUUID().toString();
- final String subscriptionName = "healthCheck-" + messageStr;
// create non-partitioned topic manually and close the previous reader
if present.
- return pulsar().getBrokerService().getTopic(topic,
true).thenCompose(topicOptional -> {
- if (!topicOptional.isPresent()) {
- LOG.error("[{}] Fail to run health check while get topic {}.
because get null value.",
- clientAppId(), topic);
- throw new RestException(Status.NOT_FOUND,
- String.format("Topic [%s] not found after create.",
topic));
- }
- CompletableFuture<Void> resultFuture = new CompletableFuture<>();
- client.newProducer(Schema.STRING).topic(topic).createAsync()
- .thenCompose(producer -> client.newReader(Schema.STRING)
- .topic(topic)
- .subscriptionName(subscriptionName)
- .startMessageId(MessageId.latest)
- .createAsync()
- .exceptionally(createException -> {
- producer.closeAsync().exceptionally(ex -> {
- LOG.error("[{}] Close producer fail while
heath check.", clientAppId());
- return null;
- });
- throw
FutureUtil.wrapToCompletionException(createException);
- })
- .thenCompose(reader ->
producer.sendAsync(messageStr)
- .thenCompose(__ ->
healthCheckRecursiveReadNext(reader, messageStr))
- .whenComplete((__, ex) -> {
- closeAndReCheck(producer, reader,
topicOptional.get(), subscriptionName)
- .whenComplete((unused,
innerEx) -> {
- if (ex != null) {
-
resultFuture.completeExceptionally(ex);
- } else {
-
resultFuture.complete(null);
- }
- });
- }))
- ).exceptionally(ex -> {
- resultFuture.completeExceptionally(ex);
- return null;
- });
- return resultFuture;
- });
- }
-
- private CompletableFuture<Void>
healthCheckRecursiveReadNext(Reader<String> reader, String content) {
- return reader.readNextAsync()
- .thenCompose(msg -> {
- if (!Objects.equals(content, msg.getValue())) {
- return healthCheckRecursiveReadNext(reader, content);
+ try {
+ pulsar().getBrokerService().getTopic(topic,
true).get().ifPresent(t -> {
+ t.getSubscriptions().forEach((__, value) -> {
+ try {
+ value.deleteForcefully();
+ } catch (Exception e) {
+ LOG.warn("Failed to delete previous subscription {}
for health check", value.getName(), e);
}
- return CompletableFuture.completedFuture(null);
});
- }
+ });
+ } catch (Exception e) {
+ LOG.warn("Failed to try to delete subscriptions for health check",
e);
+ }
+ CompletableFuture<Producer<String>> producerFuture =
+ client.newProducer(Schema.STRING).topic(topic).createAsync();
+ CompletableFuture<Reader<String>> readerFuture =
client.newReader(Schema.STRING)
+ .topic(topic).startMessageId(MessageId.latest).createAsync();
- /**
- * Close producer and reader and then to re-check if this operation is
success.
- *
- * Re-check
- * - Producer: If close fails we will print error log to notify user.
- * - Consumer: If close fails we will force delete subscription.
- *
- * @param producer Producer
- * @param reader Reader
- * @param topic Topic
- * @param subscriptionName Subscription name
- */
- private CompletableFuture<Void> closeAndReCheck(Producer<String> producer,
Reader<String> reader,
- Topic topic, String
subscriptionName) {
- // no matter exception or success, we still need to
- // close producer/reader
- CompletableFuture<Void> producerFuture = producer.closeAsync();
- CompletableFuture<Void> readerFuture = reader.closeAsync();
- List<CompletableFuture<Void>> futures = new ArrayList<>(2);
- futures.add(producerFuture);
- futures.add(readerFuture);
- return FutureUtil.waitForAll(Collections.unmodifiableList(futures))
- .exceptionally(closeException -> {
- if (readerFuture.isCompletedExceptionally()) {
- LOG.error("[{}] Close reader fail while heath check.",
clientAppId());
- Subscription subscription =
- topic.getSubscription(subscriptionName);
- // re-check subscription after reader close
- if (subscription != null) {
- LOG.warn("[{}] Force delete subscription {} "
- + "when it still exists after the"
- + " reader is closed.",
- clientAppId(), subscription);
- subscription.deleteForcefully()
- .exceptionally(ex -> {
- LOG.error("[{}] Force delete
subscription fail"
- + " while health
check",
- clientAppId(), ex);
- return null;
- });
- }
+ CompletableFuture<Void> completePromise = new CompletableFuture<>();
+
+ CompletableFuture.allOf(producerFuture, readerFuture).whenComplete(
+ (ignore, exception) -> {
+ if (exception != null) {
+ completePromise.completeExceptionally(exception);
} else {
- // producer future fail.
- LOG.error("[{}] Close producer fail while heath
check.", clientAppId());
+ producerFuture.thenCompose((producer) ->
producer.sendAsync(messageStr))
+ .whenComplete((ignore2, exception2) -> {
+ if (exception2 != null) {
+
completePromise.completeExceptionally(exception2);
+ }
+ });
+
+ healthcheckReadLoop(readerFuture, completePromise,
messageStr);
+
+ // timeout read loop after 10 seconds
+ FutureUtil.addTimeoutHandling(completePromise,
+ HEALTHCHECK_READ_TIMEOUT,
pulsar().getExecutor(),
+ () -> FutureUtil.createTimeoutException("Timed
out reading", getClass(),
+ "healthcheck(...)"));
}
- return null;
});
+
+ completePromise.whenComplete((ignore, exception) -> {
+ producerFuture.thenAccept((producer) -> {
+ producer.closeAsync().whenComplete((ignore2,
exception2) -> {
+ if (exception2 != null) {
+ LOG.warn("Error closing producer for
healthcheck", exception2);
+ }
+ });
+ });
+ readerFuture.thenAccept((reader) -> {
+ reader.closeAsync().whenComplete((ignore2, exception2)
-> {
+ if (exception2 != null) {
+ LOG.warn("Error closing reader for
healthcheck", exception2);
+ }
+ });
+ });
+ if (exception != null) {
+ asyncResponse.resume(new RestException(exception));
+ } else {
+ asyncResponse.resume("ok");
+ }
+ });
+ }
+
+ private void healthcheckReadLoop(CompletableFuture<Reader<String>>
readerFuture,
+ CompletableFuture<?> completablePromise,
+ String messageStr) {
+ readerFuture.thenAccept((reader) -> {
+ CompletableFuture<Message<String>> readFuture =
reader.readNextAsync()
+ .whenComplete((m, exception) -> {
+ if (exception != null) {
+
completablePromise.completeExceptionally(exception);
+ } else if (m.getValue().equals(messageStr)) {
+ completablePromise.complete(null);
+ } else {
+ healthcheckReadLoop(readerFuture,
completablePromise, messageStr);
+ }
+ });
+ });
}
private synchronized void deleteDynamicConfigurationOnZk(String
configName) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 8fe6375..71eae45 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -43,7 +43,6 @@ import java.util.concurrent.TimeoutException;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
@@ -68,8 +67,6 @@ import
org.apache.pulsar.broker.resources.NamespaceResources.IsolationPolicyReso
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.resources.ResourceGroupResources;
import org.apache.pulsar.broker.resources.TenantResources;
-import org.apache.pulsar.broker.service.BrokerServiceException;
-import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
import org.apache.pulsar.common.naming.Constants;
@@ -1210,17 +1207,4 @@ public abstract class PulsarWebResource {
}
return tree;
}
-
- protected static void resumeAsyncResponseExceptionally(AsyncResponse
asyncResponse, Throwable exception) {
- Throwable realCause = FutureUtil.unwrapCompletionException(exception);
- if (realCause instanceof WebApplicationException) {
- asyncResponse.resume(realCause);
- } else if (realCause instanceof
BrokerServiceException.NotAllowedException) {
- asyncResponse.resume(new RestException(Status.CONFLICT,
realCause));
- } else if (realCause instanceof PulsarAdminException) {
- asyncResponse.resume(new RestException(((PulsarAdminException)
realCause)));
- } else {
- asyncResponse.resume(new RestException(realCause));
- }
- }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
deleted file mode 100644
index c44d771..0000000
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.admin;
-
-import com.google.common.collect.Sets;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
-import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
-import org.apache.pulsar.compaction.Compactor;
-import org.awaitility.Awaitility;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-@Test(groups = "broker")
-public class AdminApiHealthCheckTest extends MockedPulsarServiceBaseTest {
-
- @BeforeMethod
- @Override
- public void setup() throws Exception {
- resetConfig();
- super.internalSetup();
- admin.clusters().createCluster("test",
-
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
- TenantInfoImpl tenantInfo = new TenantInfoImpl(
- Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
- admin.tenants().createTenant("pulsar", tenantInfo);
- admin.namespaces().createNamespace("pulsar/system",
Sets.newHashSet("test"));
- admin.tenants().createTenant("public", tenantInfo);
- admin.namespaces().createNamespace("public/default",
Sets.newHashSet("test"));
- }
- @AfterMethod(alwaysRun = true)
- @Override
- public void cleanup() throws Exception {
- super.internalCleanup();
- }
-
- @Test
- public void testHealthCheckup() throws Exception {
- final int times = 30;
- CompletableFuture<Void> future = new CompletableFuture<>();
- pulsar.getExecutor().execute(() -> {
- try {
- for (int i = 0; i < times; i++) {
- admin.brokers().healthcheck();
- }
- future.complete(null);
- } catch (PulsarAdminException e) {
- future.completeExceptionally(e);
- }
- });
- for (int i = 0; i < times; i++) {
- admin.brokers().healthcheck();
- }
- // To ensure we don't have any subscription
- final String testHealthCheckTopic =
String.format("persistent://pulsar/test/localhost:%s/healthcheck",
- pulsar.getConfig().getWebServicePort().get());
- Awaitility.await().untilAsserted(() -> {
- Assert.assertFalse(future.isCompletedExceptionally());
- });
- Awaitility.await().untilAsserted(() ->
- Assert.assertTrue(CollectionUtils.isEmpty(admin.topics()
- .getSubscriptions(testHealthCheckTopic).stream()
- // All system topics are using compaction, even though
is not explicitly set in the policies.
- .filter(v ->
!v.equals(Compactor.COMPACTION_SUBSCRIPTION))
- .collect(Collectors.toList())
- ))
- );
- }
-}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index 7d9fd79..45fd7f7 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -78,7 +78,8 @@ public class FutureUtil {
}
/**
- * If the future is cancelled or times out, the cancel action will be
invoked.
+ * If the future is cancelled or times out, the cancel action will be
+ * invoked
*
* The action is executed once if the future completes with
* {@link java.util.concurrent.CancellationException} or {@link
TimeoutException}
@@ -202,18 +203,4 @@ public class FutureUtil {
}
return Optional.empty();
}
-
- /**
- * Wrap throwable exception to CompletionException if that exception is
not an instance of CompletionException.
- *
- * @param throwable Exception
- * @return CompletionException
- */
- public static CompletionException wrapToCompletionException(Throwable
throwable) {
- if (throwable instanceof CompletionException) {
- return (CompletionException) throwable;
- } else {
- return new CompletionException(throwable);
- }
- }
}