This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 4fa29b94a [ISSUE #5013] allow broker to bind specified network 
interface (#5014)
4fa29b94a is described below

commit 4fa29b94a9813add80d2b7323b9636a5615c181a
Author: SSpirits <[email protected]>
AuthorDate: Thu Sep 8 14:08:49 2022 +0800

    [ISSUE #5013] allow broker to bind specified network interface (#5014)
    
    * allow broker to bind specified network interface
    
    * remove bind ip check
    
    * Improve pull request
    
    * Log the listening InetAddress on remoting server started
    
    Co-authored-by: Zhanhui Li <[email protected]>
---
 .../org/apache/rocketmq/broker/BrokerStartup.java  |  11 +-
 .../remoting/netty/NettyRemotingServer.java        | 111 +++++++++++----------
 .../rocketmq/remoting/netty/NettyServerConfig.java |  14 +++
 3 files changed, 76 insertions(+), 60 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
index 87c651379..f000668a7 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
@@ -18,6 +18,11 @@ package org.apache.rocketmq.broker;
 
 import ch.qos.logback.classic.LoggerContext;
 import ch.qos.logback.classic.joran.JoranConfigurator;
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.DefaultParser;
 import org.apache.commons.cli.Option;
@@ -40,12 +45,6 @@ import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedInputStream;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_ENABLE;
 
 public class BrokerStartup {
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index bdcfdc95e..e9d5c0dc2 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -39,7 +39,6 @@ import io.netty.handler.timeout.IdleState;
 import io.netty.handler.timeout.IdleStateEvent;
 import io.netty.handler.timeout.IdleStateHandler;
 import io.netty.util.concurrent.DefaultEventExecutorGroup;
-
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.cert.CertificateException;
@@ -53,7 +52,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.ChannelEventListener;
@@ -102,7 +100,8 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
         this(nettyServerConfig, null);
     }
 
-    public NettyRemotingServer(final NettyServerConfig nettyServerConfig, 
final ChannelEventListener channelEventListener) {
+    public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
+        final ChannelEventListener channelEventListener) {
         super(nettyServerConfig.getServerOnewaySemaphoreValue(), 
nettyServerConfig.getServerAsyncSemaphoreValue());
         this.serverBootstrap = new ServerBootstrap();
         this.nettyServerConfig = nettyServerConfig;
@@ -195,60 +194,64 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
 
     private boolean useEpoll() {
         return RemotingUtil.isLinuxPlatform()
-                && nettyServerConfig.isUseEpollNativeSelector()
-                && Epoll.isAvailable();
+            && nettyServerConfig.isUseEpollNativeSelector()
+            && Epoll.isAvailable();
     }
 
     @Override
     public void start() {
         this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
-                nettyServerConfig.getServerWorkerThreads(),
-                new ThreadFactory() {
+            nettyServerConfig.getServerWorkerThreads(),
+            new ThreadFactory() {
 
-                    private final AtomicInteger threadIndex = new 
AtomicInteger(0);
+                private final AtomicInteger threadIndex = new AtomicInteger(0);
 
-                    @Override
-                    public Thread newThread(Runnable r) {
-                        return new Thread(r, "NettyServerCodecThread_" + 
this.threadIndex.incrementAndGet());
-                    }
-                });
+                @Override
+                public Thread newThread(Runnable r) {
+                    return new Thread(r, "NettyServerCodecThread_" + 
this.threadIndex.incrementAndGet());
+                }
+            });
 
         prepareSharableHandlers();
 
         serverBootstrap.group(this.eventLoopGroupBoss, 
this.eventLoopGroupSelector)
-                .channel(useEpoll() ? EpollServerSocketChannel.class : 
NioServerSocketChannel.class)
-                .option(ChannelOption.SO_BACKLOG, 1024)
-                .option(ChannelOption.SO_REUSEADDR, true)
-                .childOption(ChannelOption.SO_KEEPALIVE, false)
-                .childOption(ChannelOption.TCP_NODELAY, true)
-                .localAddress(new 
InetSocketAddress(this.nettyServerConfig.getListenPort()))
-                .childHandler(new ChannelInitializer<SocketChannel>() {
-                    @Override
-                    public void initChannel(SocketChannel ch) {
-                        ch.pipeline()
-                                .addLast(defaultEventExecutorGroup, 
HANDSHAKE_HANDLER_NAME, handshakeHandler)
-                                .addLast(defaultEventExecutorGroup,
-                                        encoder,
-                                        new NettyDecoder(),
-                                        new IdleStateHandler(0, 0, 
nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
-                                        connectionManageHandler,
-                                        serverHandler
-                                );
-                    }
-                });
+            .channel(useEpoll() ? EpollServerSocketChannel.class : 
NioServerSocketChannel.class)
+            .option(ChannelOption.SO_BACKLOG, 1024)
+            .option(ChannelOption.SO_REUSEADDR, true)
+            .childOption(ChannelOption.SO_KEEPALIVE, false)
+            .childOption(ChannelOption.TCP_NODELAY, true)
+            .localAddress(new 
InetSocketAddress(this.nettyServerConfig.getBindAddress(),
+                this.nettyServerConfig.getListenPort()))
+            .childHandler(new ChannelInitializer<SocketChannel>() {
+                @Override
+                public void initChannel(SocketChannel ch) {
+                    ch.pipeline()
+                        .addLast(defaultEventExecutorGroup, 
HANDSHAKE_HANDLER_NAME, handshakeHandler)
+                        .addLast(defaultEventExecutorGroup,
+                            encoder,
+                            new NettyDecoder(),
+                            new IdleStateHandler(0, 0,
+                                
nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
+                            connectionManageHandler,
+                            serverHandler
+                        );
+                }
+            });
 
         addCustomConfig(serverBootstrap);
 
         try {
-            ChannelFuture sync = 
serverBootstrap.bind(nettyServerConfig.getListenPort()).sync();
+            ChannelFuture sync = serverBootstrap.bind().sync();
             InetSocketAddress addr = (InetSocketAddress) 
sync.channel().localAddress();
             if (0 == nettyServerConfig.getListenPort()) {
                 this.nettyServerConfig.setListenPort(addr.getPort());
-                log.debug("Server is listening {}", 
this.nettyServerConfig.getListenPort());
             }
+            log.info("RemotingServer started, listening {}:{}", 
this.nettyServerConfig.getBindAddress(),
+                this.nettyServerConfig.getListenPort());
             
this.remotingServerTable.put(this.nettyServerConfig.getListenPort(), this);
-        } catch (InterruptedException e1) {
-            throw new RuntimeException("this.serverBootstrap.bind().sync() 
InterruptedException", e1);
+        } catch (Exception e) {
+            throw new IllegalStateException(String.format("Failed to bind to 
%s:%d", nettyServerConfig.getBindAddress(),
+                nettyServerConfig.getListenPort()), e);
         }
 
         if (this.channelEventListener != null) {
@@ -279,9 +282,9 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
         }
         if (nettyServerConfig.getWriteBufferLowWaterMark() > 0 && 
nettyServerConfig.getWriteBufferHighWaterMark() > 0) {
             log.info("server set netty WRITE_BUFFER_WATER_MARK to {},{}",
-                    nettyServerConfig.getWriteBufferLowWaterMark(), 
nettyServerConfig.getWriteBufferHighWaterMark());
+                nettyServerConfig.getWriteBufferLowWaterMark(), 
nettyServerConfig.getWriteBufferHighWaterMark());
             childHandler.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, 
new WriteBufferWaterMark(
-                    nettyServerConfig.getWriteBufferLowWaterMark(), 
nettyServerConfig.getWriteBufferHighWaterMark()));
+                nettyServerConfig.getWriteBufferLowWaterMark(), 
nettyServerConfig.getWriteBufferHighWaterMark()));
         }
 
         if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
@@ -350,8 +353,8 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
     @Override
     public RemotingServer newRemotingServer(final int port) {
         SubRemotingServer remotingServer = new SubRemotingServer(port,
-                this.nettyServerConfig.getServerOnewaySemaphoreValue(),
-                this.nettyServerConfig.getServerAsyncSemaphoreValue());
+            this.nettyServerConfig.getServerOnewaySemaphoreValue(),
+            this.nettyServerConfig.getServerAsyncSemaphoreValue());
         NettyRemotingAbstract existingServer = 
this.remotingServerTable.putIfAbsent(port, remotingServer);
         if (existingServer != null) {
             throw new RuntimeException("The port " + port + " already in use 
by another RemotingServer");
@@ -366,19 +369,19 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
 
     @Override
     public RemotingCommand invokeSync(final Channel channel, final 
RemotingCommand request, final long timeoutMillis)
-            throws InterruptedException, RemotingSendRequestException, 
RemotingTimeoutException {
+        throws InterruptedException, RemotingSendRequestException, 
RemotingTimeoutException {
         return this.invokeSyncImpl(channel, request, timeoutMillis);
     }
 
     @Override
     public void invokeAsync(Channel channel, RemotingCommand request, long 
timeoutMillis, InvokeCallback invokeCallback)
-            throws InterruptedException, RemotingTooMuchRequestException, 
RemotingTimeoutException, RemotingSendRequestException {
+        throws InterruptedException, RemotingTooMuchRequestException, 
RemotingTimeoutException, RemotingSendRequestException {
         this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);
     }
 
     @Override
     public void invokeOneway(Channel channel, RemotingCommand request, long 
timeoutMillis) throws InterruptedException,
-            RemotingTooMuchRequestException, RemotingTimeoutException, 
RemotingSendRequestException {
+        RemotingTooMuchRequestException, RemotingTimeoutException, 
RemotingSendRequestException {
         this.invokeOnewayImpl(channel, request, timeoutMillis);
     }
 
@@ -429,8 +432,8 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
                     case ENFORCING:
                         if (null != sslContext) {
                             ctx.pipeline()
-                                    .addAfter(defaultEventExecutorGroup, 
HANDSHAKE_HANDLER_NAME, TLS_HANDLER_NAME, 
sslContext.newHandler(ctx.channel().alloc()))
-                                    .addAfter(defaultEventExecutorGroup, 
TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder());
+                                .addAfter(defaultEventExecutorGroup, 
HANDSHAKE_HANDLER_NAME, TLS_HANDLER_NAME, 
sslContext.newHandler(ctx.channel().alloc()))
+                                .addAfter(defaultEventExecutorGroup, 
TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder());
                             log.info("Handlers prepended to channel pipeline 
to establish SSL connection");
                         } else {
                             ctx.close();
@@ -466,7 +469,7 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
     class NettyServerHandler extends 
SimpleChannelInboundHandler<RemotingCommand> {
 
         @Override
-        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand 
msg) throws Exception {
+        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand 
msg) {
             int localPort = 
RemotingHelper.parseSocketAddressPort(ctx.channel().localAddress());
             NettyRemotingAbstract remotingAbstract = 
NettyRemotingServer.this.remotingServerTable.get(localPort);
             if (localPort != -1 && remotingAbstract != null) {
@@ -526,7 +529,7 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
                     RemotingUtil.closeChannel(ctx.channel());
                     if (NettyRemotingServer.this.channelEventListener != null) 
{
                         NettyRemotingServer.this
-                                .putNettyEvent(new 
NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
+                            .putNettyEvent(new NettyEvent(NettyEventType.IDLE, 
remoteAddress, ctx.channel()));
                     }
                 }
             }
@@ -564,7 +567,7 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
 
         @Override
         public void registerProcessor(final int requestCode, final 
NettyRequestProcessor processor,
-                                      final ExecutorService executor) {
+            final ExecutorService executor) {
             ExecutorService executorThis = executor;
             if (null == executor) {
                 executorThis = NettyRemotingServer.this.publicExecutor;
@@ -597,30 +600,30 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
         @Override
         public RemotingServer newRemotingServer(final int port) {
             throw new UnsupportedOperationException("The SubRemotingServer of 
NettyRemotingServer " +
-                    "doesn't support new nested RemotingServer");
+                "doesn't support new nested RemotingServer");
         }
 
         @Override
         public void removeRemotingServer(final int port) {
             throw new UnsupportedOperationException("The SubRemotingServer of 
NettyRemotingServer " +
-                    "doesn't support remove nested RemotingServer");
+                "doesn't support remove nested RemotingServer");
         }
 
         @Override
         public RemotingCommand invokeSync(final Channel channel, final 
RemotingCommand request,
-                                          final long timeoutMillis) throws 
InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
+            final long timeoutMillis) throws InterruptedException, 
RemotingSendRequestException, RemotingTimeoutException {
             return this.invokeSyncImpl(channel, request, timeoutMillis);
         }
 
         @Override
         public void invokeAsync(final Channel channel, final RemotingCommand 
request, final long timeoutMillis,
-                                final InvokeCallback invokeCallback) throws 
InterruptedException, RemotingTooMuchRequestException, 
RemotingTimeoutException, RemotingSendRequestException {
+            final InvokeCallback invokeCallback) throws InterruptedException, 
RemotingTooMuchRequestException, RemotingTimeoutException, 
RemotingSendRequestException {
             this.invokeAsyncImpl(channel, request, timeoutMillis, 
invokeCallback);
         }
 
         @Override
         public void invokeOneway(final Channel channel, final RemotingCommand 
request,
-                                 final long timeoutMillis) throws 
InterruptedException, RemotingTooMuchRequestException, 
RemotingTimeoutException, RemotingSendRequestException {
+            final long timeoutMillis) throws InterruptedException, 
RemotingTooMuchRequestException, RemotingTimeoutException, 
RemotingSendRequestException {
             this.invokeOnewayImpl(channel, request, timeoutMillis);
         }
 
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
index fd1fe9d09..59ef2c84f 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
@@ -17,6 +17,12 @@
 package org.apache.rocketmq.remoting.netty;
 
 public class NettyServerConfig implements Cloneable {
+
+    /**
+     * Bind address may be hostname, IPv4 or IPv6.
+     * By default, it's wildcard address, listening all network interfaces.
+     */
+    private String bindAddress = "0.0.0.0";
     private int listenPort = 0;
     private int serverWorkerThreads = 8;
     private int serverCallbackExecutorThreads = 0;
@@ -41,6 +47,14 @@ public class NettyServerConfig implements Cloneable {
      */
     private boolean useEpollNativeSelector = false;
 
+    public String getBindAddress() {
+        return bindAddress;
+    }
+
+    public void setBindAddress(String bindAddress) {
+        this.bindAddress = bindAddress;
+    }
+
     public int getListenPort() {
         return listenPort;
     }

Reply via email to