This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new a3e596550 [ISSUE #5180] Support domain resolution to obtain the 
nameserver address (#5189)
a3e596550 is described below

commit a3e596550d0bf0a3307becf38cb62e7d1646e1fd
Author: lizhimins <[email protected]>
AuthorDate: Mon Sep 26 19:15:29 2022 +0800

    [ISSUE #5180] Support domain resolution to obtain the nameserver address 
(#5189)
    
    * [ISSUE #5180] Support domain resolution to obtain the nameserver address
    
    * [ISSUE #5180] Support domain resolution to obtain the nameserver address
    
    Co-authored-by: 斜阳 <[email protected]>
---
 .../apache/rocketmq/broker/BrokerController.java   | 12 +++++++++--
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java | 24 ++++++++++++++++++++++
 .../apache/rocketmq/broker/BrokerOuterAPITest.java | 20 ++++++++++++++++++
 .../org/apache/rocketmq/common/BrokerConfig.java   | 13 ++++++++++++
 .../common/namesrv/DefaultTopAddressing.java       |  1 -
 .../apache/rocketmq/container/BrokerContainer.java | 13 ++++++++++--
 .../rocketmq/container/BrokerContainerConfig.java  | 11 ++++++++++
 7 files changed, 89 insertions(+), 5 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 6c79559c3..5676dbd0a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -681,14 +681,14 @@ public class BrokerController {
         }, 10, 5, TimeUnit.SECONDS);
 
         if (this.brokerConfig.getNamesrvAddr() != null) {
-            
this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
+            this.updateNamesrvAddr();
             LOG.info("Set user specified name server address: {}", 
this.brokerConfig.getNamesrvAddr());
             // also auto update namesrv if specify
             this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                 @Override
                 public void run() {
                     try {
-                        
BrokerController.this.brokerOuterAPI.updateNameServerAddressList(BrokerController.this.brokerConfig.getNamesrvAddr());
+                        BrokerController.this.updateNamesrvAddr();
                     } catch (Throwable e) {
                         LOG.error("Failed to update nameServer address list", 
e);
                     }
@@ -709,6 +709,14 @@ public class BrokerController {
         }
     }
 
+    private void updateNamesrvAddr() {
+        if (this.brokerConfig.isFetchNameSrvAddrByDnsLookup()) {
+            
this.brokerOuterAPI.updateNameServerAddressListByDnsLookup(this.brokerConfig.getNamesrvAddr());
+        } else {
+            
this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
+        }
+    }
+
     public boolean initialize() throws CloneNotSupportedException {
 
         boolean result = this.topicConfigManager.load();
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java 
b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 188440e04..a91accb92 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.broker.out;
 
 import java.io.UnsupportedEncodingException;
+import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -183,12 +184,35 @@ public class BrokerOuterAPI {
         return nameSrvAddr;
     }
 
+    private List<String> lookupNameServerAddress(String domain) {
+        List<String> addressList = new ArrayList<>();
+        try {
+            java.security.Security.setProperty("networkaddress.cache.ttl" , 
"10");
+            int index = domain.indexOf(":");
+            String portStr = domain.substring(index);
+            String domainStr = domain.substring(0, index);
+            InetAddress[] addresses = InetAddress.getAllByName(domainStr);
+            for (InetAddress address : addresses) {
+                addressList.add(address.getHostAddress() + portStr);
+            }
+            LOGGER.info("dns lookup address by domain success, domain={}, 
result={}", domain, addressList);
+        } catch (Exception e) {
+            LOGGER.error("dns lookup address by domain error, domain={}", 
domain, e);
+        }
+        return addressList;
+    }
+
     public void updateNameServerAddressList(final String addrs) {
         String[] addrArray = addrs.split(";");
         List<String> lst = new ArrayList<String>(Arrays.asList(addrArray));
         this.remotingClient.updateNameServerAddressList(lst);
     }
 
+    public void updateNameServerAddressListByDnsLookup(final String domain) {
+        List<String> lst = this.lookupNameServerAddress(domain);
+        this.remotingClient.updateNameServerAddressList(lst);
+    }
+
     public BrokerMemberGroup syncBrokerMemberGroup(String clusterName, String 
brokerName)
             throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException, RemotingConnectException {
         return syncBrokerMemberGroup(clusterName, brokerName, false);
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java 
b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
index 845e445bf..ffb1d9522 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
@@ -22,10 +22,12 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import io.netty.channel.ChannelHandlerContext;
 import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.rocketmq.broker.out.BrokerOuterAPI;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.BrokerIdentity;
@@ -45,6 +47,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
@@ -234,4 +237,21 @@ public class BrokerOuterAPITest {
         responseHeader.setChanged(changed);
         return response;
     }
+
+    @Test
+    public void testLookupAddressByDomain() throws Exception {
+        init();
+        brokerOuterAPI.start();
+        Class<BrokerOuterAPI> clazz = BrokerOuterAPI.class;
+        Method method = clazz.getDeclaredMethod("lookupNameServerAddress", 
String.class);
+        method.setAccessible(true);
+        List<String> addressList = (List<String>) 
method.invoke(brokerOuterAPI, "localhost:6789");
+        AtomicBoolean result = new AtomicBoolean(false);
+        addressList.forEach(s -> {
+            if (s.contains("127.0.0.1:6789")) {
+                result.set(true);
+            }
+        });
+        Assert.assertTrue(result.get());
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 4c7b53143..fc49428bb 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -94,8 +94,13 @@ public class BrokerConfig extends BrokerIdentity {
 
     @ImportantField
     private boolean rejectTransactionMessage = false;
+
+    @ImportantField
+    private boolean fetchNameSrvAddrByDnsLookup = false;
+
     @ImportantField
     private boolean fetchNamesrvAddrByAddressServer = false;
+
     private int sendThreadPoolQueueCapacity = 10000;
     private int putThreadPoolQueueCapacity = 10000;
     private int pullThreadPoolQueueCapacity = 100000;
@@ -1343,4 +1348,12 @@ public class BrokerConfig extends BrokerIdentity {
     public void setRecoverThreadPoolNums(int recoverThreadPoolNums) {
         this.recoverThreadPoolNums = recoverThreadPoolNums;
     }
+
+    public boolean isFetchNameSrvAddrByDnsLookup() {
+        return fetchNameSrvAddrByDnsLookup;
+    }
+
+    public void setFetchNameSrvAddrByDnsLookup(boolean 
fetchNameSrvAddrByDnsLookup) {
+        this.fetchNameSrvAddrByDnsLookup = fetchNameSrvAddrByDnsLookup;
+    }
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/namesrv/DefaultTopAddressing.java
 
b/common/src/main/java/org/apache/rocketmq/common/namesrv/DefaultTopAddressing.java
index 6e7a97d47..f7da77764 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/namesrv/DefaultTopAddressing.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/namesrv/DefaultTopAddressing.java
@@ -51,7 +51,6 @@ public class DefaultTopAddressing implements TopAddressing {
         this.topAddressingList = loadCustomTopAddressing();
     }
 
-
     public DefaultTopAddressing(final String unitName, final Map<String, 
String> para, final String wsAddr) {
         this.wsAddr = wsAddr;
         this.unitName = unitName;
diff --git 
a/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java 
b/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java
index 91e8afefa..92e5aa211 100644
--- a/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java
+++ b/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java
@@ -112,6 +112,7 @@ public class BrokerContainer implements IBrokerContainer {
         return nettyServerConfig;
     }
 
+    @Override
     public NettyClientConfig getNettyClientConfig() {
         return nettyClientConfig;
     }
@@ -130,6 +131,14 @@ public class BrokerContainer implements IBrokerContainer {
         return this.configuration;
     }
 
+    private void updateNamesrvAddr() {
+        if (this.brokerContainerConfig.isFetchNameSrvAddrByDnsLookup()) {
+            
this.brokerOuterAPI.updateNameServerAddressListByDnsLookup(this.brokerContainerConfig.getNamesrvAddr());
+        } else {
+            
this.brokerOuterAPI.updateNameServerAddressList(this.brokerContainerConfig.getNamesrvAddr());
+        }
+    }
+
     public boolean initialize() {
         this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, 
this.containerClientHouseKeepingService);
         this.fastRemotingServer = 
this.remotingServer.newRemotingServer(this.nettyServerConfig.getListenPort() - 
2);
@@ -145,14 +154,14 @@ public class BrokerContainer implements IBrokerContainer {
         this.registerProcessor();
 
         if (this.brokerContainerConfig.getNamesrvAddr() != null) {
-            
this.brokerOuterAPI.updateNameServerAddressList(this.brokerContainerConfig.getNamesrvAddr());
+            this.updateNamesrvAddr();
             LOG.info("Set user specified name server address: {}", 
this.brokerContainerConfig.getNamesrvAddr());
             // also auto update namesrv if specify
             this.scheduledExecutorService.scheduleAtFixedRate(new 
AbstractBrokerRunnable(BrokerIdentity.BROKER_CONTAINER_IDENTITY) {
                 @Override
                 public void run2() {
                     try {
-                        
BrokerContainer.this.brokerOuterAPI.updateNameServerAddressList(BrokerContainer.this.brokerContainerConfig.getNamesrvAddr());
+                        BrokerContainer.this.updateNamesrvAddr();
                     } catch (Throwable e) {
                         LOG.error("ScheduledTask fetchNameServerAddr 
exception", e);
                     }
diff --git 
a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerConfig.java
 
b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerConfig.java
index 564cd37ab..c8516280b 100644
--- 
a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerConfig.java
+++ 
b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerConfig.java
@@ -28,6 +28,9 @@ public class BrokerContainerConfig {
     @ImportantField
     private String namesrvAddr = 
System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, 
System.getenv(MixAll.NAMESRV_ADDR_ENV));
 
+    @ImportantField
+    private boolean fetchNameSrvAddrByDnsLookup = false;
+
     @ImportantField
     private boolean fetchNamesrvAddrByAddressServer = false;
 
@@ -52,6 +55,14 @@ public class BrokerContainerConfig {
         this.namesrvAddr = namesrvAddr;
     }
 
+    public boolean isFetchNameSrvAddrByDnsLookup() {
+        return fetchNameSrvAddrByDnsLookup;
+    }
+
+    public void setFetchNameSrvAddrByDnsLookup(boolean 
fetchNameSrvAddrByDnsLookup) {
+        this.fetchNameSrvAddrByDnsLookup = fetchNameSrvAddrByDnsLookup;
+    }
+
     public boolean isFetchNamesrvAddrByAddressServer() {
         return fetchNamesrvAddrByAddressServer;
     }

Reply via email to