[
https://issues.apache.org/jira/browse/SCB-526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461981#comment-16461981
]
ASF GitHub Bot commented on SCB-526:
------------------------------------
liubao68 closed pull request #681: [SCB-526]fetch once from dynamic config
source when boot up
URL: https://github.com/apache/incubator-servicecomb-java-chassis/pull/681
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/demo/demo-pojo/pojo-client/src/main/java/org/apache/servicecomb/demo/pojo/client/PojoClient.java
b/demo/demo-pojo/pojo-client/src/main/java/org/apache/servicecomb/demo/pojo/client/PojoClient.java
index 7ffd70dcd..3549b47a2 100644
---
a/demo/demo-pojo/pojo-client/src/main/java/org/apache/servicecomb/demo/pojo/client/PojoClient.java
+++
b/demo/demo-pojo/pojo-client/src/main/java/org/apache/servicecomb/demo/pojo/client/PojoClient.java
@@ -102,22 +102,32 @@ public static void run() throws Exception {
ArchaiusUtils.setProperty("cse.loadbalance.strategy.name",
"WeightedResponse");
testStringArray(test);
boolean checkerStated = false;
- Set<Thread> allThreads = Thread.getAllStackTraces().keySet();
- for (Thread t : allThreads) {
- if (t.getName().equals("NFLoadBalancer-serverWeightTimer-unknown")) {
- checkerStated = true;
+ // Timer may not start thread very fast so check for 3 times.
+ for (int i = 0; i < 3; i++) {
+ Set<Thread> allThreads = Thread.getAllStackTraces().keySet();
+ for (Thread t : allThreads) {
+ if (t.getName().equals("NFLoadBalancer-serverWeightTimer-unknown")) {
+ checkerStated = true;
+ break;
+ }
}
+ Thread.sleep(1000);
}
TestMgr.check(checkerStated, true);
-
+
ArchaiusUtils.setProperty("cse.loadbalance.strategy.name", "RoundRobin");
testStringArray(test);
-
- allThreads = Thread.getAllStackTraces().keySet();
+
boolean checkerDestroyed = true;
- for (Thread t : allThreads) {
- if (t.getName().equals("NFLoadBalancer-serverWeightTimer-unknown")) {
- checkerDestroyed = false;
+ // Timer cancel may not destroy thread very fast so check for 3 times.
+ for (int i = 0; i < 3; i++) {
+ checkerDestroyed = true;
+ Set<Thread> allThreads = Thread.getAllStackTraces().keySet();
+ for (Thread t : allThreads) {
+ if (t.getName().equals("NFLoadBalancer-serverWeightTimer-unknown")) {
+ checkerDestroyed = false;
+ Thread.sleep(1000);
+ }
}
}
TestMgr.check(checkerDestroyed, true);
diff --git
a/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ConfigCenterClient.java
b/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ConfigCenterClient.java
index 4407c1488..aaef866f0 100644
---
a/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ConfigCenterClient.java
+++
b/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ConfigCenterClient.java
@@ -86,6 +86,8 @@
private static final long HEARTBEAT_INTERVAL = 30000;
+ private static final long BOOTUP_WAIT_TIME = 10;
+
private ScheduledExecutorService heartbeatTask = null;
private int refreshMode = CONFIG_CENTER_CONFIG.getRefreshMode();
@@ -129,7 +131,9 @@ public void connectServer() {
throw new IllegalStateException(e);
}
refreshMembers(memberDiscovery);
- EXECUTOR.scheduleWithFixedDelay(new ConfigRefresh(parseConfigUtils,
memberDiscovery),
+ ConfigRefresh refreshTask = new ConfigRefresh(parseConfigUtils,
memberDiscovery);
+ refreshTask.run(true);
+ EXECUTOR.scheduleWithFixedDelay(refreshTask,
firstRefreshInterval,
refreshInterval,
TimeUnit.MILLISECONDS);
@@ -155,6 +159,9 @@ private void refreshMembers(MemberDiscovery
memberDiscovery) {
request.headers().add("X-Auth-Token",
ConfigCenterConfig.INSTANCE.getToken());
}
authHeaderProviders.forEach(provider ->
request.headers().addAll(provider.getSignAuthHeaders(signReq)));
+ request.exceptionHandler(e -> {
+ LOGGER.error("Fetch member from {} failed. Error message is [{}].",
configCenter, e.getMessage());
+ });
request.end();
});
}
@@ -211,18 +218,16 @@ private HttpClientOptions createHttpClientOptions() {
this.memberdis = memberdis;
}
- // 具体动作
- @Override
- public void run() {
+ public void run(boolean wait) {
// this will be single threaded, so we don't care about concurrent
// staffs
try {
String configCenter = memberdis.getConfigServer();
if (refreshMode == 1) {
- refreshConfig(configCenter);
+ refreshConfig(configCenter, wait);
} else if (!isWatching) {
// 重新监听时需要先加载,避免在断开期间丢失变更
- refreshConfig(configCenter);
+ refreshConfig(configCenter, wait);
doWatch(configCenter);
}
} catch (Exception e) {
@@ -230,6 +235,12 @@ public void run() {
}
}
+ // 具体动作
+ @Override
+ public void run() {
+ run(false);
+ }
+
// create watch and wait for done
public void doWatch(String configCenter)
throws UnsupportedEncodingException, InterruptedException {
@@ -270,7 +281,7 @@ public void doWatch(String configCenter)
LOGGER.info("watching config recieved {}", action);
Map<String, Object> mAction = action.toJsonObject().getMap();
if ("CREATE".equals(mAction.get("action"))) {
- refreshConfig(configCenter);
+ refreshConfig(configCenter, false);
} else if ("MEMBER_CHANGE".equals(mAction.get("action"))) {
refreshMembers(memberdis);
} else {
@@ -282,7 +293,10 @@ public void doWatch(String configCenter)
waiter.countDown();
},
e -> {
- LOGGER.error("watcher connect to config center {} refresh port
{} failed. Error message is [{}]", configCenter, refreshPort, e.getMessage());
+ LOGGER.error("watcher connect to config center {} refresh port
{} failed. Error message is [{}]",
+ configCenter,
+ refreshPort,
+ e.getMessage());
waiter.countDown();
});
});
@@ -313,7 +327,8 @@ private void sendHeartbeat(WebSocket ws) {
}
}
- public void refreshConfig(String configcenter) {
+ public void refreshConfig(String configcenter, boolean wait) {
+ CountDownLatch latch = new CountDownLatch(1);
clientMgr.findThreadBindClientPool().runOnContext(client -> {
String path = URIConst.ITEMS + "?dimensionsInfo=" +
StringUtils.deleteWhitespace(serviceName);
IpPort ipPort = NetUtils.parseIpPortFromURI(configcenter);
@@ -330,10 +345,12 @@ public void refreshConfig(String configcenter) {
EventManager.post(new ConnFailEvent("config refresh result
parse fail " + e.getMessage()));
LOGGER.error("Config refresh from {} failed. Error message is
[{}].", configcenter, e.getMessage());
}
+ latch.countDown();
});
} else {
rsp.bodyHandler(buf -> {
LOGGER.error("Server error message is [{}].", buf);
+ latch.countDown();
});
EventManager.post(new ConnFailEvent("fetch config fail"));
LOGGER.error("Config refresh from {} failed.", configcenter);
@@ -354,9 +371,19 @@ public void refreshConfig(String configcenter) {
request.exceptionHandler(e -> {
EventManager.post(new ConnFailEvent("fetch config fail"));
LOGGER.error("Config refresh from {} failed. Error message is
[{}].", configcenter, e.getMessage());
+ latch.countDown();
});
request.end();
});
+ if (wait) {
+ LOGGER.info("Refreshing remote config...");
+ try {
+ latch.await(BOOTUP_WAIT_TIME, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOGGER.warn(e.getMessage());
+ }
+ LOGGER.info("Refreshing remote config is done.");
+ }
}
}
diff --git
a/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/MemberDiscovery.java
b/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/MemberDiscovery.java
index 6db80e6f0..cee4c0466 100644
---
a/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/MemberDiscovery.java
+++
b/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/MemberDiscovery.java
@@ -41,6 +41,8 @@
private List<String> configServerAddresses = new ArrayList<>();
+ private Object lock = new Object();
+
private AtomicInteger counter = new AtomicInteger(0);
public MemberDiscovery(List<String> configCenterUri) {
@@ -52,11 +54,13 @@ public MemberDiscovery(List<String> configCenterUri) {
}
public String getConfigServer() {
- if(configServerAddresses.isEmpty()) {
- throw new IllegalStateException("Config center address is not
available.");
+ synchronized (lock) {
+ if (configServerAddresses.isEmpty()) {
+ throw new IllegalStateException("Config center address is not
available.");
+ }
+ int index = Math.abs(counter.get() % configServerAddresses.size());
+ return configServerAddresses.get(index);
}
- int index = Math.abs(counter.get() % configServerAddresses.size());
- return configServerAddresses.get(index);
}
@Subscribe
@@ -65,17 +69,22 @@ public void onConnFailEvent(ConnFailEvent e) {
}
public void refreshMembers(JsonObject members) {
- configServerAddresses.clear();
+ List<String> newServerAddresses = new ArrayList<>();
members.getJsonArray("instances").forEach(m -> {
JsonObject instance = (JsonObject) m;
if ("UP".equals(instance.getString("status", "UP"))) {
String endpoint = instance.getJsonArray("endpoints").getString(0);
String scheme = instance.getBoolean("isHttps", false) ? "https" :
"http";
- configServerAddresses.add(scheme + SCHEMA_SEPRATOR
+ newServerAddresses.add(scheme + SCHEMA_SEPRATOR
+ endpoint.substring(endpoint.indexOf(SCHEMA_SEPRATOR) +
SCHEMA_SEPRATOR.length()));
}
});
- Collections.shuffle(configServerAddresses);
- LOGGER.info("config center members: {}", configServerAddresses);
+
+ synchronized (lock) {
+ this.configServerAddresses.clear();
+ this.configServerAddresses.addAll(newServerAddresses);
+ Collections.shuffle(this.configServerAddresses);
+ }
+ LOGGER.info("New config center members: {}", this.configServerAddresses);
}
}
diff --git
a/foundations/foundation-config/src/main/java/org/apache/servicecomb/config/ConfigUtil.java
b/foundations/foundation-config/src/main/java/org/apache/servicecomb/config/ConfigUtil.java
index bb3b41322..6e2a6f690 100644
---
a/foundations/foundation-config/src/main/java/org/apache/servicecomb/config/ConfigUtil.java
+++
b/foundations/foundation-config/src/main/java/org/apache/servicecomb/config/ConfigUtil.java
@@ -181,7 +181,8 @@ private static void
duplicateServiceCombConfigToCse(ConcurrentCompositeConfigura
compositeConfiguration.addConfiguration(source, sourceName);
}
- public static DynamicWatchedConfiguration
createConfigFromConfigCenter(Configuration localConfiguration) {
+ private static ConfigCenterConfigurationSource
createConfigCenterConfigurationSource(
+ Configuration localConfiguration) {
ConfigCenterConfigurationSource configCenterConfigurationSource =
SPIServiceUtils.getTargetService(ConfigCenterConfigurationSource.class);
if (null == configCenterConfigurationSource) {
@@ -194,26 +195,30 @@ public static DynamicWatchedConfiguration
createConfigFromConfigCenter(Configura
LOGGER.info("Config Source serverUri is not correctly configured.");
return null;
}
+ return configCenterConfigurationSource;
+ }
- configCenterConfigurationSource.init(localConfiguration);
- return new DynamicWatchedConfiguration(configCenterConfigurationSource);
+ private static void createDynamicWatchedConfiguration(
+ ConcurrentCompositeConfiguration localConfiguration,
+ ConfigCenterConfigurationSource configCenterConfigurationSource) {
+ ConcurrentMapConfiguration injectConfig = new ConcurrentMapConfiguration();
+ localConfiguration.addConfigurationAtFront(injectConfig,
"extraInjectConfig");
+ configCenterConfigurationSource.addUpdateListener(new
ServiceCombPropertyUpdateListener(injectConfig));
+
+ DynamicWatchedConfiguration configFromConfigCenter =
+ new DynamicWatchedConfiguration(configCenterConfigurationSource);
+ duplicateServiceCombConfigToCse(configFromConfigCenter);
+ localConfiguration.addConfigurationAtFront(configFromConfigCenter,
"configCenterConfig");
}
public static AbstractConfiguration createDynamicConfig() {
- LOGGER.info("create dynamic config:");
- ConcurrentCompositeConfiguration config = ConfigUtil.createLocalConfig();
- DynamicWatchedConfiguration configFromConfigCenter =
createConfigFromConfigCenter(config);
- if (configFromConfigCenter != null) {
- ConcurrentMapConfiguration injectConfig = new
ConcurrentMapConfiguration();
- config.addConfigurationAtFront(injectConfig, "extraInjectConfig");
-
- duplicateServiceCombConfigToCse(configFromConfigCenter);
- config.addConfigurationAtFront(configFromConfigCenter,
"configCenterConfig");
-
- configFromConfigCenter.getSource().addUpdateListener(new
ServiceCombPropertyUpdateListener(injectConfig));
+ ConcurrentCompositeConfiguration compositeConfig =
ConfigUtil.createLocalConfig();
+ ConfigCenterConfigurationSource configCenterConfigurationSource =
+ createConfigCenterConfigurationSource(compositeConfig);
+ if (configCenterConfigurationSource != null) {
+ createDynamicWatchedConfiguration(compositeConfig,
configCenterConfigurationSource);
}
-
- return config;
+ return compositeConfig;
}
public static void installDynamicConfig() {
@@ -222,8 +227,18 @@ public static void installDynamicConfig() {
return;
}
- AbstractConfiguration dynamicConfig = ConfigUtil.createDynamicConfig();
- ConfigurationManager.install(dynamicConfig);
+ ConcurrentCompositeConfiguration compositeConfig =
ConfigUtil.createLocalConfig();
+ ConfigCenterConfigurationSource configCenterConfigurationSource =
+ createConfigCenterConfigurationSource(compositeConfig);
+ if (configCenterConfigurationSource != null) {
+ createDynamicWatchedConfiguration(compositeConfig,
configCenterConfigurationSource);
+ }
+
+ ConfigurationManager.install(compositeConfig);
+
+ if (configCenterConfigurationSource != null) {
+ configCenterConfigurationSource.init(compositeConfig);
+ }
}
public static void addExtraConfig(String extraConfigName, Map<String,
Object> extraConfig) {
diff --git
a/foundations/foundation-config/src/test/java/org/apache/servicecomb/config/TestConfigUtil.java
b/foundations/foundation-config/src/test/java/org/apache/servicecomb/config/TestConfigUtil.java
index cd04d4e70..119c6add4 100644
---
a/foundations/foundation-config/src/test/java/org/apache/servicecomb/config/TestConfigUtil.java
+++
b/foundations/foundation-config/src/test/java/org/apache/servicecomb/config/TestConfigUtil.java
@@ -30,7 +30,6 @@
import java.util.Map;
import org.apache.commons.configuration.AbstractConfiguration;
-import org.apache.commons.configuration.Configuration;
import org.apache.servicecomb.config.archaius.sources.ConfigModel;
import org.apache.servicecomb.config.archaius.sources.MicroserviceConfigLoader;
import org.apache.servicecomb.config.spi.ConfigCenterConfigurationSource;
@@ -49,7 +48,6 @@
import mockit.Deencapsulation;
import mockit.Expectations;
-import mockit.Mocked;
public class TestConfigUtil {
@@ -96,12 +94,6 @@ public void testAddConfig() {
Assert.assertEquals(configuration.getInt("cse.test.num"), 10);
}
- @Test
- public void testCreateConfigFromConfigCenterNoUrl(@Mocked Configuration
localConfiguration) {
- AbstractConfiguration configFromConfigCenter =
ConfigUtil.createConfigFromConfigCenter(localConfiguration);
- Assert.assertNull(configFromConfigCenter);
- }
-
@Test
public void testCreateDynamicConfigNoConfigCenterSPI() {
new Expectations(SPIServiceUtils.class) {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> When creating dynamic configuration, we need to fetch once for the
> configuration when startup
> ---------------------------------------------------------------------------------------------
>
> Key: SCB-526
> URL: https://issues.apache.org/jira/browse/SCB-526
> Project: Apache ServiceComb
> Issue Type: Improvement
> Reporter: liubao
> Assignee: liubao
> Priority: Major
>
> Currently, dynamic config is fetched asynchronously, and when users use
> dynamic configuration for following scenario will fail:
> # make some environment, e.g. credentials
> # spring @Value mechanism
> and so on.
>
> When users put these configurations to some dynamic configuration server,
> they can assume server is running OK, and if try to fetch will give the
> correct response.
>
> So it's very useful to fetch configurations once when boot up.
>
>
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)