This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new a3e596550 [ISSUE #5180] Support domain resolution to obtain the
nameserver address (#5189)
a3e596550 is described below
commit a3e596550d0bf0a3307becf38cb62e7d1646e1fd
Author: lizhimins <[email protected]>
AuthorDate: Mon Sep 26 19:15:29 2022 +0800
[ISSUE #5180] Support domain resolution to obtain the nameserver address
(#5189)
* [ISSUE #5180] Support domain resolution to obtain the nameserver address
* [ISSUE #5180] Support domain resolution to obtain the nameserver address
Co-authored-by: 斜阳 <[email protected]>
---
.../apache/rocketmq/broker/BrokerController.java | 12 +++++++++--
.../apache/rocketmq/broker/out/BrokerOuterAPI.java | 24 ++++++++++++++++++++++
.../apache/rocketmq/broker/BrokerOuterAPITest.java | 20 ++++++++++++++++++
.../org/apache/rocketmq/common/BrokerConfig.java | 13 ++++++++++++
.../common/namesrv/DefaultTopAddressing.java | 1 -
.../apache/rocketmq/container/BrokerContainer.java | 13 ++++++++++--
.../rocketmq/container/BrokerContainerConfig.java | 11 ++++++++++
7 files changed, 89 insertions(+), 5 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 6c79559c3..5676dbd0a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -681,14 +681,14 @@ public class BrokerController {
}, 10, 5, TimeUnit.SECONDS);
if (this.brokerConfig.getNamesrvAddr() != null) {
-
this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
+ this.updateNamesrvAddr();
LOG.info("Set user specified name server address: {}",
this.brokerConfig.getNamesrvAddr());
// also auto update namesrv if specify
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
-
BrokerController.this.brokerOuterAPI.updateNameServerAddressList(BrokerController.this.brokerConfig.getNamesrvAddr());
+ BrokerController.this.updateNamesrvAddr();
} catch (Throwable e) {
LOG.error("Failed to update nameServer address list",
e);
}
@@ -709,6 +709,14 @@ public class BrokerController {
}
}
+ private void updateNamesrvAddr() {
+ if (this.brokerConfig.isFetchNameSrvAddrByDnsLookup()) {
+
this.brokerOuterAPI.updateNameServerAddressListByDnsLookup(this.brokerConfig.getNamesrvAddr());
+ } else {
+
this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
+ }
+ }
+
public boolean initialize() throws CloneNotSupportedException {
boolean result = this.topicConfigManager.load();
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 188440e04..a91accb92 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.broker.out;
import java.io.UnsupportedEncodingException;
+import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@@ -183,12 +184,35 @@ public class BrokerOuterAPI {
return nameSrvAddr;
}
+ private List<String> lookupNameServerAddress(String domain) {
+ List<String> addressList = new ArrayList<>();
+ try {
+ java.security.Security.setProperty("networkaddress.cache.ttl" ,
"10");
+ int index = domain.indexOf(":");
+ String portStr = domain.substring(index);
+ String domainStr = domain.substring(0, index);
+ InetAddress[] addresses = InetAddress.getAllByName(domainStr);
+ for (InetAddress address : addresses) {
+ addressList.add(address.getHostAddress() + portStr);
+ }
+ LOGGER.info("dns lookup address by domain success, domain={},
result={}", domain, addressList);
+ } catch (Exception e) {
+ LOGGER.error("dns lookup address by domain error, domain={}",
domain, e);
+ }
+ return addressList;
+ }
+
public void updateNameServerAddressList(final String addrs) {
String[] addrArray = addrs.split(";");
List<String> lst = new ArrayList<String>(Arrays.asList(addrArray));
this.remotingClient.updateNameServerAddressList(lst);
}
+ public void updateNameServerAddressListByDnsLookup(final String domain) {
+ List<String> lst = this.lookupNameServerAddress(domain);
+ this.remotingClient.updateNameServerAddressList(lst);
+ }
+
public BrokerMemberGroup syncBrokerMemberGroup(String clusterName, String
brokerName)
throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException {
return syncBrokerMemberGroup(clusterName, brokerName, false);
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
index 845e445bf..ffb1d9522 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
@@ -22,10 +22,12 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import io.netty.channel.ChannelHandlerContext;
import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.BrokerIdentity;
@@ -45,6 +47,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
@@ -234,4 +237,21 @@ public class BrokerOuterAPITest {
responseHeader.setChanged(changed);
return response;
}
+
+ @Test
+ public void testLookupAddressByDomain() throws Exception {
+ init();
+ brokerOuterAPI.start();
+ Class<BrokerOuterAPI> clazz = BrokerOuterAPI.class;
+ Method method = clazz.getDeclaredMethod("lookupNameServerAddress",
String.class);
+ method.setAccessible(true);
+ List<String> addressList = (List<String>)
method.invoke(brokerOuterAPI, "localhost:6789");
+ AtomicBoolean result = new AtomicBoolean(false);
+ addressList.forEach(s -> {
+ if (s.contains("127.0.0.1:6789")) {
+ result.set(true);
+ }
+ });
+ Assert.assertTrue(result.get());
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 4c7b53143..fc49428bb 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -94,8 +94,13 @@ public class BrokerConfig extends BrokerIdentity {
@ImportantField
private boolean rejectTransactionMessage = false;
+
+ @ImportantField
+ private boolean fetchNameSrvAddrByDnsLookup = false;
+
@ImportantField
private boolean fetchNamesrvAddrByAddressServer = false;
+
private int sendThreadPoolQueueCapacity = 10000;
private int putThreadPoolQueueCapacity = 10000;
private int pullThreadPoolQueueCapacity = 100000;
@@ -1343,4 +1348,12 @@ public class BrokerConfig extends BrokerIdentity {
public void setRecoverThreadPoolNums(int recoverThreadPoolNums) {
this.recoverThreadPoolNums = recoverThreadPoolNums;
}
+
+ public boolean isFetchNameSrvAddrByDnsLookup() {
+ return fetchNameSrvAddrByDnsLookup;
+ }
+
+ public void setFetchNameSrvAddrByDnsLookup(boolean
fetchNameSrvAddrByDnsLookup) {
+ this.fetchNameSrvAddrByDnsLookup = fetchNameSrvAddrByDnsLookup;
+ }
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/namesrv/DefaultTopAddressing.java
b/common/src/main/java/org/apache/rocketmq/common/namesrv/DefaultTopAddressing.java
index 6e7a97d47..f7da77764 100644
---
a/common/src/main/java/org/apache/rocketmq/common/namesrv/DefaultTopAddressing.java
+++
b/common/src/main/java/org/apache/rocketmq/common/namesrv/DefaultTopAddressing.java
@@ -51,7 +51,6 @@ public class DefaultTopAddressing implements TopAddressing {
this.topAddressingList = loadCustomTopAddressing();
}
-
public DefaultTopAddressing(final String unitName, final Map<String,
String> para, final String wsAddr) {
this.wsAddr = wsAddr;
this.unitName = unitName;
diff --git
a/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java
b/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java
index 91e8afefa..92e5aa211 100644
--- a/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java
+++ b/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java
@@ -112,6 +112,7 @@ public class BrokerContainer implements IBrokerContainer {
return nettyServerConfig;
}
+ @Override
public NettyClientConfig getNettyClientConfig() {
return nettyClientConfig;
}
@@ -130,6 +131,14 @@ public class BrokerContainer implements IBrokerContainer {
return this.configuration;
}
+ private void updateNamesrvAddr() {
+ if (this.brokerContainerConfig.isFetchNameSrvAddrByDnsLookup()) {
+
this.brokerOuterAPI.updateNameServerAddressListByDnsLookup(this.brokerContainerConfig.getNamesrvAddr());
+ } else {
+
this.brokerOuterAPI.updateNameServerAddressList(this.brokerContainerConfig.getNamesrvAddr());
+ }
+ }
+
public boolean initialize() {
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig,
this.containerClientHouseKeepingService);
this.fastRemotingServer =
this.remotingServer.newRemotingServer(this.nettyServerConfig.getListenPort() -
2);
@@ -145,14 +154,14 @@ public class BrokerContainer implements IBrokerContainer {
this.registerProcessor();
if (this.brokerContainerConfig.getNamesrvAddr() != null) {
-
this.brokerOuterAPI.updateNameServerAddressList(this.brokerContainerConfig.getNamesrvAddr());
+ this.updateNamesrvAddr();
LOG.info("Set user specified name server address: {}",
this.brokerContainerConfig.getNamesrvAddr());
// also auto update namesrv if specify
this.scheduledExecutorService.scheduleAtFixedRate(new
AbstractBrokerRunnable(BrokerIdentity.BROKER_CONTAINER_IDENTITY) {
@Override
public void run2() {
try {
-
BrokerContainer.this.brokerOuterAPI.updateNameServerAddressList(BrokerContainer.this.brokerContainerConfig.getNamesrvAddr());
+ BrokerContainer.this.updateNamesrvAddr();
} catch (Throwable e) {
LOG.error("ScheduledTask fetchNameServerAddr
exception", e);
}
diff --git
a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerConfig.java
b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerConfig.java
index 564cd37ab..c8516280b 100644
---
a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerConfig.java
+++
b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerConfig.java
@@ -28,6 +28,9 @@ public class BrokerContainerConfig {
@ImportantField
private String namesrvAddr =
System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY,
System.getenv(MixAll.NAMESRV_ADDR_ENV));
+ @ImportantField
+ private boolean fetchNameSrvAddrByDnsLookup = false;
+
@ImportantField
private boolean fetchNamesrvAddrByAddressServer = false;
@@ -52,6 +55,14 @@ public class BrokerContainerConfig {
this.namesrvAddr = namesrvAddr;
}
+ public boolean isFetchNameSrvAddrByDnsLookup() {
+ return fetchNameSrvAddrByDnsLookup;
+ }
+
+ public void setFetchNameSrvAddrByDnsLookup(boolean
fetchNameSrvAddrByDnsLookup) {
+ this.fetchNameSrvAddrByDnsLookup = fetchNameSrvAddrByDnsLookup;
+ }
+
public boolean isFetchNamesrvAddrByAddressServer() {
return fetchNamesrvAddrByAddressServer;
}