This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 5d6ee6384 [ISSUE #6910] Extract the interval of fetchNameServerAddr
5d6ee6384 is described below
commit 5d6ee6384fea3159ec1ecad76942880cc7976e7a
Author: Guocheng Tang <[email protected]>
AuthorDate: Fri Jun 23 14:21:29 2023 +0800
[ISSUE #6910] Extract the interval of fetchNameServerAddr
Co-authored-by: RongtongJin <[email protected]>
---
.../apache/rocketmq/broker/BrokerController.java | 53 +++++++++++-----------
.../org/apache/rocketmq/common/BrokerConfig.java | 16 ++++++-
.../org/apache/rocketmq/common/BrokerIdentity.java | 5 +-
.../apache/rocketmq/container/BrokerContainer.java | 25 +++++-----
.../rocketmq/container/BrokerContainerConfig.java | 14 +++++-
5 files changed, 71 insertions(+), 42 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 191c690f9..196401e26 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -17,31 +17,6 @@
package org.apache.rocketmq.broker;
import com.google.common.collect.Lists;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.AbstractMap;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Function;
-import java.util.stream.Collectors;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.acl.plain.PlainAccessValidator;
import org.apache.rocketmq.broker.client.ClientHousekeepingService;
@@ -159,6 +134,32 @@ import org.apache.rocketmq.store.timer.TimerCheckpoint;
import org.apache.rocketmq.store.timer.TimerMessageStore;
import org.apache.rocketmq.store.timer.TimerMetrics;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
public class BrokerController {
protected static final Logger LOG =
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final Logger LOG_PROTECTION =
LoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME);
@@ -720,7 +721,7 @@ public class BrokerController {
LOG.error("Failed to fetch nameServer address", e);
}
}
- }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
+ }, 1000 * 10, this.brokerConfig.getFetchNamesrvAddrInterval(),
TimeUnit.MILLISECONDS);
}
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 47ce2cb8d..f5f0db101 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.common;
-import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.annotation.ImportantField;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.message.MessageRequestMode;
@@ -24,6 +23,8 @@ import org.apache.rocketmq.common.metrics.MetricsExporterType;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.common.utils.NetworkUtil;
+import java.util.concurrent.TimeUnit;
+
public class BrokerConfig extends BrokerIdentity {
private String brokerConfigPath = null;
@@ -374,6 +375,11 @@ public class BrokerConfig extends BrokerIdentity {
private boolean usePIDColdCtrStrategy = true;
private long cgColdReadThreshold = 3 * 1024 * 1024;
private long globalColdReadThreshold = 100 * 1024 * 1024;
+
+ /**
+ * The interval to fetch namesrv addr, default value is 10 second
+ */
+ private long fetchNamesrvAddrInterval = 10 * 1000;
public long getMaxPopPollingSize() {
return maxPopPollingSize;
@@ -1662,4 +1668,12 @@ public class BrokerConfig extends BrokerIdentity {
public void setUseStaticSubscription(boolean useStaticSubscription) {
this.useStaticSubscription = useStaticSubscription;
}
+
+ public long getFetchNamesrvAddrInterval() {
+ return fetchNamesrvAddrInterval;
+ }
+
+ public void setFetchNamesrvAddrInterval(final long
fetchNamesrvAddrInterval) {
+ this.fetchNamesrvAddrInterval = fetchNamesrvAddrInterval;
+ }
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/BrokerIdentity.java
b/common/src/main/java/org/apache/rocketmq/common/BrokerIdentity.java
index 4115744a4..e85a3aac7 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerIdentity.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerIdentity.java
@@ -17,8 +17,6 @@
package org.apache.rocketmq.common;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
@@ -27,6 +25,9 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
public class BrokerIdentity {
private static final String DEFAULT_CLUSTER_NAME = "DefaultCluster";
diff --git
a/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java
b/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java
index 170c5d045..c6446f058 100644
--- a/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java
+++ b/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java
@@ -16,17 +16,6 @@
*/
package org.apache.rocketmq.container;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
@@ -50,6 +39,18 @@ import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
public class BrokerContainer implements IBrokerContainer {
private static final Logger LOG =
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -177,7 +178,7 @@ public class BrokerContainer implements IBrokerContainer {
LOG.error("ScheduledTask fetchNameServerAddr
exception", e);
}
}
- }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
+ }, 1000 * 10,
this.brokerContainerConfig.getFetchNamesrvAddrInterval(),
TimeUnit.MILLISECONDS);
}
this.scheduledExecutorService.scheduleAtFixedRate(new
AbstractBrokerRunnable(BrokerIdentity.BROKER_CONTAINER_IDENTITY) {
diff --git
a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerConfig.java
b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerConfig.java
index b04d51e77..77422adde 100644
---
a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerConfig.java
+++
b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerConfig.java
@@ -38,6 +38,11 @@ public class BrokerContainerConfig {
private String brokerContainerIP = NetworkUtil.getLocalAddress();
private String brokerConfigPaths = null;
+
+ /**
+ * The interval to fetch namesrv addr, default value is 10 second
+ */
+ private long fetchNamesrvAddrInterval = 10 * 1000;
public String getRocketmqHome() {
return rocketmqHome;
@@ -82,5 +87,12 @@ public class BrokerContainerConfig {
public void setBrokerConfigPaths(String brokerConfigPaths) {
this.brokerConfigPaths = brokerConfigPaths;
}
-
+
+ public long getFetchNamesrvAddrInterval() {
+ return fetchNamesrvAddrInterval;
+ }
+
+ public void setFetchNamesrvAddrInterval(final long
fetchNamesrvAddrInterval) {
+ this.fetchNamesrvAddrInterval = fetchNamesrvAddrInterval;
+ }
}