[ https://issues.apache.org/jira/browse/SCB-782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16574086#comment-16574086 ]
ASF GitHub Bot commented on SCB-782: ------------------------------------ liubao68 closed pull request #851: [SCB-782]support revision check when use pull mode with config center URL: https://github.com/apache/incubator-servicecomb-java-chassis/pull/851 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ConfigCenterClient.java b/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ConfigCenterClient.java index a58de4da2..09d26c484 100644 --- a/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ConfigCenterClient.java +++ b/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ConfigCenterClient.java @@ -128,14 +128,14 @@ public void connectServer() { LOGGER.error("refreshMode must be 0 or 1."); return; } - ParseConfigUtils parseConfigUtils = new ParseConfigUtils(updateHandler); + ParseConfigUtils.getInstance().initWithUpdateHandler(updateHandler); try { deployConfigClient(); } catch (InterruptedException e) { throw new IllegalStateException(e); } refreshMembers(memberDiscovery); - ConfigRefresh refreshTask = new ConfigRefresh(parseConfigUtils, memberDiscovery); + ConfigRefresh refreshTask = new ConfigRefresh(ParseConfigUtils.getInstance(), memberDiscovery); refreshTask.run(true); executor.scheduleWithFixedDelay(refreshTask, firstRefreshInterval, @@ -160,13 +160,14 @@ private void refreshMembers(MemberDiscovery memberDiscovery) { String configCenter = memberDiscovery.getConfigServer(); IpPort ipPort = NetUtils.parseIpPortFromURI(configCenter); clientMgr.findThreadBindClientPool().runOnContext(client -> { - HttpClientRequest request = client.get(ipPort.getPort(), ipPort.getHostOrIp(), uriConst.MEMBERS, rsp -> { - if (rsp.statusCode() == HttpResponseStatus.OK.code()) { - rsp.bodyHandler(buf -> { - memberDiscovery.refreshMembers(buf.toJsonObject()); + HttpClientRequest request = + client.get(ipPort.getPort(), ipPort.getHostOrIp(), uriConst.MEMBERS, rsp -> { + if (rsp.statusCode() == HttpResponseStatus.OK.code()) { + rsp.bodyHandler(buf -> { + memberDiscovery.refreshMembers(buf.toJsonObject()); + }); + } }); - } - }); SignRequest signReq = createSignRequest(request.method().toString(), configCenter + uriConst.MEMBERS, new HashMap<>(), @@ -174,7 +175,8 @@ private void refreshMembers(MemberDiscovery memberDiscovery) { if (ConfigCenterConfig.INSTANCE.getToken() != null) { request.headers().add("X-Auth-Token", ConfigCenterConfig.INSTANCE.getToken()); } - authHeaderProviders.forEach(provider -> request.headers().addAll(provider.getSignAuthHeaders(signReq))); + authHeaderProviders.forEach(provider -> request.headers() + .addAll(provider.getSignAuthHeaders(signReq))); request.exceptionHandler(e -> { LOGGER.error("Fetch member from {} failed. Error message is [{}].", configCenter, e.getMessage()); }); @@ -235,14 +237,15 @@ private HttpClientOptions createHttpClientOptions() { } public void run(boolean wait) { - // this will be single threaded, so we don't care about concurrent - // staffs try { String configCenter = memberdis.getConfigServer(); if (refreshMode == 1) { - refreshConfig(configCenter, wait); + //make sure that revision is updated timely,wait sub thread to finish it's pull task + refreshConfig(configCenter, true); } else if (!isWatching) { // 重新监听时需要先加载,避免在断开期间丢失变更 + //we do not need worry about that the revision may not be updated timely, because we do not need + //revision info in the push mode. the config-center will push the changing to us refreshConfig(configCenter, wait); doWatch(configCenter); } @@ -302,6 +305,7 @@ public void doWatch(String configCenter) LOGGER.info("watching config recieved {}", action); Map<String, Object> mAction = action.toJsonObject().getMap(); if ("CREATE".equals(mAction.get("action"))) { + //event loop can not be blocked,we just keep nothing changed in push mode refreshConfig(configCenter, false); } else if ("MEMBER_CHANGE".equals(mAction.get("action"))) { refreshMembers(memberdis); @@ -314,7 +318,8 @@ public void doWatch(String configCenter) waiter.countDown(); }, e -> { - LOGGER.error("watcher connect to config center {} refresh port {} failed. Error message is [{}]", + LOGGER.error( + "watcher connect to config center {} refresh port {} failed. Error message is [{}]", configCenter, refreshPort, e.getMessage()); @@ -352,12 +357,14 @@ public void refreshConfig(String configcenter, boolean wait) { CountDownLatch latch = new CountDownLatch(1); String encodeServiceName = ""; try { - encodeServiceName = URLEncoder.encode(StringUtils.deleteWhitespace(serviceName), StandardCharsets.UTF_8.name()); + encodeServiceName = + URLEncoder.encode(StringUtils.deleteWhitespace(serviceName), StandardCharsets.UTF_8.name()); } catch (UnsupportedEncodingException e) { LOGGER.error("encode failed. Error message: {}", e.getMessage()); encodeServiceName = StringUtils.deleteWhitespace(serviceName); } - String path = uriConst.ITEMS + "?dimensionsInfo=" + encodeServiceName; + String path = uriConst.ITEMS + "?dimensionsInfo=" + encodeServiceName + "&revision=" + + ParseConfigUtils.getInstance().getCurrentVersionInfo(); clientMgr.findThreadBindClientPool().runOnContext(client -> { IpPort ipPort = NetUtils.parseIpPortFromURI(configcenter); HttpClientRequest request = client.get(ipPort.getPort(), ipPort.getHostOrIp(), path, rsp -> { @@ -370,11 +377,18 @@ public void refreshConfig(String configcenter, boolean wait) { })); EventManager.post(new ConnSuccEvent()); } catch (IOException e) { - EventManager.post(new ConnFailEvent("config refresh result parse fail " + e.getMessage())); - LOGGER.error("Config refresh from {} failed. Error message is [{}].", configcenter, e.getMessage()); + EventManager.post(new ConnFailEvent( + "config refresh result parse fail " + e.getMessage())); + LOGGER.error("Config refresh from {} failed. Error message is [{}].", + configcenter, + e.getMessage()); } latch.countDown(); }); + } else if (rsp.statusCode() == HttpResponseStatus.NOT_MODIFIED.code()) { + //nothing changed + EventManager.post(new ConnSuccEvent()); + latch.countDown(); } else { rsp.bodyHandler(buf -> { LOGGER.error("Server error message is [{}].", buf); @@ -383,7 +397,7 @@ public void refreshConfig(String configcenter, boolean wait) { EventManager.post(new ConnFailEvent("fetch config fail")); LOGGER.error("Config refresh from {} failed.", configcenter); } - }); + }).setTimeout((BOOTUP_WAIT_TIME - 1) * 1000); Map<String, String> headers = new HashMap<>(); headers.put("x-domain-name", tenantName); if (ConfigCenterConfig.INSTANCE.getToken() != null) { @@ -398,7 +412,9 @@ public void refreshConfig(String configcenter, boolean wait) { null)))); request.exceptionHandler(e -> { EventManager.post(new ConnFailEvent("fetch config fail")); - LOGGER.error("Config refresh from {} failed. Error message is [{}].", configcenter, e.getMessage()); + LOGGER.error("Config refresh from {} failed. Error message is [{}].", + configcenter, + e.getMessage()); latch.countDown(); }); request.end(); diff --git a/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ParseConfigUtils.java b/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ParseConfigUtils.java index a891088fe..3dd95c8e7 100644 --- a/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ParseConfigUtils.java +++ b/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ParseConfigUtils.java @@ -21,6 +21,8 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.servicecomb.config.archaius.sources.ConfigCenterConfigurationSourceImpl.UpdateHandler; import org.apache.servicecomb.foundation.common.utils.JsonUtils; @@ -36,36 +38,82 @@ private static final Logger LOGGER = LoggerFactory.getLogger(ParseConfigUtils.class); - private static LinkedHashMap<String, Map<String, Object>> multiDimensionItems = new LinkedHashMap<>(); + private static final ParseConfigUtils INSTANCE = new ParseConfigUtils(); - public static final Map<String, Object> flatItems = new HashMap<>(); + private LinkedHashMap<String, Map<String, Object>> multiDimensionItems = new LinkedHashMap<>(); + + //it's dangerous to make flatItems public + private final Map<String, Object> flatItems = new HashMap<>(); + + private String currentVersionInfo = "default"; private UpdateHandler updateHandler; + private Lock configLock = new ReentrantLock(); + + //for compatibility with other modules and JunitTest public ParseConfigUtils(UpdateHandler updateHandler) { this.updateHandler = updateHandler; } + private ParseConfigUtils() { + } + + public void initWithUpdateHandler(UpdateHandler updateHandler) { + if (updateHandler == null) { + LOGGER.error("when init ParseConfigUtils, updateHandler can not be null"); + } + this.updateHandler = updateHandler; + } + + /* + as the data is returned, we can block the thread at a short time. consider that if the multiple verticle is deployed + and if we use pull mode and push mode at the same time , we must share a common lock with all methods which would + change the config setting + */ public void refreshConfigItems(Map<String, Map<String, Object>> remoteItems) { - multiDimensionItems.clear(); - multiDimensionItems.putAll(remoteItems); - doRefreshItems(); - LOGGER.debug("refresh config success"); + try { + configLock.lock(); + currentVersionInfo = + remoteItems.getOrDefault("revision", new HashMap<>()).getOrDefault("version", "default").toString(); + //make sure the currentVersionInfo != "" + currentVersionInfo = currentVersionInfo.equals("") ? "default" : currentVersionInfo; + remoteItems.remove("revision");//the key revision is not the config setting + multiDimensionItems.clear(); + multiDimensionItems.putAll(remoteItems); + doRefreshItems(); + LOGGER.debug("refresh config success"); + } finally { + configLock.unlock(); + } + } + + public static ParseConfigUtils getInstance() { + return INSTANCE; + } + + public String getCurrentVersionInfo() { + return this.currentVersionInfo; } public void refreshConfigItemsIncremental(Map<String, Object> action) { - if ("UPDATE".equals(action.get("action"))) { - try { - multiDimensionItems.put((String) action.get("key"), JsonUtils.OBJ_MAPPER - .readValue(action.get("value").toString(), new TypeReference<Map<String, Object>>() { - })); - } catch (IOException e) { - LOGGER.error("parse config change action fail"); + try { + configLock.lock(); + if ("UPDATE".equals(action.get("action"))) { + try { + multiDimensionItems.put((String) action.get("key"), JsonUtils.OBJ_MAPPER + .readValue(action.get("value").toString(), new TypeReference<Map<String, Object>>() { + })); + } catch (IOException e) { + LOGGER.error("parse config change action fail"); + } + doRefreshItems(); + } else if ("DELETE".equals(action.get("action"))) { + multiDimensionItems.remove(action.get("key")); + doRefreshItems(); } - doRefreshItems(); - } else if ("DELETE".equals(action.get("action"))) { - multiDimensionItems.remove(action.get("key")); - doRefreshItems(); + } finally { + configLock.unlock(); } } diff --git a/dynamic-config/config-cc/src/test/java/org/apache/servicecomb/config/client/TestConfigCenterClient.java b/dynamic-config/config-cc/src/test/java/org/apache/servicecomb/config/client/TestConfigCenterClient.java index 83398f107..f672d9e86 100644 --- a/dynamic-config/config-cc/src/test/java/org/apache/servicecomb/config/client/TestConfigCenterClient.java +++ b/dynamic-config/config-cc/src/test/java/org/apache/servicecomb/config/client/TestConfigCenterClient.java @@ -115,7 +115,77 @@ public void testConnectRefreshModeTwo() { @SuppressWarnings("unchecked") @Test - public void testConfigRefresh(@Mocked ClientPoolManager<HttpClientWithContext> clientMgr, + public void testConfigRefreshModeOne(@Mocked ClientPoolManager<HttpClientWithContext> clientMgr, + @Mocked HttpClientWithContext httpClientWithContext) { + String version1 = refreshAndGetCurrentRevision(clientMgr, httpClientWithContext, 200, "huawei"); + //test the sdk get and change the latestRevision + Assert.assertEquals("huawei", version1); + String version2 = refreshAndGetCurrentRevision(clientMgr, httpClientWithContext, 304, "rkd"); + //test that when return code is 304, the sdk do not change the latestRevision + Assert.assertNotEquals("rkd", version2); + String version3 = refreshAndGetCurrentRevision(clientMgr, httpClientWithContext, 200, ""); + //make sure the current version is not "" + Assert.assertNotEquals("", version3); + } + + @SuppressWarnings("unchecked") + private String refreshAndGetCurrentRevision(ClientPoolManager<HttpClientWithContext> clientMgr, + HttpClientWithContext httpClientWithContext, int statusCode, String version) { + + ConfigCenterConfigurationSourceImpl impl = new ConfigCenterConfigurationSourceImpl(); + UpdateHandler updateHandler = impl.new UpdateHandler(); + HttpClientRequest request = Mockito.mock(HttpClientRequest.class); + Mockito.when(request.headers()).thenReturn(MultiMap.caseInsensitiveMultiMap()); + Buffer rsp = Mockito.mock(Buffer.class); + Mockito.when(rsp.toString()) + .thenReturn(String.format( + "{\"application\":{\"3\":\"2\",\"aa\":\"1\"},\"vmalledge\":{\"aa\":\"3\"},\"revision\": { \"version\": \"%s\"} }", + version)); + + HttpClientResponse httpClientResponse = Mockito.mock(HttpClientResponse.class); + Mockito.when(httpClientResponse.bodyHandler(Mockito.any(Handler.class))).then(invocation -> { + Handler<Buffer> handler = invocation.getArgumentAt(0, Handler.class); + handler.handle(rsp); + return null; + }); + Mockito.when(httpClientResponse.statusCode()).thenReturn(statusCode); + + HttpClient httpClient = Mockito.mock(HttpClient.class); + Mockito.when( + httpClient.get(Mockito.anyInt(), Mockito.anyString(), Mockito.anyString(), Mockito.any(Handler.class))) + .then(invocation -> { + Handler<HttpClientResponse> handler = invocation.getArgumentAt(3, Handler.class); + handler.handle(httpClientResponse); + return request; + }); + + new MockUp<HttpClientWithContext>() { + @Mock + public void runOnContext(RunHandler handler) { + handler.run(httpClient); + } + }; + new Expectations() { + { + clientMgr.findThreadBindClientPool(); + result = httpClientWithContext; + } + }; + + ConfigCenterClient cc = new ConfigCenterClient(updateHandler); + Deencapsulation.setField(cc, "clientMgr", clientMgr); + ParseConfigUtils parseConfigUtils = new ParseConfigUtils(updateHandler); + MemberDiscovery memberdis = new MemberDiscovery(Arrays.asList("http://configcentertest:30103")); + ConfigRefresh refresh = cc.new ConfigRefresh(parseConfigUtils, memberdis); + Deencapsulation.setField(cc, "refreshMode", 1); + refresh.run(); + String currentVersionInfo = Deencapsulation.getField(parseConfigUtils, "currentVersionInfo").toString(); + return currentVersionInfo; + } + + @SuppressWarnings("unchecked") + @Test + public void testConfigRefreshModeZero(@Mocked ClientPoolManager<HttpClientWithContext> clientMgr, @Mocked HttpClientWithContext httpClientWithContext) { ConfigCenterConfigurationSourceImpl impl = new ConfigCenterConfigurationSourceImpl(); UpdateHandler updateHandler = impl.new UpdateHandler(); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > [SCB-782]support revision check when use pull mode with config center > --------------------------------------------------------------------- > > Key: SCB-782 > URL: https://issues.apache.org/jira/browse/SCB-782 > Project: Apache ServiceComb > Issue Type: Improvement > Components: Java-Chassis > Reporter: 何一乐 > Assignee: 何一乐 > Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)