This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new a3fcaa2  Revert "Fix Broker HealthCheck Endpoint Exposes Race 
Conditions."
a3fcaa2 is described below

commit a3fcaa2de105f723f967dd9aea2a6a4877b98319
Author: technoboy <[email protected]>
AuthorDate: Wed Mar 9 16:17:52 2022 +0800

    Revert "Fix Broker HealthCheck Endpoint Exposes Race Conditions."
    
    This reverts commit df10859d1dbb4def553a321e92215eb980adf151.
---
 .../pulsar/broker/admin/impl/BrokersBase.java      | 195 +++++++++------------
 .../pulsar/broker/web/PulsarWebResource.java       |  16 --
 .../broker/admin/AdminApiHealthCheckTest.java      |  91 ----------
 .../org/apache/pulsar/common/util/FutureUtil.java  |  17 +-
 4 files changed, 82 insertions(+), 237 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 15303b1..ad4a6ff 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -24,12 +24,9 @@ import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
 import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -50,10 +47,9 @@ import org.apache.pulsar.broker.loadbalance.LeaderBroker;
 import org.apache.pulsar.broker.loadbalance.LoadManager;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.broker.service.Subscription;
-import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -304,133 +300,102 @@ public class BrokersBase extends PulsarWebResource {
         @ApiResponse(code = 403, message = "Don't have admin permission"),
         @ApiResponse(code = 404, message = "Cluster doesn't exist"),
         @ApiResponse(code = 500, message = "Internal server error")})
