This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new af8a6cdf9 [ISSUE #3348]Modify SessionContext's public attributes to
private
new bc2f4a13e Merge pull request #3349 from mxsm/eventmesh-3348
af8a6cdf9 is described below
commit af8a6cdf9d9a8b573466a98e0d798ae3ddf6cd39
Author: mxsm <[email protected]>
AuthorDate: Mon Mar 6 00:01:48 2023 +0800
[ISSUE #3348]Modify SessionContext's public attributes to private
---
.../core/protocol/tcp/client/group/ClientSessionGroupMapping.java | 2 +-
.../runtime/core/protocol/tcp/client/session/Session.java | 8 ++++----
.../runtime/core/protocol/tcp/client/session/SessionContext.java | 8 ++++++--
.../runtime/core/protocol/tcp/client/task/UnSubscribeTask.java | 4 ++--
.../apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java | 6 +++---
5 files changed, 16 insertions(+), 12 deletions(-)
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
index 5670bdd3c..9f219a321 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
@@ -277,7 +277,7 @@ public class ClientSessionGroupMapping {
* @param session
*/
private void cleanSubscriptionInSession(Session session) throws Exception {
- for (SubscriptionItem item :
session.getSessionContext().subscribeTopics.values()) {
+ for (SubscriptionItem item :
session.getSessionContext().getSubscribeTopics().values()) {
ClientGroupWrapper clientGroupWrapper =
Objects.requireNonNull(session.getClientGroupWrapper().get());
clientGroupWrapper.removeSubscription(item, session);
if (!clientGroupWrapper.hasSubscription(item.getTopic())) {
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java
index 52fcd5180..d5a8f4d0a 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java
@@ -169,7 +169,7 @@ public class Session {
public void subscribe(List<SubscriptionItem> items) throws Exception {
for (SubscriptionItem item : items) {
- sessionContext.subscribeTopics.putIfAbsent(item.getTopic(), item);
+ sessionContext.getSubscribeTopics().putIfAbsent(item.getTopic(),
item);
Objects.requireNonNull(clientGroupWrapper.get()).subscribe(item);
Objects.requireNonNull(clientGroupWrapper.get()).getMqProducerWrapper().getMeshMQProducer()
@@ -182,7 +182,7 @@ public class Session {
public void unsubscribe(List<SubscriptionItem> items) throws Exception {
for (SubscriptionItem item : items) {
- sessionContext.subscribeTopics.remove(item.getTopic());
+ sessionContext.getSubscribeTopics().remove(item.getTopic());
Objects.requireNonNull(clientGroupWrapper.get()).removeSubscription(item, this);
if
(!Objects.requireNonNull(clientGroupWrapper.get()).hasSubscription(item.getTopic()))
{
@@ -195,7 +195,7 @@ public class Session {
public EventMeshTcpSendResult upstreamMsg(Header header, CloudEvent event,
SendCallback sendCallback,
long startTime, long taskExecuteTime) {
String topic = event.getSubject();
- sessionContext.sendTopics.putIfAbsent(topic, topic);
+ sessionContext.getSendTopics().putIfAbsent(topic, topic);
return sender.send(header, event, sendCallback, startTime,
taskExecuteTime);
}
@@ -358,7 +358,7 @@ public class Session {
return false;
}
- if (!sessionContext.subscribeTopics.containsKey(topic)) {
+ if (!sessionContext.getSubscribeTopics().containsKey(topic)) {
log.warn("session is not available because session has not
subscribe topic:{},client:{}", topic, client);
return false;
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/SessionContext.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/SessionContext.java
index 2751df10b..8182bfc59 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/SessionContext.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/SessionContext.java
@@ -24,13 +24,17 @@ import org.apache.commons.lang3.time.DateFormatUtils;
import java.util.concurrent.ConcurrentHashMap;
+import lombok.Getter;
+
public class SessionContext {
private Session session;
- public ConcurrentHashMap<String, String> sendTopics = new
ConcurrentHashMap<String, String>();
+ @Getter
+ private final ConcurrentHashMap<String, String> sendTopics = new
ConcurrentHashMap<>(64);
- public ConcurrentHashMap<String, SubscriptionItem> subscribeTopics = new
ConcurrentHashMap<String, SubscriptionItem>();
+ @Getter
+ private final ConcurrentHashMap<String/*Topic*/, SubscriptionItem>
subscribeTopics = new ConcurrentHashMap<>(64);
public long createTime = System.currentTimeMillis();
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/UnSubscribeTask.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/UnSubscribeTask.java
index 77dc5cf1b..99bc12f59 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/UnSubscribeTask.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/UnSubscribeTask.java
@@ -55,8 +55,8 @@ public class UnSubscribeTask extends AbstractTask {
try {
synchronized (session) {
List<SubscriptionItem> topics = new
ArrayList<SubscriptionItem>();
- if
(MapUtils.isNotEmpty(session.getSessionContext().subscribeTopics)) {
- for (Map.Entry<String, SubscriptionItem> entry :
session.getSessionContext().subscribeTopics.entrySet()) {
+ if
(MapUtils.isNotEmpty(session.getSessionContext().getSubscribeTopics())) {
+ for (Map.Entry<String, SubscriptionItem> entry :
session.getSessionContext().getSubscribeTopics().entrySet()) {
topics.add(entry.getValue());
}
session.unsubscribe(topics);
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java
index 596c4361a..2c4804440 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java
@@ -112,14 +112,14 @@ public class EventMeshTcpMonitor {
AtomicLong deliveredMsgsCount =
session.getPusher().getDeliveredMsgsCount();
AtomicLong deliveredFailCount =
session.getPusher().getDeliverFailMsgsCount();
int unAckMsgsCount = session.getPusher().getTotalUnackMsgs();
- int sendTopics = session.getSessionContext().sendTopics.size();
- int subscribeTopics =
session.getSessionContext().subscribeTopics.size();
+ int sendTopics =
session.getSessionContext().getSendTopics().size();
+ int subscribeTopics =
session.getSessionContext().getSubscribeTopics().size();
tcpLogger.info("session|deliveredFailCount={}|deliveredMsgsCount={}|unAckMsgsCount={}|sendTopics={}|subscribeTopics={}|user={}",
deliveredFailCount.longValue(),
deliveredMsgsCount.longValue(),
unAckMsgsCount, sendTopics, subscribeTopics,
session.getClient());
-
topicSet.addAll(session.getSessionContext().subscribeTopics.keySet());
+
topicSet.addAll(session.getSessionContext().getSubscribeTopics().keySet());
}
tcpSummaryMetrics.setSubTopicNum(topicSet.size());
tcpSummaryMetrics.setAllConnections(EventMeshTcpConnectionHandler.connections.get());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]