This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 4fa29b94a [ISSUE #5013] allow broker to bind specified network
interface (#5014)
4fa29b94a is described below
commit 4fa29b94a9813add80d2b7323b9636a5615c181a
Author: SSpirits <[email protected]>
AuthorDate: Thu Sep 8 14:08:49 2022 +0800
[ISSUE #5013] allow broker to bind specified network interface (#5014)
* allow broker to bind specified network interface
* remove bind ip check
* Improve pull request
* Log the listening InetAddress on remoting server started
Co-authored-by: Zhanhui Li <[email protected]>
---
.../org/apache/rocketmq/broker/BrokerStartup.java | 11 +-
.../remoting/netty/NettyRemotingServer.java | 111 +++++++++++----------
.../rocketmq/remoting/netty/NettyServerConfig.java | 14 +++
3 files changed, 76 insertions(+), 60 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
index 87c651379..f000668a7 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
@@ -18,6 +18,11 @@ package org.apache.rocketmq.broker;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Option;
@@ -40,12 +45,6 @@ import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedInputStream;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
-
import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_ENABLE;
public class BrokerStartup {
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index bdcfdc95e..e9d5c0dc2 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -39,7 +39,6 @@ import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
-
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.cert.CertificateException;
@@ -53,7 +52,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
@@ -102,7 +100,8 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
this(nettyServerConfig, null);
}
- public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
final ChannelEventListener channelEventListener) {
+ public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
+ final ChannelEventListener channelEventListener) {
super(nettyServerConfig.getServerOnewaySemaphoreValue(),
nettyServerConfig.getServerAsyncSemaphoreValue());
this.serverBootstrap = new ServerBootstrap();
this.nettyServerConfig = nettyServerConfig;
@@ -195,60 +194,64 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
private boolean useEpoll() {
return RemotingUtil.isLinuxPlatform()
- && nettyServerConfig.isUseEpollNativeSelector()
- && Epoll.isAvailable();
+ && nettyServerConfig.isUseEpollNativeSelector()
+ && Epoll.isAvailable();
}
@Override
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
- nettyServerConfig.getServerWorkerThreads(),
- new ThreadFactory() {
+ nettyServerConfig.getServerWorkerThreads(),
+ new ThreadFactory() {
- private final AtomicInteger threadIndex = new
AtomicInteger(0);
+ private final AtomicInteger threadIndex = new AtomicInteger(0);
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "NettyServerCodecThread_" +
this.threadIndex.incrementAndGet());
- }
- });
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "NettyServerCodecThread_" +
this.threadIndex.incrementAndGet());
+ }
+ });
prepareSharableHandlers();
serverBootstrap.group(this.eventLoopGroupBoss,
this.eventLoopGroupSelector)
- .channel(useEpoll() ? EpollServerSocketChannel.class :
NioServerSocketChannel.class)
- .option(ChannelOption.SO_BACKLOG, 1024)
- .option(ChannelOption.SO_REUSEADDR, true)
- .childOption(ChannelOption.SO_KEEPALIVE, false)
- .childOption(ChannelOption.TCP_NODELAY, true)
- .localAddress(new
InetSocketAddress(this.nettyServerConfig.getListenPort()))
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- public void initChannel(SocketChannel ch) {
- ch.pipeline()
- .addLast(defaultEventExecutorGroup,
HANDSHAKE_HANDLER_NAME, handshakeHandler)
- .addLast(defaultEventExecutorGroup,
- encoder,
- new NettyDecoder(),
- new IdleStateHandler(0, 0,
nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
- connectionManageHandler,
- serverHandler
- );
- }
- });
+ .channel(useEpoll() ? EpollServerSocketChannel.class :
NioServerSocketChannel.class)
+ .option(ChannelOption.SO_BACKLOG, 1024)
+ .option(ChannelOption.SO_REUSEADDR, true)
+ .childOption(ChannelOption.SO_KEEPALIVE, false)
+ .childOption(ChannelOption.TCP_NODELAY, true)
+ .localAddress(new
InetSocketAddress(this.nettyServerConfig.getBindAddress(),
+ this.nettyServerConfig.getListenPort()))
+ .childHandler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ public void initChannel(SocketChannel ch) {
+ ch.pipeline()
+ .addLast(defaultEventExecutorGroup,
HANDSHAKE_HANDLER_NAME, handshakeHandler)
+ .addLast(defaultEventExecutorGroup,
+ encoder,
+ new NettyDecoder(),
+ new IdleStateHandler(0, 0,
+
nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
+ connectionManageHandler,
+ serverHandler
+ );
+ }
+ });
addCustomConfig(serverBootstrap);
try {
- ChannelFuture sync =
serverBootstrap.bind(nettyServerConfig.getListenPort()).sync();
+ ChannelFuture sync = serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress)
sync.channel().localAddress();
if (0 == nettyServerConfig.getListenPort()) {
this.nettyServerConfig.setListenPort(addr.getPort());
- log.debug("Server is listening {}",
this.nettyServerConfig.getListenPort());
}
+ log.info("RemotingServer started, listening {}:{}",
this.nettyServerConfig.getBindAddress(),
+ this.nettyServerConfig.getListenPort());
this.remotingServerTable.put(this.nettyServerConfig.getListenPort(), this);
- } catch (InterruptedException e1) {
- throw new RuntimeException("this.serverBootstrap.bind().sync()
InterruptedException", e1);
+ } catch (Exception e) {
+ throw new IllegalStateException(String.format("Failed to bind to
%s:%d", nettyServerConfig.getBindAddress(),
+ nettyServerConfig.getListenPort()), e);
}
if (this.channelEventListener != null) {
@@ -279,9 +282,9 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
}
if (nettyServerConfig.getWriteBufferLowWaterMark() > 0 &&
nettyServerConfig.getWriteBufferHighWaterMark() > 0) {
log.info("server set netty WRITE_BUFFER_WATER_MARK to {},{}",
- nettyServerConfig.getWriteBufferLowWaterMark(),
nettyServerConfig.getWriteBufferHighWaterMark());
+ nettyServerConfig.getWriteBufferLowWaterMark(),
nettyServerConfig.getWriteBufferHighWaterMark());
childHandler.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
new WriteBufferWaterMark(
- nettyServerConfig.getWriteBufferLowWaterMark(),
nettyServerConfig.getWriteBufferHighWaterMark()));
+ nettyServerConfig.getWriteBufferLowWaterMark(),
nettyServerConfig.getWriteBufferHighWaterMark()));
}
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
@@ -350,8 +353,8 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
@Override
public RemotingServer newRemotingServer(final int port) {
SubRemotingServer remotingServer = new SubRemotingServer(port,
- this.nettyServerConfig.getServerOnewaySemaphoreValue(),
- this.nettyServerConfig.getServerAsyncSemaphoreValue());
+ this.nettyServerConfig.getServerOnewaySemaphoreValue(),
+ this.nettyServerConfig.getServerAsyncSemaphoreValue());
NettyRemotingAbstract existingServer =
this.remotingServerTable.putIfAbsent(port, remotingServer);
if (existingServer != null) {
throw new RuntimeException("The port " + port + " already in use
by another RemotingServer");
@@ -366,19 +369,19 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
@Override
public RemotingCommand invokeSync(final Channel channel, final
RemotingCommand request, final long timeoutMillis)
- throws InterruptedException, RemotingSendRequestException,
RemotingTimeoutException {
+ throws InterruptedException, RemotingSendRequestException,
RemotingTimeoutException {
return this.invokeSyncImpl(channel, request, timeoutMillis);
}
@Override
public void invokeAsync(Channel channel, RemotingCommand request, long
timeoutMillis, InvokeCallback invokeCallback)
- throws InterruptedException, RemotingTooMuchRequestException,
RemotingTimeoutException, RemotingSendRequestException {
+ throws InterruptedException, RemotingTooMuchRequestException,
RemotingTimeoutException, RemotingSendRequestException {
this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);
}
@Override
public void invokeOneway(Channel channel, RemotingCommand request, long
timeoutMillis) throws InterruptedException,
- RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException {
+ RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException {
this.invokeOnewayImpl(channel, request, timeoutMillis);
}
@@ -429,8 +432,8 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
case ENFORCING:
if (null != sslContext) {
ctx.pipeline()
- .addAfter(defaultEventExecutorGroup,
HANDSHAKE_HANDLER_NAME, TLS_HANDLER_NAME,
sslContext.newHandler(ctx.channel().alloc()))
- .addAfter(defaultEventExecutorGroup,
TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder());
+ .addAfter(defaultEventExecutorGroup,
HANDSHAKE_HANDLER_NAME, TLS_HANDLER_NAME,
sslContext.newHandler(ctx.channel().alloc()))
+ .addAfter(defaultEventExecutorGroup,
TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder());
log.info("Handlers prepended to channel pipeline
to establish SSL connection");
} else {
ctx.close();
@@ -466,7 +469,7 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
class NettyServerHandler extends
SimpleChannelInboundHandler<RemotingCommand> {
@Override
- protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand
msg) throws Exception {
+ protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand
msg) {
int localPort =
RemotingHelper.parseSocketAddressPort(ctx.channel().localAddress());
NettyRemotingAbstract remotingAbstract =
NettyRemotingServer.this.remotingServerTable.get(localPort);
if (localPort != -1 && remotingAbstract != null) {
@@ -526,7 +529,7 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
RemotingUtil.closeChannel(ctx.channel());
if (NettyRemotingServer.this.channelEventListener != null)
{
NettyRemotingServer.this
- .putNettyEvent(new
NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
+ .putNettyEvent(new NettyEvent(NettyEventType.IDLE,
remoteAddress, ctx.channel()));
}
}
}
@@ -564,7 +567,7 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
@Override
public void registerProcessor(final int requestCode, final
NettyRequestProcessor processor,
- final ExecutorService executor) {
+ final ExecutorService executor) {
ExecutorService executorThis = executor;
if (null == executor) {
executorThis = NettyRemotingServer.this.publicExecutor;
@@ -597,30 +600,30 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
@Override
public RemotingServer newRemotingServer(final int port) {
throw new UnsupportedOperationException("The SubRemotingServer of
NettyRemotingServer " +
- "doesn't support new nested RemotingServer");
+ "doesn't support new nested RemotingServer");
}
@Override
public void removeRemotingServer(final int port) {
throw new UnsupportedOperationException("The SubRemotingServer of
NettyRemotingServer " +
- "doesn't support remove nested RemotingServer");
+ "doesn't support remove nested RemotingServer");
}
@Override
public RemotingCommand invokeSync(final Channel channel, final
RemotingCommand request,
- final long timeoutMillis) throws
InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
+ final long timeoutMillis) throws InterruptedException,
RemotingSendRequestException, RemotingTimeoutException {
return this.invokeSyncImpl(channel, request, timeoutMillis);
}
@Override
public void invokeAsync(final Channel channel, final RemotingCommand
request, final long timeoutMillis,
- final InvokeCallback invokeCallback) throws
InterruptedException, RemotingTooMuchRequestException,
RemotingTimeoutException, RemotingSendRequestException {
+ final InvokeCallback invokeCallback) throws InterruptedException,
RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException {
this.invokeAsyncImpl(channel, request, timeoutMillis,
invokeCallback);
}
@Override
public void invokeOneway(final Channel channel, final RemotingCommand
request,
- final long timeoutMillis) throws
InterruptedException, RemotingTooMuchRequestException,
RemotingTimeoutException, RemotingSendRequestException {
+ final long timeoutMillis) throws InterruptedException,
RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException {
this.invokeOnewayImpl(channel, request, timeoutMillis);
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
index fd1fe9d09..59ef2c84f 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
@@ -17,6 +17,12 @@
package org.apache.rocketmq.remoting.netty;
public class NettyServerConfig implements Cloneable {
+
+ /**
+ * Bind address may be hostname, IPv4 or IPv6.
+ * By default, it's wildcard address, listening all network interfaces.
+ */
+ private String bindAddress = "0.0.0.0";
private int listenPort = 0;
private int serverWorkerThreads = 8;
private int serverCallbackExecutorThreads = 0;
@@ -41,6 +47,14 @@ public class NettyServerConfig implements Cloneable {
*/
private boolean useEpollNativeSelector = false;
+ public String getBindAddress() {
+ return bindAddress;
+ }
+
+ public void setBindAddress(String bindAddress) {
+ this.bindAddress = bindAddress;
+ }
+
public int getListenPort() {
return listenPort;
}