This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new ab62742aee5 [Broker] Fix typo in enum name and handle closing of the
channel properly since writeAndFlush is asynchronous (#15384)
ab62742aee5 is described below
commit ab62742aee564c31a670bde6952b2d5d085f7240
Author: Lari Hotari <[email protected]>
AuthorDate: Sat Apr 30 04:57:42 2022 +0300
[Broker] Fix typo in enum name and handle closing of the channel properly
since writeAndFlush is asynchronous (#15384)
(cherry picked from commit cd3816aa351ba8a1f0e9876eefe019b7f0d282d8)
---
.../pulsar/broker/service/ConnectionController.java | 16 ++++++++--------
.../java/org/apache/pulsar/broker/service/ServerCnx.java | 15 ++++++++-------
2 files changed, 16 insertions(+), 15 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConnectionController.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConnectionController.java
index 51540e179be..65c3a6c4f2a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConnectionController.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConnectionController.java
@@ -36,7 +36,7 @@ public interface ConnectionController {
* @param remoteAddress
* @return
*/
- Sate increaseConnection(SocketAddress remoteAddress);
+ State increaseConnection(SocketAddress remoteAddress);
/**
* Decrease the number of connections counter.
@@ -44,7 +44,7 @@ public interface ConnectionController {
*/
void decreaseConnection(SocketAddress remoteAddress);
- enum Sate {
+ enum State {
OK, REACH_MAX_CONNECTION_PER_IP, REACH_MAX_CONNECTION;
}
@@ -68,13 +68,13 @@ public interface ConnectionController {
}
@Override
- public Sate increaseConnection(SocketAddress remoteAddress) {
+ public State increaseConnection(SocketAddress remoteAddress) {
if (!maxConnectionsLimitEnabled &&
!maxConnectionsLimitPerIpEnabled) {
- return Sate.OK;
+ return State.OK;
}
if (!(remoteAddress instanceof InetSocketAddress)
|| !isLegalIpAddress(((InetSocketAddress)
remoteAddress).getHostString())) {
- return Sate.OK;
+ return State.OK;
}
lock.lock();
try {
@@ -88,20 +88,20 @@ public interface ConnectionController {
if (maxConnectionsLimitEnabled && totalConnectionNum >
maxConnections) {
log.info("Reject connect request from {}, because reached
the maximum number of connections {}",
remoteAddress, totalConnectionNum);
- return Sate.REACH_MAX_CONNECTION;
+ return State.REACH_MAX_CONNECTION;
}
if (maxConnectionsLimitPerIpEnabled &&
CONNECTIONS.get(ip).getValue() > maxConnectionPerIp) {
log.info("Reject connect request from {}, because reached
the maximum number "
+ "of connections per Ip {}",
remoteAddress, CONNECTIONS.get(ip).getValue());
- return Sate.REACH_MAX_CONNECTION_PER_IP;
+ return State.REACH_MAX_CONNECTION_PER_IP;
}
} catch (Exception e) {
log.error("increase connection failed", e);
} finally {
lock.unlock();
}
- return Sate.OK;
+ return State.OK;
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index d8a51f60d65..03a6a80aed3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -27,6 +27,7 @@ import static
org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
@@ -270,13 +271,13 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
- ConnectionController.Sate sate =
connectionController.increaseConnection(remoteAddress);
- if (!sate.equals(ConnectionController.Sate.OK)) {
- ctx.channel().writeAndFlush(Commands.newError(-1,
ServerError.NotAllowedError,
- sate.equals(ConnectionController.Sate.REACH_MAX_CONNECTION)
- ? "Reached the maximum number of connections"
- : "Reached the maximum number of connections on
address" + remoteAddress));
- ctx.channel().close();
+ ConnectionController.State state =
connectionController.increaseConnection(remoteAddress);
+ if (!state.equals(ConnectionController.State.OK)) {
+ ctx.writeAndFlush(Commands.newError(-1,
ServerError.NotAllowedError,
+
state.equals(ConnectionController.State.REACH_MAX_CONNECTION)
+ ? "Reached the maximum number of
connections"
+ : "Reached the maximum number of
connections on address" + remoteAddress))
+ .addListener(ChannelFutureListener.CLOSE);
return;
}
log.info("New connection from {}", remoteAddress);