This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new ab62742aee5 [Broker] Fix typo in enum name and handle closing of the 
channel properly since writeAndFlush is asynchronous (#15384)
ab62742aee5 is described below

commit ab62742aee564c31a670bde6952b2d5d085f7240
Author: Lari Hotari <[email protected]>
AuthorDate: Sat Apr 30 04:57:42 2022 +0300

    [Broker] Fix typo in enum name and handle closing of the channel properly 
since writeAndFlush is asynchronous (#15384)
    
    (cherry picked from commit cd3816aa351ba8a1f0e9876eefe019b7f0d282d8)
---
 .../pulsar/broker/service/ConnectionController.java      | 16 ++++++++--------
 .../java/org/apache/pulsar/broker/service/ServerCnx.java | 15 ++++++++-------
 2 files changed, 16 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConnectionController.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConnectionController.java
index 51540e179be..65c3a6c4f2a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConnectionController.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConnectionController.java
@@ -36,7 +36,7 @@ public interface ConnectionController {
      * @param remoteAddress
      * @return
      */
-    Sate increaseConnection(SocketAddress remoteAddress);
+    State increaseConnection(SocketAddress remoteAddress);
 
     /**
      * Decrease the number of connections counter.
@@ -44,7 +44,7 @@ public interface ConnectionController {
      */
     void decreaseConnection(SocketAddress remoteAddress);
 
-    enum Sate {
+    enum State {
         OK, REACH_MAX_CONNECTION_PER_IP, REACH_MAX_CONNECTION;
     }
 
@@ -68,13 +68,13 @@ public interface ConnectionController {
         }
 
         @Override
-        public Sate increaseConnection(SocketAddress remoteAddress) {
+        public State increaseConnection(SocketAddress remoteAddress) {
             if (!maxConnectionsLimitEnabled && 
!maxConnectionsLimitPerIpEnabled) {
-                return Sate.OK;
+                return State.OK;
             }
             if (!(remoteAddress instanceof InetSocketAddress)
                     || !isLegalIpAddress(((InetSocketAddress) 
remoteAddress).getHostString())) {
-                return Sate.OK;
+                return State.OK;
             }
             lock.lock();
             try {
@@ -88,20 +88,20 @@ public interface ConnectionController {
                 if (maxConnectionsLimitEnabled && totalConnectionNum > 
maxConnections) {
                     log.info("Reject connect request from {}, because reached 
the maximum number of connections {}",
                             remoteAddress, totalConnectionNum);
-                    return Sate.REACH_MAX_CONNECTION;
+                    return State.REACH_MAX_CONNECTION;
                 }
                 if (maxConnectionsLimitPerIpEnabled && 
CONNECTIONS.get(ip).getValue() > maxConnectionPerIp) {
                     log.info("Reject connect request from {}, because reached 
the maximum number "
                                     + "of connections per Ip {}",
                             remoteAddress, CONNECTIONS.get(ip).getValue());
-                    return Sate.REACH_MAX_CONNECTION_PER_IP;
+                    return State.REACH_MAX_CONNECTION_PER_IP;
                 }
             } catch (Exception e) {
                 log.error("increase connection failed", e);
             } finally {
                 lock.unlock();
             }
-            return Sate.OK;
+            return State.OK;
         }
 
         @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index d8a51f60d65..03a6a80aed3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -27,6 +27,7 @@ import static 
org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
 import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelOption;
@@ -270,13 +271,13 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
     @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
         super.channelActive(ctx);
-        ConnectionController.Sate sate = 
connectionController.increaseConnection(remoteAddress);
-        if (!sate.equals(ConnectionController.Sate.OK)) {
-            ctx.channel().writeAndFlush(Commands.newError(-1, 
ServerError.NotAllowedError,
-                    sate.equals(ConnectionController.Sate.REACH_MAX_CONNECTION)
-                            ? "Reached the maximum number of connections"
-                            : "Reached the maximum number of connections on 
address" + remoteAddress));
-            ctx.channel().close();
+        ConnectionController.State state = 
connectionController.increaseConnection(remoteAddress);
+        if (!state.equals(ConnectionController.State.OK)) {
+            ctx.writeAndFlush(Commands.newError(-1, 
ServerError.NotAllowedError,
+                            
state.equals(ConnectionController.State.REACH_MAX_CONNECTION)
+                                    ? "Reached the maximum number of 
connections"
+                                    : "Reached the maximum number of 
connections on address" + remoteAddress))
+                    .addListener(ChannelFutureListener.CLOSE);
             return;
         }
         log.info("New connection from {}", remoteAddress);

Reply via email to