This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a93a67b004a6a965e8045f48e8bdc349e4e6bfc2 Author: Lari Hotari <[email protected]> AuthorDate: Sat Apr 30 04:57:42 2022 +0300 [Broker] Fix typo in enum name and handle closing of the channel properly since writeAndFlush is asynchronous (#15384) (cherry picked from commit cd3816aa351ba8a1f0e9876eefe019b7f0d282d8) --- .../pulsar/broker/service/ConnectionController.java | 16 ++++++++-------- .../java/org/apache/pulsar/broker/service/ServerCnx.java | 15 ++++++++------- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConnectionController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConnectionController.java index 51540e179be..65c3a6c4f2a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConnectionController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConnectionController.java @@ -36,7 +36,7 @@ public interface ConnectionController { * @param remoteAddress * @return */ - Sate increaseConnection(SocketAddress remoteAddress); + State increaseConnection(SocketAddress remoteAddress); /** * Decrease the number of connections counter. @@ -44,7 +44,7 @@ public interface ConnectionController { */ void decreaseConnection(SocketAddress remoteAddress); - enum Sate { + enum State { OK, REACH_MAX_CONNECTION_PER_IP, REACH_MAX_CONNECTION; } @@ -68,13 +68,13 @@ public interface ConnectionController { } @Override - public Sate increaseConnection(SocketAddress remoteAddress) { + public State increaseConnection(SocketAddress remoteAddress) { if (!maxConnectionsLimitEnabled && !maxConnectionsLimitPerIpEnabled) { - return Sate.OK; + return State.OK; } if (!(remoteAddress instanceof InetSocketAddress) || !isLegalIpAddress(((InetSocketAddress) remoteAddress).getHostString())) { - return Sate.OK; + return State.OK; } lock.lock(); try { @@ -88,20 +88,20 @@ public interface ConnectionController { if (maxConnectionsLimitEnabled && totalConnectionNum > maxConnections) { log.info("Reject connect request from {}, because reached the maximum number of connections {}", remoteAddress, totalConnectionNum); - return Sate.REACH_MAX_CONNECTION; + return State.REACH_MAX_CONNECTION; } if (maxConnectionsLimitPerIpEnabled && CONNECTIONS.get(ip).getValue() > maxConnectionPerIp) { log.info("Reject connect request from {}, because reached the maximum number " + "of connections per Ip {}", remoteAddress, CONNECTIONS.get(ip).getValue()); - return Sate.REACH_MAX_CONNECTION_PER_IP; + return State.REACH_MAX_CONNECTION_PER_IP; } } catch (Exception e) { log.error("increase connection failed", e); } finally { lock.unlock(); } - return Sate.OK; + return State.OK; } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index a81652ec3c9..b0ff87f8d70 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -27,6 +27,7 @@ import static org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOption; @@ -258,13 +259,13 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); - ConnectionController.Sate sate = connectionController.increaseConnection(remoteAddress); - if (!sate.equals(ConnectionController.Sate.OK)) { - ctx.channel().writeAndFlush(Commands.newError(-1, ServerError.NotAllowedError, - sate.equals(ConnectionController.Sate.REACH_MAX_CONNECTION) - ? "Reached the maximum number of connections" - : "Reached the maximum number of connections on address" + remoteAddress)); - ctx.channel().close(); + ConnectionController.State state = connectionController.increaseConnection(remoteAddress); + if (!state.equals(ConnectionController.State.OK)) { + ctx.writeAndFlush(Commands.newError(-1, ServerError.NotAllowedError, + state.equals(ConnectionController.State.REACH_MAX_CONNECTION) + ? "Reached the maximum number of connections" + : "Reached the maximum number of connections on address" + remoteAddress)) + .addListener(ChannelFutureListener.CLOSE); return; } log.info("New connection from {}", remoteAddress);
