ruanwenjun commented on a change in pull request #685:
URL: 
https://github.com/apache/incubator-eventmesh/pull/685#discussion_r776941857



##########
File path: 
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerManager.java
##########
@@ -73,100 +73,97 @@ public void init() throws Exception {
 
     public void start() throws Exception {
         logger.info("consumerManager started......");
-        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
-            @Override
-            public void run() {
-                logger.info("clientInfo check start.....");
-                synchronized (eventMeshHTTPServer.localClientInfoMapping) {
-                    Map<String, List<Client>> clientInfoMap =
-                        eventMeshHTTPServer.localClientInfoMapping;
-                    if (clientInfoMap.size() > 0) {
-                        for (String key : clientInfoMap.keySet()) {
-                            String consumerGroup = key.split("@")[0];
-                            String topic = key.split("@")[1];
-                            List<Client> clientList = clientInfoMap.get(key);
-                            Iterator<Client> clientIterator = 
clientList.iterator();
-                            boolean isChange = false;
-                            while (clientIterator.hasNext()) {
-                                Client client = clientIterator.next();
-                                //The time difference is greater than 3 
heartbeat cycles
-                                if (System.currentTimeMillis() - 
client.lastUpTime.getTime()
-                                    > DEFAULT_UPDATE_TIME) {
-                                    logger.warn(
-                                        "client {} lastUpdate time {} over 
three heartbeat cycles",
-                                        JsonUtils.serialize(client), 
client.lastUpTime);
-                                    clientIterator.remove();
-                                    isChange = true;
-                                }
+        scheduledExecutorService.scheduleAtFixedRate(() -> {
+            logger.info("clientInfo check start.....");
+            synchronized (eventMeshHTTPServer.localClientInfoMapping) {
+                Map<String, List<Client>> clientInfoMap =
+                    eventMeshHTTPServer.localClientInfoMapping;
+                if (clientInfoMap.size() > 0) {
+                    for (String key : clientInfoMap.keySet()) {
+                        String consumerGroup = key.split("@")[0];
+                        String topic = key.split("@")[1];
+                        List<Client> clientList = clientInfoMap.get(key);
+                        Iterator<Client> clientIterator = 
clientList.iterator();
+                        boolean isChange = false;
+                        while (clientIterator.hasNext()) {
+                            Client client = clientIterator.next();
+                            //The time difference is greater than 3 heartbeat 
cycles
+                            if (System.currentTimeMillis() - 
client.lastUpTime.getTime()
+                                > DEFAULT_UPDATE_TIME) {
+                                logger.warn(
+                                    "client {} lastUpdate time {} over three 
heartbeat cycles",
+                                    JsonUtils.serialize(client), 
client.lastUpTime);
+                                clientIterator.remove();
+                                isChange = true;
                             }
-                            if (isChange) {
-                                if (clientList.size() > 0) {
-                                    //change url
-                                    logger.info("consumerGroup {} client info 
changing",
-                                        consumerGroup);
-                                    Map<String, List<String>> idcUrls = new 
HashMap<>();
-                                    Set<String> clientUrls = new HashSet<>();
-                                    for (Client client : clientList) {
-                                        clientUrls.add(client.url);
-                                        if (idcUrls.containsKey(client.idc)) {
-                                            idcUrls.get(client.idc)
-                                                
.add(StringUtils.deleteWhitespace(client.url));
-                                        } else {
-                                            List<String> urls = new 
ArrayList<>();
-                                            urls.add(client.url);
-                                            idcUrls.put(client.idc, urls);
-                                        }
+                        }
+                        if (isChange) {
+                            if (clientList.size() > 0) {
+                                //change url
+                                logger.info("consumerGroup {} client info 
changing",
+                                    consumerGroup);
+                                Map<String, List<String>> idcUrls = new 
HashMap<>();
+                                Set<String> clientUrls = new HashSet<>();
+                                for (Client client : clientList) {
+                                    clientUrls.add(client.url);
+                                    if (idcUrls.containsKey(client.idc)) {
+                                        idcUrls.get(client.idc)
+                                            
.add(StringUtils.deleteWhitespace(client.url));
+                                    } else {
+                                        List<String> urls = new ArrayList<>();
+                                        urls.add(client.url);
+                                        idcUrls.put(client.idc, urls);
                                     }
-                                    synchronized 
(eventMeshHTTPServer.localConsumerGroupMapping) {
-                                        ConsumerGroupConf consumerGroupConf =
-                                            
eventMeshHTTPServer.localConsumerGroupMapping
-                                                .get(consumerGroup);
-                                        Map<String, ConsumerGroupTopicConf> 
map =
-                                            
consumerGroupConf.getConsumerGroupTopicConf();
-                                        for (String topicKey : map.keySet()) {
-                                            if (StringUtils.equals(topic, 
topicKey)) {
-                                                ConsumerGroupTopicConf 
latestTopicConf =
-                                                    new 
ConsumerGroupTopicConf();
-                                                
latestTopicConf.setConsumerGroup(consumerGroup);
-                                                
latestTopicConf.setTopic(topic);
-                                                
latestTopicConf.setSubscriptionItem(
-                                                    
map.get(topicKey).getSubscriptionItem());
-                                                
latestTopicConf.setUrls(clientUrls);
-
-                                                
latestTopicConf.setIdcUrls(idcUrls);
-
-                                                map.put(topic, 
latestTopicConf);
-                                            }
-                                        }
+                                }
+                                synchronized 
(eventMeshHTTPServer.localConsumerGroupMapping) {
+                                    ConsumerGroupConf consumerGroupConf =
                                         
eventMeshHTTPServer.localConsumerGroupMapping
-                                            .put(consumerGroup, 
consumerGroupConf);
-                                        logger.info(
-                                            "consumerGroup {} client info 
changed, "
-                                                + "consumerGroupConf {}", 
consumerGroup,
-                                            
JsonUtils.serialize(consumerGroupConf));
-                                        try {
-                                            
notifyConsumerManager(consumerGroup, consumerGroupConf);
-                                        } catch (Exception e) {
-                                            e.printStackTrace();
+                                            .get(consumerGroup);
+                                    Map<String, ConsumerGroupTopicConf> map =
+                                        
consumerGroupConf.getConsumerGroupTopicConf();
+                                    for (String topicKey : map.keySet()) {
+                                        if (StringUtils.equals(topic, 
topicKey)) {
+                                            ConsumerGroupTopicConf 
latestTopicConf =
+                                                new ConsumerGroupTopicConf();
+                                            
latestTopicConf.setConsumerGroup(consumerGroup);
+                                            latestTopicConf.setTopic(topic);
+                                            
latestTopicConf.setSubscriptionItem(
+                                                
map.get(topicKey).getSubscriptionItem());
+                                            
latestTopicConf.setUrls(clientUrls);
+
+                                            
latestTopicConf.setIdcUrls(idcUrls);
+
+                                            map.put(topic, latestTopicConf);
                                         }
                                     }
-
-                                } else {
-                                    logger.info("consumerGroup {} client info 
removed",
-                                        consumerGroup);
-                                    //remove
+                                    
eventMeshHTTPServer.localConsumerGroupMapping
+                                        .put(consumerGroup, consumerGroupConf);
+                                    logger.info(
+                                        "consumerGroup {} client info changed, 
"
+                                            + "consumerGroupConf {}", 
consumerGroup,
+                                        
JsonUtils.serialize(consumerGroupConf));
                                     try {
-                                        notifyConsumerManager(consumerGroup, 
null);
+                                        notifyConsumerManager(consumerGroup, 
consumerGroupConf);
                                     } catch (Exception e) {
                                         e.printStackTrace();
                                     }
+                                }
 
-                                    
eventMeshHTTPServer.localConsumerGroupMapping.keySet()
-                                        .removeIf(s -> 
StringUtils.equals(consumerGroup, s));
+                            } else {
+                                logger.info("consumerGroup {} client info 
removed",
+                                    consumerGroup);
+                                //remove
+                                try {
+                                    notifyConsumerManager(consumerGroup, null);
+                                } catch (Exception e) {
+                                    e.printStackTrace();

Review comment:
       Use logger.

##########
File path: 
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerManager.java
##########
@@ -73,100 +73,97 @@ public void init() throws Exception {
 
     public void start() throws Exception {
         logger.info("consumerManager started......");
-        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
-            @Override
-            public void run() {
-                logger.info("clientInfo check start.....");
-                synchronized (eventMeshHTTPServer.localClientInfoMapping) {
-                    Map<String, List<Client>> clientInfoMap =
-                        eventMeshHTTPServer.localClientInfoMapping;
-                    if (clientInfoMap.size() > 0) {
-                        for (String key : clientInfoMap.keySet()) {
-                            String consumerGroup = key.split("@")[0];
-                            String topic = key.split("@")[1];
-                            List<Client> clientList = clientInfoMap.get(key);
-                            Iterator<Client> clientIterator = 
clientList.iterator();
-                            boolean isChange = false;
-                            while (clientIterator.hasNext()) {
-                                Client client = clientIterator.next();
-                                //The time difference is greater than 3 
heartbeat cycles
-                                if (System.currentTimeMillis() - 
client.lastUpTime.getTime()
-                                    > DEFAULT_UPDATE_TIME) {
-                                    logger.warn(
-                                        "client {} lastUpdate time {} over 
three heartbeat cycles",
-                                        JsonUtils.serialize(client), 
client.lastUpTime);
-                                    clientIterator.remove();
-                                    isChange = true;
-                                }
+        scheduledExecutorService.scheduleAtFixedRate(() -> {
+            logger.info("clientInfo check start.....");
+            synchronized (eventMeshHTTPServer.localClientInfoMapping) {
+                Map<String, List<Client>> clientInfoMap =
+                    eventMeshHTTPServer.localClientInfoMapping;
+                if (clientInfoMap.size() > 0) {
+                    for (String key : clientInfoMap.keySet()) {
+                        String consumerGroup = key.split("@")[0];
+                        String topic = key.split("@")[1];
+                        List<Client> clientList = clientInfoMap.get(key);
+                        Iterator<Client> clientIterator = 
clientList.iterator();
+                        boolean isChange = false;
+                        while (clientIterator.hasNext()) {
+                            Client client = clientIterator.next();
+                            //The time difference is greater than 3 heartbeat 
cycles
+                            if (System.currentTimeMillis() - 
client.lastUpTime.getTime()
+                                > DEFAULT_UPDATE_TIME) {
+                                logger.warn(
+                                    "client {} lastUpdate time {} over three 
heartbeat cycles",
+                                    JsonUtils.serialize(client), 
client.lastUpTime);
+                                clientIterator.remove();
+                                isChange = true;
                             }
-                            if (isChange) {
-                                if (clientList.size() > 0) {
-                                    //change url
-                                    logger.info("consumerGroup {} client info 
changing",
-                                        consumerGroup);
-                                    Map<String, List<String>> idcUrls = new 
HashMap<>();
-                                    Set<String> clientUrls = new HashSet<>();
-                                    for (Client client : clientList) {
-                                        clientUrls.add(client.url);
-                                        if (idcUrls.containsKey(client.idc)) {
-                                            idcUrls.get(client.idc)
-                                                
.add(StringUtils.deleteWhitespace(client.url));
-                                        } else {
-                                            List<String> urls = new 
ArrayList<>();
-                                            urls.add(client.url);
-                                            idcUrls.put(client.idc, urls);
-                                        }
+                        }
+                        if (isChange) {
+                            if (clientList.size() > 0) {
+                                //change url
+                                logger.info("consumerGroup {} client info 
changing",
+                                    consumerGroup);
+                                Map<String, List<String>> idcUrls = new 
HashMap<>();
+                                Set<String> clientUrls = new HashSet<>();
+                                for (Client client : clientList) {
+                                    clientUrls.add(client.url);
+                                    if (idcUrls.containsKey(client.idc)) {
+                                        idcUrls.get(client.idc)
+                                            
.add(StringUtils.deleteWhitespace(client.url));
+                                    } else {
+                                        List<String> urls = new ArrayList<>();
+                                        urls.add(client.url);
+                                        idcUrls.put(client.idc, urls);
                                     }
-                                    synchronized 
(eventMeshHTTPServer.localConsumerGroupMapping) {
-                                        ConsumerGroupConf consumerGroupConf =
-                                            
eventMeshHTTPServer.localConsumerGroupMapping
-                                                .get(consumerGroup);
-                                        Map<String, ConsumerGroupTopicConf> 
map =
-                                            
consumerGroupConf.getConsumerGroupTopicConf();
-                                        for (String topicKey : map.keySet()) {
-                                            if (StringUtils.equals(topic, 
topicKey)) {
-                                                ConsumerGroupTopicConf 
latestTopicConf =
-                                                    new 
ConsumerGroupTopicConf();
-                                                
latestTopicConf.setConsumerGroup(consumerGroup);
-                                                
latestTopicConf.setTopic(topic);
-                                                
latestTopicConf.setSubscriptionItem(
-                                                    
map.get(topicKey).getSubscriptionItem());
-                                                
latestTopicConf.setUrls(clientUrls);
-
-                                                
latestTopicConf.setIdcUrls(idcUrls);
-
-                                                map.put(topic, 
latestTopicConf);
-                                            }
-                                        }
+                                }
+                                synchronized 
(eventMeshHTTPServer.localConsumerGroupMapping) {
+                                    ConsumerGroupConf consumerGroupConf =
                                         
eventMeshHTTPServer.localConsumerGroupMapping
-                                            .put(consumerGroup, 
consumerGroupConf);
-                                        logger.info(
-                                            "consumerGroup {} client info 
changed, "
-                                                + "consumerGroupConf {}", 
consumerGroup,
-                                            
JsonUtils.serialize(consumerGroupConf));
-                                        try {
-                                            
notifyConsumerManager(consumerGroup, consumerGroupConf);
-                                        } catch (Exception e) {
-                                            e.printStackTrace();
+                                            .get(consumerGroup);
+                                    Map<String, ConsumerGroupTopicConf> map =
+                                        
consumerGroupConf.getConsumerGroupTopicConf();
+                                    for (String topicKey : map.keySet()) {
+                                        if (StringUtils.equals(topic, 
topicKey)) {
+                                            ConsumerGroupTopicConf 
latestTopicConf =
+                                                new ConsumerGroupTopicConf();
+                                            
latestTopicConf.setConsumerGroup(consumerGroup);
+                                            latestTopicConf.setTopic(topic);
+                                            
latestTopicConf.setSubscriptionItem(
+                                                
map.get(topicKey).getSubscriptionItem());
+                                            
latestTopicConf.setUrls(clientUrls);
+
+                                            
latestTopicConf.setIdcUrls(idcUrls);
+
+                                            map.put(topic, latestTopicConf);
                                         }
                                     }
-
-                                } else {
-                                    logger.info("consumerGroup {} client info 
removed",
-                                        consumerGroup);
-                                    //remove
+                                    
eventMeshHTTPServer.localConsumerGroupMapping
+                                        .put(consumerGroup, consumerGroupConf);
+                                    logger.info(
+                                        "consumerGroup {} client info changed, 
"
+                                            + "consumerGroupConf {}", 
consumerGroup,
+                                        
JsonUtils.serialize(consumerGroupConf));
                                     try {
-                                        notifyConsumerManager(consumerGroup, 
null);
+                                        notifyConsumerManager(consumerGroup, 
consumerGroupConf);
                                     } catch (Exception e) {
                                         e.printStackTrace();

Review comment:
       Please use logger to optimize this print, this can avoid the .out file 
expansion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to