[ 
https://issues.apache.org/jira/browse/SCB-782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16574086#comment-16574086
 ] 

ASF GitHub Bot commented on SCB-782:
------------------------------------

liubao68 closed pull request #851:  [SCB-782]support revision check when use 
pull mode with config center
URL: https://github.com/apache/incubator-servicecomb-java-chassis/pull/851
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ConfigCenterClient.java
 
b/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ConfigCenterClient.java
index a58de4da2..09d26c484 100644
--- 
a/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ConfigCenterClient.java
+++ 
b/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ConfigCenterClient.java
@@ -128,14 +128,14 @@ public void connectServer() {
       LOGGER.error("refreshMode must be 0 or 1.");
       return;
     }
-    ParseConfigUtils parseConfigUtils = new ParseConfigUtils(updateHandler);
+    ParseConfigUtils.getInstance().initWithUpdateHandler(updateHandler);
     try {
       deployConfigClient();
     } catch (InterruptedException e) {
       throw new IllegalStateException(e);
     }
     refreshMembers(memberDiscovery);
-    ConfigRefresh refreshTask = new ConfigRefresh(parseConfigUtils, 
memberDiscovery);
+    ConfigRefresh refreshTask = new 
ConfigRefresh(ParseConfigUtils.getInstance(), memberDiscovery);
     refreshTask.run(true);
     executor.scheduleWithFixedDelay(refreshTask,
         firstRefreshInterval,
@@ -160,13 +160,14 @@ private void refreshMembers(MemberDiscovery 
memberDiscovery) {
       String configCenter = memberDiscovery.getConfigServer();
       IpPort ipPort = NetUtils.parseIpPortFromURI(configCenter);
       clientMgr.findThreadBindClientPool().runOnContext(client -> {
-        HttpClientRequest request = client.get(ipPort.getPort(), 
ipPort.getHostOrIp(), uriConst.MEMBERS, rsp -> {
-          if (rsp.statusCode() == HttpResponseStatus.OK.code()) {
-            rsp.bodyHandler(buf -> {
-              memberDiscovery.refreshMembers(buf.toJsonObject());
+        HttpClientRequest request =
+            client.get(ipPort.getPort(), ipPort.getHostOrIp(), 
uriConst.MEMBERS, rsp -> {
+              if (rsp.statusCode() == HttpResponseStatus.OK.code()) {
+                rsp.bodyHandler(buf -> {
+                  memberDiscovery.refreshMembers(buf.toJsonObject());
+                });
+              }
             });
-          }
-        });
         SignRequest signReq = createSignRequest(request.method().toString(),
             configCenter + uriConst.MEMBERS,
             new HashMap<>(),
@@ -174,7 +175,8 @@ private void refreshMembers(MemberDiscovery 
memberDiscovery) {
         if (ConfigCenterConfig.INSTANCE.getToken() != null) {
           request.headers().add("X-Auth-Token", 
ConfigCenterConfig.INSTANCE.getToken());
         }
-        authHeaderProviders.forEach(provider -> 
request.headers().addAll(provider.getSignAuthHeaders(signReq)));
+        authHeaderProviders.forEach(provider -> request.headers()
+            .addAll(provider.getSignAuthHeaders(signReq)));
         request.exceptionHandler(e -> {
           LOGGER.error("Fetch member from {} failed. Error message is [{}].", 
configCenter, e.getMessage());
         });
@@ -235,14 +237,15 @@ private HttpClientOptions createHttpClientOptions() {
     }
 
     public void run(boolean wait) {
-      // this will be single threaded, so we don't care about concurrent
-      // staffs
       try {
         String configCenter = memberdis.getConfigServer();
         if (refreshMode == 1) {
-          refreshConfig(configCenter, wait);
+          //make sure that revision is updated timely,wait sub thread to 
finish it's pull task
+          refreshConfig(configCenter, true);
         } else if (!isWatching) {
           // 重新监听时需要先加载,避免在断开期间丢失变更
+          //we do not need worry about that the revision may not be updated 
timely, because we do not need
+          //revision info in the push mode. the config-center will push the 
changing to us
           refreshConfig(configCenter, wait);
           doWatch(configCenter);
         }
@@ -302,6 +305,7 @@ public void doWatch(String configCenter)
                 LOGGER.info("watching config recieved {}", action);
                 Map<String, Object> mAction = action.toJsonObject().getMap();
                 if ("CREATE".equals(mAction.get("action"))) {
+                  //event loop can not be blocked,we just keep nothing changed 
in push mode
                   refreshConfig(configCenter, false);
                 } else if ("MEMBER_CHANGE".equals(mAction.get("action"))) {
                   refreshMembers(memberdis);
@@ -314,7 +318,8 @@ public void doWatch(String configCenter)
               waiter.countDown();
             },
             e -> {
-              LOGGER.error("watcher connect to config center {} refresh port 
{} failed. Error message is [{}]",
+              LOGGER.error(
+                  "watcher connect to config center {} refresh port {} failed. 
Error message is [{}]",
                   configCenter,
                   refreshPort,
                   e.getMessage());
@@ -352,12 +357,14 @@ public void refreshConfig(String configcenter, boolean 
wait) {
       CountDownLatch latch = new CountDownLatch(1);
       String encodeServiceName = "";
       try {
-        encodeServiceName = 
URLEncoder.encode(StringUtils.deleteWhitespace(serviceName), 
StandardCharsets.UTF_8.name());
+        encodeServiceName =
+            URLEncoder.encode(StringUtils.deleteWhitespace(serviceName), 
StandardCharsets.UTF_8.name());
       } catch (UnsupportedEncodingException e) {
         LOGGER.error("encode failed. Error message: {}", e.getMessage());
         encodeServiceName = StringUtils.deleteWhitespace(serviceName);
       }
-      String path = uriConst.ITEMS + "?dimensionsInfo=" + encodeServiceName;
+      String path = uriConst.ITEMS + "?dimensionsInfo=" + encodeServiceName + 
"&revision="
+          + ParseConfigUtils.getInstance().getCurrentVersionInfo();
       clientMgr.findThreadBindClientPool().runOnContext(client -> {
         IpPort ipPort = NetUtils.parseIpPortFromURI(configcenter);
         HttpClientRequest request = client.get(ipPort.getPort(), 
ipPort.getHostOrIp(), path, rsp -> {
@@ -370,11 +377,18 @@ public void refreshConfig(String configcenter, boolean 
wait) {
                         }));
                 EventManager.post(new ConnSuccEvent());
               } catch (IOException e) {
-                EventManager.post(new ConnFailEvent("config refresh result 
parse fail " + e.getMessage()));
-                LOGGER.error("Config refresh from {} failed. Error message is 
[{}].", configcenter, e.getMessage());
+                EventManager.post(new ConnFailEvent(
+                    "config refresh result parse fail " + e.getMessage()));
+                LOGGER.error("Config refresh from {} failed. Error message is 
[{}].",
+                    configcenter,
+                    e.getMessage());
               }
               latch.countDown();
             });
+          } else if (rsp.statusCode() == 
HttpResponseStatus.NOT_MODIFIED.code()) {
+            //nothing changed
+            EventManager.post(new ConnSuccEvent());
+            latch.countDown();
           } else {
             rsp.bodyHandler(buf -> {
               LOGGER.error("Server error message is [{}].", buf);
@@ -383,7 +397,7 @@ public void refreshConfig(String configcenter, boolean 
wait) {
             EventManager.post(new ConnFailEvent("fetch config fail"));
             LOGGER.error("Config refresh from {} failed.", configcenter);
           }
-        });
+        }).setTimeout((BOOTUP_WAIT_TIME - 1) * 1000);
         Map<String, String> headers = new HashMap<>();
         headers.put("x-domain-name", tenantName);
         if (ConfigCenterConfig.INSTANCE.getToken() != null) {
@@ -398,7 +412,9 @@ public void refreshConfig(String configcenter, boolean 
wait) {
                 null))));
         request.exceptionHandler(e -> {
           EventManager.post(new ConnFailEvent("fetch config fail"));
-          LOGGER.error("Config refresh from {} failed. Error message is 
[{}].", configcenter, e.getMessage());
+          LOGGER.error("Config refresh from {} failed. Error message is [{}].",
+              configcenter,
+              e.getMessage());
           latch.countDown();
         });
         request.end();
diff --git 
a/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ParseConfigUtils.java
 
b/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ParseConfigUtils.java
index a891088fe..3dd95c8e7 100644
--- 
a/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ParseConfigUtils.java
+++ 
b/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ParseConfigUtils.java
@@ -21,6 +21,8 @@
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import 
org.apache.servicecomb.config.archaius.sources.ConfigCenterConfigurationSourceImpl.UpdateHandler;
 import org.apache.servicecomb.foundation.common.utils.JsonUtils;
@@ -36,36 +38,82 @@
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ParseConfigUtils.class);
 
-  private static LinkedHashMap<String, Map<String, Object>> 
multiDimensionItems = new LinkedHashMap<>();
+  private static final ParseConfigUtils INSTANCE = new ParseConfigUtils();
 
-  public static final Map<String, Object> flatItems = new HashMap<>();
+  private LinkedHashMap<String, Map<String, Object>> multiDimensionItems = new 
LinkedHashMap<>();
+
+  //it's dangerous to make flatItems public
+  private final Map<String, Object> flatItems = new HashMap<>();
+
+  private String currentVersionInfo = "default";
 
   private UpdateHandler updateHandler;
 
+  private Lock configLock = new ReentrantLock();
+
+  //for compatibility with other modules and JunitTest
   public ParseConfigUtils(UpdateHandler updateHandler) {
     this.updateHandler = updateHandler;
   }
 
+  private ParseConfigUtils() {
+  }
+
+  public void initWithUpdateHandler(UpdateHandler updateHandler) {
+    if (updateHandler == null) {
+      LOGGER.error("when init ParseConfigUtils, updateHandler can not be 
null");
+    }
+    this.updateHandler = updateHandler;
+  }
+
+  /*
+      as the data is returned, we can block the thread at a short time. 
consider that if the multiple verticle is deployed
+      and if we use pull mode and push mode at the same time , we must share a 
common lock with all methods which would
+      change the config setting
+     */
   public void refreshConfigItems(Map<String, Map<String, Object>> remoteItems) 
{
-    multiDimensionItems.clear();
-    multiDimensionItems.putAll(remoteItems);
-    doRefreshItems();
-    LOGGER.debug("refresh config success");
+    try {
+      configLock.lock();
+      currentVersionInfo =
+          remoteItems.getOrDefault("revision", new 
HashMap<>()).getOrDefault("version", "default").toString();
+      //make sure the currentVersionInfo != ""
+      currentVersionInfo = currentVersionInfo.equals("") ? "default" : 
currentVersionInfo;
+      remoteItems.remove("revision");//the key revision is not the config 
setting
+      multiDimensionItems.clear();
+      multiDimensionItems.putAll(remoteItems);
+      doRefreshItems();
+      LOGGER.debug("refresh config success");
+    } finally {
+      configLock.unlock();
+    }
+  }
+
+  public static ParseConfigUtils getInstance() {
+    return INSTANCE;
+  }
+
+  public String getCurrentVersionInfo() {
+    return this.currentVersionInfo;
   }
 
   public void refreshConfigItemsIncremental(Map<String, Object> action) {
-    if ("UPDATE".equals(action.get("action"))) {
-      try {
-        multiDimensionItems.put((String) action.get("key"), 
JsonUtils.OBJ_MAPPER
-            .readValue(action.get("value").toString(), new 
TypeReference<Map<String, Object>>() {
-            }));
-      } catch (IOException e) {
-        LOGGER.error("parse config change action fail");
+    try {
+      configLock.lock();
+      if ("UPDATE".equals(action.get("action"))) {
+        try {
+          multiDimensionItems.put((String) action.get("key"), 
JsonUtils.OBJ_MAPPER
+              .readValue(action.get("value").toString(), new 
TypeReference<Map<String, Object>>() {
+              }));
+        } catch (IOException e) {
+          LOGGER.error("parse config change action fail");
+        }
+        doRefreshItems();
+      } else if ("DELETE".equals(action.get("action"))) {
+        multiDimensionItems.remove(action.get("key"));
+        doRefreshItems();
       }
-      doRefreshItems();
-    } else if ("DELETE".equals(action.get("action"))) {
-      multiDimensionItems.remove(action.get("key"));
-      doRefreshItems();
+    } finally {
+      configLock.unlock();
     }
   }
 
diff --git 
a/dynamic-config/config-cc/src/test/java/org/apache/servicecomb/config/client/TestConfigCenterClient.java
 
b/dynamic-config/config-cc/src/test/java/org/apache/servicecomb/config/client/TestConfigCenterClient.java
index 83398f107..f672d9e86 100644
--- 
a/dynamic-config/config-cc/src/test/java/org/apache/servicecomb/config/client/TestConfigCenterClient.java
+++ 
b/dynamic-config/config-cc/src/test/java/org/apache/servicecomb/config/client/TestConfigCenterClient.java
@@ -115,7 +115,77 @@ public void testConnectRefreshModeTwo() {
 
   @SuppressWarnings("unchecked")
   @Test
-  public void testConfigRefresh(@Mocked 
ClientPoolManager<HttpClientWithContext> clientMgr,
+  public void testConfigRefreshModeOne(@Mocked 
ClientPoolManager<HttpClientWithContext> clientMgr,
+      @Mocked HttpClientWithContext httpClientWithContext) {
+    String version1 = refreshAndGetCurrentRevision(clientMgr, 
httpClientWithContext, 200, "huawei");
+    //test the sdk get and change the latestRevision
+    Assert.assertEquals("huawei", version1);
+    String version2 = refreshAndGetCurrentRevision(clientMgr, 
httpClientWithContext, 304, "rkd");
+    //test that when return code is 304, the sdk do not change the 
latestRevision
+    Assert.assertNotEquals("rkd", version2);
+    String version3 = refreshAndGetCurrentRevision(clientMgr, 
httpClientWithContext, 200, "");
+    //make sure the current version is not ""
+    Assert.assertNotEquals("", version3);
+  }
+
+  @SuppressWarnings("unchecked")
+  private String 
refreshAndGetCurrentRevision(ClientPoolManager<HttpClientWithContext> clientMgr,
+      HttpClientWithContext httpClientWithContext, int statusCode, String 
version) {
+
+    ConfigCenterConfigurationSourceImpl impl = new 
ConfigCenterConfigurationSourceImpl();
+    UpdateHandler updateHandler = impl.new UpdateHandler();
+    HttpClientRequest request = Mockito.mock(HttpClientRequest.class);
+    
Mockito.when(request.headers()).thenReturn(MultiMap.caseInsensitiveMultiMap());
+    Buffer rsp = Mockito.mock(Buffer.class);
+    Mockito.when(rsp.toString())
+        .thenReturn(String.format(
+            
"{\"application\":{\"3\":\"2\",\"aa\":\"1\"},\"vmalledge\":{\"aa\":\"3\"},\"revision\":
 { \"version\": \"%s\"} }",
+            version));
+
+    HttpClientResponse httpClientResponse = 
Mockito.mock(HttpClientResponse.class);
+    
Mockito.when(httpClientResponse.bodyHandler(Mockito.any(Handler.class))).then(invocation
 -> {
+      Handler<Buffer> handler = invocation.getArgumentAt(0, Handler.class);
+      handler.handle(rsp);
+      return null;
+    });
+    Mockito.when(httpClientResponse.statusCode()).thenReturn(statusCode);
+
+    HttpClient httpClient = Mockito.mock(HttpClient.class);
+    Mockito.when(
+        httpClient.get(Mockito.anyInt(), Mockito.anyString(), 
Mockito.anyString(), Mockito.any(Handler.class)))
+        .then(invocation -> {
+          Handler<HttpClientResponse> handler = invocation.getArgumentAt(3, 
Handler.class);
+          handler.handle(httpClientResponse);
+          return request;
+        });
+
+    new MockUp<HttpClientWithContext>() {
+      @Mock
+      public void runOnContext(RunHandler handler) {
+        handler.run(httpClient);
+      }
+    };
+    new Expectations() {
+      {
+        clientMgr.findThreadBindClientPool();
+        result = httpClientWithContext;
+      }
+    };
+
+    ConfigCenterClient cc = new ConfigCenterClient(updateHandler);
+    Deencapsulation.setField(cc, "clientMgr", clientMgr);
+    ParseConfigUtils parseConfigUtils = new ParseConfigUtils(updateHandler);
+    MemberDiscovery memberdis = new 
MemberDiscovery(Arrays.asList("http://configcentertest:30103";));
+    ConfigRefresh refresh = cc.new ConfigRefresh(parseConfigUtils, memberdis);
+    Deencapsulation.setField(cc, "refreshMode", 1);
+    refresh.run();
+    String currentVersionInfo = Deencapsulation.getField(parseConfigUtils, 
"currentVersionInfo").toString();
+    return currentVersionInfo;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testConfigRefreshModeZero(@Mocked 
ClientPoolManager<HttpClientWithContext> clientMgr,
       @Mocked HttpClientWithContext httpClientWithContext) {
     ConfigCenterConfigurationSourceImpl impl = new 
ConfigCenterConfigurationSourceImpl();
     UpdateHandler updateHandler = impl.new UpdateHandler();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> [SCB-782]support revision check when use pull mode with config center
> ---------------------------------------------------------------------
>
>                 Key: SCB-782
>                 URL: https://issues.apache.org/jira/browse/SCB-782
>             Project: Apache ServiceComb
>          Issue Type: Improvement
>          Components: Java-Chassis
>            Reporter: 何一乐
>            Assignee: 何一乐
>            Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to