This is an automated email from the ASF dual-hosted git repository. liubao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-java-chassis.git
commit fbad3da4b31f5040fdb9d1b8788d5dfb58cde893 Author: heyile <[email protected]> AuthorDate: Wed Aug 1 20:25:30 2018 +0800 [SCB-782]support revision check when use pull mode with config center --- .../config/client/ConfigCenterClient.java | 49 ++++++++++----- .../config/client/ParseConfigUtils.java | 7 +++ .../config/client/TestConfigCenterClient.java | 72 +++++++++++++++++++++- 3 files changed, 110 insertions(+), 18 deletions(-) diff --git a/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ConfigCenterClient.java b/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ConfigCenterClient.java index a58de4d..1925aa4 100644 --- a/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ConfigCenterClient.java +++ b/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ConfigCenterClient.java @@ -160,13 +160,14 @@ public class ConfigCenterClient { String configCenter = memberDiscovery.getConfigServer(); IpPort ipPort = NetUtils.parseIpPortFromURI(configCenter); clientMgr.findThreadBindClientPool().runOnContext(client -> { - HttpClientRequest request = client.get(ipPort.getPort(), ipPort.getHostOrIp(), uriConst.MEMBERS, rsp -> { - if (rsp.statusCode() == HttpResponseStatus.OK.code()) { - rsp.bodyHandler(buf -> { - memberDiscovery.refreshMembers(buf.toJsonObject()); + HttpClientRequest request = + client.get(ipPort.getPort(), ipPort.getHostOrIp(), uriConst.MEMBERS, rsp -> { + if (rsp.statusCode() == HttpResponseStatus.OK.code()) { + rsp.bodyHandler(buf -> { + memberDiscovery.refreshMembers(buf.toJsonObject()); + }); + } }); - } - }); SignRequest signReq = createSignRequest(request.method().toString(), configCenter + uriConst.MEMBERS, new HashMap<>(), @@ -174,7 +175,8 @@ public class ConfigCenterClient { if (ConfigCenterConfig.INSTANCE.getToken() != null) { request.headers().add("X-Auth-Token", ConfigCenterConfig.INSTANCE.getToken()); } - authHeaderProviders.forEach(provider -> request.headers().addAll(provider.getSignAuthHeaders(signReq))); + authHeaderProviders.forEach(provider -> request.headers() + .addAll(provider.getSignAuthHeaders(signReq))); request.exceptionHandler(e -> { LOGGER.error("Fetch member from {} failed. Error message is [{}].", configCenter, e.getMessage()); }); @@ -235,14 +237,15 @@ public class ConfigCenterClient { } public void run(boolean wait) { - // this will be single threaded, so we don't care about concurrent - // staffs try { String configCenter = memberdis.getConfigServer(); if (refreshMode == 1) { - refreshConfig(configCenter, wait); + //make sure that revision is updated timely,wait sub thread to finish it's pull task + refreshConfig(configCenter, true); } else if (!isWatching) { // 重新监听时需要先加载,避免在断开期间丢失变更 + //we do not need worry about that the revision may not be updated timely, because we do not need + //revision info in the push mode. the config-center will push the changing to us refreshConfig(configCenter, wait); doWatch(configCenter); } @@ -302,6 +305,7 @@ public class ConfigCenterClient { LOGGER.info("watching config recieved {}", action); Map<String, Object> mAction = action.toJsonObject().getMap(); if ("CREATE".equals(mAction.get("action"))) { + //event loop can not be blocked,we just keep nothing changed in push mode refreshConfig(configCenter, false); } else if ("MEMBER_CHANGE".equals(mAction.get("action"))) { refreshMembers(memberdis); @@ -314,7 +318,8 @@ public class ConfigCenterClient { waiter.countDown(); }, e -> { - LOGGER.error("watcher connect to config center {} refresh port {} failed. Error message is [{}]", + LOGGER.error( + "watcher connect to config center {} refresh port {} failed. Error message is [{}]", configCenter, refreshPort, e.getMessage()); @@ -352,12 +357,14 @@ public class ConfigCenterClient { CountDownLatch latch = new CountDownLatch(1); String encodeServiceName = ""; try { - encodeServiceName = URLEncoder.encode(StringUtils.deleteWhitespace(serviceName), StandardCharsets.UTF_8.name()); + encodeServiceName = + URLEncoder.encode(StringUtils.deleteWhitespace(serviceName), StandardCharsets.UTF_8.name()); } catch (UnsupportedEncodingException e) { LOGGER.error("encode failed. Error message: {}", e.getMessage()); encodeServiceName = StringUtils.deleteWhitespace(serviceName); } - String path = uriConst.ITEMS + "?dimensionsInfo=" + encodeServiceName; + String path = uriConst.ITEMS + "?dimensionsInfo=" + encodeServiceName + "&revision=" + + ParseConfigUtils.CURRENT_VERSION_INFO; clientMgr.findThreadBindClientPool().runOnContext(client -> { IpPort ipPort = NetUtils.parseIpPortFromURI(configcenter); HttpClientRequest request = client.get(ipPort.getPort(), ipPort.getHostOrIp(), path, rsp -> { @@ -370,11 +377,17 @@ public class ConfigCenterClient { })); EventManager.post(new ConnSuccEvent()); } catch (IOException e) { - EventManager.post(new ConnFailEvent("config refresh result parse fail " + e.getMessage())); - LOGGER.error("Config refresh from {} failed. Error message is [{}].", configcenter, e.getMessage()); + EventManager.post(new ConnFailEvent( + "config refresh result parse fail " + e.getMessage())); + LOGGER.error("Config refresh from {} failed. Error message is [{}].", + configcenter, + e.getMessage()); } latch.countDown(); }); + } else if (rsp.statusCode() == HttpResponseStatus.NOT_MODIFIED.code()) { + //nothing changed + latch.countDown(); } else { rsp.bodyHandler(buf -> { LOGGER.error("Server error message is [{}].", buf); @@ -383,7 +396,7 @@ public class ConfigCenterClient { EventManager.post(new ConnFailEvent("fetch config fail")); LOGGER.error("Config refresh from {} failed.", configcenter); } - }); + }).setTimeout((BOOTUP_WAIT_TIME - 1) * 1000); Map<String, String> headers = new HashMap<>(); headers.put("x-domain-name", tenantName); if (ConfigCenterConfig.INSTANCE.getToken() != null) { @@ -398,7 +411,9 @@ public class ConfigCenterClient { null)))); request.exceptionHandler(e -> { EventManager.post(new ConnFailEvent("fetch config fail")); - LOGGER.error("Config refresh from {} failed. Error message is [{}].", configcenter, e.getMessage()); + LOGGER.error("Config refresh from {} failed. Error message is [{}].", + configcenter, + e.getMessage()); latch.countDown(); }); request.end(); diff --git a/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ParseConfigUtils.java b/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ParseConfigUtils.java index a891088..0ced4fd 100644 --- a/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ParseConfigUtils.java +++ b/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ParseConfigUtils.java @@ -40,6 +40,8 @@ public class ParseConfigUtils { public static final Map<String, Object> flatItems = new HashMap<>(); + public static String CURRENT_VERSION_INFO = "default"; + private UpdateHandler updateHandler; public ParseConfigUtils(UpdateHandler updateHandler) { @@ -47,6 +49,11 @@ public class ParseConfigUtils { } public void refreshConfigItems(Map<String, Map<String, Object>> remoteItems) { + CURRENT_VERSION_INFO = + remoteItems.getOrDefault("revision", new HashMap<>()).getOrDefault("version", "default").toString(); + //make sure the CURRENT_VERSION_INFO != "" + CURRENT_VERSION_INFO = CURRENT_VERSION_INFO.equals("") ? "default" : CURRENT_VERSION_INFO; + remoteItems.remove("revision");//the key revision is not the config setting multiDimensionItems.clear(); multiDimensionItems.putAll(remoteItems); doRefreshItems(); diff --git a/dynamic-config/config-cc/src/test/java/org/apache/servicecomb/config/client/TestConfigCenterClient.java b/dynamic-config/config-cc/src/test/java/org/apache/servicecomb/config/client/TestConfigCenterClient.java index 83398f1..58d76af 100644 --- a/dynamic-config/config-cc/src/test/java/org/apache/servicecomb/config/client/TestConfigCenterClient.java +++ b/dynamic-config/config-cc/src/test/java/org/apache/servicecomb/config/client/TestConfigCenterClient.java @@ -115,7 +115,77 @@ public class TestConfigCenterClient { @SuppressWarnings("unchecked") @Test - public void testConfigRefresh(@Mocked ClientPoolManager<HttpClientWithContext> clientMgr, + public void testConfigRefreshModeOne(@Mocked ClientPoolManager<HttpClientWithContext> clientMgr, + @Mocked HttpClientWithContext httpClientWithContext) { + String version1 = refreshAndGetCurrentRevision(clientMgr, httpClientWithContext, 200, "huawei"); + //test the sdk get and change the latestRevision + Assert.assertEquals("huawei", version1); + String version2 = refreshAndGetCurrentRevision(clientMgr, httpClientWithContext, 304, "rkd"); + //test that when return code is 304, the sdk do not change the latestRevision + Assert.assertNotEquals("rkd", version2); + String version3 = refreshAndGetCurrentRevision(clientMgr, httpClientWithContext, 200, ""); + //make sure the current version is not "" + Assert.assertNotEquals("", version3); + } + + @SuppressWarnings("unchecked") + private String refreshAndGetCurrentRevision(ClientPoolManager<HttpClientWithContext> clientMgr, + HttpClientWithContext httpClientWithContext, int statusCode, String version) { + + ConfigCenterConfigurationSourceImpl impl = new ConfigCenterConfigurationSourceImpl(); + UpdateHandler updateHandler = impl.new UpdateHandler(); + HttpClientRequest request = Mockito.mock(HttpClientRequest.class); + Mockito.when(request.headers()).thenReturn(MultiMap.caseInsensitiveMultiMap()); + Buffer rsp = Mockito.mock(Buffer.class); + Mockito.when(rsp.toString()) + .thenReturn(String.format( + "{\"application\":{\"3\":\"2\",\"aa\":\"1\"},\"vmalledge\":{\"aa\":\"3\"},\"revision\": { \"version\": \"%s\"} }", + version)); + + HttpClientResponse httpClientResponse = Mockito.mock(HttpClientResponse.class); + Mockito.when(httpClientResponse.bodyHandler(Mockito.any(Handler.class))).then(invocation -> { + Handler<Buffer> handler = invocation.getArgumentAt(0, Handler.class); + handler.handle(rsp); + return null; + }); + Mockito.when(httpClientResponse.statusCode()).thenReturn(statusCode); + + HttpClient httpClient = Mockito.mock(HttpClient.class); + Mockito.when( + httpClient.get(Mockito.anyInt(), Mockito.anyString(), Mockito.anyString(), Mockito.any(Handler.class))) + .then(invocation -> { + Handler<HttpClientResponse> handler = invocation.getArgumentAt(3, Handler.class); + handler.handle(httpClientResponse); + return request; + }); + + new MockUp<HttpClientWithContext>() { + @Mock + public void runOnContext(RunHandler handler) { + handler.run(httpClient); + } + }; + new Expectations() { + { + clientMgr.findThreadBindClientPool(); + result = httpClientWithContext; + } + }; + + ConfigCenterClient cc = new ConfigCenterClient(updateHandler); + Deencapsulation.setField(cc, "clientMgr", clientMgr); + ParseConfigUtils parseConfigUtils = new ParseConfigUtils(updateHandler); + MemberDiscovery memberdis = new MemberDiscovery(Arrays.asList("http://configcentertest:30103")); + ConfigRefresh refresh = cc.new ConfigRefresh(parseConfigUtils, memberdis); + Deencapsulation.setField(cc, "refreshMode", 1); + refresh.run(); + String currentVersionInfo = Deencapsulation.getField(parseConfigUtils, "CURRENT_VERSION_INFO").toString(); + return currentVersionInfo; + } + + @SuppressWarnings("unchecked") + @Test + public void testConfigRefreshModeZero(@Mocked ClientPoolManager<HttpClientWithContext> clientMgr, @Mocked HttpClientWithContext httpClientWithContext) { ConfigCenterConfigurationSourceImpl impl = new ConfigCenterConfigurationSourceImpl(); UpdateHandler updateHandler = impl.new UpdateHandler();