-    public void healthcheck(@Suspended AsyncResponse asyncResponse) {
-        validateSuperUserAccess();
-        internalRunHealthCheck()
-                .thenAccept(__ -> {
-                    LOG.info("[{}] Successfully run health check.", 
clientAppId());
-                    asyncResponse.resume("ok");
-                }).exceptionally(ex -> {
-                    LOG.error("[{}] Fail to run health check.", clientAppId(), 
ex);
-                    resumeAsyncResponseExceptionally(asyncResponse, ex);
-                    return null;
-                });
-    }
-
-    private CompletableFuture<Void> internalRunHealthCheck() {
+    public void healthcheck(@Suspended AsyncResponse asyncResponse) throws 
Exception {
         String topic;
         PulsarClient client;
         try {
+            validateSuperUserAccess();
             String heartbeatNamespace = NamespaceService.getHeartbeatNamespace(
                     pulsar().getAdvertisedAddress(), 
pulsar().getConfiguration());
             topic = String.format("persistent://%s/%s", heartbeatNamespace, 
HEALTH_CHECK_TOPIC_SUFFIX);
             LOG.info("Running healthCheck with topic={}", topic);
             client = pulsar().getClient();
-        } catch (PulsarServerException e) {
-            LOG.error("[{}] Fail to run health check while get client.", 
clientAppId());
-            throw new RestException(e);
+        } catch (Exception e) {
+            LOG.error("Error getting heathcheck topic info", e);
+            throw new PulsarServerException(e);
         }
         String messageStr = UUID.randomUUID().toString();
-        final String subscriptionName = "healthCheck-" + messageStr;
         // create non-partitioned topic manually and close the previous reader 
if present.
-        return pulsar().getBrokerService().getTopic(topic, 
true).thenCompose(topicOptional -> {
-            if (!topicOptional.isPresent()) {
-                LOG.error("[{}] Fail to run health check while get topic {}. 
because get null value.",
-                        clientAppId(), topic);
-                throw new RestException(Status.NOT_FOUND,
-                        String.format("Topic [%s] not found after create.", 
topic));
-            }
-            CompletableFuture<Void> resultFuture = new CompletableFuture<>();
-            client.newProducer(Schema.STRING).topic(topic).createAsync()
-                    .thenCompose(producer -> client.newReader(Schema.STRING)
-                            .topic(topic)
-                            .subscriptionName(subscriptionName)
-                            .startMessageId(MessageId.latest)
-                            .createAsync()
-                            .exceptionally(createException -> {
-                                producer.closeAsync().exceptionally(ex -> {
-                                    LOG.error("[{}] Close producer fail while 
heath check.", clientAppId());
-                                    return null;
-                                });
-                                throw 
FutureUtil.wrapToCompletionException(createException);
-                            })
-                            .thenCompose(reader -> 
producer.sendAsync(messageStr)
-                                    .thenCompose(__ -> 
healthCheckRecursiveReadNext(reader, messageStr))
-                                    .whenComplete((__, ex) -> {
-                                        closeAndReCheck(producer, reader, 
topicOptional.get(), subscriptionName)
-                                                .whenComplete((unused, 
innerEx) -> {
-                                                    if (ex != null) {
-                                                        
resultFuture.completeExceptionally(ex);
-                                                    } else {
-                                                        
resultFuture.complete(null);
-                                                    }
-                                                });
-                                    }))
-                    ).exceptionally(ex -> {
-                        resultFuture.completeExceptionally(ex);
-                        return null;
-                    });
-            return resultFuture;
-        });
-    }
-
-    private CompletableFuture<Void> 
healthCheckRecursiveReadNext(Reader<String> reader, String content) {
-        return reader.readNextAsync()
-                .thenCompose(msg -> {
-                    if (!Objects.equals(content, msg.getValue())) {
-                        return healthCheckRecursiveReadNext(reader, content);
+        try {
+            pulsar().getBrokerService().getTopic(topic, 
true).get().ifPresent(t -> {
+                t.getSubscriptions().forEach((__, value) -> {
+                    try {
+                        value.deleteForcefully();
+                    } catch (Exception e) {
+                        LOG.warn("Failed to delete previous subscription {} 
for health check", value.getName(), e);
                     }
-                    return CompletableFuture.completedFuture(null);
                 });
-    }
+            });
+        } catch (Exception e) {
+            LOG.warn("Failed to try to delete subscriptions for health check", 
e);
+        }
+        CompletableFuture<Producer<String>> producerFuture =
+            client.newProducer(Schema.STRING).topic(topic).createAsync();
+        CompletableFuture<Reader<String>> readerFuture = 
client.newReader(Schema.STRING)
+            .topic(topic).startMessageId(MessageId.latest).createAsync();
 
-    /**
-     * Close producer and reader and then to re-check if this operation is 
success.
-     *
-     * Re-check
-     * - Producer: If close fails we will print error log to notify user.
-     * - Consumer: If close fails we will force delete subscription.
-     *
-     * @param producer Producer
-     * @param reader Reader
-     * @param topic  Topic
-     * @param subscriptionName  Subscription name
-     */
-    private CompletableFuture<Void> closeAndReCheck(Producer<String> producer, 
Reader<String> reader,
-                                                    Topic topic, String 
subscriptionName) {
-        // no matter exception or success, we still need to
-        // close producer/reader
-        CompletableFuture<Void> producerFuture = producer.closeAsync();
-        CompletableFuture<Void> readerFuture = reader.closeAsync();
-        List<CompletableFuture<Void>> futures = new ArrayList<>(2);
-        futures.add(producerFuture);
-        futures.add(readerFuture);
-        return FutureUtil.waitForAll(Collections.unmodifiableList(futures))
-                .exceptionally(closeException -> {
-                    if (readerFuture.isCompletedExceptionally()) {
-                        LOG.error("[{}] Close reader fail while heath check.", 
clientAppId());
-                        Subscription subscription =
-                                topic.getSubscription(subscriptionName);
-                        // re-check subscription after reader close
-                        if (subscription != null) {
-                            LOG.warn("[{}] Force delete subscription {} "
-                                            + "when it still exists after the"
-                                            + " reader is closed.",
-                                    clientAppId(), subscription);
-                            subscription.deleteForcefully()
-                                    .exceptionally(ex -> {
-                                        LOG.error("[{}] Force delete 
subscription fail"
-                                                        + " while health 
check",
-                                                clientAppId(), ex);
-                                        return null;
-                                    });
-                        }
+        CompletableFuture<Void> completePromise = new CompletableFuture<>();
+
+        CompletableFuture.allOf(producerFuture, readerFuture).whenComplete(
+                (ignore, exception) -> {
+                    if (exception != null) {
+                        completePromise.completeExceptionally(exception);
                     } else {
-                        // producer future fail.
-                        LOG.error("[{}] Close producer fail while heath 
check.", clientAppId());
+                        producerFuture.thenCompose((producer) -> 
producer.sendAsync(messageStr))
+                            .whenComplete((ignore2, exception2) -> {
+                                    if (exception2 != null) {
+                                        
completePromise.completeExceptionally(exception2);
+                                    }
+                                });
+
+                        healthcheckReadLoop(readerFuture, completePromise, 
messageStr);
+
+                        // timeout read loop after 10 seconds
+                        FutureUtil.addTimeoutHandling(completePromise,
+                                HEALTHCHECK_READ_TIMEOUT, 
pulsar().getExecutor(),
+                                () -> FutureUtil.createTimeoutException("Timed 
out reading", getClass(),
+                                        "healthcheck(...)"));
                     }
-                    return null;
                 });
+
+        completePromise.whenComplete((ignore, exception) -> {
+                producerFuture.thenAccept((producer) -> {
+                        producer.closeAsync().whenComplete((ignore2, 
exception2) -> {
+                                if (exception2 != null) {
+                                    LOG.warn("Error closing producer for 
healthcheck", exception2);
+                                }
+                            });
+                    });
+                readerFuture.thenAccept((reader) -> {
+                        reader.closeAsync().whenComplete((ignore2, exception2) 
-> {
+                                if (exception2 != null) {
+                                    LOG.warn("Error closing reader for 
healthcheck", exception2);
+                                }
+                            });
+                    });
+                if (exception != null) {
+                    asyncResponse.resume(new RestException(exception));
+                } else {
+                    asyncResponse.resume("ok");
+                }
+            });
+    }
+
+    private void healthcheckReadLoop(CompletableFuture<Reader<String>> 
readerFuture,
+                                     CompletableFuture<?> completablePromise,
+                                     String messageStr) {
+        readerFuture.thenAccept((reader) -> {
+                CompletableFuture<Message<String>> readFuture = 
reader.readNextAsync()
+                    .whenComplete((m, exception) -> {
+                            if (exception != null) {
+                                
completablePromise.completeExceptionally(exception);
+                            } else if (m.getValue().equals(messageStr)) {
+                                completablePromise.complete(null);
+                            } else {
+                                healthcheckReadLoop(readerFuture, 
completablePromise, messageStr);
+                            }
+                        });
+            });
     }
 
     private synchronized void deleteDynamicConfigurationOnZk(String 
configName) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 8fe6375..71eae45 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -43,7 +43,6 @@ import java.util.concurrent.TimeoutException;
 import javax.servlet.ServletContext;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
@@ -68,8 +67,6 @@ import 
org.apache.pulsar.broker.resources.NamespaceResources.IsolationPolicyReso
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.broker.resources.ResourceGroupResources;
 import org.apache.pulsar.broker.resources.TenantResources;
-import org.apache.pulsar.broker.service.BrokerServiceException;
-import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
 import org.apache.pulsar.common.naming.Constants;
@@ -1210,17 +1207,4 @@ public abstract class PulsarWebResource {
         }
         return tree;
     }
-
-    protected static void resumeAsyncResponseExceptionally(AsyncResponse 
asyncResponse, Throwable exception) {
-        Throwable realCause = FutureUtil.unwrapCompletionException(exception);
-        if (realCause instanceof WebApplicationException) {
-            asyncResponse.resume(realCause);
-        } else if (realCause instanceof 
BrokerServiceException.NotAllowedException) {
-            asyncResponse.resume(new RestException(Status.CONFLICT, 
realCause));
-        } else if (realCause instanceof PulsarAdminException) {
-            asyncResponse.resume(new RestException(((PulsarAdminException) 
realCause)));
-        } else {
-            asyncResponse.resume(new RestException(realCause));
-        }
-    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
deleted file mode 100644
index c44d771..0000000
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.admin;
-
-import com.google.common.collect.Sets;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
-import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
-import org.apache.pulsar.compaction.Compactor;
-import org.awaitility.Awaitility;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-@Test(groups = "broker")
-public class AdminApiHealthCheckTest extends MockedPulsarServiceBaseTest {
-
-    @BeforeMethod
-    @Override
-    public void setup() throws Exception {
-        resetConfig();
-        super.internalSetup();
-        admin.clusters().createCluster("test",
-                
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
-        TenantInfoImpl tenantInfo = new TenantInfoImpl(
-                Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
-        admin.tenants().createTenant("pulsar", tenantInfo);
-        admin.namespaces().createNamespace("pulsar/system", 
Sets.newHashSet("test"));
-        admin.tenants().createTenant("public", tenantInfo);
-        admin.namespaces().createNamespace("public/default", 
Sets.newHashSet("test"));
-    }
-    @AfterMethod(alwaysRun = true)
-    @Override
-    public void cleanup() throws Exception {
-        super.internalCleanup();
-    }
-
-    @Test
-    public void testHealthCheckup() throws Exception {
-        final int times = 30;
-        CompletableFuture<Void> future = new CompletableFuture<>();
-        pulsar.getExecutor().execute(() -> {
-            try {
-                for (int i = 0; i < times; i++) {
-                    admin.brokers().healthcheck();
-                }
-                future.complete(null);
-            } catch (PulsarAdminException e) {
-                future.completeExceptionally(e);
-            }
-        });
-        for (int i = 0; i < times; i++) {
-            admin.brokers().healthcheck();
-        }
-        // To ensure we don't have any subscription
-        final String testHealthCheckTopic = 
String.format("persistent://pulsar/test/localhost:%s/healthcheck",
-                pulsar.getConfig().getWebServicePort().get());
-        Awaitility.await().untilAsserted(() -> {
-            Assert.assertFalse(future.isCompletedExceptionally());
-        });
-        Awaitility.await().untilAsserted(() ->
-                Assert.assertTrue(CollectionUtils.isEmpty(admin.topics()
-                        .getSubscriptions(testHealthCheckTopic).stream()
-                        // All system topics are using compaction, even though 
is not explicitly set in the policies.
-                        .filter(v -> 
!v.equals(Compactor.COMPACTION_SUBSCRIPTION))
-                        .collect(Collectors.toList())
-                ))
-        );
-    }
-}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index 7d9fd79..45fd7f7 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -78,7 +78,8 @@ public class FutureUtil {
     }
 
     /**
-     * If the future is cancelled or times out, the cancel action will be 
invoked.
+     * If the future is cancelled or times out, the cancel action will be
+     * invoked
      *
      * The action is executed once if the future completes with
      * {@link java.util.concurrent.CancellationException} or {@link 
TimeoutException}
@@ -202,18 +203,4 @@ public class FutureUtil {
         }
         return Optional.empty();
     }
-
-    /**
-     * Wrap throwable exception to CompletionException if that exception is 
not an instance of CompletionException.
-     *
-     * @param throwable Exception
-     * @return CompletionException
-     */
-    public static CompletionException wrapToCompletionException(Throwable 
throwable) {
-        if (throwable instanceof CompletionException) {
-            return (CompletionException) throwable;
-        } else {
-            return new CompletionException(throwable);
-        }
-    }
 }

Reply via email to