ruanwenjun commented on a change in pull request #685:
URL:
https://github.com/apache/incubator-eventmesh/pull/685#discussion_r776941857
##########
File path:
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerManager.java
##########
@@ -73,100 +73,97 @@ public void init() throws Exception {
public void start() throws Exception {
logger.info("consumerManager started......");
- scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- logger.info("clientInfo check start.....");
- synchronized (eventMeshHTTPServer.localClientInfoMapping) {
- Map<String, List<Client>> clientInfoMap =
- eventMeshHTTPServer.localClientInfoMapping;
- if (clientInfoMap.size() > 0) {
- for (String key : clientInfoMap.keySet()) {
- String consumerGroup = key.split("@")[0];
- String topic = key.split("@")[1];
- List<Client> clientList = clientInfoMap.get(key);
- Iterator<Client> clientIterator =
clientList.iterator();
- boolean isChange = false;
- while (clientIterator.hasNext()) {
- Client client = clientIterator.next();
- //The time difference is greater than 3
heartbeat cycles
- if (System.currentTimeMillis() -
client.lastUpTime.getTime()
- > DEFAULT_UPDATE_TIME) {
- logger.warn(
- "client {} lastUpdate time {} over
three heartbeat cycles",
- JsonUtils.serialize(client),
client.lastUpTime);
- clientIterator.remove();
- isChange = true;
- }
+ scheduledExecutorService.scheduleAtFixedRate(() -> {
+ logger.info("clientInfo check start.....");
+ synchronized (eventMeshHTTPServer.localClientInfoMapping) {
+ Map<String, List<Client>> clientInfoMap =
+ eventMeshHTTPServer.localClientInfoMapping;
+ if (clientInfoMap.size() > 0) {
+ for (String key : clientInfoMap.keySet()) {
+ String consumerGroup = key.split("@")[0];
+ String topic = key.split("@")[1];
+ List<Client> clientList = clientInfoMap.get(key);
+ Iterator<Client> clientIterator =
clientList.iterator();
+ boolean isChange = false;
+ while (clientIterator.hasNext()) {
+ Client client = clientIterator.next();
+ //The time difference is greater than 3 heartbeat
cycles
+ if (System.currentTimeMillis() -
client.lastUpTime.getTime()
+ > DEFAULT_UPDATE_TIME) {
+ logger.warn(
+ "client {} lastUpdate time {} over three
heartbeat cycles",
+ JsonUtils.serialize(client),
client.lastUpTime);
+ clientIterator.remove();
+ isChange = true;
}
- if (isChange) {
- if (clientList.size() > 0) {
- //change url
- logger.info("consumerGroup {} client info
changing",
- consumerGroup);
- Map<String, List<String>> idcUrls = new
HashMap<>();
- Set<String> clientUrls = new HashSet<>();
- for (Client client : clientList) {
- clientUrls.add(client.url);
- if (idcUrls.containsKey(client.idc)) {
- idcUrls.get(client.idc)
-
.add(StringUtils.deleteWhitespace(client.url));
- } else {
- List<String> urls = new
ArrayList<>();
- urls.add(client.url);
- idcUrls.put(client.idc, urls);
- }
+ }
+ if (isChange) {
+ if (clientList.size() > 0) {
+ //change url
+ logger.info("consumerGroup {} client info
changing",
+ consumerGroup);
+ Map<String, List<String>> idcUrls = new
HashMap<>();
+ Set<String> clientUrls = new HashSet<>();
+ for (Client client : clientList) {
+ clientUrls.add(client.url);
+ if (idcUrls.containsKey(client.idc)) {
+ idcUrls.get(client.idc)
+
.add(StringUtils.deleteWhitespace(client.url));
+ } else {
+ List<String> urls = new ArrayList<>();
+ urls.add(client.url);
+ idcUrls.put(client.idc, urls);
}
- synchronized
(eventMeshHTTPServer.localConsumerGroupMapping) {
- ConsumerGroupConf consumerGroupConf =
-
eventMeshHTTPServer.localConsumerGroupMapping
- .get(consumerGroup);
- Map<String, ConsumerGroupTopicConf>
map =
-
consumerGroupConf.getConsumerGroupTopicConf();
- for (String topicKey : map.keySet()) {
- if (StringUtils.equals(topic,
topicKey)) {
- ConsumerGroupTopicConf
latestTopicConf =
- new
ConsumerGroupTopicConf();
-
latestTopicConf.setConsumerGroup(consumerGroup);
-
latestTopicConf.setTopic(topic);
-
latestTopicConf.setSubscriptionItem(
-
map.get(topicKey).getSubscriptionItem());
-
latestTopicConf.setUrls(clientUrls);
-
-
latestTopicConf.setIdcUrls(idcUrls);
-
- map.put(topic,
latestTopicConf);
- }
- }
+ }
+ synchronized
(eventMeshHTTPServer.localConsumerGroupMapping) {
+ ConsumerGroupConf consumerGroupConf =
eventMeshHTTPServer.localConsumerGroupMapping
- .put(consumerGroup,
consumerGroupConf);
- logger.info(
- "consumerGroup {} client info
changed, "
- + "consumerGroupConf {}",
consumerGroup,
-
JsonUtils.serialize(consumerGroupConf));
- try {
-
notifyConsumerManager(consumerGroup, consumerGroupConf);
- } catch (Exception e) {
- e.printStackTrace();
+ .get(consumerGroup);
+ Map<String, ConsumerGroupTopicConf> map =
+
consumerGroupConf.getConsumerGroupTopicConf();
+ for (String topicKey : map.keySet()) {
+ if (StringUtils.equals(topic,
topicKey)) {
+ ConsumerGroupTopicConf
latestTopicConf =
+ new ConsumerGroupTopicConf();
+
latestTopicConf.setConsumerGroup(consumerGroup);
+ latestTopicConf.setTopic(topic);
+
latestTopicConf.setSubscriptionItem(
+
map.get(topicKey).getSubscriptionItem());
+
latestTopicConf.setUrls(clientUrls);
+
+
latestTopicConf.setIdcUrls(idcUrls);
+
+ map.put(topic, latestTopicConf);
}
}
-
- } else {
- logger.info("consumerGroup {} client info
removed",
- consumerGroup);
- //remove
+
eventMeshHTTPServer.localConsumerGroupMapping
+ .put(consumerGroup, consumerGroupConf);
+ logger.info(
+ "consumerGroup {} client info changed,
"
+ + "consumerGroupConf {}",
consumerGroup,
+
JsonUtils.serialize(consumerGroupConf));
try {
- notifyConsumerManager(consumerGroup,
null);
+ notifyConsumerManager(consumerGroup,
consumerGroupConf);
} catch (Exception e) {
e.printStackTrace();
}
+ }
-
eventMeshHTTPServer.localConsumerGroupMapping.keySet()
- .removeIf(s ->
StringUtils.equals(consumerGroup, s));
+ } else {
+ logger.info("consumerGroup {} client info
removed",
+ consumerGroup);
+ //remove
+ try {
+ notifyConsumerManager(consumerGroup, null);
+ } catch (Exception e) {
+ e.printStackTrace();
Review comment:
Use logger.
##########
File path:
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerManager.java
##########
@@ -73,100 +73,97 @@ public void init() throws Exception {
public void start() throws Exception {
logger.info("consumerManager started......");
- scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- logger.info("clientInfo check start.....");
- synchronized (eventMeshHTTPServer.localClientInfoMapping) {
- Map<String, List<Client>> clientInfoMap =
- eventMeshHTTPServer.localClientInfoMapping;
- if (clientInfoMap.size() > 0) {
- for (String key : clientInfoMap.keySet()) {
- String consumerGroup = key.split("@")[0];
- String topic = key.split("@")[1];
- List<Client> clientList = clientInfoMap.get(key);
- Iterator<Client> clientIterator =
clientList.iterator();
- boolean isChange = false;
- while (clientIterator.hasNext()) {
- Client client = clientIterator.next();
- //The time difference is greater than 3
heartbeat cycles
- if (System.currentTimeMillis() -
client.lastUpTime.getTime()
- > DEFAULT_UPDATE_TIME) {
- logger.warn(
- "client {} lastUpdate time {} over
three heartbeat cycles",
- JsonUtils.serialize(client),
client.lastUpTime);
- clientIterator.remove();
- isChange = true;
- }
+ scheduledExecutorService.scheduleAtFixedRate(() -> {
+ logger.info("clientInfo check start.....");
+ synchronized (eventMeshHTTPServer.localClientInfoMapping) {
+ Map<String, List<Client>> clientInfoMap =
+ eventMeshHTTPServer.localClientInfoMapping;
+ if (clientInfoMap.size() > 0) {
+ for (String key : clientInfoMap.keySet()) {
+ String consumerGroup = key.split("@")[0];
+ String topic = key.split("@")[1];
+ List<Client> clientList = clientInfoMap.get(key);
+ Iterator<Client> clientIterator =
clientList.iterator();
+ boolean isChange = false;
+ while (clientIterator.hasNext()) {
+ Client client = clientIterator.next();
+ //The time difference is greater than 3 heartbeat
cycles
+ if (System.currentTimeMillis() -
client.lastUpTime.getTime()
+ > DEFAULT_UPDATE_TIME) {
+ logger.warn(
+ "client {} lastUpdate time {} over three
heartbeat cycles",
+ JsonUtils.serialize(client),
client.lastUpTime);
+ clientIterator.remove();
+ isChange = true;
}
- if (isChange) {
- if (clientList.size() > 0) {
- //change url
- logger.info("consumerGroup {} client info
changing",
- consumerGroup);
- Map<String, List<String>> idcUrls = new
HashMap<>();
- Set<String> clientUrls = new HashSet<>();
- for (Client client : clientList) {
- clientUrls.add(client.url);
- if (idcUrls.containsKey(client.idc)) {
- idcUrls.get(client.idc)
-
.add(StringUtils.deleteWhitespace(client.url));
- } else {
- List<String> urls = new
ArrayList<>();
- urls.add(client.url);
- idcUrls.put(client.idc, urls);
- }
+ }
+ if (isChange) {
+ if (clientList.size() > 0) {
+ //change url
+ logger.info("consumerGroup {} client info
changing",
+ consumerGroup);
+ Map<String, List<String>> idcUrls = new
HashMap<>();
+ Set<String> clientUrls = new HashSet<>();
+ for (Client client : clientList) {
+ clientUrls.add(client.url);
+ if (idcUrls.containsKey(client.idc)) {
+ idcUrls.get(client.idc)
+
.add(StringUtils.deleteWhitespace(client.url));
+ } else {
+ List<String> urls = new ArrayList<>();
+ urls.add(client.url);
+ idcUrls.put(client.idc, urls);
}
- synchronized
(eventMeshHTTPServer.localConsumerGroupMapping) {
- ConsumerGroupConf consumerGroupConf =
-
eventMeshHTTPServer.localConsumerGroupMapping
- .get(consumerGroup);
- Map<String, ConsumerGroupTopicConf>
map =
-
consumerGroupConf.getConsumerGroupTopicConf();
- for (String topicKey : map.keySet()) {
- if (StringUtils.equals(topic,
topicKey)) {
- ConsumerGroupTopicConf
latestTopicConf =
- new
ConsumerGroupTopicConf();
-
latestTopicConf.setConsumerGroup(consumerGroup);
-
latestTopicConf.setTopic(topic);
-
latestTopicConf.setSubscriptionItem(
-
map.get(topicKey).getSubscriptionItem());
-
latestTopicConf.setUrls(clientUrls);
-
-
latestTopicConf.setIdcUrls(idcUrls);
-
- map.put(topic,
latestTopicConf);
- }
- }
+ }
+ synchronized
(eventMeshHTTPServer.localConsumerGroupMapping) {
+ ConsumerGroupConf consumerGroupConf =
eventMeshHTTPServer.localConsumerGroupMapping
- .put(consumerGroup,
consumerGroupConf);
- logger.info(
- "consumerGroup {} client info
changed, "
- + "consumerGroupConf {}",
consumerGroup,
-
JsonUtils.serialize(consumerGroupConf));
- try {
-
notifyConsumerManager(consumerGroup, consumerGroupConf);
- } catch (Exception e) {
- e.printStackTrace();
+ .get(consumerGroup);
+ Map<String, ConsumerGroupTopicConf> map =
+
consumerGroupConf.getConsumerGroupTopicConf();
+ for (String topicKey : map.keySet()) {
+ if (StringUtils.equals(topic,
topicKey)) {
+ ConsumerGroupTopicConf
latestTopicConf =
+ new ConsumerGroupTopicConf();
+
latestTopicConf.setConsumerGroup(consumerGroup);
+ latestTopicConf.setTopic(topic);
+
latestTopicConf.setSubscriptionItem(
+
map.get(topicKey).getSubscriptionItem());
+
latestTopicConf.setUrls(clientUrls);
+
+
latestTopicConf.setIdcUrls(idcUrls);
+
+ map.put(topic, latestTopicConf);
}
}
-
- } else {
- logger.info("consumerGroup {} client info
removed",
- consumerGroup);
- //remove
+
eventMeshHTTPServer.localConsumerGroupMapping
+ .put(consumerGroup, consumerGroupConf);
+ logger.info(
+ "consumerGroup {} client info changed,
"
+ + "consumerGroupConf {}",
consumerGroup,
+
JsonUtils.serialize(consumerGroupConf));
try {
- notifyConsumerManager(consumerGroup,
null);
+ notifyConsumerManager(consumerGroup,
consumerGroupConf);
} catch (Exception e) {
e.printStackTrace();
Review comment:
Please use logger to optimize this print, this can avoid the .out file
expansion.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]