This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0e877bdff9e01c9a9d69d207865d22240bf60e9f
Author: yangyijun <1012293...@qq.com>
AuthorDate: Fri Sep 19 18:53:34 2025 +0800

    [improve][broker] If there is a deadlock in the service, the probe should 
return a failure because the service may be unavailable (#23634)
    
    Co-authored-by: Lari Hotari <lhot...@apache.org>
    Co-authored-by: Lari Hotari <lhot...@users.noreply.github.com>
    (cherry picked from commit d833b8bef21cb9e85f0f313eb9d49c7ca550fbbd)
---
 pulsar-broker-common/pom.xml                       |   6 +
 .../pulsar/common/configuration/VipStatus.java     |  99 +++++++++++--
 .../pulsar/common/configuration/VipStatusTest.java | 161 +++++++++++++++++++++
 3 files changed, 254 insertions(+), 12 deletions(-)

diff --git a/pulsar-broker-common/pom.xml b/pulsar-broker-common/pom.xml
index 4cc01f33b45..59dc93d7b39 100644
--- a/pulsar-broker-common/pom.xml
+++ b/pulsar-broker-common/pom.xml
@@ -89,6 +89,12 @@
       <artifactId>rest-assured</artifactId>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.glassfish.jersey.core</groupId>
+      <artifactId>jersey-server</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java
index 5e6a31b323f..aa4ec1109a6 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java
@@ -18,8 +18,15 @@
  */
 package org.apache.pulsar.common.configuration;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.File;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.time.Clock;
+import java.util.Arrays;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 import javax.servlet.ServletContext;
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
@@ -27,6 +34,7 @@ import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.Response.Status;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.util.ThreadDumpUtil;
 
 /**
  * Web resource used by the VIP service to check to availability of the 
service instance.
@@ -38,25 +46,92 @@ public class VipStatus {
     public static final String ATTRIBUTE_STATUS_FILE_PATH = "statusFilePath";
     public static final String ATTRIBUTE_IS_READY_PROBE = "isReadyProbe";
 
+    // log a full thread dump when a deadlock is detected in status check once 
every 10 minutes
+    // to prevent excessive logging
+    private static final long LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED = 
600000L;
+    // Rate limit status checks to every 500ms to prevent DoS
+    private static final long CHECK_STATUS_INTERVAL = 500L;
+
+    private static volatile long lastCheckStatusTimestamp;
+    private static volatile long lastPrintThreadDumpTimestamp;
+    private static volatile boolean lastCheckStatusResult;
+
+    private long printThreadDumpIntervalMs;
+    private Clock clock;
+
     @Context
     protected ServletContext servletContext;
 
+    public VipStatus() {
+        this.clock = Clock.systemUTC();
+        this.printThreadDumpIntervalMs = 
LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED;
+    }
+
+    @VisibleForTesting
+    public VipStatus(ServletContext servletContext, long 
printThreadDumpIntervalMs) {
+        this.servletContext = servletContext;
+        this.printThreadDumpIntervalMs = printThreadDumpIntervalMs;
+        this.clock = Clock.systemUTC();
+    }
+
+    @VisibleForTesting
+    static void reset() {
+        lastCheckStatusTimestamp = 0L;
+        lastPrintThreadDumpTimestamp = 0L;
+        lastCheckStatusResult = false;
+    }
+
     @GET
     public String checkStatus() {
-        String statusFilePath = (String) 
servletContext.getAttribute(ATTRIBUTE_STATUS_FILE_PATH);
-        @SuppressWarnings("unchecked")
-        Supplier<Boolean> isReadyProbe = (Supplier<Boolean>) 
servletContext.getAttribute(ATTRIBUTE_IS_READY_PROBE);
+        // Locking classes to avoid deadlock detection in multi-thread 
concurrent requests.
+        synchronized (VipStatus.class) {
+            if (clock.millis() - lastCheckStatusTimestamp < 
CHECK_STATUS_INTERVAL) {
+                if (lastCheckStatusResult) {
+                    return "OK";
+                } else {
+                    throw new 
WebApplicationException(Status.SERVICE_UNAVAILABLE);
+                }
+            }
+            lastCheckStatusTimestamp = clock.millis();
 
-        boolean isReady = isReadyProbe != null ? isReadyProbe.get() : true;
+            String statusFilePath = (String) 
servletContext.getAttribute(ATTRIBUTE_STATUS_FILE_PATH);
+            @SuppressWarnings("unchecked")
+            Supplier<Boolean> isReadyProbe = (Supplier<Boolean>) 
servletContext.getAttribute(ATTRIBUTE_IS_READY_PROBE);
+            boolean isReady = isReadyProbe != null ? isReadyProbe.get() : true;
 
-        if (statusFilePath != null) {
-            File statusFile = new File(statusFilePath);
-            if (isReady && statusFile.exists() && statusFile.isFile()) {
-                return "OK";
+            if (statusFilePath != null) {
+                File statusFile = new File(statusFilePath);
+                if (isReady && statusFile.exists() && statusFile.isFile()) {
+                    // check deadlock
+                    ThreadMXBean threadBean = 
ManagementFactory.getThreadMXBean();
+                    long[] threadIds = threadBean.findDeadlockedThreads();
+                    if (threadIds != null && threadIds.length > 0) {
+                        ThreadInfo[] threadInfos = 
threadBean.getThreadInfo(threadIds, false,
+                                false);
+                        String threadNames = Arrays.stream(threadInfos)
+                                .map(threadInfo -> threadInfo.getThreadName()
+                                        + "(tid=" + threadInfo.getThreadId() + 
")")
+                                .collect(Collectors.joining(", "));
+                        if (clock.millis() - lastPrintThreadDumpTimestamp > 
printThreadDumpIntervalMs) {
+                            String diagnosticResult = 
ThreadDumpUtil.buildThreadDiagnosticString();
+                            log.error("Deadlocked threads detected. {}. 
Service may be unavailable, "
+                                    + "thread stack details are as 
follows:\n{}", threadNames, diagnosticResult);
+                            lastPrintThreadDumpTimestamp = clock.millis();
+                        } else {
+                            log.error("Deadlocked threads detected. {}", 
threadNames);
+                        }
+                        lastCheckStatusResult = false;
+                        throw new 
WebApplicationException(Status.SERVICE_UNAVAILABLE);
+                    } else {
+                        lastCheckStatusResult = true;
+                        return "OK";
+                    }
+                }
             }
+            lastCheckStatusResult = false;
+            log.warn("Status file '{}' doesn't exist or ready probe value ({}) 
isn't true. The service is not ready",
+                    statusFilePath, isReady);
+            throw new WebApplicationException(Status.NOT_FOUND);
         }
-        log.warn("Failed to access \"status.html\". The service is not ready");
-        throw new WebApplicationException(Status.NOT_FOUND);
     }
-
-}
+}
\ No newline at end of file
diff --git 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java
 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java
new file mode 100644
index 00000000000..36542d237b5
--- /dev/null
+++ 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.configuration;
+
+import static org.testng.Assert.assertEquals;
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+import javax.servlet.ServletContext;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.assertj.core.util.Files;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class VipStatusTest {
+
+    public static final String ATTRIBUTE_STATUS_FILE_PATH = "statusFilePath";
+    public static final String ATTRIBUTE_IS_READY_PROBE = "isReadyProbe";
+    private static final long LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED = 
10000L;
+    // Rate limit status checks to every 500ms to prevent DoS
+    private static final long CHECK_STATUS_INTERVAL = 500L;
+
+    private ServletContext mockServletContext;
+    private VipStatus vipStatus;
+    private File file;
+
+    @BeforeMethod
+    public void setup() throws IOException {
+        file = Files.newTemporaryFile();
+        Supplier<Boolean> isReadyProbe = () -> true;
+        mockServletContext = Mockito.mock(ServletContext.class);
+        
Mockito.when(mockServletContext.getAttribute(ATTRIBUTE_STATUS_FILE_PATH)).thenReturn(file.getAbsolutePath());
+        
Mockito.when(mockServletContext.getAttribute(ATTRIBUTE_IS_READY_PROBE)).thenReturn(isReadyProbe);
+        vipStatus = new VipStatus(mockServletContext, 
LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED);
+        VipStatus.reset();
+    }
+
+    @Test
+    public void testVipStatusCheckStatus() {
+        // No deadlocks
+        testVipStatusCheckStatusWithoutDeadlock();
+        // There is a deadlock
+        testVipStatusCheckStatusWithDeadlock();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void release() throws IOException {
+        if (file != null) {
+            file.delete();
+            file = null;
+        }
+    }
+
+    @Test
+    public void testVipStatusCheckStatusWithoutDeadlock() {
+        assertEquals(vipStatus.checkStatus(), "OK");
+    }
+
+    @Test
+    public void testVipStatusCheckStatusWithDeadlock() {
+        MockDeadlock mockDeadlock = new MockDeadlock();
+        boolean asExpected = true;
+        try {
+            mockDeadlock.startDeadlock();
+            vipStatus.checkStatus();
+            asExpected = false;
+            System.out.println("Simulated deadlock, no deadlock detected, not 
as expected.");
+        } catch (Exception wae) {
+            System.out.println("Simulated deadlock and detected it, as 
expected.");
+        } finally {
+            mockDeadlock.close();
+        }
+
+        if (!asExpected) {
+            throw new 
WebApplicationException(Response.Status.SERVICE_UNAVAILABLE);
+        }
+    }
+
+    static class MockDeadlock implements Closeable {
+        private ExecutorService executorService = 
Executors.newCachedThreadPool();
+        private ReentrantLock lockA = new ReentrantLock();
+        private ReentrantLock lockB = new ReentrantLock();
+        private Phaser phaser = new Phaser(2);
+
+        @SneakyThrows
+        public void startDeadlock() {
+            executorService.execute(new TaskOne());
+            executorService.execute(new TaskTwo());
+            Thread.sleep(CHECK_STATUS_INTERVAL);
+        }
+
+        @Override
+        public void close() {
+            executorService.shutdownNow();
+        }
+
+        private class TaskOne implements Runnable {
+            @Override
+            public void run() {
+                try {
+                    lockA.lock();
+                    System.out.println("ThreadOne acquired lockA");
+                    phaser.arriveAndAwaitAdvance();
+                    while (!lockB.tryLock(1, TimeUnit.SECONDS)) {
+                        System.out.println("ThreadOne acquired lockB");
+                    }
+                } catch (InterruptedException e) {
+                    //e.printStackTrace();
+                } finally {
+                    lockA.unlock();
+                }
+            }
+        }
+
+        private class TaskTwo implements Runnable {
+            @Override
+            public void run() {
+                try {
+                    lockB.lock();
+                    System.out.println("ThreadOne acquired lockB");
+                    phaser.arriveAndAwaitAdvance();
+                    while (!lockA.tryLock(1, TimeUnit.SECONDS)) {
+                        System.out.println("ThreadOne acquired lockA");
+                    }
+                } catch (InterruptedException e) {
+                    //e.printStackTrace();
+                } finally {
+                    lockB.unlock();
+                }
+            }
+        }
+    }
+}

Reply via email to