This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 453872359f6 [improve][broker] fix broker irrational behavior when it 
is closing (#17085)
453872359f6 is described below

commit 453872359f6d824c19186f8f6d2fa954dae71e69
Author: Qiang Huang <[email protected]>
AuthorDate: Sun Aug 28 13:31:56 2022 +0800

    [improve][broker] fix broker irrational behavior when it is closing (#17085)
---
 .../src/main/java/org/apache/pulsar/broker/PulsarService.java  |  7 +++++++
 .../main/java/org/apache/pulsar/broker/service/ServerCnx.java  | 10 ++++++++++
 2 files changed, 17 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 0d5c4c59775..1c3b6b8aea1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1240,6 +1240,13 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
         return this.state;
     }
 
+    /**
+     * check the current pulsar service is running, including Started and Init 
state.
+     */
+    public boolean isRunning() {
+        return this.state == State.Started || this.state == State.Init;
+    }
+
     /**
      * Get a reference of the current <code>LeaderElectionService</code> 
instance associated with the current
      * <code>PulsarService</code> instance.
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 40ec486133d..604b824881a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -452,6 +452,16 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             return;
         }
 
+        if (!this.service.getPulsar().isRunning()) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Failed lookup topic {} due to pulsar service 
is not ready: {} state", remoteAddress,
+                        topicName, 
this.service.getPulsar().getState().toString());
+            }
+            
ctx.writeAndFlush(newLookupErrorResponse(ServerError.ServiceNotReady,
+                    "Failed due to pulsar service is not ready", requestId));
+            return;
+        }
+
         final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
         if (lookupSemaphore.tryAcquire()) {
             if (invalidOriginalPrincipal(originalPrincipal)) {

Reply via email to